from flask import Blueprint, render_template, request, redirect, url_for, flash from flask_login import login_required, current_user from extensions import db, environment, htmx from jinja2_fragments import render_block import secrets import json settings = Blueprint('settings', __name__) @settings.route("/", methods=["GET"]) @login_required def index(): return redirect(url_for('settings.api_keys')) @settings.route("/export", methods=["GET"]) @login_required def export(): """Display data export page or download data export""" # Check if this is a download request if request.args.get('download') == 'true': from flask import make_response from datetime import datetime user_id = current_user.id # Get all user data export_data = db.export_user_data(user_id) # Add export metadata export_data['_export_metadata'] = { 'exported_at': datetime.now().isoformat(), 'export_version': '1.0', 'application': 'Functions Platform' } # Create JSON response response = make_response(json.dumps(export_data, indent=2, default=str)) response.headers['Content-Type'] = 'application/json' # Generate filename with username and timestamp username = current_user.username timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'user_data_export_{username}_{timestamp}.json' response.headers['Content-Disposition'] = f'attachment; filename={filename}' return response # Otherwise show the export page if htmx: return render_block( environment, "dashboard/settings/export.html", "page" ) return render_template("dashboard/settings/export.html") @settings.route("/api-keys", methods=["GET"]) @login_required def api_keys(): user_id = current_user.id api_keys = db.list_api_keys(user_id) # Parse scopes for display for key in api_keys: if isinstance(key['scopes'], str): key['scopes'] = json.loads(key['scopes']) # Fetch user's functions for scoping functions = db.get_http_functions_for_user(user_id) if htmx: return render_block( environment, "dashboard/settings/api_keys.html", "page", api_keys=api_keys, functions=functions ) return render_template("dashboard/settings/api_keys.html", api_keys=api_keys, functions=functions) @settings.route("/api-keys", methods=["POST"]) @login_required def create_api_key(): user_id = current_user.id name = request.form.get("name", "My API Key") scopes_list = request.form.getlist("scopes") if not scopes_list: scopes = ["*"] else: scopes = scopes_list # Generate a secure random key key = f"sk_{secrets.token_urlsafe(24)}" db.create_api_key(user_id, name, key, scopes) flash(f"API Key created: {key} - Save it now, you won't see it again!", "success") return redirect(url_for("settings.api_keys")) @settings.route("/api-keys/", methods=["DELETE"]) @login_required def delete_api_key(key_id): user_id = current_user.id db.delete_api_key(user_id, key_id) return "", 200 @settings.route("/theme", methods=["POST"]) @login_required def toggle_theme(): user_id = current_user.id theme = request.form.get("theme") if theme in ['light', 'dark']: db.update_user_theme_preference(user_id, theme) # Return empty string as we'll handle the UI update via client-side JS or just let the class toggle persist # Actually, for HTMX we might want to return something or just 200 OK. return "", 200 return "Invalid theme", 400 @settings.route("/database_schema", methods=["GET"]) @login_required def database_schema(): """Display database schema with ERD visualization""" # Fetch database schema information schema_info = get_database_schema() if htmx: return render_block( environment, "dashboard/settings/database_schema.html", "page", schema_info=schema_info ) return render_template("dashboard/settings/database_schema.html", schema_info=schema_info) def get_database_schema(): """Fetch database schema information for ERD generation""" # Get all tables tables = db.execute(""" SELECT table_name, COALESCE(obj_description((quote_ident(table_schema)||'.'||quote_ident(table_name))::regclass), '') as table_comment FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' ORDER BY table_name """) schema_data = [] for table in tables or []: table_name = table['table_name'] # Get columns for this table columns = db.execute(""" SELECT column_name, data_type, is_nullable, column_default, character_maximum_length FROM information_schema.columns WHERE table_schema = 'public' AND table_name = %s ORDER BY ordinal_position """, [table_name]) # Get foreign keys for this table foreign_keys = db.execute(""" SELECT kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name = %s AND tc.table_schema = 'public' """, [table_name]) # Get primary keys primary_keys = db.execute(""" SELECT kcu.column_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema WHERE tc.constraint_type = 'PRIMARY KEY' AND tc.table_name = %s AND tc.table_schema = 'public' """, [table_name]) pk_columns = [pk['column_name'] for pk in (primary_keys or [])] schema_data.append({ 'table_name': table_name, 'columns': columns or [], 'foreign_keys': foreign_keys or [], 'primary_keys': pk_columns }) return schema_data