Files
function/routes/settings.py
2025-12-02 17:02:17 +11:00

531 lines
19 KiB
Python

from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user
from extensions import db, environment, htmx
from jinja2_fragments import render_block
import secrets
import json
settings = Blueprint('settings', __name__)
@settings.route("/", methods=["GET"])
@login_required
def index():
return redirect(url_for('settings.api_keys'))
@settings.route("/export", methods=["GET"])
@login_required
def export():
"""Display data export page or download data export"""
# Check if this is a download request
if request.args.get('download') == 'true':
from flask import make_response
from datetime import datetime
user_id = current_user.id
# Get all user data
export_data = db.export_user_data(user_id)
# Add export metadata
export_data['_export_metadata'] = {
'exported_at': datetime.now().isoformat(),
'export_version': '1.0',
'application': 'Functions Platform'
}
# Create JSON response
response = make_response(json.dumps(export_data, indent=2, default=str))
response.headers['Content-Type'] = 'application/json'
# Generate filename with username and timestamp
username = current_user.username
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'user_data_export_{username}_{timestamp}.json'
response.headers['Content-Disposition'] = f'attachment; filename={filename}'
return response
# Otherwise show the export page
if htmx:
return render_block(
environment,
"dashboard/settings/export.html",
"page",
current_user=current_user
)
return render_template("dashboard/settings/export.html")
@settings.route("/api-keys", methods=["GET"])
@login_required
def api_keys():
user_id = current_user.id
api_keys = db.list_api_keys(user_id)
# Parse scopes for display
for key in api_keys:
if isinstance(key['scopes'], str):
key['scopes'] = json.loads(key['scopes'])
# Fetch user's functions for scoping
functions = db.get_http_functions_for_user(user_id)
if htmx:
return render_block(
environment,
"dashboard/settings/api_keys.html",
"page",
api_keys=api_keys,
functions=functions,
current_user=current_user
)
return render_template("dashboard/settings/api_keys.html", api_keys=api_keys, functions=functions)
@settings.route("/api-keys", methods=["POST"])
@login_required
def create_api_key():
user_id = current_user.id
name = request.form.get("name", "My API Key")
scopes_list = request.form.getlist("scopes")
rate_limit_count = request.form.get("rate_limit_count")
rate_limit_period = request.form.get("rate_limit_period")
if rate_limit_count:
try:
rate_limit_count = int(rate_limit_count)
except ValueError:
rate_limit_count = None
if not rate_limit_period in ['minute', 'hour', 'day']:
rate_limit_period = None
if not scopes_list:
scopes = ["*"]
else:
scopes = scopes_list
# Generate a secure random key
key = f"sk_{secrets.token_urlsafe(24)}"
db.create_api_key(user_id, name, key, scopes, rate_limit_count, rate_limit_period)
flash(f"API Key created: {key} - Save it now, you won't see it again!", "success")
return redirect(url_for("settings.api_keys"))
@settings.route("/api-keys/<int:key_id>", methods=["DELETE"])
@login_required
def delete_api_key(key_id):
user_id = current_user.id
db.delete_api_key(user_id, key_id)
return "", 200
@settings.route("/theme", methods=["POST"])
@login_required
def toggle_theme():
user_id = current_user.id
theme = request.form.get("theme")
if theme in ['light', 'dark']:
db.update_user_theme_preference(user_id, theme)
# Return empty string as we'll handle the UI update via client-side JS or just let the class toggle persist
# Actually, for HTMX we might want to return something or just 200 OK.
return "", 200
return "Invalid theme", 400
@settings.route("/database_schema", methods=["GET"])
@login_required
def database_schema():
"""Display database schema with ERD visualization"""
# Fetch database schema information
schema_info = get_database_schema()
if htmx:
return render_block(
environment,
"dashboard/settings/database_schema.html",
"page",
schema_info=schema_info,
current_user=current_user
)
return render_template("dashboard/settings/database_schema.html", schema_info=schema_info)
@settings.route("/login-history", methods=["GET"])
@login_required
def login_history():
"""Display login history for the current user"""
user_id = current_user.id
history = db.get_login_history(user_id, limit=50)
if htmx:
return render_block(
environment,
"dashboard/settings/login_history.html",
"page",
history=history,
current_user=current_user
)
return render_template("dashboard/settings/login_history.html", history=history)
@settings.route("/account", methods=["GET"])
@login_required
def account():
"""Display account settings page"""
if htmx:
return render_block(
environment,
"dashboard/settings/account.html",
"page",
current_user=current_user
)
return render_template("dashboard/settings/account.html")
@settings.route("/account/password", methods=["POST"])
@login_required
def update_password():
"""Update user password"""
from werkzeug.security import generate_password_hash, check_password_hash
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# Verify current password
user_data = db.get_user(current_user.id)
if not check_password_hash(user_data['password_hash'], current_password):
return render_template("dashboard/settings/account.html", error="Incorrect current password")
# Validate new password
if len(new_password) < 10:
return render_template("dashboard/settings/account.html", error="New password must be at least 10 characters")
if new_password != confirm_password:
return render_template("dashboard/settings/account.html", error="New passwords do not match")
# Update password
new_hash = generate_password_hash(new_password)
db.update_user_password(current_user.id, new_hash)
return render_template("dashboard/settings/account.html", success="Password updated successfully")
@settings.route("/account/email", methods=["POST"])
@login_required
def update_email():
"""Update user email"""
email = request.form.get('email')
# Basic validation
if email and '@' not in email:
return render_template("dashboard/settings/account.html", error="Invalid email address")
db.update_user_email(current_user.id, email)
# Update current user object in session if needed, or just reload page
return render_template("dashboard/settings/account.html", success="Email updated successfully")
@settings.route("/account/delete", methods=["POST"])
@login_required
def delete_account():
"""Delete user account"""
from werkzeug.security import check_password_hash
from flask_login import logout_user
password = request.form.get('password')
confirm_text = request.form.get('confirm_text')
# Verify password
user_data = db.get_user(current_user.id)
if not check_password_hash(user_data['password_hash'], password):
return render_template("dashboard/settings/account.html", error="Incorrect password")
# Verify confirmation text
if confirm_text != "DELETE":
return render_template("dashboard/settings/account.html", error="Please type DELETE to confirm")
# Delete account
db.delete_user(current_user.id)
logout_user()
return redirect(url_for('landing_page'))
def get_database_schema():
"""Fetch database schema information for ERD generation"""
# Get all tables
tables = db.execute("""
SELECT
table_name,
COALESCE(obj_description((quote_ident(table_schema)||'.'||quote_ident(table_name))::regclass), '') as table_comment
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
ORDER BY table_name
""")
schema_data = []
for table in tables or []:
table_name = table['table_name']
# Get columns for this table
columns = db.execute("""
SELECT
column_name,
data_type,
is_nullable,
column_default,
character_maximum_length
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = %s
ORDER BY ordinal_position
""", [table_name])
# Get foreign keys for this table
foreign_keys = db.execute("""
SELECT
kcu.column_name,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_name = %s
AND tc.table_schema = 'public'
""", [table_name])
# Get primary keys
primary_keys = db.execute("""
SELECT kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_name = %s
AND tc.table_schema = 'public'
""", [table_name])
pk_columns = [pk['column_name'] for pk in (primary_keys or [])]
schema_data.append({
'table_name': table_name,
'columns': columns or [],
'foreign_keys': foreign_keys or [],
'primary_keys': pk_columns
})
return schema_data
@settings.route("/execute_query", methods=["POST"])
@login_required
def execute_query():
"""Execute a user-scoped SQL query"""
query = request.json.get('query', '').strip()
if not query:
return {"error": "No query provided"}, 400
# Basic validation - must be SELECT only
query_upper = query.upper()
if not query_upper.startswith('SELECT'):
return {"error": "Only SELECT queries are allowed"}, 400
# Check for dangerous keywords
dangerous_keywords = ['DROP', 'DELETE', 'INSERT', 'UPDATE', 'ALTER', 'CREATE', 'TRUNCATE', 'GRANT', 'REVOKE']
for keyword in dangerous_keywords:
if keyword in query_upper:
return {"error": f"Keyword '{keyword}' is not allowed"}, 400
user_id = current_user.id
# List of tables that have user_id column
user_scoped_tables = [
'http_functions', 'timer_functions', 'shared_environments',
'api_keys', 'http_function_invocations', 'timer_function_invocations'
]
# Automatically add user_id filter if querying user-scoped tables
modified_query = query
for table in user_scoped_tables:
if table in query.lower():
# Add WHERE clause if not present
if 'WHERE' not in query_upper:
modified_query = f"{query} WHERE {table}.user_id = {user_id}"
# Append to existing WHERE clause
elif f'{table}.user_id' not in query.lower() and 'user_id' not in query.lower():
modified_query = f"{query} AND {table}.user_id = {user_id}"
break
# Limit results to prevent massive queries
if 'LIMIT' not in query_upper:
modified_query = f"{modified_query} LIMIT 100"
try:
results = db.execute(modified_query)
if not results:
return {
"columns": [],
"rows": [],
"row_count": 0,
"message": "Query executed successfully, but returned no results."
}
# Convert results to JSON-serializable format
columns = list(results[0].keys()) if results else []
rows = []
for row in results:
row_data = []
for col in columns:
value = row[col]
# Convert datetime to string
if hasattr(value, 'isoformat'):
value = value.isoformat()
row_data.append(value)
rows.append(row_data)
return {
"columns": columns,
"rows": rows,
"row_count": len(rows),
"message": f"Query executed successfully. {len(rows)} row(s) returned.",
"query_executed": modified_query
}
except Exception as e:
return {"error": f"Query execution failed: {str(e)}"}, 500
@settings.route("/import", methods=["POST"])
@login_required
def import_data():
"""Import user data from JSON file"""
try:
# Check if file was uploaded
if 'import_file' not in request.files:
return {"error": "No file uploaded"}, 400
file = request.files['import_file']
if file.filename == '':
return {"error": "No file selected"}, 400
# Validate file type
if not file.filename.endswith('.json'):
return {"error": "File must be a JSON file"}, 400
# Read and parse JSON
try:
file_content = file.read()
# Check file size (max 10MB)
if len(file_content) > 10 * 1024 * 1024:
return {"error": "File too large (max 10MB)"}, 400
import_data = json.loads(file_content)
except json.JSONDecodeError as e:
return {"error": f"Invalid JSON format: {str(e)}"}, 400
# Validate structure
if not isinstance(import_data, dict):
return {"error": "Invalid data format: expected JSON object"}, 400
user_id = current_user.id
results = {
"http_functions": {"success": [], "skipped": [], "failed": []},
"timer_functions": {"success": [], "skipped": [], "failed": []},
"shared_environments": {"success": [], "skipped": [], "failed": []}
}
# Import HTTP Functions
http_functions = import_data.get('http_functions', [])
for func in http_functions:
# Map old export column names to new import method requirements
func_data = {
'name': func.get('name'),
'code': func.get('script_content'), # Export uses 'script_content'
'environment': func.get('environment_info'), # Export uses 'environment_info'
'runtime': func.get('runtime', 'python')
}
success, message, func_id = db.import_http_function(user_id, func_data)
if success:
results['http_functions']['success'].append(message)
elif 'already exists' in message:
results['http_functions']['skipped'].append(message)
else:
results['http_functions']['failed'].append(message)
# Import Timer Functions
timer_functions = import_data.get('timer_functions', [])
for func in timer_functions:
func_data = {
'name': func.get('name'),
'code': func.get('code'),
'environment': func.get('environment'),
'runtime': func.get('runtime', 'python'),
'trigger_type': func.get('trigger_type'),
'frequency_minutes': func.get('frequency_minutes'),
'run_date': func.get('run_date'),
'cron_expression': func.get('cron_expression'),
'enabled': func.get('enabled', True)
}
success, message, func_id = db.import_timer_function(user_id, func_data)
if success:
results['timer_functions']['success'].append(message)
elif 'already exists' in message:
results['timer_functions']['skipped'].append(message)
else:
results['timer_functions']['failed'].append(message)
# Import Shared Environments
shared_envs = import_data.get('shared_environments', [])
for env in shared_envs:
env_data = {
'name': env.get('name'),
'environment': env.get('environment')
}
success, message, env_id = db.import_shared_environment(user_id, env_data)
if success:
results['shared_environments']['success'].append(message)
elif 'already exists' in message:
results['shared_environments']['skipped'].append(message)
else:
results['shared_environments']['failed'].append(message)
# Calculate totals
total_success = (len(results['http_functions']['success']) +
len(results['timer_functions']['success']) +
len(results['shared_environments']['success']))
total_skipped = (len(results['http_functions']['skipped']) +
len(results['timer_functions']['skipped']) +
len(results['shared_environments']['skipped']))
total_failed = (len(results['http_functions']['failed']) +
len(results['timer_functions']['failed']) +
len(results['shared_environments']['failed']))
return {
"success": True,
"results": results,
"summary": {
"total_success": total_success,
"total_skipped": total_skipped,
"total_failed": total_failed
}
}
except Exception as e:
return {"error": f"Import failed: {str(e)}"}, 500