import os from dotenv import load_dotenv # Load environment variables from .env file first, before any other imports if os.environ.get('FLASK_ENV') != 'production': load_dotenv() import asyncio import aiohttp import json from datetime import datetime, timezone, timedelta from flask import Flask from extensions import db, init_app from flask_apscheduler import APScheduler app = Flask(__name__) app.config.from_pyfile('config.py') init_app(app) # Initialize scheduler scheduler = APScheduler() TIMER_CHECK_INTERVAL_SECONDS = int(os.environ.get('TIMER_CHECK_INTERVAL_SECONDS', 10)) # Runtime-specific API URLs matching app.py configuration NODE_API_URL = os.environ.get('NODE_API_URL', 'http://isolator.web:5000/execute') DENO_API_URL = os.environ.get('DENO_API_URL', 'http://deno-isolator.web:5000/execute') PYTHON_API_URL = os.environ.get('PYTHON_API_URL', 'http://python-isolator.web:5000/execute') async def execute_timer_function_async(timer_function): """ Execute a timer function asynchronously and record the invocation """ try: code = timer_function['code'] environment = timer_function['environment'] name = timer_function['name'] version_number = timer_function['version_number'] runtime = timer_function.get('runtime', 'node') # Default to node if not specified # Select the appropriate API URL based on runtime if runtime == 'deno': api_url = DENO_API_URL elif runtime == 'python': api_url = PYTHON_API_URL else: api_url = NODE_API_URL # Load and inject shared environments (namespaced) shared_envs = db.execute(''' SELECT se.id, se.name, se.environment FROM timer_function_shared_envs tfse JOIN shared_environments se ON tfse.shared_env_id = se.id WHERE tfse.timer_function_id = %s ORDER BY se.name ''', [timer_function['id']]) # Inject shared environments as nested objects combined_environment = environment.copy() if environment else {} shared_env_map = {} # Track shared env IDs for later extraction if shared_envs: for se in shared_envs: env_data = json.loads(se['environment']) if isinstance(se['environment'], str) else se['environment'] combined_environment[se['name']] = env_data shared_env_map[se['name']] = se['id'] async with aiohttp.ClientSession() as session: async with session.post(api_url, json={ 'code': code, 'request': {'method': 'TIMER'}, 'environment': combined_environment, 'name': name }) as response: response_data = await response.json() # Extract and persist shared environment mutations returned_env = response_data['environment'] function_specific_env = {} # Separate function-specific properties from shared environments for key, value in returned_env.items(): if key in shared_env_map: # This is a shared environment - save it back db.execute( 'UPDATE shared_environments SET environment=%s, updated_at=NOW() WHERE id=%s', [json.dumps(value), shared_env_map[key]], commit=True ) else: # This is function-specific - keep it function_specific_env[key] = value # Update environment and record invocation # Calculate next run time based on trigger type next_run = None if timer_function.get('trigger_type') == 'interval' and timer_function.get('frequency_minutes'): next_run = datetime.now(timezone.utc) + timedelta(minutes=timer_function['frequency_minutes']) elif timer_function.get('trigger_type') == 'cron' and timer_function.get('cron_expression'): from croniter import croniter try: next_run = croniter(timer_function['cron_expression'], datetime.now(timezone.utc)).get_next(datetime) except Exception as e: print(f"Error calculating next cron run for timer {timer_function['id']}: {str(e)}") next_run = None # For 'date' trigger type, next_run should be NULL (one-time execution) db.execute(""" UPDATE timer_functions SET environment = %s::jsonb, last_run = NOW(), next_run = %s WHERE id = %s """, [json.dumps(function_specific_env), next_run, timer_function['id']], commit=True) # Record the invocation db.execute(""" INSERT INTO timer_function_invocations (timer_function_id, status, logs, version_number, execution_time) VALUES (%s, %s, %s, %s, %s) """, [ timer_function['id'], response_data['status'], json.dumps(response_data['logs']), version_number, response_data.get('execution_time') ], commit=True) except Exception as e: print(f"Error executing timer function {timer_function['id']}: {str(e)}") db.execute(""" INSERT INTO timer_function_invocations (timer_function_id, status, logs, version_number) VALUES (%s, %s, %s, %s) """, [ timer_function['id'], 'ERROR', json.dumps({'error': str(e)}), timer_function['version_number'] ], commit=True) async def execute_batch_timer_functions(timer_functions): """Execute multiple timer functions concurrently""" tasks = [execute_timer_function_async(tf) for tf in timer_functions] await asyncio.gather(*tasks, return_exceptions=True) def check_and_execute_timer_functions(): """Background job to check and execute timer functions""" try: with app.app_context(): print("Starting timer check job...") timer_functions = db.execute(""" SELECT id, name, code, environment, version_number, trigger_type, frequency_minutes, run_date, cron_expression, next_run, enabled, runtime, EXTRACT(EPOCH FROM (NOW() - next_run)) as seconds_since_next_run FROM timer_functions WHERE enabled = true AND next_run <= NOW() ORDER BY next_run ASC """) if timer_functions: print(f"Found {len(timer_functions)} timer functions to execute:") for tf in timer_functions: print(f" - {tf['name']} (ID: {tf['id']}, {tf['seconds_since_next_run']:.1f} seconds overdue)") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) print("Starting async execution...") loop.run_until_complete(execute_batch_timer_functions(timer_functions)) loop.close() print("Completed async execution") else: print("No timer functions due for execution") except Exception as e: print(f"Error in timer check job: {str(e)}") print(f"Error details: {type(e).__name__}") import traceback print(traceback.format_exc()) if __name__ == '__main__': with app.app_context(): # Configure and start scheduler app.config['SCHEDULER_API_ENABLED'] = True scheduler.init_app(app) scheduler.add_job( id='timer_check', func=check_and_execute_timer_functions, trigger='interval', seconds=TIMER_CHECK_INTERVAL_SECONDS ) scheduler.start() # Keep the process running try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): pass