Files
function/routes/http.py
2025-09-28 13:32:42 +10:00

283 lines
10 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify
from jinja2_fragments import render_block
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import current_user, login_required
from extensions import db, htmx, environment
from datetime import datetime, timezone, timedelta
import json
from services import create_http_function_view_model, create_http_functions_view_model
'''
CREATE TABLE http_function_versions (
id SERIAL PRIMARY KEY,
http_function_id INT NOT NULL,
script_content TEXT NOT NULL,
version_number INT NOT NULL,
versioned_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT fk_http_function_versions
FOREIGN KEY (http_function_id)
REFERENCES http_functions (id)
ON DELETE CASCADE
ON UPDATE CASCADE
);
CREATE OR REPLACE FUNCTION fn_http_functions_versioning()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
next_version INT;
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO http_functions_versions (http_function_id, script_content, version_number)
VALUES (NEW.id, NEW.script_content, 1);
UPDATE http_functions
SET version_number = 1
WHERE id = NEW.id;
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
IF NEW.script_content IS DISTINCT FROM OLD.script_content THEN
SELECT COALESCE(MAX(version_number), 0) + 1
INTO next_version
FROM http_functions_versions
WHERE http_function_id = NEW.id;
INSERT INTO http_functions_versions (http_function_id, script_content, version_number)
VALUES (NEW.id, NEW.script_content, next_version);
UPDATE http_functions
SET version_number = next_version
WHERE id = NEW.id;
END IF;
RETURN NEW;
END IF;
RETURN NEW;
END;
$$;
CREATE TRIGGER tr_http_functions_versioning
AFTER INSERT OR UPDATE
ON http_functions
FOR EACH ROW
EXECUTE PROCEDURE fn_http_functions_versioning();
'''
DEFAULT_SCRIPT = """async (req) => {
console.log(`Method:${req.method}`)
console.log(`Generating ${environment.lines} random lines...`)
let svgContent = "";
for (let i = 0; i < environment.lines; i++) {
console.log(i)
let pathD = "M2 " + Math.random() * 79;
let circles = "";
for (let x = 12; x <= 202; x += 10) {
let y = Math.random() * 79
pathD += ` L${x} ${y}`;
circles += `<circle cx="${x}" cy="${y}" r="1"></circle>`;
}
let pathColor = `rgb(${Math.random() * 255}, ${Math.random() * 255}, ${Math.random() * 255})`;
svgContent += `
<g style="fill: ${pathColor}; stroke: ${pathColor};">
<path d="${pathD}" fill="none" stroke="${pathColor}"></path>
${circles}
</g>`;
}
return HtmlResponse(`<svg viewBox="0 0 204 79" preserveAspectRatio="none">${svgContent}</svg>`);
}"""
DEFAULT_ENVIRONMENT = """{
"lines": 3
}"""
DEFAULT_PYTHON_SCRIPT = """def main(request, environment):
print(f"Method: {request['method']}")
return {"body": "Hello from Python!"}
"""
http = Blueprint('http', __name__)
@http.route("/overview", methods=["GET"])
@login_required
def overview():
user_id = current_user.id
http_functions = db.get_http_functions_for_user(user_id)
http_functions = create_http_functions_view_model(http_functions)
if htmx:
return render_block(environment, "dashboard/http_functions/overview.html", "page", http_functions=http_functions)
return render_template("dashboard/http_functions/overview.html", http_functions=http_functions)
@http.route("/new", methods=["GET", "POST"])
@login_required
def new():
user_id = current_user.id
if request.method == "GET":
context = {
'user_id': user_id,
'name': 'foo',
'script': DEFAULT_SCRIPT,
'default_python_script': DEFAULT_PYTHON_SCRIPT,
'environment_info': DEFAULT_ENVIRONMENT,
'is_public': False,
'log_request': True,
'log_response': False
}
if htmx:
return render_block(environment, 'dashboard/http_functions/new.html', 'page', **context)
return render_template("dashboard/http_functions/new.html", **context)
try:
name = request.json.get('name')
script_content = request.json.get('script_content')
environment_info = request.json.get('environment_info')
is_public = request.json.get('is_public')
log_request = request.json.get('log_request')
log_response = request.json.get('log_response')
runtime = request.json.get('runtime', 'node')
db.create_new_http_function(user_id, name, script_content, environment_info, is_public, log_request, log_response, runtime)
return jsonify({
"status": "success",
"message": "Http function created successfully"
}), 200
except Exception as e:
print(e)
return { "status": "error", "message": str(e) }
@http.route("/edit/<int:function_id>", methods=["POST"])
@login_required
def edit(function_id):
try:
user_id = current_user.id
name = request.json.get('name')
script_content = request.json.get('script_content')
environment_info = request.json.get('environment_info')
is_public = request.json.get('is_public')
log_request = request.json.get('log_request')
log_response = request.json.get('log_response')
runtime = request.json.get('runtime', 'node')
updated_version = db.edit_http_function(user_id, function_id, name, script_content, environment_info, is_public, log_request, log_response, runtime)
return { "status": "success", "message": f'{name} updated' }
except Exception as e:
print(e)
return { "status": "error", "message": str(e) }
@http.route("/delete/<int:function_id>", methods=["DELETE"])
@login_required
def delete(function_id):
try:
user_id = current_user.id
db.execute(
'DELETE FROM http_functions WHERE user_id=%s AND id=%s', [user_id, function_id], commit=True)
return { "status": "success", "message": f'Function deleted' }
except Exception as e:
return jsonify({"status": 'error', "message": str(e)}), 500
@http.route("/logs/<int:function_id>", methods=["GET"])
@login_required
def logs(function_id):
user_id = current_user.id
http_function = db.get_http_function_by_id(user_id, function_id)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
name = http_function['name']
http_function_invocations = db.get_http_function_invocations(function_id)
if htmx:
return render_block(environment, 'dashboard/http_functions/logs.html', 'page', user_id=user_id, function_id=function_id, name=name, http_function_invocations=http_function_invocations)
return render_template("dashboard/http_functions/logs.html", user_id=user_id, name=name, function_id=function_id, http_function_invocations=http_function_invocations)
@http.route("/client/<int:function_id>", methods=["GET"])
@login_required
def client(function_id):
user_id = current_user.id
http_function = db.get_http_function_by_id(user_id, function_id)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
http_function = create_http_function_view_model(http_function)
if htmx:
return render_block(environment, 'dashboard/http_functions/client.html', 'page', function_id=function_id, **http_function)
return render_template("dashboard/http_functions/client.html", function_id=function_id, **http_function)
@http.route('/history/<int:function_id>')
@login_required
def history(function_id):
# Fetch the http function to verify ownership
http_function = db.execute("""
SELECT id, name, script_content AS code, version_number
FROM http_functions
WHERE id = %s AND user_id = %s
""", [function_id, current_user.id], one=True)
if not http_function:
flash('Http function not found', 'error')
return redirect(url_for('http.overview'))
# Fetch all versions
versions = db.execute("""
SELECT version_number, script_content AS script, updated_at AS versioned_at
FROM http_functions_versions
WHERE http_function_id = %s
ORDER BY version_number DESC
""", [function_id])
# Convert datetime objects to ISO format strings
for version in versions:
version['versioned_at'] = version['versioned_at'].isoformat() if version['versioned_at'] else None
args = {
'user_id': current_user.id,
'function_id': function_id,
'http_function': http_function,
'versions': versions
}
if htmx:
return render_block(environment, 'dashboard/http_functions/history.html', 'page', **args)
return render_template('dashboard/http_functions/history.html', **args)
@http.route("/editor/<int:function_id>", methods=["GET"])
@login_required
def editor(function_id):
user_id = current_user.id
http_function = db.get_http_function_by_id(user_id, function_id)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
# Create a view model with all necessary data for the editor
editor_data = {
'id': http_function['id'],
'name': http_function['name'],
'script_content': http_function['script_content'],
'environment_info': json.dumps(http_function['environment_info'], indent=2),
'is_public': http_function['is_public'],
'log_request': http_function['log_request'],
'log_response': http_function['log_response'],
'version_number': http_function['version_number'],
'runtime': http_function.get('runtime', 'node'),
'user_id': user_id,
'function_id': function_id,
# Add new URLs for navigation
'cancel_url': url_for('http.overview'),
'edit_url': url_for('http.editor', function_id=function_id),
}
if htmx:
return render_block(environment, "dashboard/http_functions/editor.html", "page", **editor_data)
return render_template("dashboard/http_functions/editor.html", **editor_data)