From 1b7cfcc8b83d9f5b977729ef97bac6f5573089bd Mon Sep 17 00:00:00 2001 From: Peter Stockings Date: Sun, 23 Feb 2025 16:48:01 +1100 Subject: [PATCH] Add background job scheduler for timer functions - Implement asynchronous timer function execution using APScheduler - Add support for concurrent timer function invocations with asyncio - Create background job to check and run enabled timer functions at specified intervals - Update requirements.txt to include aiohttp and flask-apscheduler - Configure thread pool executor for scheduler - Add error handling and logging for timer function executions --- app.py | 137 ++++++++++++++++++++++++++++++++++++++++++++++- requirements.txt | 4 +- 2 files changed, 139 insertions(+), 2 deletions(-) diff --git a/app.py b/app.py index 84572e5..91642bb 100644 --- a/app.py +++ b/app.py @@ -13,18 +13,34 @@ from dotenv import load_dotenv from routes.timer import timer from routes.test import test from routes.home import home +from flask_apscheduler import APScheduler +import asyncio +import aiohttp +from concurrent.futures import ThreadPoolExecutor # Load environment variables from .env file in non-production environments if os.environ.get('FLASK_ENV') != 'production': load_dotenv() -login_manager = LoginManager() app = Flask(__name__) +# Initialize scheduler +scheduler = APScheduler() +# Get timer check interval from environment (default 5 minutes) +TIMER_CHECK_INTERVAL = int(os.environ.get('TIMER_CHECK_INTERVAL_MINUTES', 5)) + app.config.from_pyfile('config.py') app.secret_key = os.environ.get('SECRET_KEY', '2a661781919643cb8a5a8bc57642d99f') +login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = "login" jinja_partials.register_extensions(app) + +# Configure scheduler with thread pool +app.config['SCHEDULER_API_ENABLED'] = True +app.config['SCHEDULER_EXECUTORS'] = { + 'default': {'type': 'threadpool', 'max_workers': 20} # Configure thread pool executor +} +scheduler.init_app(app) init_app(app) app.register_blueprint(timer, url_prefix='/timer') @@ -549,6 +565,125 @@ def load_user(user_id): return User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at']) return None +async def execute_timer_function_async(timer_function): + """ + Execute a timer function asynchronously and record the invocation + """ + try: + code = timer_function['code'] + environment = timer_function['environment'] + name = timer_function['name'] + version_number = timer_function['version_number'] + + async with aiohttp.ClientSession() as session: + async with session.post(API_URL, json={ + 'code': code, + 'request': {'method': 'TIMER'}, + 'environment': environment, + 'name': name + }) as response: + response_data = await response.json() + + # Update environment and record invocation + db.execute(""" + UPDATE timer_functions + SET environment = %s::jsonb, + last_run = NOW(), + next_run = CASE + WHEN trigger_type = 'interval' + THEN NOW() + (frequency_minutes || ' minutes')::interval + ELSE NULL + END + WHERE id = %s + """, [json.dumps(response_data['environment']), timer_function['id']], commit=True) + + # Record the invocation + db.execute(""" + INSERT INTO timer_function_invocations + (timer_function_id, status, logs, version_number) + VALUES (%s, %s, %s, %s) + """, [ + timer_function['id'], + response_data['status'], + json.dumps(response_data['logs']), + version_number + ], commit=True) + + except Exception as e: + print(f"Error executing timer function {timer_function['id']}: {str(e)}") + # Record the failed invocation + db.execute(""" + INSERT INTO timer_function_invocations + (timer_function_id, status, logs, version_number) + VALUES (%s, %s, %s, %s) + """, [ + timer_function['id'], + 'ERROR', + json.dumps({'error': str(e)}), + timer_function['version_number'] + ], commit=True) + +async def execute_batch_timer_functions(timer_functions): + """ + Execute multiple timer functions concurrently + """ + tasks = [execute_timer_function_async(tf) for tf in timer_functions] + await asyncio.gather(*tasks, return_exceptions=True) + +def check_and_execute_timer_functions(): + """ + Background job to check and execute timer functions + """ + try: + with app.app_context(): + print("Starting timer check job...") + timer_functions = db.execute(""" + SELECT + id, + name, + code, + environment, + version_number, + trigger_type, + frequency_minutes, + run_date, + next_run, + enabled, + EXTRACT(EPOCH FROM (NOW() - next_run)) as seconds_since_next_run + FROM timer_functions + WHERE enabled = true + AND next_run <= NOW() + ORDER BY next_run ASC + """) + + if timer_functions: + print(f"Found {len(timer_functions)} timer functions to execute:") + for tf in timer_functions: + print(f" - {tf['name']} (ID: {tf['id']}, {tf['seconds_since_next_run']:.1f} seconds overdue)") + + # Create a new event loop in this thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + print("Starting async execution...") + # Run the async execution + loop.run_until_complete(execute_batch_timer_functions(timer_functions)) + loop.close() + print("Completed async execution") + else: + print("No timer functions due for execution") + + except Exception as e: + print(f"Error in timer check job: {str(e)}") + print(f"Error details: {type(e).__name__}") + import traceback + print(traceback.format_exc()) + +scheduler.add_job(id='timer_check', + func=check_and_execute_timer_functions, + trigger='interval', + minutes=TIMER_CHECK_INTERVAL) +scheduler.start() if __name__ == '__main__': # Bind to PORT if defined, otherwise default to 5000. diff --git a/requirements.txt b/requirements.txt index 4e4e434..ff229d8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,6 @@ jinja2-fragments==0.3.0 Werkzeug==2.2.2 requests==2.26.0 Flask-Login==0.6.3 -python-dotenv==1.0.1 \ No newline at end of file +python-dotenv==1.0.1 +aiohttp==3.11.12 +flask-apscheduler==1.13.1 \ No newline at end of file