import json import os from flask import Flask, Response, jsonify, redirect, render_template, render_template_string, request, url_for import jinja_partials from jinja2_fragments import render_block import requests from extensions import db, htmx, init_app from services import create_http_function_view_model, create_http_functions_view_model from flask_login import LoginManager, UserMixin, current_user, login_required, login_user, logout_user from werkzeug.security import check_password_hash, generate_password_hash import os from dotenv import load_dotenv from routes.timer import timer from routes.test import test from routes.home import home from routes.http import http from flask_apscheduler import APScheduler import asyncio import aiohttp from concurrent.futures import ThreadPoolExecutor # Load environment variables from .env file in non-production environments if os.environ.get('FLASK_ENV') != 'production': load_dotenv() app = Flask(__name__) app.config.from_pyfile('config.py') app.secret_key = os.environ.get('SECRET_KEY', '2a661781919643cb8a5a8bc57642d99f') login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = "login" jinja_partials.register_extensions(app) # Remove scheduler configuration and initialization init_app(app) app.register_blueprint(timer, url_prefix='/timer') app.register_blueprint(test, url_prefix='/test') app.register_blueprint(home, url_prefix='/home') app.register_blueprint(http, url_prefix='/http') class User(UserMixin): def __init__(self, id, username, password_hash, created_at): self.id = id self.username = username self.password_hash = password_hash self.created_at = created_at @staticmethod def get(user_id): user_data = db.get_user(int(user_id)) if user_data: return User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at']) return None # Swith to inter app routing, which results in speed up from ~400ms to ~270ms # https://stackoverflow.com/questions/76886643/linking-two-not-exposed-dokku-apps API_URL = os.environ.get('API_URL', 'http://isolator.web:5000/execute') # 'https://isolator.peterstockings.com/execute' 'http://127.0.0.1:5000/execute' DEFAULT_FUNCTION_NAME = 'foo' DEFAULT_SCRIPT = """async (req) => { console.log(`Method:${req.method}`) console.log(`Generating ${environment.lines} random lines...`) let svgContent = ""; for (let i = 0; i < environment.lines; i++) { console.log(i) let pathD = "M2 " + Math.random() * 79; let circles = ""; for (let x = 12; x <= 202; x += 10) { let y = Math.random() * 79 pathD += ` L${x} ${y}`; circles += ``; } let pathColor = `rgb(${Math.random() * 255}, ${Math.random() * 255}, ${Math.random() * 255})`; svgContent += ` ${circles} `; } return HtmlResponse(`${svgContent}`); }""" DEFAULT_ENVIRONMENT = """{ "lines": 3 }""" def map_isolator_response_to_flask_response(response): """ Maps a Node.js response to a Flask response. :param nodejs_response: The response from Node.js, expected to be a dictionary with keys 'body', 'headers', and 'status'. :return: Flask Response object """ result = response.get('result', {}) body = result.get('body', '') headers = result.get('headers', {}) status = result.get('status', 200) # Convert body to JSON if it's a dictionary body = str(body) return Response(response=body, status=status, headers=headers) @ app.route("/", methods=["GET"]) def home(): return render_template("home.html", name='Try me', script=DEFAULT_SCRIPT, environment_info=DEFAULT_ENVIRONMENT) @app.route("/documentation", methods=["GET"]) def documentation(): return render_template("documentation.html") def _generate_script_from_natural_language(natural_query): """Generates a Javascript function from natural language using Gemini REST API.""" gemni_model = os.environ.get("GEMINI_MODEL", "gemini-1.5-flash") api_key = os.environ.get("GEMINI_API_KEY") if not api_key: return None, "GEMINI_API_KEY environment variable not set." api_url = f"https://generativelanguage.googleapis.com/v1beta/models/{gemni_model}:generateContent?key={api_key}" headers = {'Content-Type': 'application/json'} try: prompt = f""" You are an expert Javascript developer. Your task is to write a complete, production-ready Javascript async function based on the user's request. **Function Signature:** Your function MUST have the following signature: `async (req, environment) => {{ ... }}` - `req`: An object containing details about the incoming HTTP request (e.g., `req.method`, `req.headers`, `req.body`, `req.query`). - `environment`: A mutable JSON object that persists across executions. You can read and write to it to maintain state. **Environment & Constraints:** - The function will be executed in a simple, sandboxed Javascript environment. - **CRITICAL**: ALL helper functions or variables MUST be defined *inside* the main `async` function. Do not define anything in the global scope. - **DO NOT** use `require()`, `import`, or any other module loading system. - **DO NOT** access the file system (`fs` module) or make network requests. - You have access to a `console.log()` function for logging. **Response Helpers:** You must use one of the following functions to return a response: - `HtmlResponse(body)`: Returns an HTML response. - `JsonResponse(body)`: Returns a JSON response. - `TextResponse(body)`: Returns a plain text response. - `RedirectResponse(url)`: Redirects the user. **Complex Example (Tic-Tac-Toe):** ```javascript async (req, environment) => {{ // Helper function defined INSIDE the main function function checkWinner(board) {{ const winConditions = [ [0, 1, 2], [3, 4, 5], [6, 7, 8], [0, 3, 6], [1, 4, 7], [2, 5, 8], [0, 4, 8], [2, 4, 6] ]; for (const condition of winConditions) {{ const [a, b, c] = condition; if (board[a] && board[a] === board[b] && board[a] === board[c]) {{ return board[a]; }} }} return null; }} if (!environment.board) {{ environment.board = Array(9).fill(""); environment.turn = "X"; environment.winner = null; }} if (req.method === "POST" && req.json && req.json.move !== undefined) {{ const move = parseInt(req.json.move); if (environment.board[move] === "" && !environment.winner) {{ environment.board[move] = environment.turn; environment.turn = environment.turn === "X" ? "O" : "X"; environment.winner = checkWinner(environment.board); }} }} const boardHTML = environment.board.map((cell, index) => ``).join(""); const message = environment.winner ? `Player ${{environment.winner}} wins!` : `Turn: ${{environment.turn}}`; return HtmlResponse(` Tic-Tac-Toe

${{message}}

${{boardHTML}}
`); }} ``` **User's request:** "{natural_query}" Return ONLY the complete Javascript function code, without any explanation, comments, or surrounding text/markdown. """ payload = json.dumps({ "contents": [{"parts": [{"text": prompt}]}] }) response = requests.post(api_url, headers=headers, data=payload) response.raise_for_status() response_data = response.json() candidates = response_data.get('candidates', []) if not candidates: return None, "No candidates found in API response." content = candidates[0].get('content', {}) parts = content.get('parts', []) if not parts: return None, "No parts found in API response content." generated_script = parts[0].get('text', '').strip() # More robustly extract from markdown if generated_script.startswith("```javascript"): generated_script = generated_script[12:] if generated_script.endswith("```"): generated_script = generated_script[:-3] generated_script = generated_script.strip() # Remove any leading non-code characters if not generated_script.startswith('async ('): async_start = generated_script.find('async (') if async_start != -1: generated_script = generated_script[async_start:] return generated_script.strip(), None except requests.exceptions.RequestException as e: app.logger.error(f"Gemini API request error: {e}") return None, f"Error communicating with API: {e}" except (KeyError, IndexError, Exception) as e: app.logger.error(f"Error processing Gemini API response: {e} - Response: {response_data if 'response_data' in locals() else 'N/A'}") return None, f"Error processing API response: {e}" @app.route("/api/generate_script", methods=["POST"]) @login_required def generate_script(): try: natural_query = request.json.get('natural_query') if not natural_query: return jsonify({"error": "natural_query is required"}), 400 script_content, error = _generate_script_from_natural_language(natural_query) if error: return jsonify({"error": error}), 500 return jsonify({"script_content": script_content}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/execute', methods=['POST']) def execute_code(): try: # Extract code and convert request to a format acceptable by Node.js app code = request.json.get('code') request_obj = { 'method': request.method, 'headers': dict(request.headers), 'body': request.json, 'url': request.url, 'path': '/', } environment = request.json.get('environment_info') environment_json = json.loads(environment) # Call the Node.js API response = requests.post(API_URL, json={'code': code, 'request': request_obj, 'environment': environment_json, 'name': "anonymous"}) response_data = response.json() # check if playground=true is in the query string if request.args.get('playground') == 'true': return response_data # Map the Node.js response to Flask response flask_response = map_isolator_response_to_flask_response(response_data) return flask_response except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/f//', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD']) def execute_http_function(user_id, function): try: # Split the function_path into the function name and the sub-path parts = function.split('/', 1) function_name = parts[0] sub_path = '/' + parts[1] if len(parts) > 1 else '/' http_function = db.get_http_function(user_id, function_name) if not http_function: return jsonify({'error': 'Function not found'}), 404 code = http_function['script_content'] environment_info = http_function['environment_info'] # Ensure environment is a dictionary if isinstance(environment_info, str) and environment_info: try: environment = json.loads(environment_info) except json.JSONDecodeError: environment = {} elif isinstance(environment_info, dict): environment = environment_info else: environment = {} is_public = http_function['is_public'] log_request = http_function['log_request'] log_response = http_function['log_response'] version_number = http_function['version_number'] # Check if the function is public, if not check if the user is authenticated and owns the function if not is_public: if not current_user.is_authenticated: return login_manager.unauthorized() if int(current_user.id) != user_id: return jsonify({'error': 'Function belongs to another user'}), 404 request_data = { 'method': request.method, 'headers': dict(request.headers), 'url': request.url, 'path': sub_path, } # Add JSON data if it exists if request.is_json: request_data['json'] = request.get_json() # Add form data if it exists if request.form: request_data['form'] = { key.rstrip('[]'): request.form.getlist(key) if key.endswith('[]') or len(request.form.getlist(key)) > 1 else request.form.getlist(key)[0] for key in request.form.keys() } # Add query parameters if they exist if request.args: request_data['query'] = { key.rstrip('[]'): request.args.getlist(key) if key.endswith('[]') or len(request.args.getlist(key)) > 1 else request.args.getlist(key)[0] for key in request.args.keys() } # Add plain text data if it exists if request.data and not request.is_json: request_data['text'] = request.data.decode('utf-8') # Call the Node.js API response = requests.post(API_URL, json={'code': code, 'request': request_data, 'environment': environment, 'name': function_name}) response_data = response.json() db.update_http_function_environment_info_and_invoked_count(user_id, function_name, response_data['environment']) db.add_http_function_invocation( http_function['id'], response_data['status'], request_data if log_request else {}, response_data['result'] if (log_response or response_data['status'] != 'SUCCESS') else {}, response_data['logs'], version_number) if response_data['status'] != 'SUCCESS': return render_template("function_error.html", function_name=function_name ,error=response_data['result'], logs=response_data['logs']) # Map the Node.js response to Flask response flask_response = map_isolator_response_to_flask_response(response_data) return flask_response except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': return render_template("login.html") username = request.form.get('username') password = request.form.get('password') if not username or not password: return render_template("login.html", error="Both username and password must be entered") user_data = db.get_user_by_username(username) if not user_data: return render_template("login.html", error="User does not exist") if not check_password_hash(user_data['password_hash'], password): return render_template("login.html", error="Invalid username or password") user = User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at']) # user should be an instance of your `User` class login_user(user) #flask.flash('Logged in successfully.') next = request.args.get('next') return redirect(next or url_for('home.index')) @app.route('/signup', methods=['GET', 'POST']) def signup(): if request.method == 'GET': return render_template("signup.html") username = request.form.get('username') password = request.form.get('password') if not username or not password: return render_template("signup.html", error="Both username and password must be entered") if len(username) < 10 or len(password) < 10: return render_template("signup.html", error="Both username and password must be at least 10 characters long") user = db.get_user_by_username(username) if user: return render_template("signup.html", error="User already exists") hashed_password = generate_password_hash(password) user_data = db.create_new_user(username, hashed_password) user = User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at']) login_user(user) return redirect(url_for('home.index')) @app.route("/logout") @login_required def logout(): logout_user() return redirect(url_for('home')) @login_manager.user_loader def load_user(user_id): user_data = db.get_user(int(user_id)) if user_data: return User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at']) return None if __name__ == '__main__': # Bind to PORT if defined, otherwise default to 5000. port = int(os.environ.get('PORT', 5000)) app.run(host='127.0.0.1', port=port)