676 lines
27 KiB
Python
676 lines
27 KiB
Python
import json
|
|
import os
|
|
from flask import Flask, Response, jsonify, redirect, render_template, render_template_string, request, url_for
|
|
import jinja_partials
|
|
from jinja2_fragments import render_block
|
|
import requests
|
|
from extensions import db, htmx, init_app
|
|
from services import create_http_function_view_model, create_http_functions_view_model
|
|
from flask_login import LoginManager, UserMixin, current_user, login_required, login_user, logout_user
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from routes.timer import timer
|
|
from routes.test import test
|
|
from routes.home import home
|
|
from flask_apscheduler import APScheduler
|
|
import asyncio
|
|
import aiohttp
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
# Load environment variables from .env file in non-production environments
|
|
if os.environ.get('FLASK_ENV') != 'production':
|
|
load_dotenv()
|
|
|
|
app = Flask(__name__)
|
|
|
|
app.config.from_pyfile('config.py')
|
|
app.secret_key = os.environ.get('SECRET_KEY', '2a661781919643cb8a5a8bc57642d99f')
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
login_manager.login_view = "login"
|
|
jinja_partials.register_extensions(app)
|
|
|
|
# Remove scheduler configuration and initialization
|
|
init_app(app)
|
|
|
|
app.register_blueprint(timer, url_prefix='/timer')
|
|
app.register_blueprint(test, url_prefix='/test')
|
|
app.register_blueprint(home, url_prefix='/home')
|
|
|
|
class User(UserMixin):
|
|
def __init__(self, id, username, password_hash, created_at):
|
|
self.id = id
|
|
self.username = username
|
|
self.password_hash = password_hash
|
|
self.created_at = created_at
|
|
|
|
@staticmethod
|
|
def get(user_id):
|
|
user_data = db.get_user(int(user_id))
|
|
|
|
if user_data:
|
|
return User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
|
|
return None
|
|
|
|
# Swith to inter app routing, which results in speed up from ~400ms to ~270ms
|
|
# https://stackoverflow.com/questions/76886643/linking-two-not-exposed-dokku-apps
|
|
API_URL = os.environ.get('API_URL', 'http://isolator.web:5000/execute') # 'https://isolator.peterstockings.com/execute' 'http://127.0.0.1:5000/execute'
|
|
|
|
DEFAULT_FUNCTION_NAME = 'foo'
|
|
|
|
DEFAULT_SCRIPT = """async (req) => {
|
|
console.log(`Method:${req.method}`)
|
|
console.log(`Generating ${environment.lines} random lines...`)
|
|
let svgContent = "";
|
|
|
|
for (let i = 0; i < environment.lines; i++) {
|
|
console.log(i)
|
|
let pathD = "M2 " + Math.random() * 79;
|
|
let circles = "";
|
|
|
|
for (let x = 12; x <= 202; x += 10) {
|
|
let y = Math.random() * 79
|
|
pathD += ` L${x} ${y}`;
|
|
circles += `<circle cx="${x}" cy="${y}" r="1"></circle>`;
|
|
}
|
|
|
|
let pathColor = `rgb(${Math.random() * 255}, ${Math.random() * 255}, ${Math.random() * 255})`;
|
|
svgContent += `
|
|
<g style="fill: ${pathColor}; stroke: ${pathColor};">
|
|
<path d="${pathD}" fill="none" stroke="${pathColor}"></path>
|
|
${circles}
|
|
</g>`;
|
|
}
|
|
|
|
return HtmlResponse(`<svg viewBox="0 0 204 79" preserveAspectRatio="none">${svgContent}</svg>`);
|
|
}"""
|
|
|
|
DEFAULT_ENVIRONMENT = """{
|
|
"lines": 3
|
|
}"""
|
|
|
|
def map_isolator_response_to_flask_response(response):
|
|
"""
|
|
Maps a Node.js response to a Flask response.
|
|
|
|
:param nodejs_response: The response from Node.js, expected to be a dictionary
|
|
with keys 'body', 'headers', and 'status'.
|
|
:return: Flask Response object
|
|
"""
|
|
result = response.get('result', {})
|
|
body = result.get('body', '')
|
|
headers = result.get('headers', {})
|
|
status = result.get('status', 200)
|
|
|
|
|
|
# Convert body to JSON if it's a dictionary
|
|
body = str(body)
|
|
|
|
return Response(response=body, status=status, headers=headers)
|
|
|
|
|
|
@ app.route("/", methods=["GET"])
|
|
def home():
|
|
return render_template("home.html", name='Try me', script=DEFAULT_SCRIPT, environment_info=DEFAULT_ENVIRONMENT)
|
|
|
|
@app.route("/documentation", methods=["GET"])
|
|
def documentation():
|
|
return render_template("documentation.html")
|
|
|
|
@ app.route("/dashboard", methods=["GET"])
|
|
@login_required
|
|
def dashboard():
|
|
user_id = current_user.id
|
|
http_functions = db.get_http_functions_for_user(user_id)
|
|
http_functions = create_http_functions_view_model(http_functions)
|
|
return render_template("dashboard/http_functions/overview.html", http_functions=http_functions)
|
|
|
|
@ app.route("/dashboard/http_functions", methods=["GET"])
|
|
@login_required
|
|
def dashboard_http_functions():
|
|
user_id = current_user.id
|
|
http_functions = db.get_http_functions_for_user(user_id)
|
|
http_functions = create_http_functions_view_model(http_functions)
|
|
if htmx:
|
|
return render_block(app.jinja_env, "dashboard/http_functions/overview.html", "page", http_functions=http_functions)
|
|
return render_template("dashboard/http_functions/overview.html", http_functions=http_functions)
|
|
|
|
|
|
@ app.route("/dashboard/http_functions/add_form", methods=["GET"])
|
|
@login_required
|
|
def get_http_function_add_form():
|
|
user_id = current_user.id
|
|
if htmx:
|
|
return render_block(app.jinja_env, 'dashboard/http_functions/new.html', 'page', user_id=user_id, name=DEFAULT_FUNCTION_NAME, script=DEFAULT_SCRIPT, environment_info=DEFAULT_ENVIRONMENT, is_public=False, log_request=True, log_response=False)
|
|
return render_template("dashboard/http_functions/new.html", user_id=user_id, name=DEFAULT_FUNCTION_NAME, script=DEFAULT_SCRIPT, environment_info=DEFAULT_ENVIRONMENT, is_public=False, log_request=True, log_response=False)
|
|
|
|
|
|
@ app.route("/dashboard/http_functions/create", methods=["POST"])
|
|
@login_required
|
|
def create_http_function():
|
|
try:
|
|
user_id = current_user.id
|
|
name = request.form.get('name')
|
|
script_content = request.form.get('script_content')
|
|
environment_info = request.form.get('environment_info')
|
|
is_public = request.form.get('is_public')
|
|
log_request = request.form.get('log_request')
|
|
log_response = request.form.get('log_response')
|
|
|
|
db.create_new_http_function(user_id, name, script_content, environment_info, is_public, log_request, log_response)
|
|
|
|
http_functions = db.get_http_functions_for_user(user_id)
|
|
http_functions = create_http_functions_view_model(http_functions)
|
|
return render_block(app.jinja_env, "dashboard/http_functions/overview.html", "page", http_functions=http_functions), 200, {"HX-Push-Url": url_for('dashboard_http_functions')}
|
|
except Exception as e:
|
|
print(e)
|
|
return { "status": "error", "message": str(e) }
|
|
|
|
|
|
@ app.route("/dashboard/http_functions/<int:function_id>/edit", methods=["POST"])
|
|
@login_required
|
|
def edit_http_function(function_id):
|
|
try:
|
|
user_id = current_user.id
|
|
name = request.json.get('name')
|
|
script_content = request.json.get('script_content')
|
|
environment_info = request.json.get('environment_info')
|
|
is_public = request.json.get('is_public')
|
|
log_request = request.json.get('log_request')
|
|
log_response = request.json.get('log_response')
|
|
|
|
updated_version = db.edit_http_function(user_id, function_id, name, script_content, environment_info, is_public, log_request, log_response)
|
|
|
|
return { "status": "success", "message": f'{name} updated' }
|
|
except Exception as e:
|
|
print(e)
|
|
return { "status": "error", "message": str(e) }
|
|
|
|
@ app.route("/dashboard/http_functions/<int:function_id>/delete", methods=["DELETE"])
|
|
@login_required
|
|
def delete_http_function(function_id):
|
|
try:
|
|
user_id = current_user.id
|
|
db.delete_http_function(user_id, function_id)
|
|
|
|
http_functions = db.get_http_functions_for_user(user_id)
|
|
http_functions = create_http_functions_view_model(http_functions)
|
|
return render_block(app.jinja_env, "dashboard/http_functions/overview.html", "page", http_functions=http_functions), 200, {"HX-Push-Url": url_for('dashboard_http_functions')}
|
|
except Exception as e:
|
|
return jsonify({"status": 'error', "message": str(e)}), 500
|
|
|
|
@ app.route("/dashboard/http_functions/<int:function_id>/logs", methods=["GET"])
|
|
@login_required
|
|
def get_http_function_logs(function_id):
|
|
user_id = current_user.id
|
|
http_function = db.get_http_function_by_id(user_id, function_id)
|
|
if not http_function:
|
|
return jsonify({'error': 'Function not found'}), 404
|
|
name = http_function['name']
|
|
http_function_invocations = db.get_http_function_invocations(function_id)
|
|
if htmx:
|
|
return render_block(app.jinja_env, 'dashboard/http_functions/logs.html', 'page', user_id=user_id, function_id=function_id, name=name, http_function_invocations=http_function_invocations)
|
|
return render_template("dashboard/http_functions/logs.html", user_id=user_id, name=name, function_id=function_id, http_function_invocations=http_function_invocations)
|
|
|
|
@ app.route("/dashboard/http_functions/<int:function_id>/client", methods=["GET"])
|
|
@login_required
|
|
def client(function_id):
|
|
user_id = current_user.id
|
|
http_function = db.get_http_function_by_id(user_id, function_id)
|
|
if not http_function:
|
|
return jsonify({'error': 'Function not found'}), 404
|
|
|
|
http_function = create_http_function_view_model(http_function)
|
|
if htmx:
|
|
return render_block(app.jinja_env, 'dashboard/http_functions/client.html', 'page', function_id=function_id, **http_function)
|
|
return render_template("dashboard/http_functions/client.html", function_id=function_id, **http_function)
|
|
|
|
@app.route("/dashboard/http_functions/<int:function_id>/history", methods=["GET"])
|
|
@login_required
|
|
def get_http_function_history(function_id):
|
|
user_id = current_user.id
|
|
http_function = db.get_http_function_by_id(user_id, function_id)
|
|
if not http_function:
|
|
return jsonify({'error': 'Function not found'}), 404
|
|
|
|
name = http_function['name']
|
|
version_number = http_function['version_number']
|
|
original_script = http_function['script_content'] if version_number == 1 else None
|
|
|
|
http_function_history = []
|
|
if version_number > 1:
|
|
raw_history = db.get_http_function_history(function_id)
|
|
|
|
for i in range(len(raw_history) - 1):
|
|
pre_version = raw_history[i + 1]
|
|
post_version = raw_history[i]
|
|
|
|
http_function_history.append({
|
|
'pre': pre_version['script_content'],
|
|
'post': post_version['script_content'],
|
|
'version_id': post_version['version_id'],
|
|
'version_number': post_version['version_number'],
|
|
'updated_at': post_version['updated_at']
|
|
})
|
|
|
|
if raw_history:
|
|
original_script = raw_history[-1]['script_content']
|
|
|
|
if htmx:
|
|
return render_block(app.jinja_env, 'dashboard/http_functions/history.html', 'page', user_id=user_id, function_id=function_id, name=name, http_function=http_function, http_function_history=http_function_history, original_script=original_script)
|
|
return render_template("dashboard/http_functions/history.html", user_id=user_id, name=name, function_id=function_id, http_function=http_function, http_function_history=http_function_history, original_script=original_script)
|
|
|
|
def _generate_script_from_natural_language(natural_query):
|
|
"""Generates a Javascript function from natural language using Gemini REST API."""
|
|
gemni_model = os.environ.get("GEMINI_MODEL", "gemini-1.5-flash")
|
|
api_key = os.environ.get("GEMINI_API_KEY")
|
|
if not api_key:
|
|
return None, "GEMINI_API_KEY environment variable not set."
|
|
|
|
api_url = f"https://generativelanguage.googleapis.com/v1beta/models/{gemni_model}:generateContent?key={api_key}"
|
|
headers = {'Content-Type': 'application/json'}
|
|
|
|
try:
|
|
prompt = f"""
|
|
You are an expert Javascript developer. Your task is to write a complete, production-ready Javascript async function based on the user's request.
|
|
|
|
**Environment & Constraints:**
|
|
- The function will be executed in a simple, sandboxed Javascript environment.
|
|
- **DO NOT** use `require()`, `import`, or any other module loading system. The environment does not support it.
|
|
- **DO NOT** access the file system (`fs` module) or make network requests.
|
|
- The function must be a single `async` arrow function.
|
|
- You have access to a persistent JSON object called `environment`. You can read from and write to it. For example: `environment.my_variable = 'hello'`.
|
|
- You also have access to a `console.log()` function for logging.
|
|
|
|
**Input:**
|
|
The function receives one argument: `req`, an object containing details about the incoming HTTP request. It has properties like `req.method`, `req.headers`, `req.body`, `req.query`, etc.
|
|
|
|
**Output:**
|
|
You must use one of the following helper functions to return a response:
|
|
- `HtmlResponse(body)`: Returns an HTML response.
|
|
- `JsonResponse(body)`: Returns a JSON response. The `body` will be automatically stringified.
|
|
- `TextResponse(body)`: Returns a plain text response.
|
|
- `RedirectResponse(url)`: Redirects the user to a different URL.
|
|
|
|
**Example:**
|
|
```javascript
|
|
async (req) => {{
|
|
if (!environment.counter) {{
|
|
environment.counter = 0;
|
|
}}
|
|
environment.counter++;
|
|
console.log("The counter is now " + environment.counter);
|
|
return HtmlResponse(`<h1>Counter: ${{environment.counter}}</h1>`);
|
|
}}
|
|
```
|
|
|
|
**User's request:** "{natural_query}"
|
|
|
|
Return ONLY the complete Javascript function code, without any explanation, comments, or surrounding text/markdown.
|
|
"""
|
|
payload = json.dumps({
|
|
"contents": [{"parts": [{"text": prompt}]}]
|
|
})
|
|
|
|
response = requests.post(api_url, headers=headers, data=payload)
|
|
response.raise_for_status()
|
|
|
|
response_data = response.json()
|
|
|
|
candidates = response_data.get('candidates', [])
|
|
if not candidates:
|
|
return None, "No candidates found in API response."
|
|
|
|
content = candidates[0].get('content', {})
|
|
parts = content.get('parts', [])
|
|
if not parts:
|
|
return None, "No parts found in API response content."
|
|
|
|
generated_script = parts[0].get('text', '').strip()
|
|
|
|
# More robustly extract from markdown
|
|
if generated_script.startswith("```javascript"):
|
|
generated_script = generated_script[12:]
|
|
if generated_script.endswith("```"):
|
|
generated_script = generated_script[:-3]
|
|
|
|
generated_script = generated_script.strip()
|
|
|
|
# Remove any leading non-code characters
|
|
if not generated_script.startswith('async ('):
|
|
async_start = generated_script.find('async (')
|
|
if async_start != -1:
|
|
generated_script = generated_script[async_start:]
|
|
|
|
return generated_script.strip(), None
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
app.logger.error(f"Gemini API request error: {e}")
|
|
return None, f"Error communicating with API: {e}"
|
|
except (KeyError, IndexError, Exception) as e:
|
|
app.logger.error(f"Error processing Gemini API response: {e} - Response: {response_data if 'response_data' in locals() else 'N/A'}")
|
|
return None, f"Error processing API response: {e}"
|
|
|
|
@app.route("/api/generate_script", methods=["POST"])
|
|
@login_required
|
|
def generate_script():
|
|
try:
|
|
natural_query = request.json.get('natural_query')
|
|
if not natural_query:
|
|
return jsonify({"error": "natural_query is required"}), 400
|
|
|
|
script_content, error = _generate_script_from_natural_language(natural_query)
|
|
|
|
if error:
|
|
return jsonify({"error": error}), 500
|
|
|
|
return jsonify({"script_content": script_content})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/execute', methods=['POST'])
|
|
def execute_code():
|
|
try:
|
|
# Extract code and convert request to a format acceptable by Node.js app
|
|
code = request.json.get('code')
|
|
request_obj = {
|
|
'method': request.method,
|
|
'headers': dict(request.headers),
|
|
'body': request.json,
|
|
'url': request.url,
|
|
'path': '/',
|
|
}
|
|
environment = request.json.get('environment_info')
|
|
environment_json = json.loads(environment)
|
|
|
|
# Call the Node.js API
|
|
response = requests.post(API_URL, json={'code': code, 'request': request_obj, 'environment': environment_json, 'name': "anonymous"})
|
|
response_data = response.json()
|
|
|
|
# check if playground=true is in the query string
|
|
if request.args.get('playground') == 'true':
|
|
return response_data
|
|
|
|
# Map the Node.js response to Flask response
|
|
flask_response = map_isolator_response_to_flask_response(response_data)
|
|
return flask_response
|
|
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/f/<int:user_id>/<path:function>', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD'])
|
|
def execute_http_function(user_id, function):
|
|
try:
|
|
# Split the function_path into the function name and the sub-path
|
|
parts = function.split('/', 1)
|
|
function_name = parts[0]
|
|
sub_path = '/' + parts[1] if len(parts) > 1 else '/'
|
|
|
|
http_function = db.get_http_function(user_id, function_name)
|
|
if not http_function:
|
|
return jsonify({'error': 'Function not found'}), 404
|
|
|
|
code = http_function['script_content']
|
|
environment = http_function['environment_info']
|
|
is_public = http_function['is_public']
|
|
log_request = http_function['log_request']
|
|
log_response = http_function['log_response']
|
|
version_number = http_function['version_number']
|
|
|
|
# Check if the function is public, if not check if the user is authenticated and owns the function
|
|
if not is_public:
|
|
if not current_user.is_authenticated:
|
|
return login_manager.unauthorized()
|
|
|
|
if int(current_user.id) != user_id:
|
|
return jsonify({'error': 'Function belongs to another user'}), 404
|
|
|
|
request_data = {
|
|
'method': request.method,
|
|
'headers': dict(request.headers),
|
|
'url': request.url,
|
|
'path': sub_path,
|
|
}
|
|
|
|
# Add JSON data if it exists
|
|
if request.is_json:
|
|
request_data['json'] = request.get_json()
|
|
|
|
# Add form data if it exists
|
|
if request.form:
|
|
request_data['form'] = {
|
|
key.rstrip('[]'): request.form.getlist(key) if key.endswith('[]') or len(request.form.getlist(key)) > 1 else request.form.getlist(key)[0]
|
|
for key in request.form.keys()
|
|
}
|
|
|
|
# Add query parameters if they exist
|
|
if request.args:
|
|
request_data['query'] = {
|
|
key.rstrip('[]'): request.args.getlist(key) if key.endswith('[]') or len(request.args.getlist(key)) > 1 else request.args.getlist(key)[0]
|
|
for key in request.args.keys()
|
|
}
|
|
|
|
# Add plain text data if it exists
|
|
if request.data and not request.is_json:
|
|
request_data['text'] = request.data.decode('utf-8')
|
|
|
|
# Call the Node.js API
|
|
response = requests.post(API_URL, json={'code': code, 'request': request_data, 'environment': environment, 'name': function_name})
|
|
response_data = response.json()
|
|
|
|
db.update_http_function_environment_info_and_invoked_count(user_id, function_name, response_data['environment'])
|
|
db.add_http_function_invocation(
|
|
http_function['id'],
|
|
response_data['status'],
|
|
request_data if log_request else {},
|
|
response_data['result'] if (log_response or response_data['status'] != 'SUCCESS') else {},
|
|
response_data['logs'],
|
|
version_number)
|
|
|
|
if response_data['status'] != 'SUCCESS':
|
|
return render_template("function_error.html", function_name=function_name ,error=response_data['result'], logs=response_data['logs'])
|
|
|
|
# Map the Node.js response to Flask response
|
|
flask_response = map_isolator_response_to_flask_response(response_data)
|
|
return flask_response
|
|
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'GET':
|
|
return render_template("login.html")
|
|
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
if not username or not password:
|
|
return render_template("login.html", error="Both username and password must be entered")
|
|
|
|
user_data = db.get_user_by_username(username)
|
|
if not user_data:
|
|
return render_template("login.html", error="User does not exist")
|
|
|
|
if not check_password_hash(user_data['password_hash'], password):
|
|
return render_template("login.html", error="Invalid username or password")
|
|
|
|
user = User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
|
|
|
|
# user should be an instance of your `User` class
|
|
login_user(user)
|
|
#flask.flash('Logged in successfully.')
|
|
|
|
next = request.args.get('next')
|
|
return redirect(next or url_for('home.index'))
|
|
|
|
@app.route('/signup', methods=['GET', 'POST'])
|
|
def signup():
|
|
if request.method == 'GET':
|
|
return render_template("signup.html")
|
|
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
if not username or not password:
|
|
return render_template("signup.html", error="Both username and password must be entered")
|
|
|
|
if len(username) < 10 or len(password) < 10:
|
|
return render_template("signup.html", error="Both username and password must be at least 10 characters long")
|
|
|
|
user = db.get_user_by_username(username)
|
|
if user:
|
|
return render_template("signup.html", error="User already exists")
|
|
|
|
hashed_password = generate_password_hash(password)
|
|
user_data = db.create_new_user(username, hashed_password)
|
|
|
|
user = User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
|
|
login_user(user)
|
|
|
|
return redirect(url_for('dashboard'))
|
|
|
|
@app.route("/logout")
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for('home'))
|
|
|
|
@app.route("/http_function_editor/<int:function_id>", methods=["GET"])
|
|
@login_required
|
|
def http_function_editor(function_id):
|
|
user_id = current_user.id
|
|
http_function = db.get_http_function_by_id(user_id, function_id)
|
|
if not http_function:
|
|
return jsonify({'error': 'Function not found'}), 404
|
|
|
|
# Create a view model with all necessary data for the editor
|
|
editor_data = {
|
|
'id': http_function['id'],
|
|
'name': http_function['name'],
|
|
'script_content': http_function['script_content'],
|
|
'environment_info': json.dumps(http_function['environment_info'], indent=2),
|
|
'is_public': http_function['is_public'],
|
|
'log_request': http_function['log_request'],
|
|
'log_response': http_function['log_response'],
|
|
'version_number': http_function['version_number'],
|
|
'user_id': user_id,
|
|
'function_id': function_id,
|
|
# Add new URLs for navigation
|
|
'cancel_url': url_for('dashboard_http_functions'),
|
|
'edit_url': url_for('http_function_editor', function_id=function_id),
|
|
}
|
|
|
|
if htmx:
|
|
return render_block(app.jinja_env, "dashboard/http_functions/editor.html", "page", **editor_data)
|
|
|
|
return render_template("dashboard/http_functions/editor.html", **editor_data)
|
|
|
|
@app.route("/api/http_functions", methods=["POST"])
|
|
@login_required
|
|
def api_create_http_function():
|
|
try:
|
|
user_id = current_user.id
|
|
data = request.get_json()
|
|
name = data.get('name')
|
|
script_content = data.get('script_content')
|
|
environment_info = data.get('environment_info')
|
|
is_public = data.get('is_public')
|
|
log_request = data.get('log_request')
|
|
log_response = data.get('log_response')
|
|
|
|
# Check if function with same name already exists for this user
|
|
existing_function = db.get_http_function(user_id, name)
|
|
if existing_function:
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": f"A function with the name '{name}' already exists"
|
|
}), 400
|
|
|
|
http_function = db.create_new_http_function(
|
|
user_id,
|
|
name,
|
|
script_content,
|
|
environment_info,
|
|
is_public,
|
|
log_request,
|
|
log_response
|
|
)
|
|
|
|
return jsonify({
|
|
"status": "success",
|
|
"message": f'{name} created',
|
|
"function": http_function
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": str(e)
|
|
}), 400
|
|
|
|
@app.route("/api/http_functions/<int:function_id>", methods=["POST"])
|
|
@login_required
|
|
def api_update_http_function(function_id):
|
|
try:
|
|
user_id = current_user.id
|
|
data = request.get_json()
|
|
name = data.get('name')
|
|
script_content = data.get('script_content')
|
|
environment_info = data.get('environment_info')
|
|
is_public = data.get('is_public')
|
|
log_request = data.get('log_request')
|
|
log_response = data.get('log_response')
|
|
|
|
updated_function = db.edit_http_function(
|
|
user_id,
|
|
function_id,
|
|
name,
|
|
script_content,
|
|
environment_info,
|
|
is_public,
|
|
log_request,
|
|
log_response
|
|
)
|
|
|
|
return jsonify({
|
|
"status": "success",
|
|
"message": f'{name} updated',
|
|
"function": updated_function
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": str(e)
|
|
}), 400
|
|
|
|
@app.route("/api/http_functions/<int:function_id>", methods=["DELETE"])
|
|
@login_required
|
|
def api_delete_http_function(function_id):
|
|
try:
|
|
user_id = current_user.id
|
|
db.delete_http_function(user_id, function_id)
|
|
|
|
return jsonify({
|
|
"status": "success",
|
|
"message": "Function deleted successfully"
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"status": "error",
|
|
"message": str(e)
|
|
}), 400
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
user_data = db.get_user(int(user_id))
|
|
if user_data:
|
|
return User(id=str(user_data['id']), username=user_data['username'], password_hash=user_data['password_hash'], created_at=user_data['created_at'])
|
|
return None
|
|
|
|
if __name__ == '__main__':
|
|
# Bind to PORT if defined, otherwise default to 5000.
|
|
port = int(os.environ.get('PORT', 5000))
|
|
app.run(host='127.0.0.1', port=port)
|