from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify from jinja2_fragments import render_block from werkzeug.security import generate_password_hash, check_password_hash from flask_login import current_user, login_required from extensions import db, htmx, environment from datetime import datetime, timezone, timedelta import json from services import create_http_function_view_model, create_http_functions_view_model """ CREATE TABLE http_function_versions ( id SERIAL PRIMARY KEY, http_function_id INT NOT NULL, script_content TEXT NOT NULL, version_number INT NOT NULL, versioned_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), CONSTRAINT fk_http_function_versions FOREIGN KEY (http_function_id) REFERENCES http_functions (id) ON DELETE CASCADE ON UPDATE CASCADE ); CREATE OR REPLACE FUNCTION fn_http_functions_versioning() RETURNS TRIGGER LANGUAGE plpgsql AS $$ DECLARE next_version INT; BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO http_functions_versions (http_function_id, script_content, version_number) VALUES (NEW.id, NEW.script_content, 1); UPDATE http_functions SET version_number = 1 WHERE id = NEW.id; RETURN NEW; ELSIF TG_OP = 'UPDATE' THEN IF NEW.script_content IS DISTINCT FROM OLD.script_content THEN SELECT COALESCE(MAX(version_number), 0) + 1 INTO next_version FROM http_functions_versions WHERE http_function_id = NEW.id; INSERT INTO http_functions_versions (http_function_id, script_content, version_number) VALUES (NEW.id, NEW.script_content, next_version); UPDATE http_functions SET version_number = next_version WHERE id = NEW.id; END IF; RETURN NEW; END IF; RETURN NEW; END; $$; CREATE TRIGGER tr_http_functions_versioning AFTER INSERT OR UPDATE ON http_functions FOR EACH ROW EXECUTE PROCEDURE fn_http_functions_versioning(); """ DEFAULT_SCRIPT = """async (req) => { console.log(`Method:${req.method}`) console.log(`Generating ${environment.lines} random lines...`) let svgContent = ""; for (let i = 0; i < environment.lines; i++) { console.log(i) let pathD = "M2 " + Math.random() * 79; let circles = ""; for (let x = 12; x <= 202; x += 10) { let y = Math.random() * 79 pathD += ` L${x} ${y}`; circles += ``; } let pathColor = `rgb(${Math.random() * 255}, ${Math.random() * 255}, ${Math.random() * 255})`; svgContent += ` ${circles} `; } return HtmlResponse(`${svgContent}`); }""" DEFAULT_ENVIRONMENT = """{ "lines": 3 }""" DEFAULT_PYTHON_SCRIPT = """def main(request, environment): print(f"Method: {request['method']}") return {"body": "Hello from Python!"} """ http = Blueprint("http", __name__) def group_functions_by_path(functions): grouped = {} for function in functions: # Use the explicit path column full_path = function.get("path", "") or "" path_parts = [p for p in full_path.split("/") if p] current_level = grouped for part in path_parts: if part not in current_level: current_level[part] = {} current_level = current_level[part] function["_is_function"] = True current_level[function["name"]] = function return grouped @http.route("/overview", methods=["GET"]) @login_required def overview(): user_id = current_user.id search_query = request.args.get("q", "") http_functions = db.get_http_functions_for_user(user_id, search_query) http_functions_view_model = create_http_functions_view_model(http_functions) grouped_functions = group_functions_by_path(http_functions_view_model) if request.headers.get('HX-Target') == 'function-list': return render_block(environment, "dashboard/http_functions/overview.html", "function_list", http_functions=grouped_functions) if htmx: return render_block( environment, "dashboard/http_functions/overview.html", "page", http_functions=grouped_functions, search_query=search_query ) return render_template( "dashboard/http_functions/overview.html", http_functions=grouped_functions, search_query=search_query ) @http.route("/new", methods=["GET", "POST"]) @login_required def new(): user_id = current_user.id if request.method == "GET": context = { "user_id": user_id, "name": "foo", "path": "", "script": DEFAULT_SCRIPT, "default_python_script": DEFAULT_PYTHON_SCRIPT, "environment_info": DEFAULT_ENVIRONMENT, "is_public": False, "log_request": True, "log_response": False, "description": "", } if htmx: return render_block( environment, "dashboard/http_functions/new.html", "page", **context ) return render_template("dashboard/http_functions/new.html", **context) try: name = request.json.get("name") path = request.json.get("path", "") script_content = request.json.get("script_content") environment_info = request.json.get("environment_info") is_public = request.json.get("is_public") log_request = request.json.get("log_request") log_response = request.json.get("log_response") runtime = request.json.get("runtime", "node") description = request.json.get("description", "") db.create_new_http_function( user_id, name, path, script_content, environment_info, is_public, log_request, log_response, runtime, description ) return ( jsonify( {"status": "success", "message": "Http function created successfully"} ), 200, ) except Exception as e: print(e) return {"status": "error", "message": str(e)} @http.route("/edit/", methods=["POST"]) @login_required def edit(function_id): try: user_id = current_user.id name = request.json.get("name") path = request.json.get("path", "") script_content = request.json.get("script_content") environment_info = request.json.get("environment_info") is_public = request.json.get("is_public") log_request = request.json.get("log_request") log_response = request.json.get("log_response") runtime = request.json.get("runtime", "node") description = request.json.get("description", "") commit_message = request.json.get("commit_message", "") updated_version = db.edit_http_function( user_id, function_id, name, path, script_content, environment_info, is_public, log_request, log_response, runtime, description ) # Update the commit message for the newly created version # Note: The database trigger creates a new version after the UPDATE, # so we need to get the latest version number if commit_message: latest_version = db.execute( "SELECT MAX(version_number) as version_number FROM http_functions_versions WHERE http_function_id = %s", [function_id], one=True ) if latest_version: db.execute( "UPDATE http_functions_versions SET commit_message = %s WHERE http_function_id = %s AND version_number = %s", [commit_message, function_id, latest_version['version_number']], commit=True ) return {"status": "success", "message": f"{name} updated"} except Exception as e: print(e) return {"status": "error", "message": str(e)} @http.route("/delete/", methods=["DELETE"]) @login_required def delete(function_id): try: user_id = current_user.id db.execute( "DELETE FROM http_functions WHERE user_id=%s AND id=%s", [user_id, function_id], commit=True, ) return {"status": "success", "message": f"Function deleted"} except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @http.route("/logs/", methods=["GET"]) @login_required def logs(function_id): user_id = current_user.id http_function = db.get_http_function_by_id(user_id, function_id) if not http_function: return jsonify({"error": "Function not found"}), 404 name = http_function["name"] http_function_invocations = db.get_http_function_invocations(function_id) if htmx: return render_block( environment, "dashboard/http_functions/logs.html", "page", user_id=user_id, function_id=function_id, name=name, http_function_invocations=http_function_invocations, ) return render_template( "dashboard/http_functions/logs.html", user_id=user_id, name=name, function_id=function_id, http_function_invocations=http_function_invocations, ) @http.route("/client/", methods=["GET"]) @login_required def client(function_id): user_id = current_user.id http_function = db.get_http_function_by_id(user_id, function_id) if not http_function: return jsonify({"error": "Function not found"}), 404 http_function = create_http_function_view_model(http_function) if htmx: return render_block( environment, "dashboard/http_functions/client.html", "page", function_id=function_id, **http_function, ) return render_template( "dashboard/http_functions/client.html", function_id=function_id, **http_function ) @http.route("/history/") @login_required def history(function_id): # Fetch the http function to verify ownership http_function = db.execute( """ SELECT id, name, script_content AS code, version_number, runtime FROM http_functions WHERE id = %s AND user_id = %s """, [function_id, current_user.id], one=True, ) if not http_function: flash("Http function not found", "error") return redirect(url_for("http.overview")) # Fetch all versions versions = db.execute( """ SELECT version_number, script_content AS script, updated_at AS versioned_at, commit_message FROM http_functions_versions WHERE http_function_id = %s ORDER BY version_number DESC """, [function_id], ) # Convert datetime objects to ISO format strings for version in versions: version["versioned_at"] = ( version["versioned_at"].isoformat() if version["versioned_at"] else None ) args = { "user_id": current_user.id, "function_id": function_id, "http_function": http_function, "versions": versions, "runtime": http_function.get("runtime", "node"), } if htmx: return render_block( environment, "dashboard/http_functions/history.html", "page", **args ) return render_template("dashboard/http_functions/history.html", **args) @http.route("/editor/", methods=["GET"]) @login_required def editor(function_id): user_id = current_user.id http_function = db.get_http_function_by_id(user_id, function_id) if not http_function: return jsonify({"error": "Function not found"}), 404 # Create a view model with all necessary data for the editor editor_data = { "id": http_function["id"], "name": http_function["name"], "path": http_function.get("path", ""), "script_content": http_function["script_content"], "environment_info": json.dumps(http_function["environment_info"], indent=2), "is_public": http_function["is_public"], "log_request": http_function["log_request"], "log_response": http_function["log_response"], "version_number": http_function["version_number"], "runtime": http_function.get("runtime", "node"), "description": http_function.get("description", ""), "user_id": user_id, "function_id": function_id, # Add new URLs for navigation "cancel_url": url_for("http.overview"), "edit_url": url_for("http.editor", function_id=function_id), } if htmx: return render_block( environment, "dashboard/http_functions/editor.html", "page", **editor_data ) return render_template("dashboard/http_functions/editor.html", **editor_data) @http.route("/restore/", methods=["POST"]) @login_required def restore(function_id): try: user_id = current_user.id version_number = request.json.get("version_number") if not version_number: return jsonify({"status": "error", "message": "Version number is required"}), 400 # Verify ownership and existence of the function http_function = db.execute( "SELECT id FROM http_functions WHERE id = %s AND user_id = %s", [function_id, user_id], one=True ) if not http_function: return jsonify({"status": "error", "message": "Function not found"}), 404 # Fetch the content of the selected version version_data = db.execute( "SELECT script_content FROM http_functions_versions WHERE http_function_id = %s AND version_number = %s", [function_id, version_number], one=True ) if not version_data: return jsonify({"status": "error", "message": "Version not found"}), 404 # Update the function with the old script content # This will trigger the database function to create a new version entry db.execute( "UPDATE http_functions SET script_content = %s WHERE id = %s", [version_data["script_content"], function_id], commit=True ) return jsonify({"status": "success", "message": f"Restored to version {version_number}"}) except Exception as e: print(e) return jsonify({"status": "error", "message": str(e)}), 500