import json
import os
from flask import Flask, Response, jsonify, redirect, render_template, request, url_for
import jinja_partials
from jinja2_fragments import render_block
from flask_htmx import HTMX
import requests
from db import DataBase
from services import create_http_function_view_model, create_http_functions_view_model
from flask_login import LoginManager, UserMixin, current_user, login_required, login_user, logout_user
from werkzeug.security import check_password_hash, generate_password_hash
login_manager = LoginManager()
app = Flask(__name__)
app.config.from_pyfile('config.py')
app.secret_key = os.environ.get('SECRET_KEY', '2a661781919643cb8a5a8bc57642d99f')
login_manager.init_app(app)
login_manager.login_view = "login"
jinja_partials.register_extensions(app)
htmx = HTMX(app)
db = DataBase(app)
class User(UserMixin):
def __init__(self, id, username, password_hash, created_at):
self.id = id
self.username = username
self.password_hash = password_hash
self.created_at = created_at
@staticmethod
def get(user_id):
user_data = db.get_user(int(user_id))
if user_data:
return User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
return None
API_URL = 'https://isolator.peterstockings.com/execute'
DEFAULT_FUNCTION_NAME = 'foo'
DEFAULT_SCRIPT = """async (req) => {
console.log(`Hello ${environment.name}`)
return HtmlResponse(
`
Method:${req.method}
Hello ${environment.name}
`)
}"""
DEFAULT_ENVIRONMENT = """{
"name": "Peter"
}"""
def map_isolator_response_to_flask_response(response):
"""
Maps a Node.js response to a Flask response.
:param nodejs_response: The response from Node.js, expected to be a dictionary
with keys 'body', 'headers', and 'status'.
:return: Flask Response object
"""
result = response.get('result', {})
body = result.get('body', '')
headers = result.get('headers', {})
status = result.get('status', 200)
# Convert body to JSON if it's a dictionary
body = str(body)
return Response(response=body, status=status, headers=headers)
@ app.route("/", methods=["GET"])
def home():
return render_template("home.html", name='Try me', script=DEFAULT_SCRIPT, environment_info=DEFAULT_ENVIRONMENT)
@ app.route("/client//", methods=["GET"])
@login_required
def client(user_id, function):
http_function = db.get_http_function(user_id, function)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
http_function = create_http_function_view_model(http_function)
return render_template("client.html", **http_function)
@ app.route("/dashboard", methods=["GET"])
@login_required
def dashboard():
user_id = current_user.id
http_functions = db.get_http_functions_for_user(user_id)
http_functions = create_http_functions_view_model(http_functions)
return render_template("dashboard.html", http_functions=http_functions)
@ app.route("/dashboard/http_functions", methods=["GET"])
@login_required
def dashboard_http_functions():
user_id = current_user.id
http_functions = db.get_http_functions_for_user(user_id)
http_functions = create_http_functions_view_model(http_functions)
return render_template("dashboard/http_functions.html", http_functions=http_functions)
@ app.route("/dashboard/http_functions/add_form", methods=["GET"])
@login_required
def get_http_function_add_form():
return render_template("dashboard/http_functions/new.html", name=DEFAULT_FUNCTION_NAME, script=DEFAULT_SCRIPT, environment_info=DEFAULT_ENVIRONMENT, is_public=False, log_request=True, log_response=False)
@ app.route("/dashboard/http_functions/create", methods=["POST"])
@login_required
def create_http_function():
try:
user_id = current_user.id
name = request.json.get('name')
script_content = request.json.get('script_content')
environment_info = json.dumps(eval(request.json.get('environment_info')))
is_public = request.json.get('is_public')
log_request = request.json.get('log_request')
log_response = request.json.get('log_response')
db.create_new_http_function(user_id, name, script_content, environment_info, is_public, log_request, log_response)
http_functions = db.get_http_functions_for_user(user_id)
http_functions = create_http_functions_view_model(http_functions)
return render_template("dashboard/http_functions.html", http_functions=http_functions)
except Exception as e:
print(e)
return { "status": "error", "message": str(e) }
@ app.route("/dashboard/http_functions/edit_form", methods=["GET"])
@login_required
def get_http_function_edit_form():
user_id = current_user.id
name = request.args.get('name')
http_function = db.get_http_function(user_id, name)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
script = http_function['script_content']
environment_info = json.dumps(http_function['environment_info'], indent=2)
is_public = http_function['is_public']
log_request = http_function['log_request']
log_response = http_function['log_response']
return render_template("dashboard/http_functions/edit.html", user_id=user_id, name=name, script=script, environment_info=environment_info, is_public=is_public, log_request=log_request, log_response=log_response)
@ app.route("/dashboard/http_functions/edit", methods=["POST"])
@login_required
def edit_http_function():
try:
user_id = current_user.id
name = request.json.get('name')
script_content = request.json.get('script_content')
environment_info = json.dumps(eval(request.json.get('environment_info')))
is_public = request.json.get('is_public')
log_request = request.json.get('log_request')
log_response = request.json.get('log_response')
db.edit_http_function(user_id, name, script_content, environment_info, is_public, log_request, log_response)
return { "status": "success", "message": f'{name} updated' }
except Exception as e:
print(e)
return { "status": "error", "message": str(e) }
@ app.route("/dashboard/http_functions/delete", methods=["DELETE"])
@login_required
def delete_http_function():
try:
user_id = current_user.id
name = request.args.get('name')
db.delete_http_function(user_id, name)
http_functions = db.get_http_functions_for_user(user_id)
http_functions = create_http_functions_view_model(http_functions)
return render_template("dashboard/http_functions.html", http_functions=http_functions)
except Exception as e:
return jsonify({"status": 'error', "message": str(e)}), 500
@ app.route("/dashboard/http_functions/logs", methods=["GET"])
@login_required
def get_http_function_logs():
user_id = current_user.id
name = request.args.get('name')
http_function = db.get_http_function(user_id, name)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
http_function_id = http_function['id']
http_function_invocations = db.get_http_function_invocations(http_function_id)
return render_template("dashboard/http_functions/logs.html", user_id=user_id, name=name, http_function_invocations=http_function_invocations)
@ app.route("/dashboard/timer_functions", methods=["GET"])
@login_required
def dashboard_timer_functions():
return render_template("dashboard/timer_functions.html")
@app.route('/execute', methods=['POST'])
def execute_code():
try:
# Extract code and convert request to a format acceptable by Node.js app
code = request.json.get('code')
request_obj = {
'method': request.method,
'headers': dict(request.headers),
'body': request.json,
'url': request.url,
# Add other request properties as needed
}
environment = request.json.get('environment_info')
environment_json = json.loads(environment)
# Call the Node.js API
response = requests.post(API_URL, json={'code': code, 'request': request_obj, 'environment': environment_json})
response_data = response.json()
# check if playground=true is in the query string
if request.args.get('playground') == 'true':
return response_data
# Map the Node.js response to Flask response
flask_response = map_isolator_response_to_flask_response(response_data)
return flask_response
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/f//', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD'])
def execute_http_function(user_id, function):
try:
http_function = db.get_http_function(user_id, function)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
code = http_function['script_content']
environment = http_function['environment_info']
is_public = http_function['is_public']
log_request = http_function['log_request']
log_response = http_function['log_response']
# Check if the function is public, if not check if the user is authenticated and owns the function
if not is_public:
if not current_user.is_authenticated:
return login_manager.unauthorized()
if int(current_user.id) != user_id:
return jsonify({'error': 'Function belongs to another user', 'current_user_id': current_user.id, 'user_id': user_id}), 404
request_data = {
'method': request.method,
'headers': dict(request.headers),
'url': request.url,
# Add other request properties as needed
}
# Add JSON data if it exists
if request.is_json:
request_data['json'] = request.get_json()
# Add form data if it exists
if request.form:
request_data['form'] = request.form.to_dict()
# Add query parameters if they exist
if request.args:
request_data['query'] = request.args.to_dict()
# Add plain text data if it exists
if request.data and not request.is_json:
request_data['text'] = request.data.decode('utf-8')
# Call the Node.js API
response = requests.post(API_URL, json={'code': code, 'request': request_data, 'environment': environment})
response_data = response.json()
db.update_http_function_environment_info_and_invoked_count(user_id, function, response_data['environment'])
db.add_http_function_invocation(
http_function['id'],
response_data['status'],
request_data if log_request else {},
response_data['result'] if log_response else {},
response_data['logs'])
# Map the Node.js response to Flask response
flask_response = map_isolator_response_to_flask_response(response_data)
return flask_response
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
return render_template("login.html")
username = request.form.get('username')
password = request.form.get('password')
if not username or not password:
return render_template("login.html", error="Both username and password must be entered")
user_data = db.get_user_by_username(username)
if not user_data:
return render_template("login.html", error="User does not exist")
if not check_password_hash(user_data['password_hash'], password):
return render_template("login.html", error="Invalid username or password")
user = User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
# user should be an instance of your `User` class
login_user(user)
#flask.flash('Logged in successfully.')
next = request.args.get('next')
return redirect(next or url_for('dashboard'))
@app.route('/signup', methods=['GET', 'POST'])
def signup():
if request.method == 'GET':
return render_template("signup.html")
username = request.form.get('username')
password = request.form.get('password')
if not username or not password:
return render_template("signup.html", error="Both username and password must be entered")
if len(username) < 10 or len(password) < 10:
return render_template("signup.html", error="Both username and password must be at least 10 characters long")
user = db.get_user_by_username(username)
if user:
return render_template("signup.html", error="User already exists")
hashed_password = generate_password_hash(password)
user_data = db.create_new_user(username, hashed_password)
user = User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
login_user(user)
return redirect(url_for('dashboard'))
@app.route("/logout")
@login_required
def logout():
logout_user()
return redirect(url_for('home'))
@login_manager.user_loader
def load_user(user_id):
user_data = db.get_user(int(user_id))
if user_data:
return User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
return None
if __name__ == '__main__':
# Bind to PORT if defined, otherwise default to 5000.
port = int(os.environ.get('PORT', 5000))
app.run(host='127.0.0.1', port=port)