Files
function/routes/http.py
2025-11-21 10:30:14 +11:00

396 lines
13 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify
from jinja2_fragments import render_block
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import current_user, login_required
from extensions import db, htmx, environment
from datetime import datetime, timezone, timedelta
import json
from services import create_http_function_view_model, create_http_functions_view_model
"""
CREATE TABLE http_function_versions (
id SERIAL PRIMARY KEY,
http_function_id INT NOT NULL,
script_content TEXT NOT NULL,
version_number INT NOT NULL,
versioned_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT fk_http_function_versions
FOREIGN KEY (http_function_id)
REFERENCES http_functions (id)
ON DELETE CASCADE
ON UPDATE CASCADE
);
CREATE OR REPLACE FUNCTION fn_http_functions_versioning()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
next_version INT;
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO http_functions_versions (http_function_id, script_content, version_number)
VALUES (NEW.id, NEW.script_content, 1);
UPDATE http_functions
SET version_number = 1
WHERE id = NEW.id;
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
IF NEW.script_content IS DISTINCT FROM OLD.script_content THEN
SELECT COALESCE(MAX(version_number), 0) + 1
INTO next_version
FROM http_functions_versions
WHERE http_function_id = NEW.id;
INSERT INTO http_functions_versions (http_function_id, script_content, version_number)
VALUES (NEW.id, NEW.script_content, next_version);
UPDATE http_functions
SET version_number = next_version
WHERE id = NEW.id;
END IF;
RETURN NEW;
END IF;
RETURN NEW;
END;
$$;
CREATE TRIGGER tr_http_functions_versioning
AFTER INSERT OR UPDATE
ON http_functions
FOR EACH ROW
EXECUTE PROCEDURE fn_http_functions_versioning();
"""
DEFAULT_SCRIPT = """async (req) => {
console.log(`Method:${req.method}`)
console.log(`Generating ${environment.lines} random lines...`)
let svgContent = "";
for (let i = 0; i < environment.lines; i++) {
console.log(i)
let pathD = "M2 " + Math.random() * 79;
let circles = "";
for (let x = 12; x <= 202; x += 10) {
let y = Math.random() * 79
pathD += ` L${x} ${y}`;
circles += `<circle cx="${x}" cy="${y}" r="1"></circle>`;
}
let pathColor = `rgb(${Math.random() * 255}, ${Math.random() * 255}, ${Math.random() * 255})`;
svgContent += `
<g style="fill: ${pathColor}; stroke: ${pathColor};">
<path d="${pathD}" fill="none" stroke="${pathColor}"></path>
${circles}
</g>`;
}
return HtmlResponse(`<svg viewBox="0 0 204 79" preserveAspectRatio="none">${svgContent}</svg>`);
}"""
DEFAULT_ENVIRONMENT = """{
"lines": 3
}"""
DEFAULT_PYTHON_SCRIPT = """def main(request, environment):
print(f"Method: {request['method']}")
return {"body": "Hello from Python!"}
"""
http = Blueprint("http", __name__)
def group_functions_by_path(functions):
grouped = {}
for function in functions:
# Use the explicit path column
full_path = function.get("path", "") or ""
path_parts = [p for p in full_path.split("/") if p]
current_level = grouped
for part in path_parts:
if part not in current_level:
current_level[part] = {}
current_level = current_level[part]
function["_is_function"] = True
current_level[function["name"]] = function
return grouped
@http.route("/overview", methods=["GET"])
@login_required
def overview():
user_id = current_user.id
search_query = request.args.get("q", "")
http_functions = db.get_http_functions_for_user(user_id, search_query)
http_functions_view_model = create_http_functions_view_model(http_functions)
grouped_functions = group_functions_by_path(http_functions_view_model)
if request.headers.get('HX-Target') == 'function-list':
return render_block(environment, "dashboard/http_functions/overview.html", "function_list", http_functions=grouped_functions)
if htmx:
return render_block(
environment,
"dashboard/http_functions/overview.html",
"page",
http_functions=grouped_functions,
search_query=search_query
)
return render_template(
"dashboard/http_functions/overview.html",
http_functions=grouped_functions,
search_query=search_query
)
@http.route("/new", methods=["GET", "POST"])
@login_required
def new():
user_id = current_user.id
if request.method == "GET":
context = {
"user_id": user_id,
"name": "foo",
"path": "",
"script": DEFAULT_SCRIPT,
"default_python_script": DEFAULT_PYTHON_SCRIPT,
"environment_info": DEFAULT_ENVIRONMENT,
"is_public": False,
"log_request": True,
"log_response": False,
"description": "",
}
if htmx:
return render_block(
environment, "dashboard/http_functions/new.html", "page", **context
)
return render_template("dashboard/http_functions/new.html", **context)
try:
name = request.json.get("name")
path = request.json.get("path", "")
script_content = request.json.get("script_content")
environment_info = request.json.get("environment_info")
is_public = request.json.get("is_public")
log_request = request.json.get("log_request")
log_response = request.json.get("log_response")
runtime = request.json.get("runtime", "node")
description = request.json.get("description", "")
db.create_new_http_function(
user_id,
name,
path,
script_content,
environment_info,
is_public,
log_request,
log_response,
runtime,
description
)
return (
jsonify(
{"status": "success", "message": "Http function created successfully"}
),
200,
)
except Exception as e:
print(e)
return {"status": "error", "message": str(e)}
@http.route("/edit/<int:function_id>", methods=["POST"])
@login_required
def edit(function_id):
try:
user_id = current_user.id
name = request.json.get("name")
path = request.json.get("path", "")
script_content = request.json.get("script_content")
environment_info = request.json.get("environment_info")
is_public = request.json.get("is_public")
log_request = request.json.get("log_request")
log_response = request.json.get("log_response")
runtime = request.json.get("runtime", "node")
description = request.json.get("description", "")
updated_version = db.edit_http_function(
user_id,
function_id,
name,
path,
script_content,
environment_info,
is_public,
log_request,
log_response,
runtime,
description
)
return {"status": "success", "message": f"{name} updated"}
except Exception as e:
print(e)
return {"status": "error", "message": str(e)}
@http.route("/delete/<int:function_id>", methods=["DELETE"])
@login_required
def delete(function_id):
try:
user_id = current_user.id
db.execute(
"DELETE FROM http_functions WHERE user_id=%s AND id=%s",
[user_id, function_id],
commit=True,
)
return {"status": "success", "message": f"Function deleted"}
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@http.route("/logs/<int:function_id>", methods=["GET"])
@login_required
def logs(function_id):
user_id = current_user.id
http_function = db.get_http_function_by_id(user_id, function_id)
if not http_function:
return jsonify({"error": "Function not found"}), 404
name = http_function["name"]
http_function_invocations = db.get_http_function_invocations(function_id)
if htmx:
return render_block(
environment,
"dashboard/http_functions/logs.html",
"page",
user_id=user_id,
function_id=function_id,
name=name,
http_function_invocations=http_function_invocations,
)
return render_template(
"dashboard/http_functions/logs.html",
user_id=user_id,
name=name,
function_id=function_id,
http_function_invocations=http_function_invocations,
)
@http.route("/client/<int:function_id>", methods=["GET"])
@login_required
def client(function_id):
user_id = current_user.id
http_function = db.get_http_function_by_id(user_id, function_id)
if not http_function:
return jsonify({"error": "Function not found"}), 404
http_function = create_http_function_view_model(http_function)
if htmx:
return render_block(
environment,
"dashboard/http_functions/client.html",
"page",
function_id=function_id,
**http_function,
)
return render_template(
"dashboard/http_functions/client.html", function_id=function_id, **http_function
)
@http.route("/history/<int:function_id>")
@login_required
def history(function_id):
# Fetch the http function to verify ownership
http_function = db.execute(
"""
SELECT id, name, script_content AS code, version_number
FROM http_functions
WHERE id = %s AND user_id = %s
""",
[function_id, current_user.id],
one=True,
)
if not http_function:
flash("Http function not found", "error")
return redirect(url_for("http.overview"))
# Fetch all versions
versions = db.execute(
"""
SELECT version_number, script_content AS script, updated_at AS versioned_at
FROM http_functions_versions
WHERE http_function_id = %s
ORDER BY version_number DESC
""",
[function_id],
)
# Convert datetime objects to ISO format strings
for version in versions:
version["versioned_at"] = (
version["versioned_at"].isoformat() if version["versioned_at"] else None
)
args = {
"user_id": current_user.id,
"function_id": function_id,
"http_function": http_function,
"versions": versions,
}
if htmx:
return render_block(
environment, "dashboard/http_functions/history.html", "page", **args
)
return render_template("dashboard/http_functions/history.html", **args)
@http.route("/editor/<int:function_id>", methods=["GET"])
@login_required
def editor(function_id):
user_id = current_user.id
http_function = db.get_http_function_by_id(user_id, function_id)
if not http_function:
return jsonify({"error": "Function not found"}), 404
# Create a view model with all necessary data for the editor
editor_data = {
"id": http_function["id"],
"name": http_function["name"],
"path": http_function.get("path", ""),
"script_content": http_function["script_content"],
"environment_info": json.dumps(http_function["environment_info"], indent=2),
"is_public": http_function["is_public"],
"log_request": http_function["log_request"],
"log_response": http_function["log_response"],
"version_number": http_function["version_number"],
"runtime": http_function.get("runtime", "node"),
"description": http_function.get("description", ""),
"user_id": user_id,
"function_id": function_id,
# Add new URLs for navigation
"cancel_url": url_for("http.overview"),
"edit_url": url_for("http.editor", function_id=function_id),
}
if htmx:
return render_block(
environment, "dashboard/http_functions/editor.html", "page", **editor_data
)
return render_template("dashboard/http_functions/editor.html", **editor_data)