from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify from jinja2_fragments import render_block from werkzeug.security import generate_password_hash, check_password_hash from flask_login import current_user, login_required from extensions import db, htmx, environment from datetime import datetime, timezone, timedelta import json from services import create_http_function_view_model, create_http_functions_view_model ''' CREATE TABLE http_function_versions ( id SERIAL PRIMARY KEY, http_function_id INT NOT NULL, script_content TEXT NOT NULL, version_number INT NOT NULL, versioned_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), CONSTRAINT fk_http_function_versions FOREIGN KEY (http_function_id) REFERENCES http_functions (id) ON DELETE CASCADE ON UPDATE CASCADE ); CREATE OR REPLACE FUNCTION fn_http_functions_versioning() RETURNS TRIGGER LANGUAGE plpgsql AS $$ DECLARE next_version INT; BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO http_functions_versions (http_function_id, script_content, version_number) VALUES (NEW.id, NEW.script_content, 1); UPDATE http_functions SET version_number = 1 WHERE id = NEW.id; RETURN NEW; ELSIF TG_OP = 'UPDATE' THEN IF NEW.script_content IS DISTINCT FROM OLD.script_content THEN SELECT COALESCE(MAX(version_number), 0) + 1 INTO next_version FROM http_functions_versions WHERE http_function_id = NEW.id; INSERT INTO http_functions_versions (http_function_id, script_content, version_number) VALUES (NEW.id, NEW.script_content, next_version); UPDATE http_functions SET version_number = next_version WHERE id = NEW.id; END IF; RETURN NEW; END IF; RETURN NEW; END; $$; CREATE TRIGGER tr_http_functions_versioning AFTER INSERT OR UPDATE ON http_functions FOR EACH ROW EXECUTE PROCEDURE fn_http_functions_versioning(); ''' DEFAULT_SCRIPT = """async (req) => { console.log(`Method:${req.method}`) console.log(`Generating ${environment.lines} random lines...`) let svgContent = ""; for (let i = 0; i < environment.lines; i++) { console.log(i) let pathD = "M2 " + Math.random() * 79; let circles = ""; for (let x = 12; x <= 202; x += 10) { let y = Math.random() * 79 pathD += ` L${x} ${y}`; circles += ``; } let pathColor = `rgb(${Math.random() * 255}, ${Math.random() * 255}, ${Math.random() * 255})`; svgContent += ` ${circles} `; } return HtmlResponse(`${svgContent}`); }""" DEFAULT_ENVIRONMENT = """{ "lines": 3 }""" http = Blueprint('http', __name__) @http.route("/overview", methods=["GET"]) @login_required def overview(): user_id = current_user.id http_functions = db.execute( 'SELECT id, user_id, NAME, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number FROM http_functions WHERE user_id=%s ORDER by id DESC', [user_id]) http_functions = create_http_functions_view_model(http_functions) if htmx: return render_block(environment, "dashboard/http_functions/overview.html", "page", http_functions=http_functions) return render_template("dashboard/http_functions/overview.html", http_functions=http_functions) @http.route("/new", methods=["GET", "POST"]) @login_required def new(): user_id = current_user.id if request.method == "GET": if htmx: return render_block(environment, 'dashboard/http_functions/new.html', 'page', user_id=user_id, name='foo', script=DEFAULT_SCRIPT, environment_info=DEFAULT_ENVIRONMENT, is_public=False, log_request=True, log_response=False) return render_template("dashboard/http_functions/new.html", user_id=user_id, name='foo', script=DEFAULT_SCRIPT, environment_info=DEFAULT_ENVIRONMENT, is_public=False, log_request=True, log_response=False) try: name = request.json.get('name') script_content = request.json.get('script_content') environment_info = request.json.get('environment_info') is_public = request.json.get('is_public') log_request = request.json.get('log_request') log_response = request.json.get('log_response') db.execute( 'INSERT INTO http_functions (user_id, NAME, script_content, environment_info, is_public, log_request, log_response) VALUES (%s, %s, %s, %s, %s, %s, %s)', [user_id, name, script_content, environment_info, is_public, log_request, log_response], commit=True ) return jsonify({ "status": "success", "message": "Http function created successfully" }), 200 except Exception as e: print(e) return { "status": "error", "message": str(e) } @http.route("/edit/", methods=["POST"]) @login_required def edit(function_id): try: user_id = current_user.id name = request.json.get('name') script_content = request.json.get('script_content') environment_info = request.json.get('environment_info') is_public = request.json.get('is_public') log_request = request.json.get('log_request') log_response = request.json.get('log_response') updated_version = db.execute( 'UPDATE http_functions SET NAME=%s, script_content=%s, environment_info=%s, is_public=%s, log_request=%s, log_response=%s WHERE user_id=%s AND id=%s RETURNING version_number', [name, script_content, environment_info, is_public, log_request, log_response, user_id, function_id], commit=True, one=True ) return { "status": "success", "message": f'{name} updated' } except Exception as e: print(e) return { "status": "error", "message": str(e) } @http.route("/delete/", methods=["DELETE"]) @login_required def delete(function_id): try: user_id = current_user.id db.execute( 'DELETE FROM http_functions WHERE user_id=%s AND id=%s', [user_id, function_id], commit=True) return { "status": "success", "message": f'Function deleted' } except Exception as e: return jsonify({"status": 'error', "message": str(e)}), 500 @http.route("/logs/", methods=["GET"]) @login_required def logs(function_id): user_id = current_user.id http_function = db.execute( 'SELECT id, user_id, NAME, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, created_at FROM http_functions WHERE user_id=%s AND id=%s', [user_id, function_id], one=True) if not http_function: return jsonify({'error': 'Function not found'}), 404 name = http_function['name'] http_function_invocations = db.get_http_function_invocations(function_id) if htmx: return render_block(environment, 'dashboard/http_functions/logs.html', 'page', user_id=user_id, function_id=function_id, name=name, http_function_invocations=http_function_invocations) return render_template("dashboard/http_functions/logs.html", user_id=user_id, name=name, function_id=function_id, http_function_invocations=http_function_invocations) @http.route("/client/", methods=["GET"]) @login_required def client(function_id): user_id = current_user.id http_function = db.execute( 'SELECT id, user_id, NAME, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, created_at FROM http_functions WHERE user_id=%s AND id=%s', [user_id, function_id], one=True) if not http_function: return jsonify({'error': 'Function not found'}), 404 http_function = create_http_function_view_model(http_function) if htmx: return render_block(environment, 'dashboard/http_functions/client.html', 'page', function_id=function_id, **http_function) return render_template("dashboard/http_functions/client.html", function_id=function_id, **http_function) @http.route('/history/') @login_required def history(function_id): # Fetch the http function to verify ownership http_function = db.execute(""" SELECT id, name, script_content AS code, version_number FROM http_functions WHERE id = %s AND user_id = %s """, [function_id, current_user.id], one=True) if not http_function: flash('Http function not found', 'error') return redirect(url_for('http.overview')) # Fetch all versions versions = db.execute(""" SELECT version_number, script_content AS script, updated_at AS versioned_at FROM http_functions_versions WHERE http_function_id = %s ORDER BY version_number DESC """, [function_id]) # Convert datetime objects to ISO format strings for version in versions: version['versioned_at'] = version['versioned_at'].isoformat() if version['versioned_at'] else None args = { 'user_id': current_user.id, 'function_id': function_id, 'http_function': http_function, 'versions': versions } if htmx: return render_block(environment, 'dashboard/http_functions/history.html', 'page', **args) return render_template('dashboard/http_functions/history.html', **args) @http.route("/editor/", methods=["GET"]) @login_required def editor(function_id): user_id = current_user.id http_function = db.get_http_function_by_id(user_id, function_id) if not http_function: return jsonify({'error': 'Function not found'}), 404 # Create a view model with all necessary data for the editor editor_data = { 'id': http_function['id'], 'name': http_function['name'], 'script_content': http_function['script_content'], 'environment_info': json.dumps(http_function['environment_info'], indent=2), 'is_public': http_function['is_public'], 'log_request': http_function['log_request'], 'log_response': http_function['log_response'], 'version_number': http_function['version_number'], 'user_id': user_id, 'function_id': function_id, # Add new URLs for navigation 'cancel_url': url_for('http.overview'), 'edit_url': url_for('http.editor', function_id=function_id), } if htmx: return render_block(environment, "dashboard/http_functions/editor.html", "page", **editor_data) return render_template("dashboard/http_functions/editor.html", **editor_data)