from flask import Blueprint, render_template, request, redirect, url_for, flash from flask_login import login_required, current_user from extensions import db, environment, htmx from jinja2_fragments import render_block import secrets import json settings = Blueprint('settings', __name__) @settings.route("/", methods=["GET"]) @login_required def index(): return redirect(url_for('settings.api_keys')) @settings.route("/export", methods=["GET"]) @login_required def export(): """Display data export page or download data export""" # Check if this is a download request if request.args.get('download') == 'true': from flask import make_response from datetime import datetime user_id = current_user.id # Get all user data export_data = db.export_user_data(user_id) # Add export metadata export_data['_export_metadata'] = { 'exported_at': datetime.now().isoformat(), 'export_version': '1.0', 'application': 'Functions Platform' } # Create JSON response response = make_response(json.dumps(export_data, indent=2, default=str)) response.headers['Content-Type'] = 'application/json' # Generate filename with username and timestamp username = current_user.username timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'user_data_export_{username}_{timestamp}.json' response.headers['Content-Disposition'] = f'attachment; filename={filename}' return response # Otherwise show the export page if htmx: return render_block( environment, "dashboard/settings/export.html", "page" ) return render_template("dashboard/settings/export.html") @settings.route("/api-keys", methods=["GET"]) @login_required def api_keys(): user_id = current_user.id api_keys = db.list_api_keys(user_id) # Parse scopes for display for key in api_keys: if isinstance(key['scopes'], str): key['scopes'] = json.loads(key['scopes']) # Fetch user's functions for scoping functions = db.get_http_functions_for_user(user_id) if htmx: return render_block( environment, "dashboard/settings/api_keys.html", "page", api_keys=api_keys, functions=functions ) return render_template("dashboard/settings/api_keys.html", api_keys=api_keys, functions=functions) @settings.route("/api-keys", methods=["POST"]) @login_required def create_api_key(): user_id = current_user.id name = request.form.get("name", "My API Key") scopes_list = request.form.getlist("scopes") if not scopes_list: scopes = ["*"] else: scopes = scopes_list # Generate a secure random key key = f"sk_{secrets.token_urlsafe(24)}" db.create_api_key(user_id, name, key, scopes) flash(f"API Key created: {key} - Save it now, you won't see it again!", "success") return redirect(url_for("settings.api_keys")) @settings.route("/api-keys/", methods=["DELETE"]) @login_required def delete_api_key(key_id): user_id = current_user.id db.delete_api_key(user_id, key_id) return "", 200 @settings.route("/theme", methods=["POST"]) @login_required def toggle_theme(): user_id = current_user.id theme = request.form.get("theme") if theme in ['light', 'dark']: db.update_user_theme_preference(user_id, theme) # Return empty string as we'll handle the UI update via client-side JS or just let the class toggle persist # Actually, for HTMX we might want to return something or just 200 OK. return "", 200 return "Invalid theme", 400 @settings.route("/database_schema", methods=["GET"]) @login_required def database_schema(): """Display database schema with ERD visualization""" # Fetch database schema information schema_info = get_database_schema() if htmx: return render_block( environment, "dashboard/settings/database_schema.html", "page", schema_info=schema_info ) return render_template("dashboard/settings/database_schema.html", schema_info=schema_info) def get_database_schema(): """Fetch database schema information for ERD generation""" # Get all tables tables = db.execute(""" SELECT table_name, COALESCE(obj_description((quote_ident(table_schema)||'.'||quote_ident(table_name))::regclass), '') as table_comment FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' ORDER BY table_name """) schema_data = [] for table in tables or []: table_name = table['table_name'] # Get columns for this table columns = db.execute(""" SELECT column_name, data_type, is_nullable, column_default, character_maximum_length FROM information_schema.columns WHERE table_schema = 'public' AND table_name = %s ORDER BY ordinal_position """, [table_name]) # Get foreign keys for this table foreign_keys = db.execute(""" SELECT kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name = %s AND tc.table_schema = 'public' """, [table_name]) # Get primary keys primary_keys = db.execute(""" SELECT kcu.column_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema WHERE tc.constraint_type = 'PRIMARY KEY' AND tc.table_name = %s AND tc.table_schema = 'public' """, [table_name]) pk_columns = [pk['column_name'] for pk in (primary_keys or [])] schema_data.append({ 'table_name': table_name, 'columns': columns or [], 'foreign_keys': foreign_keys or [], 'primary_keys': pk_columns }) return schema_data @settings.route("/execute_query", methods=["POST"]) @login_required def execute_query(): """Execute a user-scoped SQL query""" query = request.json.get('query', '').strip() if not query: return {"error": "No query provided"}, 400 # Basic validation - must be SELECT only query_upper = query.upper() if not query_upper.startswith('SELECT'): return {"error": "Only SELECT queries are allowed"}, 400 # Check for dangerous keywords dangerous_keywords = ['DROP', 'DELETE', 'INSERT', 'UPDATE', 'ALTER', 'CREATE', 'TRUNCATE', 'GRANT', 'REVOKE'] for keyword in dangerous_keywords: if keyword in query_upper: return {"error": f"Keyword '{keyword}' is not allowed"}, 400 user_id = current_user.id # List of tables that have user_id column user_scoped_tables = [ 'http_functions', 'timer_functions', 'shared_environments', 'api_keys', 'http_function_invocations', 'timer_function_invocations' ] # Automatically add user_id filter if querying user-scoped tables modified_query = query for table in user_scoped_tables: if table in query.lower(): # Add WHERE clause if not present if 'WHERE' not in query_upper: modified_query = f"{query} WHERE {table}.user_id = {user_id}" # Append to existing WHERE clause elif f'{table}.user_id' not in query.lower() and 'user_id' not in query.lower(): modified_query = f"{query} AND {table}.user_id = {user_id}" break # Limit results to prevent massive queries if 'LIMIT' not in query_upper: modified_query = f"{modified_query} LIMIT 100" try: results = db.execute(modified_query) if not results: return { "columns": [], "rows": [], "row_count": 0, "message": "Query executed successfully, but returned no results." } # Convert results to JSON-serializable format columns = list(results[0].keys()) if results else [] rows = [] for row in results: row_data = [] for col in columns: value = row[col] # Convert datetime to string if hasattr(value, 'isoformat'): value = value.isoformat() row_data.append(value) rows.append(row_data) return { "columns": columns, "rows": rows, "row_count": len(rows), "message": f"Query executed successfully. {len(rows)} row(s) returned.", "query_executed": modified_query } except Exception as e: return {"error": f"Query execution failed: {str(e)}"}, 500