import json import os from flask import Flask, Response, jsonify, redirect, render_template, request, url_for import jinja_partials from jinja2_fragments import render_block from flask_htmx import HTMX import requests from db import DataBase from services import create_http_function_view_model, create_http_functions_view_model app = Flask(__name__) app.config.from_pyfile('config.py') jinja_partials.register_extensions(app) htmx = HTMX(app) db = DataBase(app) API_URL = 'https://isolator.peterstockings.com/execute' DEFAULT_SCRIPT = """async (req) => { console.log('hello world...') return HtmlResponse( `

Method:${req.method}

Headers

${Object.entries(req.headers).map(([key, value]) => ``)}
Key Value
${key} ${value}
${JSON.stringify(req.body, null, 2)}
`) }""" def map_isolator_response_to_flask_response(response): """ Maps a Node.js response to a Flask response. :param nodejs_response: The response from Node.js, expected to be a dictionary with keys 'body', 'headers', and 'status'. :return: Flask Response object """ result = response.get('result', {}) body = result.get('body', '') headers = result.get('headers', {}) status = result.get('status', 200) # Convert body to JSON if it's a dictionary body = str(body) return Response(response=body, status=status, headers=headers) @ app.route("/", methods=["GET"]) def home(): return render_template("home.html", script=DEFAULT_SCRIPT) @ app.route("/client/", methods=["GET"]) def client(function): http_function = db.get_http_function(function) if not http_function: return jsonify({'error': 'Function not found'}), 404 http_function = create_http_function_view_model(http_function) return render_template("client.html", **http_function) @ app.route("/dashboard", methods=["GET"]) def dashboard(): http_functions = db.get_http_functions() http_functions = create_http_functions_view_model(http_functions) return render_template("dashboard.html", http_functions=http_functions) @ app.route("/dashboard/http_functions", methods=["GET"]) def dashboard_http_functions(): http_functions = db.get_http_functions() http_functions = create_http_functions_view_model(http_functions) return render_template("dashboard/http_functions.html", http_functions=http_functions) @ app.route("/dashboard/http_functions/add_form", methods=["GET"]) def get_http_function_add_form(): script=DEFAULT_SCRIPT name = "foo" return render_template("dashboard/http_functions/new.html", name=name, script=script) @ app.route("/dashboard/http_functions/create", methods=["POST"]) def create_http_function(): try: name = request.json.get('name') script_content = request.json.get('script_content') environment_info = json.dumps(eval(request.json.get('environment_info'))) db.create_new_http_function(name, script_content, environment_info) http_functions = db.get_http_functions() http_functions = create_http_functions_view_model(http_functions) return render_template("dashboard/http_functions.html", http_functions=http_functions) except Exception as e: print(e) return { "status": "error", "message": str(e) } @ app.route("/dashboard/http_functions/edit_form", methods=["GET"]) def get_http_function_edit_form(): name = request.args.get('name') http_function = db.get_http_function(name) if not http_function: return jsonify({'error': 'Function not found'}), 404 script = http_function['script_content'] environment_info = json.dumps(http_function['environment_info'], indent=2) return render_template("dashboard/http_functions/edit.html", name=name, script=script, environment_info=environment_info) @ app.route("/dashboard/http_functions/edit", methods=["POST"]) def edit_http_function(): try: name = request.json.get('name') script_content = request.json.get('script_content') environment_info = json.dumps(eval(request.json.get('environment_info'))) db.edit_http_function(name, script_content, environment_info) return { "status": "success", "message": f'{name} updated' } except Exception as e: print(e) return { "status": "error", "message": str(e) } @ app.route("/dashboard/http_functions/delete", methods=["DELETE"]) def delete_http_function(): try: name = request.args.get('name') db.delete_http_function(name) http_functions = db.get_http_functions() http_functions = create_http_functions_view_model(http_functions) return render_template("dashboard/http_functions.html", http_functions=http_functions) except Exception as e: return jsonify({"status": 'error', "message": str(e)}), 500 @ app.route("/dashboard/http_functions/logs", methods=["GET"]) def get_http_function_logs(): name = request.args.get('name') http_function = db.get_http_function(name) if not http_function: return jsonify({'error': 'Function not found'}), 404 http_function_id = http_function['id'] http_function_invocations = db.get_http_function_invocations(http_function_id) return render_template("dashboard/http_functions/logs.html", name=name, http_function_invocations=http_function_invocations) @ app.route("/dashboard/timer_functions", methods=["GET"]) def dashboard_timer_functions(): return render_template("dashboard/timer_functions.html") @app.route('/execute', methods=['POST']) def execute_code(): try: # Extract code and convert request to a format acceptable by Node.js app code = request.json.get('code') request_obj = { 'method': request.method, 'headers': dict(request.headers), 'body': request.json, 'url': request.url, # Add other request properties as needed } # Call the Node.js API response = requests.post(API_URL, json={'code': code, 'request': request_obj}) response_data = response.json() # check if playground=true is in the query string if request.args.get('playground') == 'true': return response_data # Map the Node.js response to Flask response flask_response = map_isolator_response_to_flask_response(response_data) return flask_response except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/f/', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD']) def execute_http_function(function): try: http_function = db.get_http_function(function) if not http_function: return jsonify({'error': 'Function not found'}), 404 code = http_function['script_content'] environment = http_function['environment_info'] request_data = { 'method': request.method, 'headers': dict(request.headers), 'url': request.url, # Add other request properties as needed } # Add JSON data if it exists if request.is_json: request_data['json'] = request.get_json() # Add form data if it exists if request.form: request_data['form'] = request.form.to_dict() # Add query parameters if they exist if request.args: request_data['query'] = request.args.to_dict() # Add plain text data if it exists if request.data and not request.is_json: request_data['text'] = request.data.decode('utf-8') # Call the Node.js API response = requests.post(API_URL, json={'code': code, 'request': request_data, 'environment': environment}) response_data = response.json() db.update_http_function_environment_info_and_invoked_count(function, response_data['environment']) db.add_http_function_invocation(http_function['id'], response_data['status'], request_data, response_data['result'], response_data['logs']) # Map the Node.js response to Flask response flask_response = map_isolator_response_to_flask_response(response_data) return flask_response except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': # Bind to PORT if defined, otherwise default to 5000. port = int(os.environ.get('PORT', 5000)) app.run(host='127.0.0.1', port=port)