206 lines
8.2 KiB
Python
206 lines
8.2 KiB
Python
import os
|
|
import time
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables from .env file first, before any other imports
|
|
if os.environ.get('FLASK_ENV') != 'production':
|
|
load_dotenv()
|
|
|
|
import asyncio
|
|
import aiohttp
|
|
import json
|
|
from datetime import datetime, timezone, timedelta
|
|
from flask import Flask
|
|
from extensions import db, init_app
|
|
from flask_apscheduler import APScheduler
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_pyfile('config.py')
|
|
init_app(app)
|
|
|
|
# Initialize scheduler
|
|
scheduler = APScheduler()
|
|
TIMER_CHECK_INTERVAL_SECONDS = int(os.environ.get('TIMER_CHECK_INTERVAL_SECONDS', 10))
|
|
|
|
# Runtime-specific API URLs matching app.py configuration
|
|
NODE_API_URL = os.environ.get('NODE_API_URL', 'http://isolator.web:5000/execute')
|
|
DENO_API_URL = os.environ.get('DENO_API_URL', 'http://deno-isolator.web:5000/execute')
|
|
PYTHON_API_URL = os.environ.get('PYTHON_API_URL', 'http://python-isolator.web:5000/execute')
|
|
|
|
async def execute_timer_function_async(timer_function):
|
|
"""
|
|
Execute a timer function asynchronously and record the invocation
|
|
"""
|
|
try:
|
|
start_time = time.time()
|
|
code = timer_function['code']
|
|
environment = timer_function['environment']
|
|
name = timer_function['name']
|
|
version_number = timer_function['version_number']
|
|
runtime = timer_function.get('runtime', 'node') # Default to node if not specified
|
|
|
|
# Select the appropriate API URL based on runtime
|
|
if runtime == 'deno':
|
|
api_url = DENO_API_URL
|
|
elif runtime == 'python':
|
|
api_url = PYTHON_API_URL
|
|
else:
|
|
api_url = NODE_API_URL
|
|
|
|
# Load and inject shared environments (namespaced)
|
|
shared_envs = db.execute('''
|
|
SELECT se.id, se.name, se.environment
|
|
FROM timer_function_shared_envs tfse
|
|
JOIN shared_environments se ON tfse.shared_env_id = se.id
|
|
WHERE tfse.timer_function_id = %s
|
|
ORDER BY se.name
|
|
''', [timer_function['id']])
|
|
|
|
# Inject shared environments as nested objects
|
|
combined_environment = environment.copy() if environment else {}
|
|
shared_env_map = {} # Track shared env IDs for later extraction
|
|
if shared_envs:
|
|
for se in shared_envs:
|
|
env_data = json.loads(se['environment']) if isinstance(se['environment'], str) else se['environment']
|
|
combined_environment[se['name']] = env_data
|
|
shared_env_map[se['name']] = se['id']
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(api_url, json={
|
|
'code': code,
|
|
'request': {'method': 'TIMER'},
|
|
'environment': combined_environment,
|
|
'name': name
|
|
}) as response:
|
|
response_data = await response.json()
|
|
|
|
# Extract and persist shared environment mutations
|
|
returned_env = response_data['environment']
|
|
function_specific_env = {}
|
|
|
|
# Separate function-specific properties from shared environments
|
|
for key, value in returned_env.items():
|
|
if key in shared_env_map:
|
|
# This is a shared environment - save it back
|
|
db.execute(
|
|
'UPDATE shared_environments SET environment=%s, updated_at=NOW() WHERE id=%s',
|
|
[json.dumps(value), shared_env_map[key]],
|
|
commit=True
|
|
)
|
|
else:
|
|
# This is function-specific - keep it
|
|
function_specific_env[key] = value
|
|
|
|
# Update environment and record invocation
|
|
# Calculate next run time based on trigger type
|
|
next_run = None
|
|
if timer_function.get('trigger_type') == 'interval' and timer_function.get('frequency_minutes'):
|
|
next_run = datetime.now(timezone.utc) + timedelta(minutes=timer_function['frequency_minutes'])
|
|
elif timer_function.get('trigger_type') == 'cron' and timer_function.get('cron_expression'):
|
|
from croniter import croniter
|
|
try:
|
|
next_run = croniter(timer_function['cron_expression'], datetime.now(timezone.utc)).get_next(datetime)
|
|
except Exception as e:
|
|
print(f"Error calculating next cron run for timer {timer_function['id']}: {str(e)}")
|
|
next_run = None
|
|
# For 'date' trigger type, next_run should be NULL (one-time execution)
|
|
|
|
db.execute("""
|
|
UPDATE timer_functions
|
|
SET environment = %s::jsonb,
|
|
last_run = NOW(),
|
|
next_run = %s
|
|
WHERE id = %s
|
|
""", [json.dumps(function_specific_env), next_run, timer_function['id']], commit=True)
|
|
|
|
# Calculate execution time in milliseconds
|
|
execution_time = (time.time() - start_time) * 1000
|
|
|
|
# Record the invocation
|
|
db.execute("""
|
|
INSERT INTO timer_function_invocations
|
|
(timer_function_id, status, logs, version_number, execution_time)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
""", [
|
|
timer_function['id'],
|
|
response_data['status'],
|
|
json.dumps(response_data['logs']),
|
|
version_number,
|
|
execution_time
|
|
], commit=True)
|
|
|
|
except Exception as e:
|
|
print(f"Error executing timer function {timer_function['id']}: {str(e)}")
|
|
db.execute("""
|
|
INSERT INTO timer_function_invocations
|
|
(timer_function_id, status, logs, version_number)
|
|
VALUES (%s, %s, %s, %s)
|
|
""", [
|
|
timer_function['id'],
|
|
'ERROR',
|
|
json.dumps({'error': str(e)}),
|
|
timer_function['version_number']
|
|
], commit=True)
|
|
|
|
async def execute_batch_timer_functions(timer_functions):
|
|
"""Execute multiple timer functions concurrently"""
|
|
tasks = [execute_timer_function_async(tf) for tf in timer_functions]
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
def check_and_execute_timer_functions():
|
|
"""Background job to check and execute timer functions"""
|
|
try:
|
|
with app.app_context():
|
|
print("Starting timer check job...")
|
|
timer_functions = db.execute("""
|
|
SELECT
|
|
id, name, code, environment, version_number,
|
|
trigger_type, frequency_minutes, run_date, cron_expression,
|
|
next_run, enabled, runtime,
|
|
EXTRACT(EPOCH FROM (NOW() - next_run)) as seconds_since_next_run
|
|
FROM timer_functions
|
|
WHERE enabled = true
|
|
AND next_run <= NOW()
|
|
ORDER BY next_run ASC
|
|
""")
|
|
|
|
if timer_functions:
|
|
print(f"Found {len(timer_functions)} timer functions to execute:")
|
|
for tf in timer_functions:
|
|
print(f" - {tf['name']} (ID: {tf['id']}, {tf['seconds_since_next_run']:.1f} seconds overdue)")
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
print("Starting async execution...")
|
|
loop.run_until_complete(execute_batch_timer_functions(timer_functions))
|
|
loop.close()
|
|
print("Completed async execution")
|
|
else:
|
|
print("No timer functions due for execution")
|
|
|
|
except Exception as e:
|
|
print(f"Error in timer check job: {str(e)}")
|
|
print(f"Error details: {type(e).__name__}")
|
|
import traceback
|
|
print(traceback.format_exc())
|
|
|
|
if __name__ == '__main__':
|
|
with app.app_context():
|
|
# Configure and start scheduler
|
|
app.config['SCHEDULER_API_ENABLED'] = True
|
|
scheduler.init_app(app)
|
|
scheduler.add_job(
|
|
id='timer_check',
|
|
func=check_and_execute_timer_functions,
|
|
trigger='interval',
|
|
seconds=TIMER_CHECK_INTERVAL_SECONDS
|
|
)
|
|
scheduler.start()
|
|
|
|
# Keep the process running
|
|
try:
|
|
asyncio.get_event_loop().run_forever()
|
|
except (KeyboardInterrupt, SystemExit):
|
|
pass
|