import os from dotenv import load_dotenv # Load environment variables from .env file first, before any other imports if os.environ.get('FLASK_ENV') != 'production': load_dotenv() import asyncio import aiohttp import json from flask import Flask from extensions import db, init_app from flask_apscheduler import APScheduler app = Flask(__name__) app.config.from_pyfile('config.py') init_app(app) # Initialize scheduler scheduler = APScheduler() TIMER_CHECK_INTERVAL = int(os.environ.get('TIMER_CHECK_INTERVAL_MINUTES', 1)) # Change back to 5 minutes API_URL = os.environ.get('API_URL', 'http://isolator.web:5000/execute') async def execute_timer_function_async(timer_function): """ Execute a timer function asynchronously and record the invocation """ try: code = timer_function['code'] environment = timer_function['environment'] name = timer_function['name'] version_number = timer_function['version_number'] async with aiohttp.ClientSession() as session: async with session.post(API_URL, json={ 'code': code, 'request': {'method': 'TIMER'}, 'environment': environment, 'name': name }) as response: response_data = await response.json() # Update environment and record invocation db.execute(""" UPDATE timer_functions SET environment = %s::jsonb, last_run = NOW(), next_run = CASE WHEN trigger_type = 'interval' THEN NOW() + (frequency_minutes || ' minutes')::interval ELSE NULL END WHERE id = %s """, [json.dumps(response_data['environment']), timer_function['id']], commit=True) # Record the invocation db.execute(""" INSERT INTO timer_function_invocations (timer_function_id, status, logs, version_number, execution_time) VALUES (%s, %s, %s, %s, %s) """, [ timer_function['id'], response_data['status'], json.dumps(response_data['logs']), version_number, response_data.get('execution_time') ], commit=True) except Exception as e: print(f"Error executing timer function {timer_function['id']}: {str(e)}") db.execute(""" INSERT INTO timer_function_invocations (timer_function_id, status, logs, version_number) VALUES (%s, %s, %s, %s) """, [ timer_function['id'], 'ERROR', json.dumps({'error': str(e)}), timer_function['version_number'] ], commit=True) async def execute_batch_timer_functions(timer_functions): """Execute multiple timer functions concurrently""" tasks = [execute_timer_function_async(tf) for tf in timer_functions] await asyncio.gather(*tasks, return_exceptions=True) def check_and_execute_timer_functions(): """Background job to check and execute timer functions""" try: with app.app_context(): print("Starting timer check job...") timer_functions = db.execute(""" SELECT id, name, code, environment, version_number, trigger_type, frequency_minutes, run_date, next_run, enabled, EXTRACT(EPOCH FROM (NOW() - next_run)) as seconds_since_next_run FROM timer_functions WHERE enabled = true AND next_run <= NOW() ORDER BY next_run ASC """) if timer_functions: print(f"Found {len(timer_functions)} timer functions to execute:") for tf in timer_functions: print(f" - {tf['name']} (ID: {tf['id']}, {tf['seconds_since_next_run']:.1f} seconds overdue)") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) print("Starting async execution...") loop.run_until_complete(execute_batch_timer_functions(timer_functions)) loop.close() print("Completed async execution") else: print("No timer functions due for execution") except Exception as e: print(f"Error in timer check job: {str(e)}") print(f"Error details: {type(e).__name__}") import traceback print(traceback.format_exc()) if __name__ == '__main__': with app.app_context(): # Configure and start scheduler app.config['SCHEDULER_API_ENABLED'] = True scheduler.init_app(app) scheduler.add_job( id='timer_check', func=check_and_execute_timer_functions, trigger='interval', minutes=TIMER_CHECK_INTERVAL ) scheduler.start() # Keep the process running try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): pass