Files
function/routes/shared_env.py

560 lines
20 KiB
Python

from flask import Blueprint, request, jsonify, render_template
from flask_login import login_required, current_user
from extensions import db, htmx
from jinja2_fragments import render_block
import json
'''
-- 1. Create shared_environments table
CREATE TABLE IF NOT EXISTS shared_environments (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL,
name VARCHAR(255) NOT NULL,
environment JSONB NOT NULL DEFAULT '{}',
description TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT fk_shared_env_user
FOREIGN KEY (user_id)
REFERENCES users (id)
ON DELETE CASCADE,
CONSTRAINT unique_shared_env_name_per_user
UNIQUE (user_id, name)
);
CREATE INDEX idx_shared_env_user ON shared_environments(user_id);
-- 2. Create junction table for HTTP functions
CREATE TABLE IF NOT EXISTS http_function_shared_envs (
id SERIAL PRIMARY KEY,
http_function_id INT NOT NULL,
shared_env_id INT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT fk_http_function
FOREIGN KEY (http_function_id)
REFERENCES http_functions (id)
ON DELETE CASCADE,
CONSTRAINT fk_shared_env_http
FOREIGN KEY (shared_env_id)
REFERENCES shared_environments (id)
ON DELETE CASCADE,
CONSTRAINT unique_http_function_shared_env
UNIQUE (http_function_id, shared_env_id)
);
CREATE INDEX idx_http_func_shared_env ON http_function_shared_envs(http_function_id);
-- 3. Create junction table for Timer functions
CREATE TABLE IF NOT EXISTS timer_function_shared_envs (
id SERIAL PRIMARY KEY,
timer_function_id INT NOT NULL,
shared_env_id INT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT fk_timer_function
FOREIGN KEY (timer_function_id)
REFERENCES timer_functions (id)
ON DELETE CASCADE,
CONSTRAINT fk_shared_env_timer
FOREIGN KEY (shared_env_id)
REFERENCES shared_environments (id)
ON DELETE CASCADE,
CONSTRAINT unique_timer_function_shared_env
UNIQUE (timer_function_id, shared_env_id)
);
CREATE INDEX idx_timer_func_shared_env ON timer_function_shared_envs(timer_function_id);
CREATE INDEX idx_shared_env_versions ON shared_environment_versions(shared_env_id);
-- 2. Add version_number column to main table
ALTER TABLE shared_environments
ADD COLUMN IF NOT EXISTS version_number INT NOT NULL DEFAULT 1;
-- 3. Create trigger function for automatic versioning
CREATE OR REPLACE FUNCTION fn_shared_env_versioning()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
next_version INT;
BEGIN
IF TG_OP = 'INSERT' THEN
-- Create initial version record
INSERT INTO shared_environment_versions (shared_env_id, environment, version_number)
VALUES (NEW.id, NEW.environment, 1);
-- Ensure version_number is set explicitly
UPDATE shared_environments
SET version_number = 1
WHERE id = NEW.id;
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
-- Only create new version if environment JSON changed
IF NEW.environment IS DISTINCT FROM OLD.environment THEN
-- Get next version number
SELECT COALESCE(MAX(version_number), 0) + 1
INTO next_version
FROM shared_environment_versions
WHERE shared_env_id = NEW.id;
-- Insert new version record
INSERT INTO shared_environment_versions (shared_env_id, environment, version_number)
VALUES (NEW.id, NEW.environment, next_version);
-- Update main table version number
UPDATE shared_environments
SET version_number = next_version
WHERE id = NEW.id;
END IF;
RETURN NEW;
END IF;
RETURN NEW;
END;
$$;
-- 4. Create trigger to fire on INSERT or UPDATE
CREATE TRIGGER tr_shared_env_versioning
AFTER INSERT OR UPDATE
ON shared_environments
FOR EACH ROW
EXECUTE PROCEDURE fn_shared_env_versioning();
'''
shared_env = Blueprint('shared_env', __name__)
@shared_env.route('/', methods=['GET'])
@login_required
def list_shared_environments():
"""List all shared environments for the current user"""
envs = db.execute(
'SELECT id, name, environment, description, created_at, updated_at FROM shared_environments WHERE user_id=%s ORDER BY name',
[current_user.id]
)
# Check if HTMX request
if request.headers.get('HX-Request'):
return render_template('dashboard/shared_environments/index.html', environments=envs)
# For API/fetch requests, return JSON
if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html:
return jsonify(envs if envs else [])
# For regular page loads
return render_template('dashboard/shared_environments/index.html', environments=envs)
@shared_env.route('/new', methods=['GET', 'POST'])
@login_required
def create():
"""Create a new shared environment"""
if request.method == 'GET':
# Show creation form
if htmx:
return render_block('environment', 'dashboard/shared_environments/new.html', 'page')
return render_template('dashboard/shared_environments/new.html')
# Handle POST - create new shared environment
try:
data = request.json
name = data.get('name')
environment = data.get('environment')
description = data.get('description', '')
# Validate name
if not name:
return jsonify({'status': 'error', 'message': 'Name is required'}), 400
# Validate environment JSON
if isinstance(environment, str):
try:
environment_dict = json.loads(environment)
except json.JSONDecodeError:
return jsonify({'status': 'error', 'message': 'Invalid JSON in environment'}), 400
else:
environment_dict = environment
# Create shared environment
result = db.execute(
'INSERT INTO shared_environments (user_id, name, environment, description) VALUES (%s, %s, %s, %s) RETURNING id',
[current_user.id, name, json.dumps(environment_dict), description],
commit=True,
one=True
)
env_id = result['id'] if result else None
return jsonify({
'status': 'success',
'message': f'Shared environment "{name}" created successfully',
'id': env_id
}), 201
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@shared_env.route('/<int:env_id>', methods=['GET'])
@login_required
def get(env_id):
"""Get a specific shared environment"""
env = db.execute(
'SELECT id, name, environment, description FROM shared_environments WHERE id=%s AND user_id=%s',
[env_id, current_user.id],
one=True
)
if not env:
return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404
return jsonify(env)
@shared_env.route('/<int:env_id>', methods=['PUT'])
@login_required
def update(env_id):
"""Update a shared environment"""
try:
# Verify ownership
existing = db.execute(
'SELECT id, name FROM shared_environments WHERE id=%s AND user_id=%s',
[env_id, current_user.id],
one=True
)
if not existing:
return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404
data = request.json
name = data.get('name')
environment = data.get('environment')
description = data.get('description', '')
# Validate environment JSON
if isinstance(environment, str):
try:
environment_dict = json.loads(environment)
except json.JSONDecodeError:
return jsonify({'status': 'error', 'message': 'Invalid JSON in environment'}), 400
else:
environment_dict = environment
db.execute(
'UPDATE shared_environments SET name=%s, environment=%s, description=%s, updated_at=NOW() WHERE id=%s AND user_id=%s',
[name, json.dumps(environment_dict), description, env_id, current_user.id],
commit=True
)
return jsonify({
'status': 'success',
'message': f'Shared environment "{name}" updated successfully'
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@shared_env.route('/<int:env_id>', methods=['DELETE'])
@login_required
def delete(env_id):
"""Delete a shared environment"""
try:
# Verify ownership
existing = db.execute(
'SELECT name FROM shared_environments WHERE id=%s AND user_id=%s',
[env_id, current_user.id],
one=True
)
if not existing:
return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404
db.execute(
'DELETE FROM shared_environments WHERE id=%s AND user_id=%s',
[env_id, current_user.id],
commit=True
)
return jsonify({
'status': 'success',
'message': f'Shared environment "{existing["name"]}" deleted successfully'
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@shared_env.route('/<int:env_id>/linked-functions', methods=['GET'])
@login_required
def get_linked_functions(env_id):
"""Get all functions linked to this shared environment"""
try:
# Verify ownership
existing = db.execute(
'SELECT id FROM shared_environments WHERE id=%s AND user_id=%s',
[env_id, current_user.id],
one=True
)
if not existing:
return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404
# Get linked HTTP functions
http_functions = db.execute('''
SELECT hf.id, hf.name, 'http' as type
FROM http_function_shared_envs hfse
JOIN http_functions hf ON hfse.http_function_id = hf.id
WHERE hfse.shared_env_id = %s AND hf.user_id = %s
ORDER BY hf.name
''', [env_id, current_user.id])
# Get linked Timer functions
timer_functions = db.execute('''
SELECT tf.id, tf.name, 'timer' as type
FROM timer_function_shared_envs tfse
JOIN timer_functions tf ON tfse.timer_function_id = tf.id
WHERE tfse.shared_env_id = %s AND tf.user_id = %s
ORDER BY tf.name
''', [env_id, current_user.id])
return jsonify({
'http_functions': http_functions if http_functions else [],
'timer_functions': timer_functions if timer_functions else []
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@shared_env.route('/<int:env_id>/available-functions', methods=['GET'])
@login_required
def get_available_functions(env_id):
"""Get all functions that can be linked (not already linked)"""
try:
# Get all HTTP functions for this user
all_http = db.execute(
'SELECT id, name FROM http_functions WHERE user_id=%s ORDER BY name',
[current_user.id]
)
# Get already linked HTTP functions
linked_http = db.execute('''
SELECT http_function_id
FROM http_function_shared_envs
WHERE shared_env_id = %s
''', [env_id])
linked_http_ids = [row['http_function_id'] for row in (linked_http or [])]
# Filter out already linked
available_http = [func for func in (all_http or []) if func['id'] not in linked_http_ids]
# Get all Timer functions for this user
all_timer = db.execute(
'SELECT id, name FROM timer_functions WHERE user_id=%s ORDER BY name',
[current_user.id]
)
# Get already linked Timer functions
linked_timer = db.execute('''
SELECT timer_function_id
FROM timer_function_shared_envs
WHERE shared_env_id = %s
''', [env_id])
linked_timer_ids = [row['timer_function_id'] for row in (linked_timer or [])]
# Filter out already linked
available_timer = [func for func in (all_timer or []) if func['id'] not in linked_timer_ids]
return jsonify({
'http_functions': available_http,
'timer_functions': available_timer
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@shared_env.route('/<int:env_id>/link-function', methods=['POST'])
@login_required
def link_function(env_id):
"""Link a function to this shared environment"""
try:
# Verify ownership
existing = db.execute(
'SELECT id FROM shared_environments WHERE id=%s AND user_id=%s',
[env_id, current_user.id],
one=True
)
if not existing:
return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404
data = request.json
function_id = data.get('function_id')
function_type = data.get('function_type') # 'http' or 'timer'
if not function_id or not function_type:
return jsonify({'status': 'error', 'message': 'Missing function_id or function_type'}), 400
if function_type == 'http':
# Verify function ownership
func = db.execute(
'SELECT id FROM http_functions WHERE id=%s AND user_id=%s',
[function_id, current_user.id],
one=True
)
if not func:
return jsonify({'status': 'error', 'message': 'Function not found'}), 404
# Link it
db.execute(
'INSERT INTO http_function_shared_envs (http_function_id, shared_env_id) VALUES (%s, %s) ON CONFLICT DO NOTHING',
[function_id, env_id],
commit=True
)
elif function_type == 'timer':
# Verify function ownership
func = db.execute(
'SELECT id FROM timer_functions WHERE id=%s AND user_id=%s',
[function_id, current_user.id],
one=True
)
if not func:
return jsonify({'status': 'error', 'message': 'Function not found'}), 404
# Link it
db.execute(
'INSERT INTO timer_function_shared_envs (timer_function_id, shared_env_id) VALUES (%s, %s) ON CONFLICT DO NOTHING',
[function_id, env_id],
commit=True
)
else:
return jsonify({'status': 'error', 'message': 'Invalid function_type'}), 400
return jsonify({'status': 'success', 'message': 'Function linked successfully'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@shared_env.route('/<int:env_id>/unlink-function', methods=['POST'])
@login_required
def unlink_function(env_id):
"""Unlink a function from this shared environment"""
try:
data = request.json
function_id = data.get('function_id')
function_type = data.get('function_type') # 'http' or 'timer'
if not function_id or not function_type:
return jsonify({'status': 'error', 'message': 'Missing function_id or function_type'}), 400
if function_type == 'http':
db.execute(
'DELETE FROM http_function_shared_envs WHERE http_function_id=%s AND shared_env_id=%s',
[function_id, env_id],
commit=True
)
elif function_type == 'timer':
db.execute(
'DELETE FROM timer_function_shared_envs WHERE timer_function_id=%s AND shared_env_id=%s',
[function_id, env_id],
commit=True
)
else:
return jsonify({'status': 'error', 'message': 'Invalid function_type'}), 400
return jsonify({'status': 'success', 'message': 'Function unlinked successfully'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@shared_env.route('/<int:env_id>/history', methods=['GET'])
@login_required
def history(env_id):
"""Get version history for a shared environment"""
try:
# Verify ownership
existing = db.execute(
'SELECT id, name FROM shared_environments WHERE id=%s AND user_id=%s',
[env_id, current_user.id],
one=True
)
if not existing:
return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404
# Fetch all versions
versions = db.execute('''
SELECT version_number, environment, versioned_at
FROM shared_environment_versions
WHERE shared_env_id = %s
ORDER BY version_number DESC
''', [env_id])
# Convert datetime objects to ISO format strings
for version in versions or []:
version['versioned_at'] = version['versioned_at'].isoformat() if version.get('versioned_at') else None
return jsonify({
'status': 'success',
'env_name': existing['name'],
'versions': versions if versions else []
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@shared_env.route('/<int:env_id>/restore', methods=['POST'])
@login_required
def restore(env_id):
"""Restore a shared environment to a previous version"""
try:
version_number = request.json.get('version_number')
if not version_number:
return jsonify({'status': 'error', 'message': 'Version number is required'}), 400
# Verify ownership
existing = db.execute(
'SELECT id, name FROM shared_environments WHERE id=%s AND user_id=%s',
[env_id, current_user.id],
one=True
)
if not existing:
return jsonify({'status': 'error', 'message': 'Shared environment not found'}), 404
# Fetch the selected version's environment data
version_data = db.execute(
'SELECT environment FROM shared_environment_versions WHERE shared_env_id=%s AND version_number=%s',
[env_id, version_number],
one=True
)
if not version_data:
return jsonify({'status': 'error', 'message': 'Version not found'}), 404
# Update the shared environment with the old version's data
# This will trigger the versioning function to create a new version
db.execute(
'UPDATE shared_environments SET environment=%s, updated_at=NOW() WHERE id=%s',
[json.dumps(version_data['environment']), env_id],
commit=True
)
return jsonify({
'status': 'success',
'message': f'Restored to version {version_number}'
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500