Files
function/app.py

169 lines
5.5 KiB
Python

import os
from flask import Flask, Response, jsonify, render_template, request
import jinja_partials
from jinja2_fragments import render_block
from flask_htmx import HTMX
import requests
from db import DataBase
from services import create_http_function_view_model, create_http_functions_view_model
app = Flask(__name__)
app.config.from_pyfile('config.py')
jinja_partials.register_extensions(app)
htmx = HTMX(app)
db = DataBase(app)
API_URL = 'https://isolator.peterstockings.com/execute'
DEFAULT_SCRIPT = """async (req) => {
console.log('hello world...')
return HtmlResponse(
`<div>
<h1>Method:${req.method}</h1>
<h3>Headers</h3>
<table>
<tr>
<th>Key</th>
<th>Value</th>
</tr>
${Object.entries(req.headers).map(([key, value]) =>
`<tr>
<td>${key}</td>
<td>${value}</td>
</tr>`)}
</table>
<pre>${JSON.stringify(req.body, null, 2)}</pre>
</div>`)
}"""
def map_isolator_response_to_flask_response(response):
"""
Maps a Node.js response to a Flask response.
:param nodejs_response: The response from Node.js, expected to be a dictionary
with keys 'body', 'headers', and 'status'.
:return: Flask Response object
"""
result = response.get('result', {})
body = result.get('body', '')
headers = result.get('headers', {})
status = result.get('status', 200)
# Convert body to JSON if it's a dictionary
body = str(body)
return Response(response=body, status=status, headers=headers)
@ app.route("/", methods=["GET"])
def home():
return render_template("home.html", script=DEFAULT_SCRIPT)
@ app.route("/client/<function>", methods=["GET"])
def client(function):
http_function = db.get_http_function(function)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
http_function = create_http_function_view_model(http_function)
return render_template("client.html", **http_function)
@ app.route("/dashboard", methods=["GET"])
def dashboard():
http_functions = db.get_http_functions()
http_functions = create_http_functions_view_model(http_functions)
return render_template("dashboard.html", http_functions=http_functions)
@ app.route("/dashboard/http_functions", methods=["GET"])
def dashboard_http_functions():
http_functions = db.get_http_functions()
http_functions = create_http_functions_view_model(http_functions)
return render_template("dashboard/http_functions.html", http_functions=http_functions)
@ app.route("/dashboard/timer_functions", methods=["GET"])
def dashboard_timer_functions():
return render_template("dashboard/timer_functions.html")
@app.route('/execute', methods=['POST'])
def execute_code():
try:
# Extract code and convert request to a format acceptable by Node.js app
code = request.json.get('code')
request_obj = {
'method': request.method,
'headers': dict(request.headers),
'body': request.json,
'url': request.url,
# Add other request properties as needed
}
# Call the Node.js API
response = requests.post(API_URL, json={'code': code, 'request': request_obj})
response_data = response.json()
# check if playground=true is in the query string
if request.args.get('playground') == 'true':
return response_data
# Map the Node.js response to Flask response
flask_response = map_isolator_response_to_flask_response(response_data)
return flask_response
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/f/<function>', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD'])
def execute_http_function(function):
try:
http_function = db.get_http_function(function)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
# TODO: Get code from database based on function name
code = http_function['script_content']
print(code)
request_obj = {
'method': request.method,
'headers': dict(request.headers),
'url': request.url,
# Add other request properties as needed
}
# Add JSON data if it exists
if request.is_json:
request_obj['json'] = request.get_json()
# Add form data if it exists
if request.form:
request_obj['form'] = request.form.to_dict()
# Add query parameters if they exist
if request.args:
request_obj['query'] = request.args.to_dict()
# Add plain text data if it exists
if request.data and not request.is_json:
request_obj['text'] = request.data.decode('utf-8')
# Call the Node.js API
response = requests.post(API_URL, json={'code': code, 'request': request_obj})
response_data = response.json()
# Map the Node.js response to Flask response
flask_response = map_isolator_response_to_flask_response(response_data)
return flask_response
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# Bind to PORT if defined, otherwise default to 5000.
port = int(os.environ.get('PORT', 5000))
app.run(host='127.0.0.1', port=port)