Files
function/routes/timer.py
2025-12-02 20:36:55 +11:00

659 lines
23 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify
from jinja2_fragments import render_block
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import current_user, login_required
from extensions import db, htmx, environment
from datetime import datetime, timezone, timedelta
import json
'''
CREATE TABLE timer_functions (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
code TEXT NOT NULL,
environment JSONB NOT NULL DEFAULT '{}'::jsonb,
version_number INT NOT NULL DEFAULT 1,
user_id INT NOT NULL, -- the referencing column
trigger_type VARCHAR(20) NOT NULL CHECK (
trigger_type IN ('interval', 'date', 'cron')
),
frequency_minutes INT, -- used if trigger_type = 'interval'
run_date TIMESTAMPTZ, -- used if trigger_type = 'date' (one-off)
cron_expression TEXT, -- used if trigger_type = 'cron'
next_run TIMESTAMPTZ,
last_run TIMESTAMPTZ,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
invocation_count INT NOT NULL DEFAULT 0,
-- Define the foreign key constraint
CONSTRAINT fk_timer_functions_user
FOREIGN KEY (user_id)
REFERENCES users (id)
ON DELETE CASCADE
ON UPDATE CASCADE
);
CREATE TABLE timer_function_versions (
id SERIAL PRIMARY KEY,
timer_function_id INT NOT NULL,
script TEXT NOT NULL,
version_number INT NOT NULL,
versioned_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT fk_timer_function_versions
FOREIGN KEY (timer_function_id)
REFERENCES timer_functions (id)
ON DELETE CASCADE
ON UPDATE CASCADE
);
CREATE OR REPLACE FUNCTION fn_timer_functions_versioning()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
next_version INT;
BEGIN
IF TG_OP = 'INSERT' THEN
-- In an AFTER INSERT, the row already has been inserted with version_number default (1).
-- We can optionally override that or ensure an initial version is recorded:
INSERT INTO timer_function_versions (timer_function_id, script, version_number)
VALUES (NEW.id, NEW.code, 1);
-- If desired, ensure timer_functions.version_number is set explicitly:
UPDATE timer_functions
SET version_number = 1
WHERE id = NEW.id;
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
-- Only version if the 'code' changed
IF NEW.code IS DISTINCT FROM OLD.code THEN
-- Determine the next version number based on existing versions
SELECT COALESCE(MAX(version_number), 0) + 1
INTO next_version
FROM timer_function_versions
WHERE timer_function_id = NEW.id;
-- Insert new version record
INSERT INTO timer_function_versions (timer_function_id, script, version_number)
VALUES (NEW.id, NEW.code, next_version);
-- Manually update timer_functions to set version_number
-- This second UPDATE will cause the trigger to fire again,
-- but because code won't change, the trigger won't do another version bump.
UPDATE timer_functions
SET version_number = next_version
WHERE id = NEW.id;
END IF;
RETURN NEW;
END IF;
RETURN NEW;
END;
$$;
CREATE TRIGGER tr_timer_functions_versioning
AFTER INSERT OR UPDATE
ON timer_functions
FOR EACH ROW
EXECUTE PROCEDURE fn_timer_functions_versioning();
CREATE TABLE timer_function_invocations (
id SERIAL PRIMARY KEY,
timer_function_id INT NOT NULL,
status TEXT,
invocation_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
logs JSONB,
version_number INT NOT NULL,
execution_time FLOAT,
CONSTRAINT fk_timer_function_invocations
FOREIGN KEY (timer_function_id)
REFERENCES timer_functions (id)
ON DELETE CASCADE
);
CREATE OR REPLACE FUNCTION fn_increment_invocation_count()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE timer_functions
SET invocation_count = invocation_count + 1
WHERE id = NEW.timer_function_id;
RETURN NEW;
END;
$$;
CREATE TRIGGER tr_increment_invocation_count
AFTER INSERT ON timer_function_invocations
FOR EACH ROW
EXECUTE PROCEDURE fn_increment_invocation_count();
'''
DEFAULT_SCRIPT = """async () => {
environment.count += 1
console.log(`Executing ${environment.count}`)
}"""
DEFAULT_ENVIRONMENT = """{
"count": 0
}"""
timer = Blueprint('timer', __name__)
def calculate_next_run(trigger_type, frequency_minutes=None, run_date=None, cron_expression=None, base_time=None):
"""
Calculate the next execution time based on trigger configuration.
Args:
trigger_type: One of 'interval', 'date', or 'cron'
frequency_minutes: Minutes between executions (for interval type)
run_date: Specific datetime to run (for date type)
cron_expression: Cron expression string (for cron type)
base_time: Base datetime to calculate from (defaults to now)
Returns:
tuple: (next_run datetime, error_response dict or None)
If error_response is not None, it contains the error to return to client
"""
if base_time is None:
base_time = datetime.now(timezone.utc)
next_run = None
if trigger_type == 'interval':
if frequency_minutes is None:
return None, {
"status": "error",
"message": "frequency_minutes is required for interval trigger type"
}
next_run = base_time + timedelta(minutes=int(frequency_minutes))
elif trigger_type == 'date':
if run_date is None:
return None, {
"status": "error",
"message": "run_date is required for date trigger type"
}
# run_date should already be a datetime object
if isinstance(run_date, str):
run_date = datetime.fromisoformat(run_date)
next_run = run_date
elif trigger_type == 'cron':
if cron_expression is None:
return None, {
"status": "error",
"message": "cron_expression is required for cron trigger type"
}
from croniter import croniter
# Validate cron expression
try:
if not croniter.is_valid(cron_expression):
return None, {
"status": "error",
"message": "Invalid cron expression format"
}
# Calculate next run time from base_time
next_run = croniter(cron_expression, base_time).get_next(datetime)
except Exception as e:
return None, {
"status": "error",
"message": f"Invalid cron expression: {str(e)}"
}
else:
return None, {
"status": "error",
"message": f"Invalid trigger type: {trigger_type}"
}
return next_run, None
@timer.route('/overview')
@login_required
def overview():
timer_functions = db.execute("""
SELECT id, name, code, environment, trigger_type,
frequency_minutes, run_date, next_run,
last_run, enabled, invocation_count, runtime, cron_expression
FROM timer_functions
WHERE user_id = %s
ORDER BY id DESC
""", [current_user.id])
if htmx:
return render_block(environment, 'dashboard/timer_functions/overview.html', 'page', timer_functions=timer_functions)
return render_template('dashboard/timer_functions/overview.html', timer_functions=timer_functions)
@timer.route('/new', methods=['GET', 'POST'])
@login_required
def new():
if request.method == 'GET':
args = {
'name': 'foo',
'script': DEFAULT_SCRIPT,
'environment_info': DEFAULT_ENVIRONMENT,
'user_id': current_user.id
}
if htmx:
return render_block(environment, 'dashboard/timer_functions/new.html', 'page', **args)
return render_template('dashboard/timer_functions/new.html', **args)
# Handle POST request
try:
data = request.json
trigger_type = data.get('trigger_type')
runtime = data.get('runtime', 'node')
# Validate trigger type
if trigger_type not in ('interval', 'date', 'cron'):
return jsonify({
"status": "error",
"message": "Invalid trigger type"
}), 400
# Calculate next_run based on trigger type using centralized function
next_run, error = calculate_next_run(
trigger_type,
frequency_minutes=data.get('frequency_minutes'),
run_date=data.get('run_date'),
cron_expression=data.get('cron_expression')
)
if error:
return jsonify(error), 400
# Extract individual variables for database storage
frequency_minutes = None
run_date = None
cron_expression = None
if trigger_type == 'interval':
frequency_minutes = int(data.get('frequency_minutes'))
elif trigger_type == 'date':
run_date = datetime.fromisoformat(data.get('run_date'))
elif trigger_type == 'cron':
cron_expression = data.get('cron_expression')
# Insert new timer function
db.execute("""
INSERT INTO timer_functions
(name, code, environment, user_id, trigger_type,
frequency_minutes, run_date, cron_expression, next_run, enabled, runtime)
VALUES (%s, %s, %s::jsonb, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", [
data.get('name'),
data.get('script_content'),
data.get('environment_info'),
current_user.id,
trigger_type,
frequency_minutes,
run_date,
cron_expression,
next_run,
True,
runtime
],
commit=True)
return jsonify({
"status": "success",
"message": "Timer function created successfully"
}), 200
except Exception as e:
return jsonify({
"status": "error",
"message": f"Error creating timer function: {str(e)}"
}), 400
@timer.route('/edit/<int:function_id>', methods=['GET', 'POST'])
@login_required
def edit(function_id):
if request.method == 'GET':
# Fetch the timer function
timer_function = db.execute("""
SELECT id, name, code, environment, version_number, trigger_type,
frequency_minutes, run_date, cron_expression, next_run,
last_run, enabled, invocation_count, runtime
FROM timer_functions
WHERE id = %s AND user_id = %s
""", [function_id, current_user.id], one=True)
if not timer_function:
return jsonify({
"status": "error",
"message": "Timer function not found"
}), 404
# Format the environment JSON with indentation
timer_function['environment'] = json.dumps(timer_function['environment'], indent=2)
args = {
'function_id': function_id,
'timer_function': timer_function,
'runtime': timer_function.get('runtime', 'node')
}
if htmx:
return render_block(environment, 'dashboard/timer_functions/edit.html', 'page', **args)
return render_template('dashboard/timer_functions/edit.html', **args)
# Handle POST request
try:
data = request.json
trigger_type = data.get('trigger_type')
runtime = data.get('runtime', 'node')
commit_message = data.get('commit_message', '')
# Validate trigger type
if trigger_type not in ('interval', 'date', 'cron'):
return jsonify({
"status": "error",
"message": "Invalid trigger type"
}), 400
# Calculate next_run based on trigger type using centralized function
next_run, error = calculate_next_run(
trigger_type,
frequency_minutes=data.get('frequency_minutes'),
run_date=data.get('run_date'),
cron_expression=data.get('cron_expression')
)
if error:
return jsonify(error), 400
# Extract individual variables for database storage
frequency_minutes = None
run_date = None
cron_expression = None
if trigger_type == 'interval':
frequency_minutes = int(data.get('frequency_minutes'))
elif trigger_type == 'date':
run_date = datetime.fromisoformat(data.get('run_date'))
elif trigger_type == 'cron':
cron_expression = data.get('cron_expression')
# Update timer function
db.execute("""
UPDATE timer_functions
SET name = %s,
code = %s,
environment = %s::jsonb,
trigger_type = %s,
frequency_minutes = %s,
run_date = %s,
cron_expression = %s,
next_run = %s,
enabled = %s,
runtime = %s
WHERE id = %s AND user_id = %s
RETURNING id
""", [
data.get('name'),
data.get('script_content'),
data.get('environment_info'),
trigger_type,
frequency_minutes,
run_date,
cron_expression,
next_run,
data.get('is_enabled', True), # Default to True if not provided
runtime,
function_id,
current_user.id
],
commit=True)
# Update the commit message for the newly created version
# Note: The database trigger creates a new version after the UPDATE,
# so we need to get the latest version number
if commit_message:
latest_version = db.execute(
"SELECT MAX(version_number) as version_number FROM timer_function_versions WHERE timer_function_id = %s",
[function_id],
one=True
)
if latest_version:
db.execute(
"UPDATE timer_function_versions SET commit_message = %s WHERE timer_function_id = %s AND version_number = %s",
[commit_message, function_id, latest_version['version_number']],
commit=True
)
return jsonify({
"status": "success",
"message": "Timer function updated successfully"
}), 200
except Exception as e:
return jsonify({
"status": "error",
"message": f"Error updating timer function: {str(e)}"
}), 400
@timer.route('/delete/<int:function_id>', methods=['DELETE'])
@login_required
def delete(function_id):
try:
# Delete the timer function, but only if it belongs to the current user
result = db.execute("""
DELETE FROM timer_functions
WHERE id = %s AND user_id = %s
RETURNING id
""", [function_id, current_user.id], commit=True)
if not result:
return jsonify({
"status": "error",
"message": "Timer function not found or unauthorized"
}), 404
return jsonify({
"status": "success",
"message": "Timer function deleted successfully"
}), 200
except Exception as e:
return jsonify({
"status": "error",
"message": f"Error deleting timer function: {str(e)}"
}), 400
@timer.route('/toggle/<int:function_id>', methods=['POST'])
@login_required
def toggle(function_id):
try:
# Fetch timer function details first to get trigger configuration
timer_function = db.execute("""
SELECT enabled, trigger_type, frequency_minutes, run_date, cron_expression
FROM timer_functions
WHERE id = %s AND user_id = %s
""", [function_id, current_user.id], one=True)
if not timer_function:
return jsonify({
"status": "error",
"message": "Timer function not found or unauthorized"
}), 404
# Determine new enabled state (toggling)
new_enabled = not timer_function['enabled']
# If enabling the timer, recalculate next_run to prevent stale execution times
if new_enabled:
next_run, error = calculate_next_run(
timer_function['trigger_type'],
frequency_minutes=timer_function['frequency_minutes'],
run_date=timer_function['run_date'],
cron_expression=timer_function['cron_expression']
)
if error:
return jsonify(error), 400
# Update both enabled status and next_run
db.execute("""
UPDATE timer_functions
SET enabled = %s, next_run = %s
WHERE id = %s AND user_id = %s
""", [new_enabled, next_run, function_id, current_user.id], commit=True)
else:
# Just toggle the enabled status
db.execute("""
UPDATE timer_functions
SET enabled = %s
WHERE id = %s AND user_id = %s
""", [new_enabled, function_id, current_user.id], commit=True)
# Fetch updated timer functions for the overview template
timer_functions = db.execute("""
SELECT id, name, code, environment, trigger_type,
frequency_minutes, run_date, next_run,
last_run, enabled, invocation_count, runtime, cron_expression
FROM timer_functions
WHERE user_id = %s
ORDER BY id DESC
""", [current_user.id])
if htmx:
return render_block(environment, 'dashboard/timer_functions/overview.html', 'page',
timer_functions=timer_functions)
return render_template('dashboard/timer_functions/overview.html', timer_functions=timer_functions)
except Exception as e:
return jsonify({
"status": "error",
"message": f"Error toggling timer function: {str(e)}"
}), 400
@timer.route('/logs/<int:function_id>')
@login_required
def logs(function_id):
# Fetch the timer function to verify ownership
timer_function = db.execute("""
SELECT id, name
FROM timer_functions
WHERE id = %s AND user_id = %s
""", [function_id, current_user.id], one=True)
if not timer_function:
flash('Timer function not found', 'error')
return redirect(url_for('timer.overview'))
# Fetch the invocation logs
timer_function_invocations = db.execute("""
SELECT id, timer_function_id, status, invocation_time,
logs, version_number, execution_time
FROM timer_function_invocations
WHERE timer_function_id = %s
ORDER BY invocation_time DESC
LIMIT 100
""", [function_id])
args = {
'user_id': current_user.id,
'function_id': function_id,
'timer_function_invocations': timer_function_invocations
}
if htmx:
return render_block(environment, 'dashboard/timer_functions/logs.html', 'page', **args)
return render_template('dashboard/timer_functions/logs.html', **args)
@timer.route('/history/<int:function_id>')
@login_required
def history(function_id):
# Fetch the timer function to verify ownership
timer_function = db.execute("""
SELECT id, name, code AS script, version_number, runtime
FROM timer_functions
WHERE id = %s AND user_id = %s
""", [function_id, current_user.id], one=True)
if not timer_function:
flash('Timer function not found', 'error')
return redirect(url_for('timer.overview'))
# Fetch all versions
versions = db.execute("""
SELECT version_number, script, versioned_at, commit_message
FROM timer_function_versions
WHERE timer_function_id = %s
ORDER BY version_number DESC
""", [function_id])
# Convert datetime objects to ISO format strings
for version in versions:
version['versioned_at'] = version['versioned_at'].isoformat() if version['versioned_at'] else None
args = {
'user_id': current_user.id,
'function_id': function_id,
'timer_function': timer_function,
'versions': versions,
'title': timer_function['name'],
'runtime': timer_function.get('runtime', 'node'),
}
if htmx:
return render_block(environment, 'dashboard/timer_functions/history.html', 'page', **args)
return render_template('dashboard/timer_functions/history.html', **args)
@timer.route('/restore/<int:function_id>', methods=['POST'])
@login_required
def restore(function_id):
try:
user_id = current_user.id
version_number = request.json.get('version_number')
if not version_number:
return jsonify({"status": "error", "message": "Version number is required"}), 400
# Verify ownership and existence of the function
timer_function = db.execute(
"SELECT id FROM timer_functions WHERE id = %s AND user_id = %s",
[function_id, user_id],
one=True
)
if not timer_function:
return jsonify({"status": "error", "message": "Timer function not found"}), 404
# Fetch the content of the selected version
version_data = db.execute(
"SELECT script FROM timer_function_versions WHERE timer_function_id = %s AND version_number = %s",
[function_id, version_number],
one=True
)
if not version_data:
return jsonify({"status": "error", "message": "Version not found"}), 404
# Update the function with the old script content
# This will trigger the database function to create a new version entry
db.execute(
"UPDATE timer_functions SET code = %s WHERE id = %s",
[version_data["script"], function_id],
commit=True
)
return jsonify({"status": "success", "message": f"Restored to version {version_number}"})
except Exception as e:
print(e)
return jsonify({"status": "error", "message": str(e)}), 500