531 lines
19 KiB
Python
531 lines
19 KiB
Python
from flask import Blueprint, render_template, request, redirect, url_for, flash
|
|
from flask_login import login_required, current_user
|
|
from extensions import db, environment, htmx
|
|
from jinja2_fragments import render_block
|
|
import secrets
|
|
import json
|
|
|
|
settings = Blueprint('settings', __name__)
|
|
|
|
@settings.route("/", methods=["GET"])
|
|
@login_required
|
|
def index():
|
|
return redirect(url_for('settings.api_keys'))
|
|
|
|
@settings.route("/export", methods=["GET"])
|
|
@login_required
|
|
def export():
|
|
"""Display data export page or download data export"""
|
|
# Check if this is a download request
|
|
if request.args.get('download') == 'true':
|
|
from flask import make_response
|
|
from datetime import datetime
|
|
|
|
user_id = current_user.id
|
|
|
|
# Get all user data
|
|
export_data = db.export_user_data(user_id)
|
|
|
|
# Add export metadata
|
|
export_data['_export_metadata'] = {
|
|
'exported_at': datetime.now().isoformat(),
|
|
'export_version': '1.0',
|
|
'application': 'Functions Platform'
|
|
}
|
|
|
|
# Create JSON response
|
|
response = make_response(json.dumps(export_data, indent=2, default=str))
|
|
response.headers['Content-Type'] = 'application/json'
|
|
|
|
# Generate filename with username and timestamp
|
|
username = current_user.username
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
filename = f'user_data_export_{username}_{timestamp}.json'
|
|
response.headers['Content-Disposition'] = f'attachment; filename={filename}'
|
|
|
|
return response
|
|
|
|
# Otherwise show the export page
|
|
if htmx:
|
|
return render_block(
|
|
environment,
|
|
"dashboard/settings/export.html",
|
|
"page",
|
|
current_user=current_user
|
|
)
|
|
return render_template("dashboard/settings/export.html")
|
|
|
|
@settings.route("/api-keys", methods=["GET"])
|
|
@login_required
|
|
def api_keys():
|
|
user_id = current_user.id
|
|
api_keys = db.list_api_keys(user_id)
|
|
|
|
# Parse scopes for display
|
|
for key in api_keys:
|
|
if isinstance(key['scopes'], str):
|
|
key['scopes'] = json.loads(key['scopes'])
|
|
|
|
# Fetch user's functions for scoping
|
|
functions = db.get_http_functions_for_user(user_id)
|
|
|
|
if htmx:
|
|
return render_block(
|
|
environment,
|
|
"dashboard/settings/api_keys.html",
|
|
"page",
|
|
api_keys=api_keys,
|
|
functions=functions,
|
|
current_user=current_user
|
|
)
|
|
return render_template("dashboard/settings/api_keys.html", api_keys=api_keys, functions=functions)
|
|
|
|
@settings.route("/api-keys", methods=["POST"])
|
|
@login_required
|
|
def create_api_key():
|
|
user_id = current_user.id
|
|
name = request.form.get("name", "My API Key")
|
|
scopes_list = request.form.getlist("scopes")
|
|
|
|
rate_limit_count = request.form.get("rate_limit_count")
|
|
rate_limit_period = request.form.get("rate_limit_period")
|
|
|
|
if rate_limit_count:
|
|
try:
|
|
rate_limit_count = int(rate_limit_count)
|
|
except ValueError:
|
|
rate_limit_count = None
|
|
|
|
if not rate_limit_period in ['minute', 'hour', 'day']:
|
|
rate_limit_period = None
|
|
|
|
if not scopes_list:
|
|
scopes = ["*"]
|
|
else:
|
|
scopes = scopes_list
|
|
|
|
# Generate a secure random key
|
|
key = f"sk_{secrets.token_urlsafe(24)}"
|
|
|
|
db.create_api_key(user_id, name, key, scopes, rate_limit_count, rate_limit_period)
|
|
|
|
flash(f"API Key created: {key} - Save it now, you won't see it again!", "success")
|
|
return redirect(url_for("settings.api_keys"))
|
|
|
|
@settings.route("/api-keys/<int:key_id>", methods=["DELETE"])
|
|
@login_required
|
|
def delete_api_key(key_id):
|
|
user_id = current_user.id
|
|
db.delete_api_key(user_id, key_id)
|
|
return "", 200
|
|
|
|
@settings.route("/theme", methods=["POST"])
|
|
@login_required
|
|
def toggle_theme():
|
|
user_id = current_user.id
|
|
theme = request.form.get("theme")
|
|
if theme in ['light', 'dark']:
|
|
db.update_user_theme_preference(user_id, theme)
|
|
# Return empty string as we'll handle the UI update via client-side JS or just let the class toggle persist
|
|
# Actually, for HTMX we might want to return something or just 200 OK.
|
|
return "", 200
|
|
return "Invalid theme", 400
|
|
|
|
@settings.route("/database_schema", methods=["GET"])
|
|
@login_required
|
|
def database_schema():
|
|
"""Display database schema with ERD visualization"""
|
|
# Fetch database schema information
|
|
schema_info = get_database_schema()
|
|
|
|
if htmx:
|
|
return render_block(
|
|
environment,
|
|
"dashboard/settings/database_schema.html",
|
|
"page",
|
|
schema_info=schema_info,
|
|
current_user=current_user
|
|
)
|
|
return render_template("dashboard/settings/database_schema.html", schema_info=schema_info)
|
|
|
|
@settings.route("/login-history", methods=["GET"])
|
|
@login_required
|
|
def login_history():
|
|
"""Display login history for the current user"""
|
|
user_id = current_user.id
|
|
history = db.get_login_history(user_id, limit=50)
|
|
|
|
if htmx:
|
|
return render_block(
|
|
environment,
|
|
"dashboard/settings/login_history.html",
|
|
"page",
|
|
history=history,
|
|
current_user=current_user
|
|
)
|
|
return render_template("dashboard/settings/login_history.html", history=history)
|
|
|
|
@settings.route("/account", methods=["GET"])
|
|
@login_required
|
|
def account():
|
|
"""Display account settings page"""
|
|
if htmx:
|
|
return render_block(
|
|
environment,
|
|
"dashboard/settings/account.html",
|
|
"page",
|
|
current_user=current_user
|
|
)
|
|
return render_template("dashboard/settings/account.html")
|
|
|
|
@settings.route("/account/password", methods=["POST"])
|
|
@login_required
|
|
def update_password():
|
|
"""Update user password"""
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
current_password = request.form.get('current_password')
|
|
new_password = request.form.get('new_password')
|
|
confirm_password = request.form.get('confirm_password')
|
|
|
|
# Verify current password
|
|
user_data = db.get_user(current_user.id)
|
|
if not check_password_hash(user_data['password_hash'], current_password):
|
|
return render_template("dashboard/settings/account.html", error="Incorrect current password")
|
|
|
|
# Validate new password
|
|
if len(new_password) < 10:
|
|
return render_template("dashboard/settings/account.html", error="New password must be at least 10 characters")
|
|
|
|
if new_password != confirm_password:
|
|
return render_template("dashboard/settings/account.html", error="New passwords do not match")
|
|
|
|
# Update password
|
|
new_hash = generate_password_hash(new_password)
|
|
db.update_user_password(current_user.id, new_hash)
|
|
|
|
return render_template("dashboard/settings/account.html", success="Password updated successfully")
|
|
|
|
@settings.route("/account/email", methods=["POST"])
|
|
@login_required
|
|
def update_email():
|
|
"""Update user email"""
|
|
email = request.form.get('email')
|
|
|
|
# Basic validation
|
|
if email and '@' not in email:
|
|
return render_template("dashboard/settings/account.html", error="Invalid email address")
|
|
|
|
db.update_user_email(current_user.id, email)
|
|
|
|
# Update current user object in session if needed, or just reload page
|
|
return render_template("dashboard/settings/account.html", success="Email updated successfully")
|
|
|
|
@settings.route("/account/delete", methods=["POST"])
|
|
@login_required
|
|
def delete_account():
|
|
"""Delete user account"""
|
|
from werkzeug.security import check_password_hash
|
|
from flask_login import logout_user
|
|
|
|
password = request.form.get('password')
|
|
confirm_text = request.form.get('confirm_text')
|
|
|
|
# Verify password
|
|
user_data = db.get_user(current_user.id)
|
|
if not check_password_hash(user_data['password_hash'], password):
|
|
return render_template("dashboard/settings/account.html", error="Incorrect password")
|
|
|
|
# Verify confirmation text
|
|
if confirm_text != "DELETE":
|
|
return render_template("dashboard/settings/account.html", error="Please type DELETE to confirm")
|
|
|
|
# Delete account
|
|
db.delete_user(current_user.id)
|
|
logout_user()
|
|
|
|
return redirect(url_for('landing_page'))
|
|
|
|
|
|
def get_database_schema():
|
|
"""Fetch database schema information for ERD generation"""
|
|
# Get all tables
|
|
tables = db.execute("""
|
|
SELECT
|
|
table_name,
|
|
COALESCE(obj_description((quote_ident(table_schema)||'.'||quote_ident(table_name))::regclass), '') as table_comment
|
|
FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_type = 'BASE TABLE'
|
|
ORDER BY table_name
|
|
""")
|
|
|
|
schema_data = []
|
|
|
|
for table in tables or []:
|
|
table_name = table['table_name']
|
|
|
|
# Get columns for this table
|
|
columns = db.execute("""
|
|
SELECT
|
|
column_name,
|
|
data_type,
|
|
is_nullable,
|
|
column_default,
|
|
character_maximum_length
|
|
FROM information_schema.columns
|
|
WHERE table_schema = 'public'
|
|
AND table_name = %s
|
|
ORDER BY ordinal_position
|
|
""", [table_name])
|
|
|
|
# Get foreign keys for this table
|
|
foreign_keys = db.execute("""
|
|
SELECT
|
|
kcu.column_name,
|
|
ccu.table_name AS foreign_table_name,
|
|
ccu.column_name AS foreign_column_name
|
|
FROM information_schema.table_constraints AS tc
|
|
JOIN information_schema.key_column_usage AS kcu
|
|
ON tc.constraint_name = kcu.constraint_name
|
|
AND tc.table_schema = kcu.table_schema
|
|
JOIN information_schema.constraint_column_usage AS ccu
|
|
ON ccu.constraint_name = tc.constraint_name
|
|
AND ccu.table_schema = tc.table_schema
|
|
WHERE tc.constraint_type = 'FOREIGN KEY'
|
|
AND tc.table_name = %s
|
|
AND tc.table_schema = 'public'
|
|
""", [table_name])
|
|
|
|
# Get primary keys
|
|
primary_keys = db.execute("""
|
|
SELECT kcu.column_name
|
|
FROM information_schema.table_constraints tc
|
|
JOIN information_schema.key_column_usage kcu
|
|
ON tc.constraint_name = kcu.constraint_name
|
|
AND tc.table_schema = kcu.table_schema
|
|
WHERE tc.constraint_type = 'PRIMARY KEY'
|
|
AND tc.table_name = %s
|
|
AND tc.table_schema = 'public'
|
|
""", [table_name])
|
|
|
|
pk_columns = [pk['column_name'] for pk in (primary_keys or [])]
|
|
|
|
schema_data.append({
|
|
'table_name': table_name,
|
|
'columns': columns or [],
|
|
'foreign_keys': foreign_keys or [],
|
|
'primary_keys': pk_columns
|
|
})
|
|
|
|
return schema_data
|
|
|
|
@settings.route("/execute_query", methods=["POST"])
|
|
@login_required
|
|
def execute_query():
|
|
"""Execute a user-scoped SQL query"""
|
|
query = request.json.get('query', '').strip()
|
|
|
|
if not query:
|
|
return {"error": "No query provided"}, 400
|
|
|
|
# Basic validation - must be SELECT only
|
|
query_upper = query.upper()
|
|
if not query_upper.startswith('SELECT'):
|
|
return {"error": "Only SELECT queries are allowed"}, 400
|
|
|
|
# Check for dangerous keywords
|
|
dangerous_keywords = ['DROP', 'DELETE', 'INSERT', 'UPDATE', 'ALTER', 'CREATE', 'TRUNCATE', 'GRANT', 'REVOKE']
|
|
for keyword in dangerous_keywords:
|
|
if keyword in query_upper:
|
|
return {"error": f"Keyword '{keyword}' is not allowed"}, 400
|
|
|
|
user_id = current_user.id
|
|
|
|
# List of tables that have user_id column
|
|
user_scoped_tables = [
|
|
'http_functions', 'timer_functions', 'shared_environments',
|
|
'api_keys', 'http_function_invocations', 'timer_function_invocations'
|
|
]
|
|
|
|
# Automatically add user_id filter if querying user-scoped tables
|
|
modified_query = query
|
|
for table in user_scoped_tables:
|
|
if table in query.lower():
|
|
# Add WHERE clause if not present
|
|
if 'WHERE' not in query_upper:
|
|
modified_query = f"{query} WHERE {table}.user_id = {user_id}"
|
|
# Append to existing WHERE clause
|
|
elif f'{table}.user_id' not in query.lower() and 'user_id' not in query.lower():
|
|
modified_query = f"{query} AND {table}.user_id = {user_id}"
|
|
break
|
|
|
|
# Limit results to prevent massive queries
|
|
if 'LIMIT' not in query_upper:
|
|
modified_query = f"{modified_query} LIMIT 100"
|
|
|
|
try:
|
|
results = db.execute(modified_query)
|
|
|
|
if not results:
|
|
return {
|
|
"columns": [],
|
|
"rows": [],
|
|
"row_count": 0,
|
|
"message": "Query executed successfully, but returned no results."
|
|
}
|
|
|
|
# Convert results to JSON-serializable format
|
|
columns = list(results[0].keys()) if results else []
|
|
rows = []
|
|
|
|
for row in results:
|
|
row_data = []
|
|
for col in columns:
|
|
value = row[col]
|
|
# Convert datetime to string
|
|
if hasattr(value, 'isoformat'):
|
|
value = value.isoformat()
|
|
row_data.append(value)
|
|
rows.append(row_data)
|
|
|
|
return {
|
|
"columns": columns,
|
|
"rows": rows,
|
|
"row_count": len(rows),
|
|
"message": f"Query executed successfully. {len(rows)} row(s) returned.",
|
|
"query_executed": modified_query
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"error": f"Query execution failed: {str(e)}"}, 500
|
|
|
|
@settings.route("/import", methods=["POST"])
|
|
@login_required
|
|
def import_data():
|
|
"""Import user data from JSON file"""
|
|
try:
|
|
# Check if file was uploaded
|
|
if 'import_file' not in request.files:
|
|
return {"error": "No file uploaded"}, 400
|
|
|
|
file = request.files['import_file']
|
|
|
|
if file.filename == '':
|
|
return {"error": "No file selected"}, 400
|
|
|
|
# Validate file type
|
|
if not file.filename.endswith('.json'):
|
|
return {"error": "File must be a JSON file"}, 400
|
|
|
|
# Read and parse JSON
|
|
try:
|
|
file_content = file.read()
|
|
|
|
# Check file size (max 10MB)
|
|
if len(file_content) > 10 * 1024 * 1024:
|
|
return {"error": "File too large (max 10MB)"}, 400
|
|
|
|
import_data = json.loads(file_content)
|
|
except json.JSONDecodeError as e:
|
|
return {"error": f"Invalid JSON format: {str(e)}"}, 400
|
|
|
|
# Validate structure
|
|
if not isinstance(import_data, dict):
|
|
return {"error": "Invalid data format: expected JSON object"}, 400
|
|
|
|
user_id = current_user.id
|
|
results = {
|
|
"http_functions": {"success": [], "skipped": [], "failed": []},
|
|
"timer_functions": {"success": [], "skipped": [], "failed": []},
|
|
"shared_environments": {"success": [], "skipped": [], "failed": []}
|
|
}
|
|
|
|
# Import HTTP Functions
|
|
http_functions = import_data.get('http_functions', [])
|
|
for func in http_functions:
|
|
# Map old export column names to new import method requirements
|
|
func_data = {
|
|
'name': func.get('name'),
|
|
'code': func.get('script_content'), # Export uses 'script_content'
|
|
'environment': func.get('environment_info'), # Export uses 'environment_info'
|
|
'runtime': func.get('runtime', 'python')
|
|
}
|
|
|
|
success, message, func_id = db.import_http_function(user_id, func_data)
|
|
|
|
if success:
|
|
results['http_functions']['success'].append(message)
|
|
elif 'already exists' in message:
|
|
results['http_functions']['skipped'].append(message)
|
|
else:
|
|
results['http_functions']['failed'].append(message)
|
|
|
|
# Import Timer Functions
|
|
timer_functions = import_data.get('timer_functions', [])
|
|
for func in timer_functions:
|
|
func_data = {
|
|
'name': func.get('name'),
|
|
'code': func.get('code'),
|
|
'environment': func.get('environment'),
|
|
'runtime': func.get('runtime', 'python'),
|
|
'trigger_type': func.get('trigger_type'),
|
|
'frequency_minutes': func.get('frequency_minutes'),
|
|
'run_date': func.get('run_date'),
|
|
'cron_expression': func.get('cron_expression'),
|
|
'enabled': func.get('enabled', True)
|
|
}
|
|
|
|
success, message, func_id = db.import_timer_function(user_id, func_data)
|
|
|
|
if success:
|
|
results['timer_functions']['success'].append(message)
|
|
elif 'already exists' in message:
|
|
results['timer_functions']['skipped'].append(message)
|
|
else:
|
|
results['timer_functions']['failed'].append(message)
|
|
|
|
# Import Shared Environments
|
|
shared_envs = import_data.get('shared_environments', [])
|
|
for env in shared_envs:
|
|
env_data = {
|
|
'name': env.get('name'),
|
|
'environment': env.get('environment')
|
|
}
|
|
|
|
success, message, env_id = db.import_shared_environment(user_id, env_data)
|
|
|
|
if success:
|
|
results['shared_environments']['success'].append(message)
|
|
elif 'already exists' in message:
|
|
results['shared_environments']['skipped'].append(message)
|
|
else:
|
|
results['shared_environments']['failed'].append(message)
|
|
|
|
# Calculate totals
|
|
total_success = (len(results['http_functions']['success']) +
|
|
len(results['timer_functions']['success']) +
|
|
len(results['shared_environments']['success']))
|
|
|
|
total_skipped = (len(results['http_functions']['skipped']) +
|
|
len(results['timer_functions']['skipped']) +
|
|
len(results['shared_environments']['skipped']))
|
|
|
|
total_failed = (len(results['http_functions']['failed']) +
|
|
len(results['timer_functions']['failed']) +
|
|
len(results['shared_environments']['failed']))
|
|
|
|
return {
|
|
"success": True,
|
|
"results": results,
|
|
"summary": {
|
|
"total_success": total_success,
|
|
"total_skipped": total_skipped,
|
|
"total_failed": total_failed
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"error": f"Import failed: {str(e)}"}, 500
|
|
|