Add rate limiting support to API keys

This commit is contained in:
Peter Stockings
2025-12-02 17:02:17 +11:00
parent 814691c235
commit d04b7f2120
5 changed files with 152 additions and 44 deletions

84
db.py
View File

@@ -182,47 +182,89 @@ ORDER BY invocation_time DESC""", [http_function_id])
def update_user_theme_preference(self, user_id, theme):
self.execute(
'UPDATE users SET theme_preference=%s WHERE id=%s', [theme, user_id], commit=True)
def get_http_function_history(self, function_id):
http_function_history = self.execute(
'SELECT version_id, http_function_id, script_content, version_number, updated_at FROM http_functions_versions WHERE http_function_id=%s ORDER BY version_number DESC', [function_id])
return http_function_history
def create_api_key(self, user_id, name, key, scopes):
self.execute(
'INSERT INTO api_keys (user_id, name, key, scopes) VALUES (%s, %s, %s, %s)',
[user_id, name, key, json.dumps(scopes)],
commit=True
)
def create_api_key(self, user_id, name, key, scopes, rate_limit_count=None, rate_limit_period=None):
new_key = self.execute(
'INSERT INTO api_keys (user_id, name, key, scopes, rate_limit_count, rate_limit_period) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id, user_id, name, key, scopes, created_at, last_used_at, rate_limit_count, rate_limit_period',
[user_id, name, key, json.dumps(scopes), rate_limit_count, rate_limit_period], commit=True, one=True)
return new_key
def get_api_key(self, key):
api_key = self.execute(
'SELECT id, user_id, name, key, scopes, created_at, last_used_at FROM api_keys WHERE key=%s',
[key],
one=True
)
'SELECT id, user_id, name, key, scopes, created_at, last_used_at, rate_limit_count, rate_limit_period, usage_count, usage_reset_at FROM api_keys WHERE key=%s', [key], one=True)
if api_key and api_key.get('scopes'):
if isinstance(api_key['scopes'], str):
api_key['scopes'] = json.loads(api_key['scopes'])
return api_key
def delete_api_key(self, user_id, key_id):
self.execute(
'DELETE FROM api_keys WHERE user_id=%s AND id=%s',
[user_id, key_id],
commit=True
)
'DELETE FROM api_keys WHERE user_id=%s AND id=%s', [user_id, key_id], commit=True)
def list_api_keys(self, user_id):
api_keys = self.execute(
'SELECT id, user_id, name, key, scopes, created_at, last_used_at FROM api_keys WHERE user_id=%s ORDER BY created_at DESC',
[user_id]
)
'SELECT id, user_id, name, key, scopes, created_at, last_used_at, rate_limit_count, rate_limit_period, usage_count, usage_reset_at FROM api_keys WHERE user_id=%s ORDER BY created_at DESC', [user_id])
return api_keys
def update_api_key_last_used(self, key_id):
self.execute(
'UPDATE api_keys SET last_used_at=NOW() WHERE id=%s',
[key_id],
commit=True
'UPDATE api_keys SET last_used_at=NOW() WHERE id=%s', [key_id], commit=True)
def check_and_increment_api_key_usage(self, key_id):
"""
Check if API key has exceeded rate limit and increment usage.
Returns True if allowed, False if rate limited.
"""
# Get current key data
key_data = self.execute(
'SELECT rate_limit_count, rate_limit_period, usage_count, usage_reset_at FROM api_keys WHERE id=%s',
[key_id], one=True
)
if not key_data or not key_data['rate_limit_count']:
return True # No limit set
limit = key_data['rate_limit_count']
period = key_data['rate_limit_period']
usage = key_data['usage_count']
reset_at = key_data['usage_reset_at']
import datetime
now = datetime.datetime.now()
# Check if we need to reset the counter
if not reset_at or now >= reset_at:
# Calculate new reset time
if period == 'minute':
new_reset = now + datetime.timedelta(minutes=1)
elif period == 'hour':
new_reset = now + datetime.timedelta(hours=1)
elif period == 'day':
new_reset = now + datetime.timedelta(days=1)
else:
new_reset = now + datetime.timedelta(minutes=1) # Default
# Reset usage and set new reset time
self.execute(
'UPDATE api_keys SET usage_count=1, usage_reset_at=%s, last_used_at=NOW() WHERE id=%s',
[new_reset, key_id], commit=True
)
return True
# Check limit
if usage >= limit:
return False
# Increment usage
self.execute(
'UPDATE api_keys SET usage_count=usage_count+1, last_used_at=NOW() WHERE id=%s',
[key_id], commit=True
)
return True
def export_user_data(self, user_id):
"""