Files
function/app.py
2025-07-21 22:05:58 +10:00

468 lines
18 KiB
Python

import json
import os
from flask import Flask, Response, jsonify, redirect, render_template, render_template_string, request, url_for
import jinja_partials
from jinja2_fragments import render_block
import requests
from extensions import db, htmx, init_app
from services import create_http_function_view_model, create_http_functions_view_model
from flask_login import LoginManager, UserMixin, current_user, login_required, login_user, logout_user
from werkzeug.security import check_password_hash, generate_password_hash
import os
from dotenv import load_dotenv
from routes.timer import timer
from routes.test import test
from routes.home import home
from routes.http import http
from flask_apscheduler import APScheduler
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
# Load environment variables from .env file in non-production environments
if os.environ.get('FLASK_ENV') != 'production':
load_dotenv()
app = Flask(__name__)
app.config.from_pyfile('config.py')
app.secret_key = os.environ.get('SECRET_KEY', '2a661781919643cb8a5a8bc57642d99f')
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "login"
jinja_partials.register_extensions(app)
# Remove scheduler configuration and initialization
init_app(app)
app.register_blueprint(timer, url_prefix='/timer')
app.register_blueprint(test, url_prefix='/test')
app.register_blueprint(home, url_prefix='/home')
app.register_blueprint(http, url_prefix='/dashboard/http_functions')
class User(UserMixin):
def __init__(self, id, username, password_hash, created_at):
self.id = id
self.username = username
self.password_hash = password_hash
self.created_at = created_at
@staticmethod
def get(user_id):
user_data = db.get_user(int(user_id))
if user_data:
return User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
return None
# Swith to inter app routing, which results in speed up from ~400ms to ~270ms
# https://stackoverflow.com/questions/76886643/linking-two-not-exposed-dokku-apps
API_URL = os.environ.get('API_URL', 'http://isolator.web:5000/execute') # 'https://isolator.peterstockings.com/execute' 'http://127.0.0.1:5000/execute'
DEFAULT_FUNCTION_NAME = 'foo'
DEFAULT_SCRIPT = """async (req) => {
console.log(`Method:${req.method}`)
console.log(`Generating ${environment.lines} random lines...`)
let svgContent = "";
for (let i = 0; i < environment.lines; i++) {
console.log(i)
let pathD = "M2 " + Math.random() * 79;
let circles = "";
for (let x = 12; x <= 202; x += 10) {
let y = Math.random() * 79
pathD += ` L${x} ${y}`;
circles += `<circle cx="${x}" cy="${y}" r="1"></circle>`;
}
let pathColor = `rgb(${Math.random() * 255}, ${Math.random() * 255}, ${Math.random() * 255})`;
svgContent += `
<g style="fill: ${pathColor}; stroke: ${pathColor};">
<path d="${pathD}" fill="none" stroke="${pathColor}"></path>
${circles}
</g>`;
}
return HtmlResponse(`<svg viewBox="0 0 204 79" preserveAspectRatio="none">${svgContent}</svg>`);
}"""
DEFAULT_ENVIRONMENT = """{
"lines": 3
}"""
def map_isolator_response_to_flask_response(response):
"""
Maps a Node.js response to a Flask response.
:param nodejs_response: The response from Node.js, expected to be a dictionary
with keys 'body', 'headers', and 'status'.
:return: Flask Response object
"""
result = response.get('result', {})
body = result.get('body', '')
headers = result.get('headers', {})
status = result.get('status', 200)
# Convert body to JSON if it's a dictionary
body = str(body)
return Response(response=body, status=status, headers=headers)
@ app.route("/", methods=["GET"])
def home():
return render_template("home.html", name='Try me', script=DEFAULT_SCRIPT, environment_info=DEFAULT_ENVIRONMENT)
@app.route("/documentation", methods=["GET"])
def documentation():
return render_template("documentation.html")
@ app.route("/dashboard", methods=["GET"])
@login_required
def dashboard():
user_id = current_user.id
http_functions = db.get_http_functions_for_user(user_id)
http_functions = create_http_functions_view_model(http_functions)
return render_template("dashboard/http_functions/overview.html", http_functions=http_functions)
def _generate_script_from_natural_language(natural_query):
"""Generates a Javascript function from natural language using Gemini REST API."""
gemni_model = os.environ.get("GEMINI_MODEL", "gemini-1.5-flash")
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
return None, "GEMINI_API_KEY environment variable not set."
api_url = f"https://generativelanguage.googleapis.com/v1beta/models/{gemni_model}:generateContent?key={api_key}"
headers = {'Content-Type': 'application/json'}
try:
prompt = f"""
You are an expert Javascript developer. Your task is to write a complete, production-ready Javascript async function based on the user's request.
**Function Signature:**
Your function MUST have the following signature: `async (req, environment) => {{ ... }}`
- `req`: An object containing details about the incoming HTTP request (e.g., `req.method`, `req.headers`, `req.body`, `req.query`).
- `environment`: A mutable JSON object that persists across executions. You can read and write to it to maintain state.
**Environment & Constraints:**
- The function will be executed in a simple, sandboxed Javascript environment.
- **CRITICAL**: ALL helper functions or variables MUST be defined *inside* the main `async` function. Do not define anything in the global scope.
- **DO NOT** use `require()`, `import`, or any other module loading system.
- **DO NOT** access the file system (`fs` module) or make network requests.
- You have access to a `console.log()` function for logging.
**Response Helpers:**
You must use one of the following functions to return a response:
- `HtmlResponse(body)`: Returns an HTML response.
- `JsonResponse(body)`: Returns a JSON response.
- `TextResponse(body)`: Returns a plain text response.
- `RedirectResponse(url)`: Redirects the user.
**Complex Example (Tic-Tac-Toe):**
```javascript
async (req, environment) => {{
// Helper function defined INSIDE the main function
function checkWinner(board) {{
const winConditions = [
[0, 1, 2], [3, 4, 5], [6, 7, 8],
[0, 3, 6], [1, 4, 7], [2, 5, 8],
[0, 4, 8], [2, 4, 6]
];
for (const condition of winConditions) {{
const [a, b, c] = condition;
if (board[a] && board[a] === board[b] && board[a] === board[c]) {{
return board[a];
}}
}}
return null;
}}
if (!environment.board) {{
environment.board = Array(9).fill("");
environment.turn = "X";
environment.winner = null;
}}
if (req.method === "POST" && req.json && req.json.move !== undefined) {{
const move = parseInt(req.json.move);
if (environment.board[move] === "" && !environment.winner) {{
environment.board[move] = environment.turn;
environment.turn = environment.turn === "X" ? "O" : "X";
environment.winner = checkWinner(environment.board);
}}
}}
const boardHTML = environment.board.map((cell, index) => `<button hx-post="/f/${{req.path.split('/')[2]}}/${{req.path.split('/')[3]}}" hx-target="body" hx-swap="innerHTML" name="move" value="${{index}}">${{cell || '&nbsp;'}}</button>`).join("");
const message = environment.winner ? `Player ${{environment.winner}} wins!` : `Turn: ${{environment.turn}}`;
return HtmlResponse(`
<html>
<head><title>Tic-Tac-Toe</title><script src="https://unpkg.com/htmx.org@1.9.9"></script></head>
<body><h1>${{message}}</h1><div>${{boardHTML}}</div></body>
</html>
`);
}}
```
**User's request:** "{natural_query}"
Return ONLY the complete Javascript function code, without any explanation, comments, or surrounding text/markdown.
"""
payload = json.dumps({
"contents": [{"parts": [{"text": prompt}]}]
})
response = requests.post(api_url, headers=headers, data=payload)
response.raise_for_status()
response_data = response.json()
candidates = response_data.get('candidates', [])
if not candidates:
return None, "No candidates found in API response."
content = candidates[0].get('content', {})
parts = content.get('parts', [])
if not parts:
return None, "No parts found in API response content."
generated_script = parts[0].get('text', '').strip()
# More robustly extract from markdown
if generated_script.startswith("```javascript"):
generated_script = generated_script[12:]
if generated_script.endswith("```"):
generated_script = generated_script[:-3]
generated_script = generated_script.strip()
# Remove any leading non-code characters
if not generated_script.startswith('async ('):
async_start = generated_script.find('async (')
if async_start != -1:
generated_script = generated_script[async_start:]
return generated_script.strip(), None
except requests.exceptions.RequestException as e:
app.logger.error(f"Gemini API request error: {e}")
return None, f"Error communicating with API: {e}"
except (KeyError, IndexError, Exception) as e:
app.logger.error(f"Error processing Gemini API response: {e} - Response: {response_data if 'response_data' in locals() else 'N/A'}")
return None, f"Error processing API response: {e}"
@app.route("/api/generate_script", methods=["POST"])
@login_required
def generate_script():
try:
natural_query = request.json.get('natural_query')
if not natural_query:
return jsonify({"error": "natural_query is required"}), 400
script_content, error = _generate_script_from_natural_language(natural_query)
if error:
return jsonify({"error": error}), 500
return jsonify({"script_content": script_content})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/execute', methods=['POST'])
def execute_code():
try:
# Extract code and convert request to a format acceptable by Node.js app
code = request.json.get('code')
request_obj = {
'method': request.method,
'headers': dict(request.headers),
'body': request.json,
'url': request.url,
'path': '/',
}
environment = request.json.get('environment_info')
environment_json = json.loads(environment)
# Call the Node.js API
response = requests.post(API_URL, json={'code': code, 'request': request_obj, 'environment': environment_json, 'name': "anonymous"})
response_data = response.json()
# check if playground=true is in the query string
if request.args.get('playground') == 'true':
return response_data
# Map the Node.js response to Flask response
flask_response = map_isolator_response_to_flask_response(response_data)
return flask_response
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/f/<int:user_id>/<path:function>', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD'])
def execute_http_function(user_id, function):
try:
# Split the function_path into the function name and the sub-path
parts = function.split('/', 1)
function_name = parts[0]
sub_path = '/' + parts[1] if len(parts) > 1 else '/'
http_function = db.get_http_function(user_id, function_name)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
code = http_function['script_content']
environment_info = http_function['environment_info']
# Ensure environment is a dictionary
if isinstance(environment_info, str) and environment_info:
try:
environment = json.loads(environment_info)
except json.JSONDecodeError:
environment = {}
elif isinstance(environment_info, dict):
environment = environment_info
else:
environment = {}
is_public = http_function['is_public']
log_request = http_function['log_request']
log_response = http_function['log_response']
version_number = http_function['version_number']
# Check if the function is public, if not check if the user is authenticated and owns the function
if not is_public:
if not current_user.is_authenticated:
return login_manager.unauthorized()
if int(current_user.id) != user_id:
return jsonify({'error': 'Function belongs to another user'}), 404
request_data = {
'method': request.method,
'headers': dict(request.headers),
'url': request.url,
'path': sub_path,
}
# Add JSON data if it exists
if request.is_json:
request_data['json'] = request.get_json()
# Add form data if it exists
if request.form:
request_data['form'] = {
key.rstrip('[]'): request.form.getlist(key) if key.endswith('[]') or len(request.form.getlist(key)) > 1 else request.form.getlist(key)[0]
for key in request.form.keys()
}
# Add query parameters if they exist
if request.args:
request_data['query'] = {
key.rstrip('[]'): request.args.getlist(key) if key.endswith('[]') or len(request.args.getlist(key)) > 1 else request.args.getlist(key)[0]
for key in request.args.keys()
}
# Add plain text data if it exists
if request.data and not request.is_json:
request_data['text'] = request.data.decode('utf-8')
# Call the Node.js API
response = requests.post(API_URL, json={'code': code, 'request': request_data, 'environment': environment, 'name': function_name})
response_data = response.json()
db.update_http_function_environment_info_and_invoked_count(user_id, function_name, response_data['environment'])
db.add_http_function_invocation(
http_function['id'],
response_data['status'],
request_data if log_request else {},
response_data['result'] if (log_response or response_data['status'] != 'SUCCESS') else {},
response_data['logs'],
version_number)
if response_data['status'] != 'SUCCESS':
return render_template("function_error.html", function_name=function_name ,error=response_data['result'], logs=response_data['logs'])
# Map the Node.js response to Flask response
flask_response = map_isolator_response_to_flask_response(response_data)
return flask_response
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
return render_template("login.html")
username = request.form.get('username')
password = request.form.get('password')
if not username or not password:
return render_template("login.html", error="Both username and password must be entered")
user_data = db.get_user_by_username(username)
if not user_data:
return render_template("login.html", error="User does not exist")
if not check_password_hash(user_data['password_hash'], password):
return render_template("login.html", error="Invalid username or password")
user = User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
# user should be an instance of your `User` class
login_user(user)
#flask.flash('Logged in successfully.')
next = request.args.get('next')
return redirect(next or url_for('home.index'))
@app.route('/signup', methods=['GET', 'POST'])
def signup():
if request.method == 'GET':
return render_template("signup.html")
username = request.form.get('username')
password = request.form.get('password')
if not username or not password:
return render_template("signup.html", error="Both username and password must be entered")
if len(username) < 10 or len(password) < 10:
return render_template("signup.html", error="Both username and password must be at least 10 characters long")
user = db.get_user_by_username(username)
if user:
return render_template("signup.html", error="User already exists")
hashed_password = generate_password_hash(password)
user_data = db.create_new_user(username, hashed_password)
user = User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
login_user(user)
return redirect(url_for('dashboard'))
@app.route("/logout")
@login_required
def logout():
logout_user()
return redirect(url_for('home'))
@login_manager.user_loader
def load_user(user_id):
user_data = db.get_user(int(user_id))
if user_data:
return User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
return None
if __name__ == '__main__':
# Bind to PORT if defined, otherwise default to 5000.
port = int(os.environ.get('PORT', 5000))
app.run(host='127.0.0.1', port=port)