182 lines
6.1 KiB
Python
182 lines
6.1 KiB
Python
import os
|
|
from flask import Flask, Response, jsonify, render_template, request
|
|
import jinja_partials
|
|
from jinja2_fragments import render_block
|
|
from flask_htmx import HTMX
|
|
import requests
|
|
from db import DataBase
|
|
from services import create_http_function_view_model, create_http_functions_view_model
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_pyfile('config.py')
|
|
jinja_partials.register_extensions(app)
|
|
htmx = HTMX(app)
|
|
db = DataBase(app)
|
|
|
|
API_URL = 'https://isolator.peterstockings.com/execute'
|
|
|
|
DEFAULT_SCRIPT = """async (req) => {
|
|
console.log('hello world...')
|
|
return HtmlResponse(
|
|
`<div>
|
|
<h1>Method:${req.method}</h1>
|
|
<h3>Headers</h3>
|
|
<table>
|
|
<tr>
|
|
<th>Key</th>
|
|
<th>Value</th>
|
|
</tr>
|
|
${Object.entries(req.headers).map(([key, value]) =>
|
|
`<tr>
|
|
<td>${key}</td>
|
|
<td>${value}</td>
|
|
</tr>`)}
|
|
</table>
|
|
<pre>${JSON.stringify(req.body, null, 2)}</pre>
|
|
</div>`)
|
|
}"""
|
|
|
|
def map_isolator_response_to_flask_response(response):
|
|
"""
|
|
Maps a Node.js response to a Flask response.
|
|
|
|
:param nodejs_response: The response from Node.js, expected to be a dictionary
|
|
with keys 'body', 'headers', and 'status'.
|
|
:return: Flask Response object
|
|
"""
|
|
result = response.get('result', {})
|
|
body = result.get('body', '')
|
|
headers = result.get('headers', {})
|
|
status = result.get('status', 200)
|
|
|
|
|
|
# Convert body to JSON if it's a dictionary
|
|
body = str(body)
|
|
|
|
return Response(response=body, status=status, headers=headers)
|
|
|
|
|
|
@ app.route("/", methods=["GET"])
|
|
def home():
|
|
return render_template("home.html", script=DEFAULT_SCRIPT)
|
|
|
|
@ app.route("/client/<function>", methods=["GET"])
|
|
def client(function):
|
|
http_function = db.get_http_function(function)
|
|
if not http_function:
|
|
return jsonify({'error': 'Function not found'}), 404
|
|
|
|
http_function = create_http_function_view_model(http_function)
|
|
return render_template("client.html", **http_function)
|
|
|
|
@ app.route("/dashboard", methods=["GET"])
|
|
def dashboard():
|
|
http_functions = db.get_http_functions()
|
|
http_functions = create_http_functions_view_model(http_functions)
|
|
return render_template("dashboard.html", http_functions=http_functions)
|
|
|
|
@ app.route("/dashboard/http_functions", methods=["GET"])
|
|
def dashboard_http_functions():
|
|
http_functions = db.get_http_functions()
|
|
http_functions = create_http_functions_view_model(http_functions)
|
|
return render_template("dashboard/http_functions.html", http_functions=http_functions)
|
|
|
|
@ app.route("/dashboard/http_functions/add_form", methods=["GET"])
|
|
def get_http_function_add_form():
|
|
script=DEFAULT_SCRIPT
|
|
name = "foo"
|
|
return render_template("dashboard/http_functions/new.html", name=name, script=script)
|
|
|
|
@ app.route("/dashboard/http_functions/create", methods=["POST"])
|
|
def create_http_function():
|
|
name = request.json.get('name')
|
|
script_content = request.json.get('script_content')
|
|
db.create_new_http_function(name, script_content)
|
|
return render_template("dashboard/http_functions/new.html", name=name, script=script_content)
|
|
|
|
@ app.route("/dashboard/timer_functions", methods=["GET"])
|
|
def dashboard_timer_functions():
|
|
return render_template("dashboard/timer_functions.html")
|
|
|
|
|
|
@app.route('/execute', methods=['POST'])
|
|
def execute_code():
|
|
try:
|
|
# Extract code and convert request to a format acceptable by Node.js app
|
|
code = request.json.get('code')
|
|
request_obj = {
|
|
'method': request.method,
|
|
'headers': dict(request.headers),
|
|
'body': request.json,
|
|
'url': request.url,
|
|
# Add other request properties as needed
|
|
}
|
|
|
|
# Call the Node.js API
|
|
response = requests.post(API_URL, json={'code': code, 'request': request_obj})
|
|
response_data = response.json()
|
|
|
|
# check if playground=true is in the query string
|
|
if request.args.get('playground') == 'true':
|
|
return response_data
|
|
|
|
# Map the Node.js response to Flask response
|
|
flask_response = map_isolator_response_to_flask_response(response_data)
|
|
return flask_response
|
|
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/f/<function>', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD'])
|
|
def execute_http_function(function):
|
|
try:
|
|
http_function = db.get_http_function(function)
|
|
if not http_function:
|
|
return jsonify({'error': 'Function not found'}), 404
|
|
# TODO: Get code from database based on function name
|
|
code = http_function['script_content']
|
|
|
|
print(code)
|
|
|
|
request_obj = {
|
|
'method': request.method,
|
|
'headers': dict(request.headers),
|
|
'url': request.url,
|
|
# Add other request properties as needed
|
|
}
|
|
|
|
# Add JSON data if it exists
|
|
if request.is_json:
|
|
request_obj['json'] = request.get_json()
|
|
|
|
# Add form data if it exists
|
|
if request.form:
|
|
request_obj['form'] = request.form.to_dict()
|
|
|
|
# Add query parameters if they exist
|
|
if request.args:
|
|
request_obj['query'] = request.args.to_dict()
|
|
|
|
# Add plain text data if it exists
|
|
if request.data and not request.is_json:
|
|
request_obj['text'] = request.data.decode('utf-8')
|
|
|
|
# Call the Node.js API
|
|
response = requests.post(API_URL, json={'code': code, 'request': request_obj})
|
|
response_data = response.json()
|
|
|
|
# Map the Node.js response to Flask response
|
|
flask_response = map_isolator_response_to_flask_response(response_data)
|
|
return flask_response
|
|
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Bind to PORT if defined, otherwise default to 5000.
|
|
port = int(os.environ.get('PORT', 5000))
|
|
app.run(host='127.0.0.1', port=port)
|