import json import os import psycopg2 from psycopg2 import pool from psycopg2.extras import RealDictCursor from urllib.parse import urlparse from flask import g class DataBase(): def __init__(self): self.pool = None def init_app(self, app): db_url = urlparse(os.environ['DATABASE_URL']) if not db_url: raise Exception("No DATABASE_URL environment variable set") self.pool = psycopg2.pool.SimpleConnectionPool( 1, 20, database=db_url.path[1:], user=db_url.username, password=db_url.password, host=db_url.hostname, port=db_url.port ) app.teardown_appcontext(self.close_conn) def get_conn(self): if 'db_conn' not in g: g.db_conn = self.pool.getconn() return g.db_conn def close_conn(self, e=None): db_conn = g.pop('db_conn', None) if db_conn is not None: self.pool.putconn(db_conn) def close_all_connections(self): if self.pool: self.pool.closeall() def execute(self, query, args=(), one=False, commit=False): conn = self.get_conn() cur = conn.cursor(cursor_factory=RealDictCursor) try: cur.execute(query, args) rv = None if cur.description is not None: rv = cur.fetchall() if commit: conn.commit() return (rv[0] if rv else None) if one else rv except Exception as e: conn.rollback() raise e finally: cur.close() def get_http_functions_for_user(self, user_id, search_query=None): if search_query: search_pattern = f"%{search_query}%" http_functions = self.execute( 'SELECT id, user_id, NAME, path, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, runtime, description FROM http_functions WHERE user_id=%s AND (NAME ILIKE %s OR path ILIKE %s) ORDER by id DESC', [user_id, search_pattern, search_pattern] ) else: http_functions = self.execute( 'SELECT id, user_id, NAME, path, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, runtime, description FROM http_functions WHERE user_id=%s ORDER by id DESC', [user_id] ) return http_functions def get_public_http_functions(self, search_query=None): if search_query: search_pattern = f"%{search_query}%" http_functions = self.execute( 'SELECT h.id, h.user_id, h.NAME, h.path, h.script_content, h.invoked_count, h.environment_info, h.is_public, h.log_request, h.log_response, h.version_number, h.runtime, h.description, h.created_at, u.username FROM http_functions h JOIN users u ON h.user_id = u.id WHERE h.is_public=TRUE AND (h.NAME ILIKE %s OR h.description ILIKE %s) ORDER by h.created_at DESC', [search_pattern, search_pattern] ) else: http_functions = self.execute( 'SELECT h.id, h.user_id, h.NAME, h.path, h.script_content, h.invoked_count, h.environment_info, h.is_public, h.log_request, h.log_response, h.version_number, h.runtime, h.description, h.created_at, u.username FROM http_functions h JOIN users u ON h.user_id = u.id WHERE h.is_public=TRUE ORDER by h.created_at DESC' ) return http_functions def get_http_function(self, user_id, name): http_function = self.execute( 'SELECT id, user_id, NAME, path, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, created_at, runtime, description FROM http_functions WHERE user_id=%s AND NAME=%s', [user_id, name], one=True) return http_function def get_http_function_by_id(self, user_id, http_function_id): http_function = self.execute( 'SELECT id, user_id, NAME, path, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, created_at, runtime, description FROM http_functions WHERE user_id=%s AND id=%s', [user_id, http_function_id], one=True) return http_function def get_public_http_function_by_id(self, http_function_id): http_function = self.execute( 'SELECT h.id, h.user_id, h.NAME, h.path, h.script_content, h.invoked_count, h.environment_info, h.is_public, h.log_request, h.log_response, h.version_number, h.created_at, h.runtime, h.description, u.username FROM http_functions h JOIN users u ON h.user_id = u.id WHERE h.id=%s AND h.is_public=TRUE', [http_function_id], one=True) return http_function def create_new_http_function(self, user_id, name, path, script_content, environment_info, is_public, log_request, log_response, runtime, description=""): self.execute( 'INSERT INTO http_functions (user_id, NAME, path, script_content, environment_info, is_public, log_request, log_response, runtime, description) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', [user_id, name, path, script_content, environment_info, is_public, log_request, log_response, runtime, description], commit=True ) def edit_http_function(self, user_id, function_id, name, path, script_content, environment_info, is_public, log_request, log_response, runtime, description=""): updated_version = self.execute( 'UPDATE http_functions SET NAME=%s, path=%s, script_content=%s, environment_info=%s, is_public=%s, log_request=%s, log_response=%s, runtime=%s, description=%s WHERE user_id=%s AND id=%s RETURNING version_number', [name, path, script_content, environment_info, is_public, log_request, log_response, runtime, description, user_id, function_id], commit=True, one=True ) return updated_version def update_http_function_environment_info_and_invoked_count(self, user_id, name, environment_info): self.execute( 'UPDATE http_functions SET environment_info=%s, invoked_count = invoked_count + 1 WHERE user_id=%s AND NAME=%s', [json.dumps(environment_info), user_id, name], commit=True) def delete_http_function(self, user_id, function_id): self.execute( 'DELETE FROM http_functions WHERE user_id=%s AND id=%s', [user_id, function_id], commit=True) def add_http_function_invocation(self, http_function_id, status, request_data, response_data, logs, version_number, execution_time): self.execute( 'INSERT INTO http_function_invocations (http_function_id, status, request_data, response_data, logs, version_number, execution_time) VALUES (%s, %s, %s, %s, %s, %s, %s)', [http_function_id, status, json.dumps(request_data), json.dumps(response_data), json.dumps(logs), version_number, execution_time], commit=True) def get_http_function_invocations(self, http_function_id): http_function_invocations = self.execute( """SELECT id, http_function_id, STATUS, invocation_time, request_data, response_data, LOGS, version_number, execution_time FROM http_function_invocations WHERE http_function_id=%s ORDER BY invocation_time DESC""", [http_function_id]) return http_function_invocations def fork_http_function(self, user_id, function_id): # Get the original function original = self.execute( 'SELECT NAME, path, script_content, environment_info, runtime, description FROM http_functions WHERE id=%s', [function_id], one=True ) if not original: raise Exception("Function not found") new_name = original['name'] # Check if name exists for this user exists = self.execute('SELECT 1 FROM http_functions WHERE user_id=%s AND NAME=%s', [user_id, new_name], one=True) if exists: new_name = f"{new_name}-fork" self.create_new_http_function( user_id, new_name, original['path'], original['script_content'], original['environment_info'], False, # is_public True, # log_request False, # log_response original['runtime'], original['description'] ) return new_name def get_user(self, user_id): user = self.execute( 'SELECT id, username, password_hash, created_at, theme_preference FROM users WHERE id=%s', [int(user_id)], one=True) return user def get_user_by_username(self, username): user = self.execute( 'SELECT id, username, password_hash, created_at, theme_preference FROM users WHERE username=%s', [username], one=True) return user def create_new_user(self, username, password_hash): new_user = self.execute( 'INSERT INTO users (username, password_hash, theme_preference) VALUES (%s, %s, %s) RETURNING id, username, password_hash, created_at, theme_preference', [username, password_hash, 'light'], commit=True, one=True) return new_user def update_user_theme_preference(self, user_id, theme): self.execute( 'UPDATE users SET theme_preference=%s WHERE id=%s', [theme, user_id], commit=True) def get_http_function_history(self, function_id): http_function_history = self.execute( 'SELECT version_id, http_function_id, script_content, version_number, updated_at FROM http_functions_versions WHERE http_function_id=%s ORDER BY version_number DESC', [function_id]) return http_function_history def create_api_key(self, user_id, name, key, scopes): self.execute( 'INSERT INTO api_keys (user_id, name, key, scopes) VALUES (%s, %s, %s, %s)', [user_id, name, key, json.dumps(scopes)], commit=True ) def get_api_key(self, key): api_key = self.execute( 'SELECT id, user_id, name, key, scopes, created_at, last_used_at FROM api_keys WHERE key=%s', [key], one=True ) return api_key def delete_api_key(self, user_id, key_id): self.execute( 'DELETE FROM api_keys WHERE user_id=%s AND id=%s', [user_id, key_id], commit=True ) def list_api_keys(self, user_id): api_keys = self.execute( 'SELECT id, user_id, name, key, scopes, created_at, last_used_at FROM api_keys WHERE user_id=%s ORDER BY created_at DESC', [user_id] ) return api_keys def update_api_key_last_used(self, key_id): self.execute( 'UPDATE api_keys SET last_used_at=NOW() WHERE id=%s', [key_id], commit=True ) def export_user_data(self, user_id): """ Export all user data for backup/migration purposes. Returns a comprehensive dictionary with all user's data. """ export_data = {} # User profile user = self.get_user(user_id) if user: export_data['user'] = { 'id': user['id'], 'username': user['username'], 'created_at': user['created_at'].isoformat() if user.get('created_at') else None, 'theme_preference': user.get('theme_preference') } # HTTP Functions http_functions = self.execute( '''SELECT id, user_id, name, path, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, runtime, description, created_at FROM http_functions WHERE user_id=%s ORDER BY id''', [user_id] ) if http_functions: for func in http_functions: if func.get('created_at'): func['created_at'] = func['created_at'].isoformat() export_data['http_functions'] = http_functions or [] # Timer Functions timer_functions = self.execute( '''SELECT id, name, code, environment, version_number, user_id, trigger_type, frequency_minutes, run_date, cron_expression, next_run, last_run, enabled, invocation_count, runtime FROM timer_functions WHERE user_id=%s ORDER BY id''', [user_id] ) if timer_functions: for func in timer_functions: if func.get('run_date'): func['run_date'] = func['run_date'].isoformat() if func.get('next_run'): func['next_run'] = func['next_run'].isoformat() if func.get('last_run'): func['last_run'] = func['last_run'].isoformat() export_data['timer_functions'] = timer_functions or [] # Shared Environments shared_envs = self.execute( '''SELECT id, user_id, name, environment, description, created_at, updated_at, version_number FROM shared_environments WHERE user_id=%s ORDER BY name''', [user_id] ) if shared_envs: for env in shared_envs: if env.get('created_at'): env['created_at'] = env['created_at'].isoformat() if env.get('updated_at'): env['updated_at'] = env['updated_at'].isoformat() export_data['shared_environments'] = shared_envs or [] # API Keys (masked for security) api_keys = self.list_api_keys(user_id) if api_keys: for key in api_keys: # Only include partial key for security key['key'] = key['key'][:8] + '...' if 'key' in key else None if key.get('created_at'): key['created_at'] = key['created_at'].isoformat() if key.get('last_used_at'): key['last_used_at'] = key['last_used_at'].isoformat() export_data['api_keys'] = api_keys or [] # HTTP Function Invocations (limited to last 100 per function) http_invocations = [] if http_functions: for func in http_functions: invocations = self.execute( '''SELECT id, http_function_id, status, invocation_time, request_data, response_data, logs, version_number, execution_time FROM http_function_invocations WHERE http_function_id=%s ORDER BY invocation_time DESC LIMIT 100''', [func['id']] ) if invocations: for inv in invocations: if inv.get('invocation_time'): inv['invocation_time'] = inv['invocation_time'].isoformat() http_invocations.extend(invocations) export_data['http_function_invocations'] = http_invocations # Timer Function Invocations (limited to last 100 per function) timer_invocations = [] if timer_functions: for func in timer_functions: invocations = self.execute( '''SELECT id, timer_function_id, status, invocation_time, logs, version_number, execution_time FROM timer_function_invocations WHERE timer_function_id=%s ORDER BY invocation_time DESC LIMIT 100''', [func['id']] ) if invocations: for inv in invocations: if inv.get('invocation_time'): inv['invocation_time'] = inv['invocation_time'].isoformat() timer_invocations.extend(invocations) export_data['timer_function_invocations'] = timer_invocations # HTTP Function Version History http_versions = [] if http_functions: for func in http_functions: versions = self.get_http_function_history(func['id']) if versions: for ver in versions: if ver.get('updated_at'): ver['updated_at'] = ver['updated_at'].isoformat() http_versions.extend(versions) export_data['http_function_versions'] = http_versions # Timer Function Version History timer_versions = [] if timer_functions: for func in timer_functions: versions = self.execute( '''SELECT id as version_id, timer_function_id, script, version_number, versioned_at FROM timer_function_versions WHERE timer_function_id=%s ORDER BY version_number DESC''', [func['id']] ) if versions: for ver in versions: if ver.get('versioned_at'): ver['versioned_at'] = ver['versioned_at'].isoformat() timer_versions.extend(versions) export_data['timer_function_versions'] = timer_versions # Shared Environment Version History shared_env_versions = [] if shared_envs: for env in shared_envs: versions = self.execute( '''SELECT id as version_id, shared_env_id, environment, version_number, versioned_at FROM shared_environment_versions WHERE shared_env_id=%s ORDER BY version_number DESC''', [env['id']] ) if versions: for ver in versions: if ver.get('versioned_at'): ver['versioned_at'] = ver['versioned_at'].isoformat() shared_env_versions.extend(versions) export_data['shared_environment_versions'] = shared_env_versions # HTTP Function to Shared Environment Linkages http_shared_env_links = self.execute( '''SELECT hfse.id, hfse.http_function_id, hfse.shared_env_id, hfse.created_at FROM http_function_shared_envs hfse JOIN http_functions hf ON hfse.http_function_id = hf.id WHERE hf.user_id=%s''', [user_id] ) if http_shared_env_links: for link in http_shared_env_links: if link.get('created_at'): link['created_at'] = link['created_at'].isoformat() export_data['http_function_shared_env_links'] = http_shared_env_links or [] # Timer Function to Shared Environment Linkages timer_shared_env_links = self.execute( '''SELECT tfse.id, tfse.timer_function_id, tfse.shared_env_id, tfse.created_at FROM timer_function_shared_envs tfse JOIN timer_functions tf ON tfse.timer_function_id = tf.id WHERE tf.user_id=%s''', [user_id] ) if timer_shared_env_links: for link in timer_shared_env_links: if link.get('created_at'): link['created_at'] = link['created_at'].isoformat() export_data['timer_function_shared_env_links'] = timer_shared_env_links or [] return export_data def import_http_function(self, user_id, func_data): """Import a single HTTP function, returns (success, message, function_id)""" try: # Check if function with same name exists existing = self.execute( "SELECT id FROM http_functions WHERE user_id = %s AND name = %s", (user_id, func_data['name']), one=True ) if existing: return (False, f"Function '{func_data['name']}' already exists", None) # Insert the function result = self.execute( """INSERT INTO http_functions (name, code, environment, version_number, user_id, runtime) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id""", ( func_data['name'], func_data['code'], json.dumps(func_data.get('environment', {})), 1, # Start at version 1 user_id, func_data.get('runtime', 'python') ), one=True, commit=True ) return (True, f"Imported function '{func_data['name']}'", result['id']) except Exception as e: return (False, f"Error importing '{func_data.get('name', 'unknown')}': {str(e)}", None) def import_timer_function(self,user_id, func_data): """Import a single timer function, returns (success, message, function_id)""" try: # Check if function with same name exists existing = self.execute( "SELECT id FROM timer_functions WHERE user_id = %s AND name = %s", (user_id, func_data['name']), one=True ) if existing: return (False, f"Timer function '{func_data['name']}' already exists", None) # Calculate next_run based on trigger type from routes.timer import calculate_next_run next_run = calculate_next_run( func_data['trigger_type'], func_data.get('frequency_minutes'), func_data.get('run_date'), func_data.get('cron_expression') ) # Insert the function result = self.execute( """INSERT INTO timer_functions (name, code, environment, version_number, user_id, runtime, trigger_type, frequency_minutes, run_date, cron_expression, next_run, enabled) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id""", ( func_data['name'], func_data['code'], json.dumps(func_data.get('environment', {})), 1, # Start at version 1 user_id, func_data.get('runtime', 'python'), func_data['trigger_type'], func_data.get('frequency_minutes'), func_data.get('run_date'), func_data.get('cron_expression'), next_run, func_data.get('enabled', True) ), one=True, commit=True ) return (True, f"Imported timer function '{func_data['name']}'", result['id']) except Exception as e: return (False, f"Error importing timer '{func_data.get('name', 'unknown')}': {str(e)}", None) def import_shared_environment(self, user_id, env_data): """Import a single shared environment, returns (success, message, env_id)""" try: # Check if environment with same name exists existing = self.execute( "SELECT id FROM shared_environments WHERE user_id = %s AND name = %s", (user_id, env_data['name']), one=True ) if existing: return (False, f"Shared environment '{env_data['name']}' already exists", None) # Insert the environment result = self.execute( """INSERT INTO shared_environments (name, environment, user_id, version_number) VALUES (%s, %s, %s, %s) RETURNING id""", ( env_data['name'], json.dumps(env_data.get('environment', {})), user_id, 1 # Start at version 1 ), one=True, commit=True ) return (True, f"Imported shared environment '{env_data['name']}'", result['id']) except Exception as e: return (False, f"Error importing environment '{env_data.get('name', 'unknown')}': {str(e)}", None) def record_login(self, user_id, ip_address, user_agent, success=True, failure_reason=None): """Record a login attempt""" try: self.execute( """INSERT INTO login_history (user_id, ip_address, user_agent, success, failure_reason) VALUES (%s, %s, %s, %s, %s)""", (user_id, ip_address, user_agent, success, failure_reason), commit=True ) return True except Exception as e: print(f"Error recording login: {e}") return False def get_login_history(self, user_id, limit=50): """Get login history for a user""" return self.execute( """SELECT id, login_time, ip_address, user_agent, success, failure_reason FROM login_history WHERE user_id = %s ORDER BY login_time DESC LIMIT %s""", (user_id, limit) ) def update_user_password(self, user_id, new_password_hash): """Update user's password""" self.execute( 'UPDATE users SET password_hash=%s WHERE id=%s', [new_password_hash, user_id], commit=True ) def update_user_email(self, user_id, new_email): """Update user's email address""" self.execute( 'UPDATE users SET email=%s WHERE id=%s', [new_email, user_id], commit=True ) def delete_user(self, user_id): """Delete a user account and all associated data""" self.execute( 'DELETE FROM users WHERE id=%s', [user_id], commit=True )