diff --git a/app.py b/app.py index a9b6946..dea2af3 100644 --- a/app.py +++ b/app.py @@ -18,6 +18,7 @@ from routes.llm import llm from routes.auth import auth from routes.settings import settings from routes.community import community +from routes.shared_env import shared_env from constants import DEFAULT_FUNCTION_NAME, DEFAULT_SCRIPT, DEFAULT_ENVIRONMENT from flask_apscheduler import APScheduler import asyncio @@ -47,6 +48,7 @@ app.register_blueprint(llm, url_prefix='/llm') app.register_blueprint(auth, url_prefix='/auth') app.register_blueprint(settings, url_prefix='/settings') app.register_blueprint(community, url_prefix='/community') +app.register_blueprint(shared_env, url_prefix='/shared_env') # Swith to inter app routing, which results in speed up from ~400ms to ~270ms # https://stackoverflow.com/questions/76886643/linking-two-not-exposed-dokku-apps @@ -212,6 +214,24 @@ async def execute_http_function(user_id, function): if request.data and not request.is_json: request_data['text'] = request.data.decode('utf-8') + # Load and inject shared environments (namespaced) + shared_envs = db.execute(''' + SELECT se.id, se.name, se.environment + FROM http_function_shared_envs hfse + JOIN shared_environments se ON hfse.shared_env_id = se.id + WHERE hfse.http_function_id = %s + ORDER BY se.name + ''', [http_function['id']]) + + # Inject shared environments as nested objects + combined_environment = environment.copy() + shared_env_map = {} # Track shared env IDs for later extraction + if shared_envs: + for se in shared_envs: + env_data = json.loads(se['environment']) if isinstance(se['environment'], str) else se['environment'] + combined_environment[se['name']] = env_data + shared_env_map[se['name']] = se['id'] + # Call the Node.js API asynchronously if runtime == 'deno': api_url = DENO_API_URL @@ -220,10 +240,29 @@ async def execute_http_function(user_id, function): else: api_url = NODE_API_URL async with aiohttp.ClientSession() as session: - async with session.post(api_url, json={'code': code, 'request': request_data, 'environment': environment, 'name': function_name}) as response: + async with session.post(api_url, json={'code': code, 'request': request_data, 'environment': combined_environment, 'name': function_name}) as response: response_data = await response.json() - db.update_http_function_environment_info_and_invoked_count(user_id, function_name, response_data['environment']) + # Extract and persist shared environment mutations + returned_env = response_data['environment'] + function_specific_env = {} + + # Separate function-specific properties from shared environments + for key, value in returned_env.items(): + if key in shared_env_map: + # This is a shared environment - save it back + db.execute( + 'UPDATE shared_environments SET environment=%s, updated_at=NOW() WHERE id=%s', + [json.dumps(value), shared_env_map[key]], + commit=True + ) + else: + # This is function-specific - keep it + function_specific_env[key] = value + + # Update function's own environment (without shared envs) + db.update_http_function_environment_info_and_invoked_count(user_id, function_name, function_specific_env) + db.add_http_function_invocation( http_function['id'], response_data['status'], diff --git a/routes/shared_env.py b/routes/shared_env.py new file mode 100644 index 0000000..f3a273f --- /dev/null +++ b/routes/shared_env.py @@ -0,0 +1,417 @@ +from flask import Blueprint, request, jsonify, render_template +from flask_login import login_required, current_user +from extensions import db, htmx +from jinja2_fragments import render_block +import json + +''' +-- 1. Create shared_environments table +CREATE TABLE IF NOT EXISTS shared_environments ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL, + name VARCHAR(255) NOT NULL, + environment JSONB NOT NULL DEFAULT '{}', + description TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + CONSTRAINT fk_shared_env_user + FOREIGN KEY (user_id) + REFERENCES users (id) + ON DELETE CASCADE, + + CONSTRAINT unique_shared_env_name_per_user + UNIQUE (user_id, name) +); + +CREATE INDEX idx_shared_env_user ON shared_environments(user_id); + +-- 2. Create junction table for HTTP functions +CREATE TABLE IF NOT EXISTS http_function_shared_envs ( + id SERIAL PRIMARY KEY, + http_function_id INT NOT NULL, + shared_env_id INT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + CONSTRAINT fk_http_function + FOREIGN KEY (http_function_id) + REFERENCES http_functions (id) + ON DELETE CASCADE, + + CONSTRAINT fk_shared_env_http + FOREIGN KEY (shared_env_id) + REFERENCES shared_environments (id) + ON DELETE CASCADE, + + CONSTRAINT unique_http_function_shared_env + UNIQUE (http_function_id, shared_env_id) +); + +CREATE INDEX idx_http_func_shared_env ON http_function_shared_envs(http_function_id); + +-- 3. Create junction table for Timer functions +CREATE TABLE IF NOT EXISTS timer_function_shared_envs ( + id SERIAL PRIMARY KEY, + timer_function_id INT NOT NULL, + shared_env_id INT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + CONSTRAINT fk_timer_function + FOREIGN KEY (timer_function_id) + REFERENCES timer_functions (id) + ON DELETE CASCADE, + + CONSTRAINT fk_shared_env_timer + FOREIGN KEY (shared_env_id) + REFERENCES shared_environments (id) + ON DELETE CASCADE, + + CONSTRAINT unique_timer_function_shared_env + UNIQUE (timer_function_id, shared_env_id) +); + +CREATE INDEX idx_timer_func_shared_env ON timer_function_shared_envs(timer_function_id); +''' + +shared_env = Blueprint('shared_env', __name__) + +@shared_env.route('/', methods=['GET']) +@login_required +def list_shared_environments(): + """List all shared environments for the current user""" + envs = db.execute( + 'SELECT id, name, environment, description, created_at, updated_at FROM shared_environments WHERE user_id=%s ORDER BY name', + [current_user.id] + ) + + # Check if HTMX request + if request.headers.get('HX-Request'): + return render_template('dashboard/shared_environments/index.html', environments=envs) + + # For API/fetch requests, return JSON + if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html: + return jsonify(envs if envs else []) + + # For regular page loads + return render_template('dashboard/shared_environments/index.html', environments=envs) + +@shared_env.route('/new', methods=['GET', 'POST']) +@login_required +def create(): + """Create a new shared environment""" + if request.method == 'GET': + # Show creation form + if htmx: + return render_block('environment', 'dashboard/shared_environments/new.html', 'page') + return render_template('dashboard/shared_environments/new.html') + + # Handle POST - create new shared environment + try: + data = request.json + name = data.get('name') + environment = data.get('environment') + description = data.get('description', '') + + # Validate name + if not name: + return jsonify({'status': 'error', 'message': 'Name is required'}), 400 + + # Validate environment JSON + if isinstance(environment, str): + try: + environment_dict = json.loads(environment) + except json.JSONDecodeError: + return jsonify({'status': 'error', 'message': 'Invalid JSON in environment'}), 400 + else: + environment_dict = environment + + # Create shared environment + result = db.execute( + 'INSERT INTO shared_environments (user_id, name, environment, description) VALUES (%s, %s, %s, %s) RETURNING id', + [current_user.id, name, json.dumps(environment_dict), description], + commit=True, + one=True + ) + + env_id = result['id'] if result else None + + return jsonify({ + 'status': 'success', + 'message': f'Shared environment "{name}" created successfully', + 'id': env_id + }), 201 + + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 + +@shared_env.route('/', methods=['GET']) +@login_required +def get(env_id): + """Get a specific shared environment""" + env = db.execute( + 'SELECT id, name, environment, description FROM shared_environments WHERE id=%s AND user_id=%s', + [env_id, current_user.id], + one=True + ) + + if not env: + return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404 + + return jsonify(env) + +@shared_env.route('/', methods=['PUT']) +@login_required +def update(env_id): + """Update a shared environment""" + try: + # Verify ownership + existing = db.execute( + 'SELECT id, name FROM shared_environments WHERE id=%s AND user_id=%s', + [env_id, current_user.id], + one=True + ) + + if not existing: + return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404 + + data = request.json + name = data.get('name') + environment = data.get('environment') + description = data.get('description', '') + + # Validate environment JSON + if isinstance(environment, str): + try: + environment_dict = json.loads(environment) + except json.JSONDecodeError: + return jsonify({'status': 'error', 'message': 'Invalid JSON in environment'}), 400 + else: + environment_dict = environment + + db.execute( + 'UPDATE shared_environments SET name=%s, environment=%s, description=%s, updated_at=NOW() WHERE id=%s AND user_id=%s', + [name, json.dumps(environment_dict), description, env_id, current_user.id], + commit=True + ) + + return jsonify({ + 'status': 'success', + 'message': f'Shared environment "{name}" updated successfully' + }) + + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 + +@shared_env.route('/', methods=['DELETE']) +@login_required +def delete(env_id): + """Delete a shared environment""" + try: + # Verify ownership + existing = db.execute( + 'SELECT name FROM shared_environments WHERE id=%s AND user_id=%s', + [env_id, current_user.id], + one=True + ) + + if not existing: + return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404 + + db.execute( + 'DELETE FROM shared_environments WHERE id=%s AND user_id=%s', + [env_id, current_user.id], + commit=True + ) + + return jsonify({ + 'status': 'success', + 'message': f'Shared environment "{existing["name"]}" deleted successfully' + }) + + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 + +@shared_env.route('//linked-functions', methods=['GET']) +@login_required +def get_linked_functions(env_id): + """Get all functions linked to this shared environment""" + try: + # Verify ownership + existing = db.execute( + 'SELECT id FROM shared_environments WHERE id=%s AND user_id=%s', + [env_id, current_user.id], + one=True + ) + + if not existing: + return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404 + + # Get linked HTTP functions + http_functions = db.execute(''' + SELECT hf.id, hf.name, 'http' as type + FROM http_function_shared_envs hfse + JOIN http_functions hf ON hfse.http_function_id = hf.id + WHERE hfse.shared_env_id = %s AND hf.user_id = %s + ORDER BY hf.name + ''', [env_id, current_user.id]) + + # Get linked Timer functions + timer_functions = db.execute(''' + SELECT tf.id, tf.name, 'timer' as type + FROM timer_function_shared_envs tfse + JOIN timer_functions tf ON tfse.timer_function_id = tf.id + WHERE tfse.shared_env_id = %s AND tf.user_id = %s + ORDER BY tf.name + ''', [env_id, current_user.id]) + + return jsonify({ + 'http_functions': http_functions if http_functions else [], + 'timer_functions': timer_functions if timer_functions else [] + }) + + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 + +@shared_env.route('//available-functions', methods=['GET']) +@login_required +def get_available_functions(env_id): + """Get all functions that can be linked (not already linked)""" + try: + # Get all HTTP functions for this user + all_http = db.execute( + 'SELECT id, name FROM http_functions WHERE user_id=%s ORDER BY name', + [current_user.id] + ) + + # Get already linked HTTP functions + linked_http = db.execute(''' + SELECT http_function_id + FROM http_function_shared_envs + WHERE shared_env_id = %s + ''', [env_id]) + + linked_http_ids = [row['http_function_id'] for row in (linked_http or [])] + + # Filter out already linked + available_http = [func for func in (all_http or []) if func['id'] not in linked_http_ids] + + # Get all Timer functions for this user + all_timer = db.execute( + 'SELECT id, name FROM timer_functions WHERE user_id=%s ORDER BY name', + [current_user.id] + ) + + # Get already linked Timer functions + linked_timer = db.execute(''' + SELECT timer_function_id + FROM timer_function_shared_envs + WHERE shared_env_id = %s + ''', [env_id]) + + linked_timer_ids = [row['timer_function_id'] for row in (linked_timer or [])] + + # Filter out already linked + available_timer = [func for func in (all_timer or []) if func['id'] not in linked_timer_ids] + + return jsonify({ + 'http_functions': available_http, + 'timer_functions': available_timer + }) + + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 + +@shared_env.route('//link-function', methods=['POST']) +@login_required +def link_function(env_id): + """Link a function to this shared environment""" + try: + # Verify ownership + existing = db.execute( + 'SELECT id FROM shared_environments WHERE id=%s AND user_id=%s', + [env_id, current_user.id], + one=True + ) + + if not existing: + return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404 + + data = request.json + function_id = data.get('function_id') + function_type = data.get('function_type') # 'http' or 'timer' + + if not function_id or not function_type: + return jsonify({'status': 'error', 'message': 'Missing function_id or function_type'}), 400 + + if function_type == 'http': + # Verify function ownership + func = db.execute( + 'SELECT id FROM http_functions WHERE id=%s AND user_id=%s', + [function_id, current_user.id], + one=True + ) + if not func: + return jsonify({'status': 'error', 'message': 'Function not found'}), 404 + + # Link it + db.execute( + 'INSERT INTO http_function_shared_envs (http_function_id, shared_env_id) VALUES (%s, %s) ON CONFLICT DO NOTHING', + [function_id, env_id], + commit=True + ) + elif function_type == 'timer': + # Verify function ownership + func = db.execute( + 'SELECT id FROM timer_functions WHERE id=%s AND user_id=%s', + [function_id, current_user.id], + one=True + ) + if not func: + return jsonify({'status': 'error', 'message': 'Function not found'}), 404 + + # Link it + db.execute( + 'INSERT INTO timer_function_shared_envs (timer_function_id, shared_env_id) VALUES (%s, %s) ON CONFLICT DO NOTHING', + [function_id, env_id], + commit=True + ) + else: + return jsonify({'status': 'error', 'message': 'Invalid function_type'}), 400 + + return jsonify({'status': 'success', 'message': 'Function linked successfully'}) + + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 + +@shared_env.route('//unlink-function', methods=['POST']) +@login_required +def unlink_function(env_id): + """Unlink a function from this shared environment""" + try: + data = request.json + function_id = data.get('function_id') + function_type = data.get('function_type') # 'http' or 'timer' + + if not function_id or not function_type: + return jsonify({'status': 'error', 'message': 'Missing function_id or function_type'}), 400 + + if function_type == 'http': + db.execute( + 'DELETE FROM http_function_shared_envs WHERE http_function_id=%s AND shared_env_id=%s', + [function_id, env_id], + commit=True + ) + elif function_type == 'timer': + db.execute( + 'DELETE FROM timer_function_shared_envs WHERE timer_function_id=%s AND shared_env_id=%s', + [function_id, env_id], + commit=True + ) + else: + return jsonify({'status': 'error', 'message': 'Invalid function_type'}), 400 + + return jsonify({'status': 'success', 'message': 'Function unlinked successfully'}) + + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 + diff --git a/templates/dashboard.html b/templates/dashboard.html index 408aa5e..fd4a8a0 100644 --- a/templates/dashboard.html +++ b/templates/dashboard.html @@ -53,6 +53,16 @@ d="M12 6v6h4.5m4.5 0a9 9 0 1 1-18 0 9 9 0 0 1 18 0Z" /> Scheduled Jobs + + + + + + Shared Environments
@@ -243,7 +253,17 @@ d="M12 6v6h4.5m4.5 0a9 9 0 1 1-18 0 9 9 0 0 1 18 0Z" /> Scheduled Jobs - + + + + + + Shared Environments +
@@ -305,4 +325,4 @@ if (mobileSidebarOverlay) mobileSidebarOverlay.addEventListener('click', toggleSidebar); -{% endblock %} \ No newline at end of file +{% endblock %} diff --git a/templates/dashboard/shared_environments/index.html b/templates/dashboard/shared_environments/index.html new file mode 100644 index 0000000..e03903e --- /dev/null +++ b/templates/dashboard/shared_environments/index.html @@ -0,0 +1,458 @@ +{% extends 'dashboard.html' %} + +{% block page %} +
+
+
+

Shared Environments

+

Manage reusable environment configurations that can be + injected into your functions

+
+ +
+ + {% if environments and environments|length > 0 %} +
+ {% for env in environments %} +
+
+
+

{{ env.name }}

+ {% if env.description %} +

{{ env.description }}

+ {% endif %} +
+
{{ env.environment | tojson(indent=2) }}
+
+
+ Created: {{ env.created_at.strftime('%Y-%m-%d %H:%M') if env.created_at else 'Unknown' }} + {% if env.updated_at and env.updated_at != env.created_at %} + | Updated: {{ env.updated_at.strftime('%Y-%m-%d %H:%M') }} + {% endif %} +
+ + +
+
+ + +
+ +
+
+
+ + +
+
+
+ {% endfor %} +
+ {% else %} +
+ + + + +

No shared environments

+

Get started by creating a new shared environment

+
+ +
+
+ {% endif %} +
+ + + + + + + + + +{% endblock %} \ No newline at end of file diff --git a/worker.py b/worker.py index 3cb8d84..dd7b5ff 100644 --- a/worker.py +++ b/worker.py @@ -44,16 +44,51 @@ async def execute_timer_function_async(timer_function): api_url = PYTHON_API_URL else: api_url = NODE_API_URL + + # Load and inject shared environments (namespaced) + shared_envs = db.execute(''' + SELECT se.id, se.name, se.environment + FROM timer_function_shared_envs tfse + JOIN shared_environments se ON tfse.shared_env_id = se.id + WHERE tfse.timer_function_id = %s + ORDER BY se.name + ''', [timer_function['id']]) + + # Inject shared environments as nested objects + combined_environment = environment.copy() if environment else {} + shared_env_map = {} # Track shared env IDs for later extraction + if shared_envs: + for se in shared_envs: + env_data = json.loads(se['environment']) if isinstance(se['environment'], str) else se['environment'] + combined_environment[se['name']] = env_data + shared_env_map[se['name']] = se['id'] async with aiohttp.ClientSession() as session: async with session.post(api_url, json={ 'code': code, 'request': {'method': 'TIMER'}, - 'environment': environment, + 'environment': combined_environment, 'name': name }) as response: response_data = await response.json() + # Extract and persist shared environment mutations + returned_env = response_data['environment'] + function_specific_env = {} + + # Separate function-specific properties from shared environments + for key, value in returned_env.items(): + if key in shared_env_map: + # This is a shared environment - save it back + db.execute( + 'UPDATE shared_environments SET environment=%s, updated_at=NOW() WHERE id=%s', + [json.dumps(value), shared_env_map[key]], + commit=True + ) + else: + # This is function-specific - keep it + function_specific_env[key] = value + # Update environment and record invocation # Calculate next run time based on trigger type next_run = None @@ -74,7 +109,7 @@ async def execute_timer_function_async(timer_function): last_run = NOW(), next_run = %s WHERE id = %s - """, [json.dumps(response_data['environment']), next_run, timer_function['id']], commit=True) + """, [json.dumps(function_specific_env), next_run, timer_function['id']], commit=True) # Record the invocation db.execute(""" @@ -162,4 +197,4 @@ if __name__ == '__main__': try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): - pass \ No newline at end of file + pass