from flask import Blueprint, render_template, request, redirect, url_for, flash from flask_login import login_required, current_user from extensions import db, environment, htmx from jinja2_fragments import render_block import secrets import json settings = Blueprint('settings', __name__) @settings.route("/", methods=["GET"]) @login_required def index(): return redirect(url_for('settings.api_keys')) @settings.route("/export", methods=["GET"]) @login_required def export(): """Display data export page or download data export""" # Check if this is a download request if request.args.get('download') == 'true': from flask import make_response from datetime import datetime user_id = current_user.id # Get all user data export_data = db.export_user_data(user_id) # Add export metadata export_data['_export_metadata'] = { 'exported_at': datetime.now().isoformat(), 'export_version': '1.0', 'application': 'Functions Platform' } # Create JSON response response = make_response(json.dumps(export_data, indent=2, default=str)) response.headers['Content-Type'] = 'application/json' # Generate filename with username and timestamp username = current_user.username timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'user_data_export_{username}_{timestamp}.json' response.headers['Content-Disposition'] = f'attachment; filename={filename}' return response # Otherwise show the export page if htmx: return render_block( environment, "dashboard/settings/export.html", "page" ) return render_template("dashboard/settings/export.html") @settings.route("/api-keys", methods=["GET"]) @login_required def api_keys(): user_id = current_user.id api_keys = db.list_api_keys(user_id) # Parse scopes for display for key in api_keys: if isinstance(key['scopes'], str): key['scopes'] = json.loads(key['scopes']) # Fetch user's functions for scoping functions = db.get_http_functions_for_user(user_id) if htmx: return render_block( environment, "dashboard/settings/api_keys.html", "page", api_keys=api_keys, functions=functions ) return render_template("dashboard/settings/api_keys.html", api_keys=api_keys, functions=functions) @settings.route("/api-keys", methods=["POST"]) @login_required def create_api_key(): user_id = current_user.id name = request.form.get("name", "My API Key") scopes_list = request.form.getlist("scopes") if not scopes_list: scopes = ["*"] else: scopes = scopes_list # Generate a secure random key key = f"sk_{secrets.token_urlsafe(24)}" db.create_api_key(user_id, name, key, scopes) flash(f"API Key created: {key} - Save it now, you won't see it again!", "success") return redirect(url_for("settings.api_keys")) @settings.route("/api-keys/", methods=["DELETE"]) @login_required def delete_api_key(key_id): user_id = current_user.id db.delete_api_key(user_id, key_id) return "", 200 @settings.route("/theme", methods=["POST"]) @login_required def toggle_theme(): user_id = current_user.id theme = request.form.get("theme") if theme in ['light', 'dark']: db.update_user_theme_preference(user_id, theme) # Return empty string as we'll handle the UI update via client-side JS or just let the class toggle persist # Actually, for HTMX we might want to return something or just 200 OK. return "", 200 return "Invalid theme", 400 @settings.route("/database_schema", methods=["GET"]) @login_required def database_schema(): """Display database schema with ERD visualization""" # Fetch database schema information schema_info = get_database_schema() if htmx: return render_block( environment, "dashboard/settings/database_schema.html", "page", schema_info=schema_info ) return render_template("dashboard/settings/database_schema.html", schema_info=schema_info) @settings.route("/login-history", methods=["GET"]) @login_required def login_history(): """Display login history for the current user""" user_id = current_user.id history = db.get_login_history(user_id, limit=50) if htmx: return render_block( environment, "dashboard/settings/login_history.html", "page", history=history ) return render_template("dashboard/settings/login_history.html", history=history) def get_database_schema(): """Fetch database schema information for ERD generation""" # Get all tables tables = db.execute(""" SELECT table_name, COALESCE(obj_description((quote_ident(table_schema)||'.'||quote_ident(table_name))::regclass), '') as table_comment FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' ORDER BY table_name """) schema_data = [] for table in tables or []: table_name = table['table_name'] # Get columns for this table columns = db.execute(""" SELECT column_name, data_type, is_nullable, column_default, character_maximum_length FROM information_schema.columns WHERE table_schema = 'public' AND table_name = %s ORDER BY ordinal_position """, [table_name]) # Get foreign keys for this table foreign_keys = db.execute(""" SELECT kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name = %s AND tc.table_schema = 'public' """, [table_name]) # Get primary keys primary_keys = db.execute(""" SELECT kcu.column_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema WHERE tc.constraint_type = 'PRIMARY KEY' AND tc.table_name = %s AND tc.table_schema = 'public' """, [table_name]) pk_columns = [pk['column_name'] for pk in (primary_keys or [])] schema_data.append({ 'table_name': table_name, 'columns': columns or [], 'foreign_keys': foreign_keys or [], 'primary_keys': pk_columns }) return schema_data @settings.route("/execute_query", methods=["POST"]) @login_required def execute_query(): """Execute a user-scoped SQL query""" query = request.json.get('query', '').strip() if not query: return {"error": "No query provided"}, 400 # Basic validation - must be SELECT only query_upper = query.upper() if not query_upper.startswith('SELECT'): return {"error": "Only SELECT queries are allowed"}, 400 # Check for dangerous keywords dangerous_keywords = ['DROP', 'DELETE', 'INSERT', 'UPDATE', 'ALTER', 'CREATE', 'TRUNCATE', 'GRANT', 'REVOKE'] for keyword in dangerous_keywords: if keyword in query_upper: return {"error": f"Keyword '{keyword}' is not allowed"}, 400 user_id = current_user.id # List of tables that have user_id column user_scoped_tables = [ 'http_functions', 'timer_functions', 'shared_environments', 'api_keys', 'http_function_invocations', 'timer_function_invocations' ] # Automatically add user_id filter if querying user-scoped tables modified_query = query for table in user_scoped_tables: if table in query.lower(): # Add WHERE clause if not present if 'WHERE' not in query_upper: modified_query = f"{query} WHERE {table}.user_id = {user_id}" # Append to existing WHERE clause elif f'{table}.user_id' not in query.lower() and 'user_id' not in query.lower(): modified_query = f"{query} AND {table}.user_id = {user_id}" break # Limit results to prevent massive queries if 'LIMIT' not in query_upper: modified_query = f"{modified_query} LIMIT 100" try: results = db.execute(modified_query) if not results: return { "columns": [], "rows": [], "row_count": 0, "message": "Query executed successfully, but returned no results." } # Convert results to JSON-serializable format columns = list(results[0].keys()) if results else [] rows = [] for row in results: row_data = [] for col in columns: value = row[col] # Convert datetime to string if hasattr(value, 'isoformat'): value = value.isoformat() row_data.append(value) rows.append(row_data) return { "columns": columns, "rows": rows, "row_count": len(rows), "message": f"Query executed successfully. {len(rows)} row(s) returned.", "query_executed": modified_query } except Exception as e: return {"error": f"Query execution failed: {str(e)}"}, 500 @settings.route("/import", methods=["POST"]) @login_required def import_data(): """Import user data from JSON file""" try: # Check if file was uploaded if 'import_file' not in request.files: return {"error": "No file uploaded"}, 400 file = request.files['import_file'] if file.filename == '': return {"error": "No file selected"}, 400 # Validate file type if not file.filename.endswith('.json'): return {"error": "File must be a JSON file"}, 400 # Read and parse JSON try: file_content = file.read() # Check file size (max 10MB) if len(file_content) > 10 * 1024 * 1024: return {"error": "File too large (max 10MB)"}, 400 import_data = json.loads(file_content) except json.JSONDecodeError as e: return {"error": f"Invalid JSON format: {str(e)}"}, 400 # Validate structure if not isinstance(import_data, dict): return {"error": "Invalid data format: expected JSON object"}, 400 user_id = current_user.id results = { "http_functions": {"success": [], "skipped": [], "failed": []}, "timer_functions": {"success": [], "skipped": [], "failed": []}, "shared_environments": {"success": [], "skipped": [], "failed": []} } # Import HTTP Functions http_functions = import_data.get('http_functions', []) for func in http_functions: # Map old export column names to new import method requirements func_data = { 'name': func.get('name'), 'code': func.get('script_content'), # Export uses 'script_content' 'environment': func.get('environment_info'), # Export uses 'environment_info' 'runtime': func.get('runtime', 'python') } success, message, func_id = db.import_http_function(user_id, func_data) if success: results['http_functions']['success'].append(message) elif 'already exists' in message: results['http_functions']['skipped'].append(message) else: results['http_functions']['failed'].append(message) # Import Timer Functions timer_functions = import_data.get('timer_functions', []) for func in timer_functions: func_data = { 'name': func.get('name'), 'code': func.get('code'), 'environment': func.get('environment'), 'runtime': func.get('runtime', 'python'), 'trigger_type': func.get('trigger_type'), 'frequency_minutes': func.get('frequency_minutes'), 'run_date': func.get('run_date'), 'cron_expression': func.get('cron_expression'), 'enabled': func.get('enabled', True) } success, message, func_id = db.import_timer_function(user_id, func_data) if success: results['timer_functions']['success'].append(message) elif 'already exists' in message: results['timer_functions']['skipped'].append(message) else: results['timer_functions']['failed'].append(message) # Import Shared Environments shared_envs = import_data.get('shared_environments', []) for env in shared_envs: env_data = { 'name': env.get('name'), 'environment': env.get('environment') } success, message, env_id = db.import_shared_environment(user_id, env_data) if success: results['shared_environments']['success'].append(message) elif 'already exists' in message: results['shared_environments']['skipped'].append(message) else: results['shared_environments']['failed'].append(message) # Calculate totals total_success = (len(results['http_functions']['success']) + len(results['timer_functions']['success']) + len(results['shared_environments']['success'])) total_skipped = (len(results['http_functions']['skipped']) + len(results['timer_functions']['skipped']) + len(results['shared_environments']['skipped'])) total_failed = (len(results['http_functions']['failed']) + len(results['timer_functions']['failed']) + len(results['shared_environments']['failed'])) return { "success": True, "results": results, "summary": { "total_success": total_success, "total_skipped": total_skipped, "total_failed": total_failed } } except Exception as e: return {"error": f"Import failed: {str(e)}"}, 500