diff --git a/Procfile b/Procfile index 9af4d6b..266baa5 100644 --- a/Procfile +++ b/Procfile @@ -1 +1,2 @@ -web: gunicorn app:app --workers=4 \ No newline at end of file +web: gunicorn app:app --workers=4 +worker: python worker.py \ No newline at end of file diff --git a/app.py b/app.py index 91642bb..761f0aa 100644 --- a/app.py +++ b/app.py @@ -23,10 +23,6 @@ if os.environ.get('FLASK_ENV') != 'production': load_dotenv() app = Flask(__name__) -# Initialize scheduler -scheduler = APScheduler() -# Get timer check interval from environment (default 5 minutes) -TIMER_CHECK_INTERVAL = int(os.environ.get('TIMER_CHECK_INTERVAL_MINUTES', 5)) app.config.from_pyfile('config.py') app.secret_key = os.environ.get('SECRET_KEY', '2a661781919643cb8a5a8bc57642d99f') @@ -35,12 +31,7 @@ login_manager.init_app(app) login_manager.login_view = "login" jinja_partials.register_extensions(app) -# Configure scheduler with thread pool -app.config['SCHEDULER_API_ENABLED'] = True -app.config['SCHEDULER_EXECUTORS'] = { - 'default': {'type': 'threadpool', 'max_workers': 20} # Configure thread pool executor -} -scheduler.init_app(app) +# Remove scheduler configuration and initialization init_app(app) app.register_blueprint(timer, url_prefix='/timer') @@ -565,126 +556,6 @@ def load_user(user_id): return User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at']) return None -async def execute_timer_function_async(timer_function): - """ - Execute a timer function asynchronously and record the invocation - """ - try: - code = timer_function['code'] - environment = timer_function['environment'] - name = timer_function['name'] - version_number = timer_function['version_number'] - - async with aiohttp.ClientSession() as session: - async with session.post(API_URL, json={ - 'code': code, - 'request': {'method': 'TIMER'}, - 'environment': environment, - 'name': name - }) as response: - response_data = await response.json() - - # Update environment and record invocation - db.execute(""" - UPDATE timer_functions - SET environment = %s::jsonb, - last_run = NOW(), - next_run = CASE - WHEN trigger_type = 'interval' - THEN NOW() + (frequency_minutes || ' minutes')::interval - ELSE NULL - END - WHERE id = %s - """, [json.dumps(response_data['environment']), timer_function['id']], commit=True) - - # Record the invocation - db.execute(""" - INSERT INTO timer_function_invocations - (timer_function_id, status, logs, version_number) - VALUES (%s, %s, %s, %s) - """, [ - timer_function['id'], - response_data['status'], - json.dumps(response_data['logs']), - version_number - ], commit=True) - - except Exception as e: - print(f"Error executing timer function {timer_function['id']}: {str(e)}") - # Record the failed invocation - db.execute(""" - INSERT INTO timer_function_invocations - (timer_function_id, status, logs, version_number) - VALUES (%s, %s, %s, %s) - """, [ - timer_function['id'], - 'ERROR', - json.dumps({'error': str(e)}), - timer_function['version_number'] - ], commit=True) - -async def execute_batch_timer_functions(timer_functions): - """ - Execute multiple timer functions concurrently - """ - tasks = [execute_timer_function_async(tf) for tf in timer_functions] - await asyncio.gather(*tasks, return_exceptions=True) - -def check_and_execute_timer_functions(): - """ - Background job to check and execute timer functions - """ - try: - with app.app_context(): - print("Starting timer check job...") - timer_functions = db.execute(""" - SELECT - id, - name, - code, - environment, - version_number, - trigger_type, - frequency_minutes, - run_date, - next_run, - enabled, - EXTRACT(EPOCH FROM (NOW() - next_run)) as seconds_since_next_run - FROM timer_functions - WHERE enabled = true - AND next_run <= NOW() - ORDER BY next_run ASC - """) - - if timer_functions: - print(f"Found {len(timer_functions)} timer functions to execute:") - for tf in timer_functions: - print(f" - {tf['name']} (ID: {tf['id']}, {tf['seconds_since_next_run']:.1f} seconds overdue)") - - # Create a new event loop in this thread - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - print("Starting async execution...") - # Run the async execution - loop.run_until_complete(execute_batch_timer_functions(timer_functions)) - loop.close() - print("Completed async execution") - else: - print("No timer functions due for execution") - - except Exception as e: - print(f"Error in timer check job: {str(e)}") - print(f"Error details: {type(e).__name__}") - import traceback - print(traceback.format_exc()) - -scheduler.add_job(id='timer_check', - func=check_and_execute_timer_functions, - trigger='interval', - minutes=TIMER_CHECK_INTERVAL) -scheduler.start() - if __name__ == '__main__': # Bind to PORT if defined, otherwise default to 5000. port = int(os.environ.get('PORT', 5000)) diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..7370dbb --- /dev/null +++ b/worker.py @@ -0,0 +1,141 @@ +import os +from dotenv import load_dotenv + +# Load environment variables from .env file first, before any other imports +if os.environ.get('FLASK_ENV') != 'production': + load_dotenv() + +import asyncio +import aiohttp +import json +from flask import Flask +from extensions import db, init_app +from flask_apscheduler import APScheduler + +app = Flask(__name__) +app.config.from_pyfile('config.py') +init_app(app) + +# Initialize scheduler +scheduler = APScheduler() +TIMER_CHECK_INTERVAL = int(os.environ.get('TIMER_CHECK_INTERVAL_MINUTES', 1)) # Change back to 5 minutes +API_URL = os.environ.get('API_URL', 'http://isolator.web:5000/execute') + +async def execute_timer_function_async(timer_function): + """ + Execute a timer function asynchronously and record the invocation + """ + try: + code = timer_function['code'] + environment = timer_function['environment'] + name = timer_function['name'] + version_number = timer_function['version_number'] + + async with aiohttp.ClientSession() as session: + async with session.post(API_URL, json={ + 'code': code, + 'request': {'method': 'TIMER'}, + 'environment': environment, + 'name': name + }) as response: + response_data = await response.json() + + # Update environment and record invocation + db.execute(""" + UPDATE timer_functions + SET environment = %s::jsonb, + last_run = NOW(), + next_run = CASE + WHEN trigger_type = 'interval' + THEN NOW() + (frequency_minutes || ' minutes')::interval + ELSE NULL + END + WHERE id = %s + """, [json.dumps(response_data['environment']), timer_function['id']], commit=True) + + # Record the invocation + db.execute(""" + INSERT INTO timer_function_invocations + (timer_function_id, status, logs, version_number) + VALUES (%s, %s, %s, %s) + """, [ + timer_function['id'], + response_data['status'], + json.dumps(response_data['logs']), + version_number + ], commit=True) + + except Exception as e: + print(f"Error executing timer function {timer_function['id']}: {str(e)}") + db.execute(""" + INSERT INTO timer_function_invocations + (timer_function_id, status, logs, version_number) + VALUES (%s, %s, %s, %s) + """, [ + timer_function['id'], + 'ERROR', + json.dumps({'error': str(e)}), + timer_function['version_number'] + ], commit=True) + +async def execute_batch_timer_functions(timer_functions): + """Execute multiple timer functions concurrently""" + tasks = [execute_timer_function_async(tf) for tf in timer_functions] + await asyncio.gather(*tasks, return_exceptions=True) + +def check_and_execute_timer_functions(): + """Background job to check and execute timer functions""" + try: + with app.app_context(): + print("Starting timer check job...") + timer_functions = db.execute(""" + SELECT + id, name, code, environment, version_number, + trigger_type, frequency_minutes, run_date, + next_run, enabled, + EXTRACT(EPOCH FROM (NOW() - next_run)) as seconds_since_next_run + FROM timer_functions + WHERE enabled = true + AND next_run <= NOW() + ORDER BY next_run ASC + """) + + if timer_functions: + print(f"Found {len(timer_functions)} timer functions to execute:") + for tf in timer_functions: + print(f" - {tf['name']} (ID: {tf['id']}, {tf['seconds_since_next_run']:.1f} seconds overdue)") + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + print("Starting async execution...") + loop.run_until_complete(execute_batch_timer_functions(timer_functions)) + loop.close() + print("Completed async execution") + else: + print("No timer functions due for execution") + + except Exception as e: + print(f"Error in timer check job: {str(e)}") + print(f"Error details: {type(e).__name__}") + import traceback + print(traceback.format_exc()) + +if __name__ == '__main__': + with app.app_context(): + # Configure and start scheduler + app.config['SCHEDULER_API_ENABLED'] = True + scheduler.init_app(app) + scheduler.add_job( + id='timer_check', + func=check_and_execute_timer_functions, + trigger='interval', + minutes=TIMER_CHECK_INTERVAL + ) + scheduler.start() + + # Keep the process running + try: + asyncio.get_event_loop().run_forever() + except (KeyboardInterrupt, SystemExit): + pass \ No newline at end of file