700 lines
32 KiB
Python
700 lines
32 KiB
Python
import json
|
|
import os
|
|
import psycopg2
|
|
from psycopg2 import pool
|
|
from psycopg2.extras import RealDictCursor
|
|
from urllib.parse import urlparse
|
|
from flask import g
|
|
|
|
class DataBase():
|
|
def __init__(self):
|
|
self.pool = None
|
|
|
|
def init_app(self, app):
|
|
db_url = urlparse(os.environ['DATABASE_URL'])
|
|
if not db_url:
|
|
raise Exception("No DATABASE_URL environment variable set")
|
|
|
|
self.pool = psycopg2.pool.SimpleConnectionPool(
|
|
1, 20,
|
|
database=db_url.path[1:],
|
|
user=db_url.username,
|
|
password=db_url.password,
|
|
host=db_url.hostname,
|
|
port=db_url.port
|
|
)
|
|
|
|
app.teardown_appcontext(self.close_conn)
|
|
|
|
def get_conn(self):
|
|
if 'db_conn' not in g:
|
|
g.db_conn = self.pool.getconn()
|
|
return g.db_conn
|
|
|
|
def close_conn(self, e=None):
|
|
db_conn = g.pop('db_conn', None)
|
|
if db_conn is not None:
|
|
self.pool.putconn(db_conn)
|
|
|
|
def close_all_connections(self):
|
|
if self.pool:
|
|
self.pool.closeall()
|
|
|
|
def execute(self, query, args=(), one=False, commit=False):
|
|
conn = self.get_conn()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
try:
|
|
cur.execute(query, args)
|
|
rv = None
|
|
if cur.description is not None:
|
|
rv = cur.fetchall()
|
|
if commit:
|
|
conn.commit()
|
|
return (rv[0] if rv else None) if one else rv
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise e
|
|
finally:
|
|
cur.close()
|
|
|
|
def get_http_functions_for_user(self, user_id, search_query=None):
|
|
if search_query:
|
|
search_pattern = f"%{search_query}%"
|
|
http_functions = self.execute(
|
|
'SELECT id, user_id, NAME, path, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, runtime, description FROM http_functions WHERE user_id=%s AND (NAME ILIKE %s OR path ILIKE %s) ORDER by id DESC',
|
|
[user_id, search_pattern, search_pattern]
|
|
)
|
|
else:
|
|
http_functions = self.execute(
|
|
'SELECT id, user_id, NAME, path, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, runtime, description FROM http_functions WHERE user_id=%s ORDER by id DESC',
|
|
[user_id]
|
|
)
|
|
return http_functions
|
|
|
|
def get_public_http_functions(self, search_query=None):
|
|
if search_query:
|
|
search_pattern = f"%{search_query}%"
|
|
http_functions = self.execute(
|
|
'SELECT h.id, h.user_id, h.NAME, h.path, h.script_content, h.invoked_count, h.environment_info, h.is_public, h.log_request, h.log_response, h.version_number, h.runtime, h.description, h.created_at, u.username FROM http_functions h JOIN users u ON h.user_id = u.id WHERE h.is_public=TRUE AND (h.NAME ILIKE %s OR h.description ILIKE %s) ORDER by h.created_at DESC',
|
|
[search_pattern, search_pattern]
|
|
)
|
|
else:
|
|
http_functions = self.execute(
|
|
'SELECT h.id, h.user_id, h.NAME, h.path, h.script_content, h.invoked_count, h.environment_info, h.is_public, h.log_request, h.log_response, h.version_number, h.runtime, h.description, h.created_at, u.username FROM http_functions h JOIN users u ON h.user_id = u.id WHERE h.is_public=TRUE ORDER by h.created_at DESC'
|
|
)
|
|
return http_functions
|
|
|
|
def get_http_function(self, user_id, name):
|
|
http_function = self.execute(
|
|
'SELECT id, user_id, NAME, path, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, created_at, runtime, description FROM http_functions WHERE user_id=%s AND NAME=%s', [user_id, name], one=True)
|
|
return http_function
|
|
|
|
def get_http_function_by_id(self, user_id, http_function_id):
|
|
http_function = self.execute(
|
|
'SELECT id, user_id, NAME, path, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, created_at, runtime, description FROM http_functions WHERE user_id=%s AND id=%s', [user_id, http_function_id], one=True)
|
|
return http_function
|
|
|
|
def get_public_http_function_by_id(self, http_function_id):
|
|
http_function = self.execute(
|
|
'SELECT h.id, h.user_id, h.NAME, h.path, h.script_content, h.invoked_count, h.environment_info, h.is_public, h.log_request, h.log_response, h.version_number, h.created_at, h.runtime, h.description, u.username FROM http_functions h JOIN users u ON h.user_id = u.id WHERE h.id=%s AND h.is_public=TRUE', [http_function_id], one=True)
|
|
return http_function
|
|
|
|
def create_new_http_function(self, user_id, name, path, script_content, environment_info, is_public, log_request, log_response, runtime, description=""):
|
|
self.execute(
|
|
'INSERT INTO http_functions (user_id, NAME, path, script_content, environment_info, is_public, log_request, log_response, runtime, description) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)',
|
|
[user_id, name, path, script_content, environment_info, is_public, log_request, log_response, runtime, description],
|
|
commit=True
|
|
)
|
|
|
|
def edit_http_function(self, user_id, function_id, name, path, script_content, environment_info, is_public, log_request, log_response, runtime, description=""):
|
|
updated_version = self.execute(
|
|
'UPDATE http_functions SET NAME=%s, path=%s, script_content=%s, environment_info=%s, is_public=%s, log_request=%s, log_response=%s, runtime=%s, description=%s WHERE user_id=%s AND id=%s RETURNING version_number',
|
|
[name, path, script_content, environment_info, is_public, log_request, log_response, runtime, description, user_id, function_id],
|
|
commit=True, one=True
|
|
)
|
|
return updated_version
|
|
|
|
def update_http_function_environment_info_and_invoked_count(self, user_id, name, environment_info):
|
|
self.execute(
|
|
'UPDATE http_functions SET environment_info=%s, invoked_count = invoked_count + 1 WHERE user_id=%s AND NAME=%s', [json.dumps(environment_info), user_id, name], commit=True)
|
|
|
|
def delete_http_function(self, user_id, function_id):
|
|
self.execute(
|
|
'DELETE FROM http_functions WHERE user_id=%s AND id=%s', [user_id, function_id], commit=True)
|
|
|
|
def add_http_function_invocation(self, http_function_id, status, request_data, response_data, logs, version_number, execution_time):
|
|
self.execute(
|
|
'INSERT INTO http_function_invocations (http_function_id, status, request_data, response_data, logs, version_number, execution_time) VALUES (%s, %s, %s, %s, %s, %s, %s)', [http_function_id, status, json.dumps(request_data), json.dumps(response_data), json.dumps(logs), version_number, execution_time], commit=True)
|
|
|
|
def get_http_function_invocations(self, http_function_id):
|
|
http_function_invocations = self.execute(
|
|
"""SELECT id, http_function_id, STATUS, invocation_time, request_data, response_data, LOGS, version_number, execution_time
|
|
FROM http_function_invocations
|
|
WHERE http_function_id=%s
|
|
ORDER BY invocation_time DESC""", [http_function_id])
|
|
return http_function_invocations
|
|
|
|
def fork_http_function(self, user_id, function_id):
|
|
# Get the original function
|
|
original = self.execute(
|
|
'SELECT NAME, path, script_content, environment_info, runtime, description FROM http_functions WHERE id=%s',
|
|
[function_id],
|
|
one=True
|
|
)
|
|
if not original:
|
|
raise Exception("Function not found")
|
|
|
|
new_name = original['name']
|
|
# Check if name exists for this user
|
|
exists = self.execute('SELECT 1 FROM http_functions WHERE user_id=%s AND NAME=%s', [user_id, new_name], one=True)
|
|
if exists:
|
|
new_name = f"{new_name}-fork"
|
|
|
|
self.create_new_http_function(
|
|
user_id,
|
|
new_name,
|
|
original['path'],
|
|
original['script_content'],
|
|
original['environment_info'],
|
|
False, # is_public
|
|
True, # log_request
|
|
False, # log_response
|
|
original['runtime'],
|
|
original['description']
|
|
)
|
|
return new_name
|
|
|
|
def get_user(self, user_id):
|
|
user = self.execute(
|
|
'SELECT id, username, password_hash, created_at, theme_preference FROM users WHERE id=%s', [int(user_id)], one=True)
|
|
return user
|
|
|
|
def get_user_by_username(self, username):
|
|
user = self.execute(
|
|
'SELECT id, username, password_hash, created_at, theme_preference FROM users WHERE username=%s', [username], one=True)
|
|
return user
|
|
|
|
def create_new_user(self, username, password_hash):
|
|
new_user = self.execute(
|
|
'INSERT INTO users (username, password_hash, theme_preference) VALUES (%s, %s, %s) RETURNING id, username, password_hash, created_at, theme_preference', [username, password_hash, 'light'], commit=True, one=True)
|
|
return new_user
|
|
|
|
def update_user_theme_preference(self, user_id, theme):
|
|
self.execute(
|
|
'UPDATE users SET theme_preference=%s WHERE id=%s', [theme, user_id], commit=True)
|
|
def get_http_function_history(self, function_id):
|
|
http_function_history = self.execute(
|
|
'SELECT version_id, http_function_id, script_content, version_number, updated_at, commit_message FROM http_functions_versions WHERE http_function_id=%s ORDER BY version_number DESC', [function_id])
|
|
return http_function_history
|
|
|
|
def create_api_key(self, user_id, name, key, scopes, rate_limit_count=None, rate_limit_period=None):
|
|
new_key = self.execute(
|
|
'INSERT INTO api_keys (user_id, name, key, scopes, rate_limit_count, rate_limit_period) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id, user_id, name, key, scopes, created_at, last_used_at, rate_limit_count, rate_limit_period',
|
|
[user_id, name, key, json.dumps(scopes), rate_limit_count, rate_limit_period], commit=True, one=True)
|
|
return new_key
|
|
|
|
def get_api_key(self, key):
|
|
api_key = self.execute(
|
|
'SELECT id, user_id, name, key, scopes, created_at, last_used_at, rate_limit_count, rate_limit_period, usage_count, usage_reset_at FROM api_keys WHERE key=%s', [key], one=True)
|
|
if api_key and api_key.get('scopes'):
|
|
if isinstance(api_key['scopes'], str):
|
|
api_key['scopes'] = json.loads(api_key['scopes'])
|
|
return api_key
|
|
|
|
def delete_api_key(self, user_id, key_id):
|
|
self.execute(
|
|
'DELETE FROM api_keys WHERE user_id=%s AND id=%s', [user_id, key_id], commit=True)
|
|
|
|
def list_api_keys(self, user_id):
|
|
api_keys = self.execute(
|
|
'SELECT id, user_id, name, key, scopes, created_at, last_used_at, rate_limit_count, rate_limit_period, usage_count, usage_reset_at FROM api_keys WHERE user_id=%s ORDER BY created_at DESC', [user_id])
|
|
return api_keys
|
|
|
|
def update_api_key_last_used(self, key_id):
|
|
self.execute(
|
|
'UPDATE api_keys SET last_used_at=NOW() WHERE id=%s', [key_id], commit=True)
|
|
|
|
def check_and_increment_api_key_usage(self, key_id):
|
|
"""
|
|
Check if API key has exceeded rate limit and increment usage.
|
|
Returns True if allowed, False if rate limited.
|
|
"""
|
|
# Get current key data
|
|
key_data = self.execute(
|
|
'SELECT rate_limit_count, rate_limit_period, usage_count, usage_reset_at FROM api_keys WHERE id=%s',
|
|
[key_id], one=True
|
|
)
|
|
|
|
if not key_data or not key_data['rate_limit_count']:
|
|
return True # No limit set
|
|
|
|
limit = key_data['rate_limit_count']
|
|
period = key_data['rate_limit_period']
|
|
usage = key_data['usage_count']
|
|
reset_at = key_data['usage_reset_at']
|
|
|
|
import datetime
|
|
now = datetime.datetime.now()
|
|
|
|
# Check if we need to reset the counter
|
|
if not reset_at or now >= reset_at:
|
|
# Calculate new reset time
|
|
if period == 'minute':
|
|
new_reset = now + datetime.timedelta(minutes=1)
|
|
elif period == 'hour':
|
|
new_reset = now + datetime.timedelta(hours=1)
|
|
elif period == 'day':
|
|
new_reset = now + datetime.timedelta(days=1)
|
|
else:
|
|
new_reset = now + datetime.timedelta(minutes=1) # Default
|
|
|
|
# Reset usage and set new reset time
|
|
self.execute(
|
|
'UPDATE api_keys SET usage_count=1, usage_reset_at=%s, last_used_at=NOW() WHERE id=%s',
|
|
[new_reset, key_id], commit=True
|
|
)
|
|
return True
|
|
|
|
# Check limit
|
|
if usage >= limit:
|
|
return False
|
|
|
|
# Increment usage
|
|
self.execute(
|
|
'UPDATE api_keys SET usage_count=usage_count+1, last_used_at=NOW() WHERE id=%s',
|
|
[key_id], commit=True
|
|
)
|
|
return True
|
|
|
|
def export_user_data(self, user_id):
|
|
"""
|
|
Export all user data for backup/migration purposes.
|
|
Returns a comprehensive dictionary with all user's data.
|
|
"""
|
|
export_data = {}
|
|
|
|
# User profile
|
|
user = self.get_user(user_id)
|
|
if user:
|
|
export_data['user'] = {
|
|
'id': user['id'],
|
|
'username': user['username'],
|
|
'created_at': user['created_at'].isoformat() if user.get('created_at') else None,
|
|
'theme_preference': user.get('theme_preference')
|
|
}
|
|
|
|
# HTTP Functions
|
|
http_functions = self.execute(
|
|
'''SELECT id, user_id, name, path, script_content, invoked_count,
|
|
environment_info, is_public, log_request, log_response,
|
|
version_number, runtime, description, created_at
|
|
FROM http_functions
|
|
WHERE user_id=%s
|
|
ORDER BY id''',
|
|
[user_id]
|
|
)
|
|
|
|
if http_functions:
|
|
for func in http_functions:
|
|
if func.get('created_at'):
|
|
func['created_at'] = func['created_at'].isoformat()
|
|
export_data['http_functions'] = http_functions or []
|
|
|
|
# Timer Functions
|
|
timer_functions = self.execute(
|
|
'''SELECT id, name, code, environment, version_number, user_id,
|
|
trigger_type, frequency_minutes, run_date, cron_expression,
|
|
next_run, last_run, enabled, invocation_count, runtime
|
|
FROM timer_functions
|
|
WHERE user_id=%s
|
|
ORDER BY id''',
|
|
[user_id]
|
|
)
|
|
|
|
if timer_functions:
|
|
for func in timer_functions:
|
|
if func.get('run_date'):
|
|
func['run_date'] = func['run_date'].isoformat()
|
|
if func.get('next_run'):
|
|
func['next_run'] = func['next_run'].isoformat()
|
|
if func.get('last_run'):
|
|
func['last_run'] = func['last_run'].isoformat()
|
|
export_data['timer_functions'] = timer_functions or []
|
|
|
|
# Shared Environments
|
|
shared_envs = self.execute(
|
|
'''SELECT id, user_id, name, environment, description,
|
|
created_at, updated_at, version_number
|
|
FROM shared_environments
|
|
WHERE user_id=%s
|
|
ORDER BY name''',
|
|
[user_id]
|
|
)
|
|
|
|
if shared_envs:
|
|
for env in shared_envs:
|
|
if env.get('created_at'):
|
|
env['created_at'] = env['created_at'].isoformat()
|
|
if env.get('updated_at'):
|
|
env['updated_at'] = env['updated_at'].isoformat()
|
|
export_data['shared_environments'] = shared_envs or []
|
|
|
|
# API Keys (masked for security)
|
|
api_keys = self.list_api_keys(user_id)
|
|
if api_keys:
|
|
for key in api_keys:
|
|
# Only include partial key for security
|
|
key['key'] = key['key'][:8] + '...' if 'key' in key else None
|
|
if key.get('created_at'):
|
|
key['created_at'] = key['created_at'].isoformat()
|
|
if key.get('last_used_at'):
|
|
key['last_used_at'] = key['last_used_at'].isoformat()
|
|
export_data['api_keys'] = api_keys or []
|
|
|
|
# HTTP Function Invocations (limited to last 100 per function)
|
|
http_invocations = []
|
|
if http_functions:
|
|
for func in http_functions:
|
|
invocations = self.execute(
|
|
'''SELECT id, http_function_id, status, invocation_time,
|
|
request_data, response_data, logs, version_number, execution_time
|
|
FROM http_function_invocations
|
|
WHERE http_function_id=%s
|
|
ORDER BY invocation_time DESC
|
|
LIMIT 100''',
|
|
[func['id']]
|
|
)
|
|
if invocations:
|
|
for inv in invocations:
|
|
if inv.get('invocation_time'):
|
|
inv['invocation_time'] = inv['invocation_time'].isoformat()
|
|
http_invocations.extend(invocations)
|
|
export_data['http_function_invocations'] = http_invocations
|
|
|
|
# Timer Function Invocations (limited to last 100 per function)
|
|
timer_invocations = []
|
|
if timer_functions:
|
|
for func in timer_functions:
|
|
invocations = self.execute(
|
|
'''SELECT id, timer_function_id, status, invocation_time,
|
|
logs, version_number, execution_time
|
|
FROM timer_function_invocations
|
|
WHERE timer_function_id=%s
|
|
ORDER BY invocation_time DESC
|
|
LIMIT 100''',
|
|
[func['id']]
|
|
)
|
|
if invocations:
|
|
for inv in invocations:
|
|
if inv.get('invocation_time'):
|
|
inv['invocation_time'] = inv['invocation_time'].isoformat()
|
|
timer_invocations.extend(invocations)
|
|
export_data['timer_function_invocations'] = timer_invocations
|
|
|
|
# HTTP Function Version History
|
|
http_versions = []
|
|
if http_functions:
|
|
for func in http_functions:
|
|
versions = self.get_http_function_history(func['id'])
|
|
if versions:
|
|
for ver in versions:
|
|
if ver.get('updated_at'):
|
|
ver['updated_at'] = ver['updated_at'].isoformat()
|
|
http_versions.extend(versions)
|
|
export_data['http_function_versions'] = http_versions
|
|
|
|
# Timer Function Version History
|
|
timer_versions = []
|
|
if timer_functions:
|
|
for func in timer_functions:
|
|
versions = self.execute(
|
|
'''SELECT id as version_id, timer_function_id, script,
|
|
version_number, versioned_at
|
|
FROM timer_function_versions
|
|
WHERE timer_function_id=%s
|
|
ORDER BY version_number DESC''',
|
|
[func['id']]
|
|
)
|
|
if versions:
|
|
for ver in versions:
|
|
if ver.get('versioned_at'):
|
|
ver['versioned_at'] = ver['versioned_at'].isoformat()
|
|
timer_versions.extend(versions)
|
|
export_data['timer_function_versions'] = timer_versions
|
|
|
|
# Shared Environment Version History
|
|
shared_env_versions = []
|
|
if shared_envs:
|
|
for env in shared_envs:
|
|
versions = self.execute(
|
|
'''SELECT id as version_id, shared_env_id, environment,
|
|
version_number, versioned_at
|
|
FROM shared_environment_versions
|
|
WHERE shared_env_id=%s
|
|
ORDER BY version_number DESC''',
|
|
[env['id']]
|
|
)
|
|
if versions:
|
|
for ver in versions:
|
|
if ver.get('versioned_at'):
|
|
ver['versioned_at'] = ver['versioned_at'].isoformat()
|
|
shared_env_versions.extend(versions)
|
|
export_data['shared_environment_versions'] = shared_env_versions
|
|
|
|
# HTTP Function to Shared Environment Linkages
|
|
http_shared_env_links = self.execute(
|
|
'''SELECT hfse.id, hfse.http_function_id, hfse.shared_env_id, hfse.created_at
|
|
FROM http_function_shared_envs hfse
|
|
JOIN http_functions hf ON hfse.http_function_id = hf.id
|
|
WHERE hf.user_id=%s''',
|
|
[user_id]
|
|
)
|
|
if http_shared_env_links:
|
|
for link in http_shared_env_links:
|
|
if link.get('created_at'):
|
|
link['created_at'] = link['created_at'].isoformat()
|
|
export_data['http_function_shared_env_links'] = http_shared_env_links or []
|
|
|
|
# Timer Function to Shared Environment Linkages
|
|
timer_shared_env_links = self.execute(
|
|
'''SELECT tfse.id, tfse.timer_function_id, tfse.shared_env_id, tfse.created_at
|
|
FROM timer_function_shared_envs tfse
|
|
JOIN timer_functions tf ON tfse.timer_function_id = tf.id
|
|
WHERE tf.user_id=%s''',
|
|
[user_id]
|
|
)
|
|
if timer_shared_env_links:
|
|
for link in timer_shared_env_links:
|
|
if link.get('created_at'):
|
|
link['created_at'] = link['created_at'].isoformat()
|
|
export_data['timer_function_shared_env_links'] = timer_shared_env_links or []
|
|
|
|
return export_data
|
|
|
|
def import_http_function(self, user_id, func_data):
|
|
"""Import a single HTTP function, returns (success, message, function_id)"""
|
|
try:
|
|
# Check if function with same name exists
|
|
existing = self.execute(
|
|
"SELECT id FROM http_functions WHERE user_id = %s AND name = %s",
|
|
(user_id, func_data['name']),
|
|
one=True
|
|
)
|
|
|
|
if existing:
|
|
return (False, f"Function '{func_data['name']}' already exists", None)
|
|
|
|
# Insert the function
|
|
result = self.execute(
|
|
"""INSERT INTO http_functions
|
|
(name, code, environment, version_number, user_id, runtime)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
RETURNING id""",
|
|
(
|
|
func_data['name'],
|
|
func_data['code'],
|
|
json.dumps(func_data.get('environment', {})),
|
|
1, # Start at version 1
|
|
user_id,
|
|
func_data.get('runtime', 'python')
|
|
),
|
|
one=True,
|
|
commit=True
|
|
)
|
|
|
|
return (True, f"Imported function '{func_data['name']}'", result['id'])
|
|
except Exception as e:
|
|
return (False, f"Error importing '{func_data.get('name', 'unknown')}': {str(e)}", None)
|
|
|
|
def import_timer_function(self,user_id, func_data):
|
|
"""Import a single timer function, returns (success, message, function_id)"""
|
|
try:
|
|
# Check if function with same name exists
|
|
existing = self.execute(
|
|
"SELECT id FROM timer_functions WHERE user_id = %s AND name = %s",
|
|
(user_id, func_data['name']),
|
|
one=True
|
|
)
|
|
|
|
if existing:
|
|
return (False, f"Timer function '{func_data['name']}' already exists", None)
|
|
|
|
# Calculate next_run based on trigger type
|
|
from routes.timer import calculate_next_run
|
|
next_run = calculate_next_run(
|
|
func_data['trigger_type'],
|
|
func_data.get('frequency_minutes'),
|
|
func_data.get('run_date'),
|
|
func_data.get('cron_expression')
|
|
)
|
|
|
|
# Insert the function
|
|
result = self.execute(
|
|
"""INSERT INTO timer_functions
|
|
(name, code, environment, version_number, user_id, runtime,
|
|
trigger_type, frequency_minutes, run_date, cron_expression,
|
|
next_run, enabled)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id""",
|
|
(
|
|
func_data['name'],
|
|
func_data['code'],
|
|
json.dumps(func_data.get('environment', {})),
|
|
1, # Start at version 1
|
|
user_id,
|
|
func_data.get('runtime', 'python'),
|
|
func_data['trigger_type'],
|
|
func_data.get('frequency_minutes'),
|
|
func_data.get('run_date'),
|
|
func_data.get('cron_expression'),
|
|
next_run,
|
|
func_data.get('enabled', True)
|
|
),
|
|
one=True,
|
|
commit=True
|
|
)
|
|
|
|
return (True, f"Imported timer function '{func_data['name']}'", result['id'])
|
|
except Exception as e:
|
|
return (False, f"Error importing timer '{func_data.get('name', 'unknown')}': {str(e)}", None)
|
|
|
|
def import_shared_environment(self, user_id, env_data):
|
|
"""Import a single shared environment, returns (success, message, env_id)"""
|
|
try:
|
|
# Check if environment with same name exists
|
|
existing = self.execute(
|
|
"SELECT id FROM shared_environments WHERE user_id = %s AND name = %s",
|
|
(user_id, env_data['name']),
|
|
one=True
|
|
)
|
|
|
|
if existing:
|
|
return (False, f"Shared environment '{env_data['name']}' already exists", None)
|
|
|
|
# Insert the environment
|
|
result = self.execute(
|
|
"""INSERT INTO shared_environments
|
|
(name, environment, user_id, version_number)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id""",
|
|
(
|
|
env_data['name'],
|
|
json.dumps(env_data.get('environment', {})),
|
|
user_id,
|
|
1 # Start at version 1
|
|
),
|
|
one=True,
|
|
commit=True
|
|
)
|
|
|
|
return (True, f"Imported shared environment '{env_data['name']}'", result['id'])
|
|
except Exception as e:
|
|
return (False, f"Error importing environment '{env_data.get('name', 'unknown')}': {str(e)}", None)
|
|
|
|
def record_login(self, user_id, ip_address, user_agent, success=True, failure_reason=None):
|
|
"""Record a login attempt"""
|
|
try:
|
|
self.execute(
|
|
"""INSERT INTO login_history
|
|
(user_id, ip_address, user_agent, success, failure_reason)
|
|
VALUES (%s, %s, %s, %s, %s)""",
|
|
(user_id, ip_address, user_agent, success, failure_reason),
|
|
commit=True
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error recording login: {e}")
|
|
return False
|
|
|
|
def get_login_history(self, user_id, limit=50):
|
|
"""Get login history for a user"""
|
|
return self.execute(
|
|
"""SELECT id, login_time, ip_address, user_agent, success, failure_reason
|
|
FROM login_history
|
|
WHERE user_id = %s
|
|
ORDER BY login_time DESC
|
|
LIMIT %s""",
|
|
(user_id, limit)
|
|
)
|
|
|
|
def update_user_password(self, user_id, new_password_hash):
|
|
"""Update user's password"""
|
|
self.execute(
|
|
'UPDATE users SET password_hash=%s WHERE id=%s',
|
|
[new_password_hash, user_id],
|
|
commit=True
|
|
)
|
|
|
|
def update_user_email(self, user_id, new_email):
|
|
"""Update user's email address"""
|
|
self.execute(
|
|
'UPDATE users SET email=%s WHERE id=%s',
|
|
[new_email, user_id],
|
|
commit=True
|
|
)
|
|
|
|
def delete_user(self, user_id):
|
|
"""Delete a user account and all associated data"""
|
|
self.execute(
|
|
'DELETE FROM users WHERE id=%s',
|
|
[user_id],
|
|
commit=True
|
|
)
|
|
|
|
# Function Testing Methods
|
|
def create_function_test(self, http_function_id, name, description, request_method, request_headers, request_body, expected_status, expected_output, assertions=None):
|
|
"""Create a new test case for a function"""
|
|
test = self.execute(
|
|
'''INSERT INTO http_function_tests
|
|
(http_function_id, name, description, request_method, request_headers, request_body, expected_status, expected_output, assertions)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id, http_function_id, name, description, request_method, request_headers, request_body, expected_status, expected_output, assertions, created_at, updated_at''',
|
|
[http_function_id, name, description, request_method, json.dumps(request_headers), json.dumps(request_body), expected_status, json.dumps(expected_output) if expected_output else None, json.dumps(assertions) if assertions else '[]'],
|
|
commit=True,
|
|
one=True
|
|
)
|
|
return test
|
|
|
|
def get_function_tests(self, http_function_id):
|
|
"""Get all test cases for a function"""
|
|
tests = self.execute(
|
|
'''SELECT id, http_function_id, name, description, request_method, request_headers, request_body, expected_status, expected_output, assertions, created_at, updated_at
|
|
FROM http_function_tests
|
|
WHERE http_function_id = %s
|
|
ORDER BY created_at DESC''',
|
|
[http_function_id]
|
|
)
|
|
return tests if tests else []
|
|
|
|
def get_function_test(self, test_id):
|
|
"""Get a single test case"""
|
|
test = self.execute(
|
|
'''SELECT id, http_function_id, name, description, request_method, request_headers, request_body, expected_status, expected_output, assertions, created_at, updated_at
|
|
FROM http_function_tests
|
|
WHERE id = %s''',
|
|
[test_id],
|
|
one=True
|
|
)
|
|
return test
|
|
|
|
def update_function_test(self, test_id, name, description, request_method, request_headers, request_body, expected_status, expected_output, assertions=None):
|
|
"""Update an existing test case"""
|
|
test = self.execute(
|
|
'''UPDATE http_function_tests
|
|
SET name = %s, description = %s, request_method = %s, request_headers = %s, request_body = %s, expected_status = %s, expected_output = %s, assertions = %s, updated_at = NOW()
|
|
WHERE id = %s
|
|
RETURNING id, http_function_id, name, description, request_method, request_headers, request_body, expected_status, expected_output, assertions, created_at, updated_at''',
|
|
[name, description, request_method, json.dumps(request_headers), json.dumps(request_body), expected_status, json.dumps(expected_output) if expected_output else None, json.dumps(assertions) if assertions else '[]', test_id],
|
|
commit=True,
|
|
one=True
|
|
)
|
|
return test
|
|
|
|
def delete_function_test(self, test_id):
|
|
"""Delete a test case"""
|
|
self.execute(
|
|
'DELETE FROM http_function_tests WHERE id = %s',
|
|
[test_id],
|
|
commit=True
|
|
)
|