659 lines
23 KiB
Python
659 lines
23 KiB
Python
from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify
|
|
from jinja2_fragments import render_block
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from flask_login import current_user, login_required
|
|
from extensions import db, htmx, environment
|
|
from datetime import datetime, timezone, timedelta
|
|
import json
|
|
|
|
'''
|
|
CREATE TABLE timer_functions (
|
|
id SERIAL PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
code TEXT NOT NULL,
|
|
environment JSONB NOT NULL DEFAULT '{}'::jsonb,
|
|
version_number INT NOT NULL DEFAULT 1,
|
|
|
|
user_id INT NOT NULL, -- the referencing column
|
|
|
|
trigger_type VARCHAR(20) NOT NULL CHECK (
|
|
trigger_type IN ('interval', 'date', 'cron')
|
|
),
|
|
frequency_minutes INT, -- used if trigger_type = 'interval'
|
|
run_date TIMESTAMPTZ, -- used if trigger_type = 'date' (one-off)
|
|
cron_expression TEXT, -- used if trigger_type = 'cron'
|
|
|
|
next_run TIMESTAMPTZ,
|
|
last_run TIMESTAMPTZ,
|
|
enabled BOOLEAN NOT NULL DEFAULT TRUE,
|
|
invocation_count INT NOT NULL DEFAULT 0,
|
|
|
|
-- Define the foreign key constraint
|
|
CONSTRAINT fk_timer_functions_user
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES users (id)
|
|
ON DELETE CASCADE
|
|
ON UPDATE CASCADE
|
|
);
|
|
|
|
CREATE TABLE timer_function_versions (
|
|
id SERIAL PRIMARY KEY,
|
|
timer_function_id INT NOT NULL,
|
|
script TEXT NOT NULL,
|
|
version_number INT NOT NULL,
|
|
versioned_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
|
|
CONSTRAINT fk_timer_function_versions
|
|
FOREIGN KEY (timer_function_id)
|
|
REFERENCES timer_functions (id)
|
|
ON DELETE CASCADE
|
|
ON UPDATE CASCADE
|
|
);
|
|
|
|
CREATE OR REPLACE FUNCTION fn_timer_functions_versioning()
|
|
RETURNS TRIGGER
|
|
LANGUAGE plpgsql
|
|
AS $$
|
|
DECLARE
|
|
next_version INT;
|
|
BEGIN
|
|
IF TG_OP = 'INSERT' THEN
|
|
-- In an AFTER INSERT, the row already has been inserted with version_number default (1).
|
|
-- We can optionally override that or ensure an initial version is recorded:
|
|
INSERT INTO timer_function_versions (timer_function_id, script, version_number)
|
|
VALUES (NEW.id, NEW.code, 1);
|
|
|
|
-- If desired, ensure timer_functions.version_number is set explicitly:
|
|
UPDATE timer_functions
|
|
SET version_number = 1
|
|
WHERE id = NEW.id;
|
|
|
|
RETURN NEW;
|
|
|
|
ELSIF TG_OP = 'UPDATE' THEN
|
|
-- Only version if the 'code' changed
|
|
IF NEW.code IS DISTINCT FROM OLD.code THEN
|
|
-- Determine the next version number based on existing versions
|
|
SELECT COALESCE(MAX(version_number), 0) + 1
|
|
INTO next_version
|
|
FROM timer_function_versions
|
|
WHERE timer_function_id = NEW.id;
|
|
|
|
-- Insert new version record
|
|
INSERT INTO timer_function_versions (timer_function_id, script, version_number)
|
|
VALUES (NEW.id, NEW.code, next_version);
|
|
|
|
-- Manually update timer_functions to set version_number
|
|
-- This second UPDATE will cause the trigger to fire again,
|
|
-- but because code won't change, the trigger won't do another version bump.
|
|
UPDATE timer_functions
|
|
SET version_number = next_version
|
|
WHERE id = NEW.id;
|
|
END IF;
|
|
|
|
RETURN NEW;
|
|
END IF;
|
|
|
|
RETURN NEW;
|
|
END;
|
|
$$;
|
|
|
|
CREATE TRIGGER tr_timer_functions_versioning
|
|
AFTER INSERT OR UPDATE
|
|
ON timer_functions
|
|
FOR EACH ROW
|
|
EXECUTE PROCEDURE fn_timer_functions_versioning();
|
|
|
|
CREATE TABLE timer_function_invocations (
|
|
id SERIAL PRIMARY KEY,
|
|
timer_function_id INT NOT NULL,
|
|
status TEXT,
|
|
invocation_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
logs JSONB,
|
|
version_number INT NOT NULL,
|
|
execution_time FLOAT,
|
|
|
|
CONSTRAINT fk_timer_function_invocations
|
|
FOREIGN KEY (timer_function_id)
|
|
REFERENCES timer_functions (id)
|
|
ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE OR REPLACE FUNCTION fn_increment_invocation_count()
|
|
RETURNS TRIGGER
|
|
LANGUAGE plpgsql
|
|
AS $$
|
|
BEGIN
|
|
UPDATE timer_functions
|
|
SET invocation_count = invocation_count + 1
|
|
WHERE id = NEW.timer_function_id;
|
|
|
|
RETURN NEW;
|
|
END;
|
|
$$;
|
|
|
|
CREATE TRIGGER tr_increment_invocation_count
|
|
AFTER INSERT ON timer_function_invocations
|
|
FOR EACH ROW
|
|
EXECUTE PROCEDURE fn_increment_invocation_count();
|
|
'''
|
|
|
|
DEFAULT_SCRIPT = """async () => {
|
|
environment.count += 1
|
|
console.log(`Executing ${environment.count}`)
|
|
}"""
|
|
|
|
DEFAULT_ENVIRONMENT = """{
|
|
"count": 0
|
|
}"""
|
|
|
|
timer = Blueprint('timer', __name__)
|
|
|
|
def calculate_next_run(trigger_type, frequency_minutes=None, run_date=None, cron_expression=None, base_time=None):
|
|
"""
|
|
Calculate the next execution time based on trigger configuration.
|
|
|
|
Args:
|
|
trigger_type: One of 'interval', 'date', or 'cron'
|
|
frequency_minutes: Minutes between executions (for interval type)
|
|
run_date: Specific datetime to run (for date type)
|
|
cron_expression: Cron expression string (for cron type)
|
|
base_time: Base datetime to calculate from (defaults to now)
|
|
|
|
Returns:
|
|
tuple: (next_run datetime, error_response dict or None)
|
|
If error_response is not None, it contains the error to return to client
|
|
"""
|
|
if base_time is None:
|
|
base_time = datetime.now(timezone.utc)
|
|
|
|
next_run = None
|
|
|
|
if trigger_type == 'interval':
|
|
if frequency_minutes is None:
|
|
return None, {
|
|
"status": "error",
|
|
"message": "frequency_minutes is required for interval trigger type"
|
|
}
|
|
next_run = base_time + timedelta(minutes=int(frequency_minutes))
|
|
|
|
elif trigger_type == 'date':
|
|
if run_date is None:
|
|
return None, {
|
|
"status": "error",
|
|
"message": "run_date is required for date trigger type"
|
|
}
|
|
# run_date should already be a datetime object
|
|
if isinstance(run_date, str):
|
|
run_date = datetime.fromisoformat(run_date)
|
|
next_run = run_date
|
|
|
|
elif trigger_type == 'cron':
|
|
if cron_expression is None:
|
|
return None, {
|
|
"status": "error",
|
|
"message": "cron_expression is required for cron trigger type"
|
|
}
|
|
from croniter import croniter
|
|
|
|
# Validate cron expression
|
|
try:
|
|
if not croniter.is_valid(cron_expression):
|
|
return None, {
|
|
"status": "error",
|
|
"message": "Invalid cron expression format"
|
|
}
|
|
# Calculate next run time from base_time
|
|
next_run = croniter(cron_expression, base_time).get_next(datetime)
|
|
except Exception as e:
|
|
return None, {
|
|
"status": "error",
|
|
"message": f"Invalid cron expression: {str(e)}"
|
|
}
|
|
else:
|
|
return None, {
|
|
"status": "error",
|
|
"message": f"Invalid trigger type: {trigger_type}"
|
|
}
|
|
|
|
return next_run, None
|
|
|
|
@timer.route('/overview')
|
|
@login_required
|
|
def overview():
|
|
timer_functions = db.execute("""
|
|
SELECT id, name, code, environment, trigger_type,
|
|
frequency_minutes, run_date, next_run,
|
|
last_run, enabled, invocation_count, runtime, cron_expression
|
|
FROM timer_functions
|
|
WHERE user_id = %s
|
|
ORDER BY id DESC
|
|
""", [current_user.id])
|
|
|
|
if htmx:
|
|
return render_block(environment, 'dashboard/timer_functions/overview.html', 'page', timer_functions=timer_functions)
|
|
return render_template('dashboard/timer_functions/overview.html', timer_functions=timer_functions)
|
|
|
|
@timer.route('/new', methods=['GET', 'POST'])
|
|
@login_required
|
|
def new():
|
|
if request.method == 'GET':
|
|
args = {
|
|
'name': 'foo',
|
|
'script': DEFAULT_SCRIPT,
|
|
'environment_info': DEFAULT_ENVIRONMENT,
|
|
'user_id': current_user.id
|
|
}
|
|
if htmx:
|
|
return render_block(environment, 'dashboard/timer_functions/new.html', 'page', **args)
|
|
return render_template('dashboard/timer_functions/new.html', **args)
|
|
|
|
# Handle POST request
|
|
try:
|
|
data = request.json
|
|
trigger_type = data.get('trigger_type')
|
|
runtime = data.get('runtime', 'node')
|
|
|
|
# Validate trigger type
|
|
if trigger_type not in ('interval', 'date', 'cron'):
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": "Invalid trigger type"
|
|
}), 400
|
|
|
|
# Calculate next_run based on trigger type using centralized function
|
|
next_run, error = calculate_next_run(
|
|
trigger_type,
|
|
frequency_minutes=data.get('frequency_minutes'),
|
|
run_date=data.get('run_date'),
|
|
cron_expression=data.get('cron_expression')
|
|
)
|
|
|
|
if error:
|
|
return jsonify(error), 400
|
|
|
|
# Extract individual variables for database storage
|
|
frequency_minutes = None
|
|
run_date = None
|
|
cron_expression = None
|
|
|
|
if trigger_type == 'interval':
|
|
frequency_minutes = int(data.get('frequency_minutes'))
|
|
elif trigger_type == 'date':
|
|
run_date = datetime.fromisoformat(data.get('run_date'))
|
|
elif trigger_type == 'cron':
|
|
cron_expression = data.get('cron_expression')
|
|
|
|
# Insert new timer function
|
|
db.execute("""
|
|
INSERT INTO timer_functions
|
|
(name, code, environment, user_id, trigger_type,
|
|
frequency_minutes, run_date, cron_expression, next_run, enabled, runtime)
|
|
VALUES (%s, %s, %s::jsonb, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""", [
|
|
data.get('name'),
|
|
data.get('script_content'),
|
|
data.get('environment_info'),
|
|
current_user.id,
|
|
trigger_type,
|
|
frequency_minutes,
|
|
run_date,
|
|
cron_expression,
|
|
next_run,
|
|
True,
|
|
runtime
|
|
],
|
|
commit=True)
|
|
|
|
return jsonify({
|
|
"status": "success",
|
|
"message": "Timer function created successfully"
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": f"Error creating timer function: {str(e)}"
|
|
}), 400
|
|
|
|
@timer.route('/edit/<int:function_id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit(function_id):
|
|
if request.method == 'GET':
|
|
# Fetch the timer function
|
|
timer_function = db.execute("""
|
|
SELECT id, name, code, environment, version_number, trigger_type,
|
|
frequency_minutes, run_date, cron_expression, next_run,
|
|
last_run, enabled, invocation_count, runtime
|
|
FROM timer_functions
|
|
WHERE id = %s AND user_id = %s
|
|
""", [function_id, current_user.id], one=True)
|
|
|
|
if not timer_function:
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": "Timer function not found"
|
|
}), 404
|
|
|
|
# Format the environment JSON with indentation
|
|
timer_function['environment'] = json.dumps(timer_function['environment'], indent=2)
|
|
|
|
args = {
|
|
'function_id': function_id,
|
|
'timer_function': timer_function,
|
|
'runtime': timer_function.get('runtime', 'node')
|
|
}
|
|
|
|
if htmx:
|
|
return render_block(environment, 'dashboard/timer_functions/edit.html', 'page', **args)
|
|
return render_template('dashboard/timer_functions/edit.html', **args)
|
|
|
|
# Handle POST request
|
|
try:
|
|
data = request.json
|
|
trigger_type = data.get('trigger_type')
|
|
runtime = data.get('runtime', 'node')
|
|
commit_message = data.get('commit_message', '')
|
|
|
|
# Validate trigger type
|
|
if trigger_type not in ('interval', 'date', 'cron'):
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": "Invalid trigger type"
|
|
}), 400
|
|
|
|
# Calculate next_run based on trigger type using centralized function
|
|
next_run, error = calculate_next_run(
|
|
trigger_type,
|
|
frequency_minutes=data.get('frequency_minutes'),
|
|
run_date=data.get('run_date'),
|
|
cron_expression=data.get('cron_expression')
|
|
)
|
|
|
|
if error:
|
|
return jsonify(error), 400
|
|
|
|
# Extract individual variables for database storage
|
|
frequency_minutes = None
|
|
run_date = None
|
|
cron_expression = None
|
|
|
|
if trigger_type == 'interval':
|
|
frequency_minutes = int(data.get('frequency_minutes'))
|
|
elif trigger_type == 'date':
|
|
run_date = datetime.fromisoformat(data.get('run_date'))
|
|
elif trigger_type == 'cron':
|
|
cron_expression = data.get('cron_expression')
|
|
|
|
# Update timer function
|
|
db.execute("""
|
|
UPDATE timer_functions
|
|
SET name = %s,
|
|
code = %s,
|
|
environment = %s::jsonb,
|
|
trigger_type = %s,
|
|
frequency_minutes = %s,
|
|
run_date = %s,
|
|
cron_expression = %s,
|
|
next_run = %s,
|
|
enabled = %s,
|
|
runtime = %s
|
|
WHERE id = %s AND user_id = %s
|
|
RETURNING id
|
|
""", [
|
|
data.get('name'),
|
|
data.get('script_content'),
|
|
data.get('environment_info'),
|
|
trigger_type,
|
|
frequency_minutes,
|
|
run_date,
|
|
cron_expression,
|
|
next_run,
|
|
data.get('is_enabled', True), # Default to True if not provided
|
|
runtime,
|
|
function_id,
|
|
current_user.id
|
|
],
|
|
commit=True)
|
|
|
|
# Update the commit message for the newly created version
|
|
# Note: The database trigger creates a new version after the UPDATE,
|
|
# so we need to get the latest version number
|
|
if commit_message:
|
|
latest_version = db.execute(
|
|
"SELECT MAX(version_number) as version_number FROM timer_function_versions WHERE timer_function_id = %s",
|
|
[function_id],
|
|
one=True
|
|
)
|
|
if latest_version:
|
|
db.execute(
|
|
"UPDATE timer_function_versions SET commit_message = %s WHERE timer_function_id = %s AND version_number = %s",
|
|
[commit_message, function_id, latest_version['version_number']],
|
|
commit=True
|
|
)
|
|
|
|
return jsonify({
|
|
"status": "success",
|
|
"message": "Timer function updated successfully"
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": f"Error updating timer function: {str(e)}"
|
|
}), 400
|
|
|
|
@timer.route('/delete/<int:function_id>', methods=['DELETE'])
|
|
@login_required
|
|
def delete(function_id):
|
|
try:
|
|
# Delete the timer function, but only if it belongs to the current user
|
|
result = db.execute("""
|
|
DELETE FROM timer_functions
|
|
WHERE id = %s AND user_id = %s
|
|
RETURNING id
|
|
""", [function_id, current_user.id], commit=True)
|
|
|
|
if not result:
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": "Timer function not found or unauthorized"
|
|
}), 404
|
|
|
|
return jsonify({
|
|
"status": "success",
|
|
"message": "Timer function deleted successfully"
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": f"Error deleting timer function: {str(e)}"
|
|
}), 400
|
|
|
|
@timer.route('/toggle/<int:function_id>', methods=['POST'])
|
|
@login_required
|
|
def toggle(function_id):
|
|
try:
|
|
# Fetch timer function details first to get trigger configuration
|
|
timer_function = db.execute("""
|
|
SELECT enabled, trigger_type, frequency_minutes, run_date, cron_expression
|
|
FROM timer_functions
|
|
WHERE id = %s AND user_id = %s
|
|
""", [function_id, current_user.id], one=True)
|
|
|
|
if not timer_function:
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": "Timer function not found or unauthorized"
|
|
}), 404
|
|
|
|
# Determine new enabled state (toggling)
|
|
new_enabled = not timer_function['enabled']
|
|
|
|
# If enabling the timer, recalculate next_run to prevent stale execution times
|
|
if new_enabled:
|
|
next_run, error = calculate_next_run(
|
|
timer_function['trigger_type'],
|
|
frequency_minutes=timer_function['frequency_minutes'],
|
|
run_date=timer_function['run_date'],
|
|
cron_expression=timer_function['cron_expression']
|
|
)
|
|
|
|
if error:
|
|
return jsonify(error), 400
|
|
|
|
# Update both enabled status and next_run
|
|
db.execute("""
|
|
UPDATE timer_functions
|
|
SET enabled = %s, next_run = %s
|
|
WHERE id = %s AND user_id = %s
|
|
""", [new_enabled, next_run, function_id, current_user.id], commit=True)
|
|
else:
|
|
# Just toggle the enabled status
|
|
db.execute("""
|
|
UPDATE timer_functions
|
|
SET enabled = %s
|
|
WHERE id = %s AND user_id = %s
|
|
""", [new_enabled, function_id, current_user.id], commit=True)
|
|
|
|
# Fetch updated timer functions for the overview template
|
|
timer_functions = db.execute("""
|
|
SELECT id, name, code, environment, trigger_type,
|
|
frequency_minutes, run_date, next_run,
|
|
last_run, enabled, invocation_count, runtime, cron_expression
|
|
FROM timer_functions
|
|
WHERE user_id = %s
|
|
ORDER BY id DESC
|
|
""", [current_user.id])
|
|
|
|
if htmx:
|
|
return render_block(environment, 'dashboard/timer_functions/overview.html', 'page',
|
|
timer_functions=timer_functions)
|
|
return render_template('dashboard/timer_functions/overview.html', timer_functions=timer_functions)
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": f"Error toggling timer function: {str(e)}"
|
|
}), 400
|
|
|
|
@timer.route('/logs/<int:function_id>')
|
|
@login_required
|
|
def logs(function_id):
|
|
# Fetch the timer function to verify ownership
|
|
timer_function = db.execute("""
|
|
SELECT id, name
|
|
FROM timer_functions
|
|
WHERE id = %s AND user_id = %s
|
|
""", [function_id, current_user.id], one=True)
|
|
|
|
if not timer_function:
|
|
flash('Timer function not found', 'error')
|
|
return redirect(url_for('timer.overview'))
|
|
|
|
# Fetch the invocation logs
|
|
timer_function_invocations = db.execute("""
|
|
SELECT id, timer_function_id, status, invocation_time,
|
|
logs, version_number, execution_time
|
|
FROM timer_function_invocations
|
|
WHERE timer_function_id = %s
|
|
ORDER BY invocation_time DESC
|
|
LIMIT 100
|
|
""", [function_id])
|
|
|
|
args = {
|
|
'user_id': current_user.id,
|
|
'function_id': function_id,
|
|
'timer_function_invocations': timer_function_invocations
|
|
}
|
|
|
|
if htmx:
|
|
return render_block(environment, 'dashboard/timer_functions/logs.html', 'page', **args)
|
|
return render_template('dashboard/timer_functions/logs.html', **args)
|
|
|
|
@timer.route('/history/<int:function_id>')
|
|
@login_required
|
|
def history(function_id):
|
|
# Fetch the timer function to verify ownership
|
|
timer_function = db.execute("""
|
|
SELECT id, name, code AS script, version_number, runtime
|
|
FROM timer_functions
|
|
WHERE id = %s AND user_id = %s
|
|
""", [function_id, current_user.id], one=True)
|
|
|
|
if not timer_function:
|
|
flash('Timer function not found', 'error')
|
|
return redirect(url_for('timer.overview'))
|
|
|
|
# Fetch all versions
|
|
versions = db.execute("""
|
|
SELECT version_number, script, versioned_at, commit_message
|
|
FROM timer_function_versions
|
|
WHERE timer_function_id = %s
|
|
ORDER BY version_number DESC
|
|
""", [function_id])
|
|
|
|
# Convert datetime objects to ISO format strings
|
|
for version in versions:
|
|
version['versioned_at'] = version['versioned_at'].isoformat() if version['versioned_at'] else None
|
|
|
|
args = {
|
|
'user_id': current_user.id,
|
|
'function_id': function_id,
|
|
'timer_function': timer_function,
|
|
'versions': versions,
|
|
'title': timer_function['name'],
|
|
'runtime': timer_function.get('runtime', 'node'),
|
|
}
|
|
|
|
if htmx:
|
|
return render_block(environment, 'dashboard/timer_functions/history.html', 'page', **args)
|
|
return render_template('dashboard/timer_functions/history.html', **args)
|
|
|
|
|
|
@timer.route('/restore/<int:function_id>', methods=['POST'])
|
|
@login_required
|
|
def restore(function_id):
|
|
try:
|
|
user_id = current_user.id
|
|
version_number = request.json.get('version_number')
|
|
|
|
if not version_number:
|
|
return jsonify({"status": "error", "message": "Version number is required"}), 400
|
|
|
|
# Verify ownership and existence of the function
|
|
timer_function = db.execute(
|
|
"SELECT id FROM timer_functions WHERE id = %s AND user_id = %s",
|
|
[function_id, user_id],
|
|
one=True
|
|
)
|
|
if not timer_function:
|
|
return jsonify({"status": "error", "message": "Timer function not found"}), 404
|
|
|
|
# Fetch the content of the selected version
|
|
version_data = db.execute(
|
|
"SELECT script FROM timer_function_versions WHERE timer_function_id = %s AND version_number = %s",
|
|
[function_id, version_number],
|
|
one=True
|
|
)
|
|
|
|
if not version_data:
|
|
return jsonify({"status": "error", "message": "Version not found"}), 404
|
|
|
|
# Update the function with the old script content
|
|
# This will trigger the database function to create a new version entry
|
|
db.execute(
|
|
"UPDATE timer_functions SET code = %s WHERE id = %s",
|
|
[version_data["script"], function_id],
|
|
commit=True
|
|
)
|
|
|
|
return jsonify({"status": "success", "message": f"Restored to version {version_number}"})
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
return jsonify({"status": "error", "message": str(e)}), 500
|
|
|