Separate timer function execution into dedicated worker process

- Move timer function scheduling and execution logic from `app.py` to new `worker.py`
- Update `Procfile` to launch worker process alongside web application
- Simplify main application startup by removing scheduler configuration
- Maintain existing timer function execution and logging behavior
This commit is contained in:
Peter Stockings
2025-02-23 17:57:16 +11:00
parent df9378ac23
commit 525471d8c0
3 changed files with 144 additions and 131 deletions

View File

@@ -1 +1,2 @@
web: gunicorn app:app --workers=4
worker: python worker.py

131
app.py
View File

@@ -23,10 +23,6 @@ if os.environ.get('FLASK_ENV') != 'production':
load_dotenv()
app = Flask(__name__)
# Initialize scheduler
scheduler = APScheduler()
# Get timer check interval from environment (default 5 minutes)
TIMER_CHECK_INTERVAL = int(os.environ.get('TIMER_CHECK_INTERVAL_MINUTES', 5))
app.config.from_pyfile('config.py')
app.secret_key = os.environ.get('SECRET_KEY', '2a661781919643cb8a5a8bc57642d99f')
@@ -35,12 +31,7 @@ login_manager.init_app(app)
login_manager.login_view = "login"
jinja_partials.register_extensions(app)
# Configure scheduler with thread pool
app.config['SCHEDULER_API_ENABLED'] = True
app.config['SCHEDULER_EXECUTORS'] = {
'default': {'type': 'threadpool', 'max_workers': 20} # Configure thread pool executor
}
scheduler.init_app(app)
# Remove scheduler configuration and initialization
init_app(app)
app.register_blueprint(timer, url_prefix='/timer')
@@ -565,126 +556,6 @@ def load_user(user_id):
return User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
return None
async def execute_timer_function_async(timer_function):
"""
Execute a timer function asynchronously and record the invocation
"""
try:
code = timer_function['code']
environment = timer_function['environment']
name = timer_function['name']
version_number = timer_function['version_number']
async with aiohttp.ClientSession() as session:
async with session.post(API_URL, json={
'code': code,
'request': {'method': 'TIMER'},
'environment': environment,
'name': name
}) as response:
response_data = await response.json()
# Update environment and record invocation
db.execute("""
UPDATE timer_functions
SET environment = %s::jsonb,
last_run = NOW(),
next_run = CASE
WHEN trigger_type = 'interval'
THEN NOW() + (frequency_minutes || ' minutes')::interval
ELSE NULL
END
WHERE id = %s
""", [json.dumps(response_data['environment']), timer_function['id']], commit=True)
# Record the invocation
db.execute("""
INSERT INTO timer_function_invocations
(timer_function_id, status, logs, version_number)
VALUES (%s, %s, %s, %s)
""", [
timer_function['id'],
response_data['status'],
json.dumps(response_data['logs']),
version_number
], commit=True)
except Exception as e:
print(f"Error executing timer function {timer_function['id']}: {str(e)}")
# Record the failed invocation
db.execute("""
INSERT INTO timer_function_invocations
(timer_function_id, status, logs, version_number)
VALUES (%s, %s, %s, %s)
""", [
timer_function['id'],
'ERROR',
json.dumps({'error': str(e)}),
timer_function['version_number']
], commit=True)
async def execute_batch_timer_functions(timer_functions):
"""
Execute multiple timer functions concurrently
"""
tasks = [execute_timer_function_async(tf) for tf in timer_functions]
await asyncio.gather(*tasks, return_exceptions=True)
def check_and_execute_timer_functions():
"""
Background job to check and execute timer functions
"""
try:
with app.app_context():
print("Starting timer check job...")
timer_functions = db.execute("""
SELECT
id,
name,
code,
environment,
version_number,
trigger_type,
frequency_minutes,
run_date,
next_run,
enabled,
EXTRACT(EPOCH FROM (NOW() - next_run)) as seconds_since_next_run
FROM timer_functions
WHERE enabled = true
AND next_run <= NOW()
ORDER BY next_run ASC
""")
if timer_functions:
print(f"Found {len(timer_functions)} timer functions to execute:")
for tf in timer_functions:
print(f" - {tf['name']} (ID: {tf['id']}, {tf['seconds_since_next_run']:.1f} seconds overdue)")
# Create a new event loop in this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print("Starting async execution...")
# Run the async execution
loop.run_until_complete(execute_batch_timer_functions(timer_functions))
loop.close()
print("Completed async execution")
else:
print("No timer functions due for execution")
except Exception as e:
print(f"Error in timer check job: {str(e)}")
print(f"Error details: {type(e).__name__}")
import traceback
print(traceback.format_exc())
scheduler.add_job(id='timer_check',
func=check_and_execute_timer_functions,
trigger='interval',
minutes=TIMER_CHECK_INTERVAL)
scheduler.start()
if __name__ == '__main__':
# Bind to PORT if defined, otherwise default to 5000.
port = int(os.environ.get('PORT', 5000))

141
worker.py Normal file
View File

@@ -0,0 +1,141 @@
import os
from dotenv import load_dotenv
# Load environment variables from .env file first, before any other imports
if os.environ.get('FLASK_ENV') != 'production':
load_dotenv()
import asyncio
import aiohttp
import json
from flask import Flask
from extensions import db, init_app
from flask_apscheduler import APScheduler
app = Flask(__name__)
app.config.from_pyfile('config.py')
init_app(app)
# Initialize scheduler
scheduler = APScheduler()
TIMER_CHECK_INTERVAL = int(os.environ.get('TIMER_CHECK_INTERVAL_MINUTES', 1)) # Change back to 5 minutes
API_URL = os.environ.get('API_URL', 'http://isolator.web:5000/execute')
async def execute_timer_function_async(timer_function):
"""
Execute a timer function asynchronously and record the invocation
"""
try:
code = timer_function['code']
environment = timer_function['environment']
name = timer_function['name']
version_number = timer_function['version_number']
async with aiohttp.ClientSession() as session:
async with session.post(API_URL, json={
'code': code,
'request': {'method': 'TIMER'},
'environment': environment,
'name': name
}) as response:
response_data = await response.json()
# Update environment and record invocation
db.execute("""
UPDATE timer_functions
SET environment = %s::jsonb,
last_run = NOW(),
next_run = CASE
WHEN trigger_type = 'interval'
THEN NOW() + (frequency_minutes || ' minutes')::interval
ELSE NULL
END
WHERE id = %s
""", [json.dumps(response_data['environment']), timer_function['id']], commit=True)
# Record the invocation
db.execute("""
INSERT INTO timer_function_invocations
(timer_function_id, status, logs, version_number)
VALUES (%s, %s, %s, %s)
""", [
timer_function['id'],
response_data['status'],
json.dumps(response_data['logs']),
version_number
], commit=True)
except Exception as e:
print(f"Error executing timer function {timer_function['id']}: {str(e)}")
db.execute("""
INSERT INTO timer_function_invocations
(timer_function_id, status, logs, version_number)
VALUES (%s, %s, %s, %s)
""", [
timer_function['id'],
'ERROR',
json.dumps({'error': str(e)}),
timer_function['version_number']
], commit=True)
async def execute_batch_timer_functions(timer_functions):
"""Execute multiple timer functions concurrently"""
tasks = [execute_timer_function_async(tf) for tf in timer_functions]
await asyncio.gather(*tasks, return_exceptions=True)
def check_and_execute_timer_functions():
"""Background job to check and execute timer functions"""
try:
with app.app_context():
print("Starting timer check job...")
timer_functions = db.execute("""
SELECT
id, name, code, environment, version_number,
trigger_type, frequency_minutes, run_date,
next_run, enabled,
EXTRACT(EPOCH FROM (NOW() - next_run)) as seconds_since_next_run
FROM timer_functions
WHERE enabled = true
AND next_run <= NOW()
ORDER BY next_run ASC
""")
if timer_functions:
print(f"Found {len(timer_functions)} timer functions to execute:")
for tf in timer_functions:
print(f" - {tf['name']} (ID: {tf['id']}, {tf['seconds_since_next_run']:.1f} seconds overdue)")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print("Starting async execution...")
loop.run_until_complete(execute_batch_timer_functions(timer_functions))
loop.close()
print("Completed async execution")
else:
print("No timer functions due for execution")
except Exception as e:
print(f"Error in timer check job: {str(e)}")
print(f"Error details: {type(e).__name__}")
import traceback
print(traceback.format_exc())
if __name__ == '__main__':
with app.app_context():
# Configure and start scheduler
app.config['SCHEDULER_API_ENABLED'] = True
scheduler.init_app(app)
scheduler.add_job(
id='timer_check',
func=check_and_execute_timer_functions,
trigger='interval',
minutes=TIMER_CHECK_INTERVAL
)
scheduler.start()
# Keep the process running
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass