128 lines
6.0 KiB
Python
128 lines
6.0 KiB
Python
import json
|
|
import os
|
|
import psycopg2
|
|
from psycopg2 import pool
|
|
from psycopg2.extras import RealDictCursor
|
|
from urllib.parse import urlparse
|
|
from flask import g
|
|
|
|
class DataBase():
|
|
def __init__(self):
|
|
self.pool = None
|
|
|
|
def init_app(self, app):
|
|
db_url = urlparse(os.environ['DATABASE_URL'])
|
|
if not db_url:
|
|
raise Exception("No DATABASE_URL environment variable set")
|
|
|
|
self.pool = psycopg2.pool.SimpleConnectionPool(
|
|
1, 20,
|
|
database=db_url.path[1:],
|
|
user=db_url.username,
|
|
password=db_url.password,
|
|
host=db_url.hostname,
|
|
port=db_url.port
|
|
)
|
|
|
|
app.teardown_appcontext(self.close_conn)
|
|
|
|
def get_conn(self):
|
|
if 'db_conn' not in g:
|
|
g.db_conn = self.pool.getconn()
|
|
return g.db_conn
|
|
|
|
def close_conn(self, e=None):
|
|
db_conn = g.pop('db_conn', None)
|
|
if db_conn is not None:
|
|
self.pool.putconn(db_conn)
|
|
|
|
def close_all_connections(self):
|
|
if self.pool:
|
|
self.pool.closeall()
|
|
|
|
def execute(self, query, args=(), one=False, commit=False):
|
|
conn = self.get_conn()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
try:
|
|
cur.execute(query, args)
|
|
rv = None
|
|
if cur.description is not None:
|
|
rv = cur.fetchall()
|
|
if commit:
|
|
conn.commit()
|
|
return (rv[0] if rv else None) if one else rv
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise e
|
|
finally:
|
|
cur.close()
|
|
|
|
def get_http_functions_for_user(self, user_id):
|
|
http_functions = self.execute(
|
|
'SELECT id, user_id, NAME, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number FROM http_functions WHERE user_id=%s ORDER by id DESC', [user_id])
|
|
return http_functions
|
|
|
|
def get_http_function(self, user_id, name):
|
|
http_function = self.execute(
|
|
'SELECT id, user_id, NAME, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, created_at FROM http_functions WHERE user_id=%s AND NAME=%s', [user_id, name], one=True)
|
|
return http_function
|
|
|
|
def get_http_function_by_id(self, user_id, http_function_id):
|
|
http_function = self.execute(
|
|
'SELECT id, user_id, NAME, script_content, invoked_count, environment_info, is_public, log_request, log_response, version_number, created_at FROM http_functions WHERE user_id=%s AND id=%s', [user_id, http_function_id], one=True)
|
|
return http_function
|
|
|
|
def create_new_http_function(self, user_id, name, script_content, environment_info, is_public, log_request, log_response):
|
|
self.execute(
|
|
'INSERT INTO http_functions (user_id, NAME, script_content, environment_info, is_public, log_request, log_response) VALUES (%s, %s, %s, %s, %s, %s, %s)',
|
|
[user_id, name, script_content, environment_info, is_public, log_request, log_response],
|
|
commit=True
|
|
)
|
|
|
|
def edit_http_function(self, user_id, function_id, name, script_content, environment_info, is_public, log_request, log_response):
|
|
updated_version = self.execute(
|
|
'UPDATE http_functions SET NAME=%s, script_content=%s, environment_info=%s, is_public=%s, log_request=%s, log_response=%s WHERE user_id=%s AND id=%s RETURNING version_number',
|
|
[name, script_content, environment_info, is_public, log_request, log_response, user_id, function_id],
|
|
commit=True, one=True
|
|
)
|
|
return updated_version
|
|
|
|
def update_http_function_environment_info_and_invoked_count(self, user_id, name, environment_info):
|
|
self.execute(
|
|
'UPDATE http_functions SET environment_info=%s, invoked_count = invoked_count + 1 WHERE user_id=%s AND NAME=%s', [json.dumps(environment_info), user_id, name], commit=True)
|
|
|
|
def delete_http_function(self, user_id, function_id):
|
|
self.execute(
|
|
'DELETE FROM http_functions WHERE user_id=%s AND id=%s', [user_id, function_id], commit=True)
|
|
|
|
def add_http_function_invocation(self, http_function_id, status, request_data, response_data, logs, version_number):
|
|
self.execute(
|
|
'INSERT INTO http_function_invocations (http_function_id, status, request_data, response_data, logs, version_number) VALUES (%s, %s, %s, %s, %s, %s)', [http_function_id, status, json.dumps(request_data), json.dumps(response_data), json.dumps(logs), version_number], commit=True)
|
|
|
|
def get_http_function_invocations(self, http_function_id):
|
|
http_function_invocations = self.execute(
|
|
"""SELECT id, http_function_id, STATUS, invocation_time, request_data, response_data, LOGS, version_number
|
|
FROM http_function_invocations
|
|
WHERE http_function_id=%s
|
|
ORDER BY invocation_time DESC""", [http_function_id])
|
|
return http_function_invocations
|
|
|
|
def get_user(self, user_id):
|
|
user = self.execute(
|
|
'SELECT id, username, password_hash, created_at FROM users WHERE id=%s', [int(user_id)], one=True)
|
|
return user
|
|
|
|
def get_user_by_username(self, username):
|
|
user = self.execute(
|
|
'SELECT id, username, password_hash, created_at FROM users WHERE username=%s', [username], one=True)
|
|
return user
|
|
|
|
def create_new_user(self, username, password_hash):
|
|
new_user = self.execute(
|
|
'INSERT INTO users (username, password_hash) VALUES (%s, %s) RETURNING id, username, password_hash, created_at', [username, password_hash], commit=True, one=True)
|
|
return new_user
|
|
|
|
def get_http_function_history(self, function_id):
|
|
http_function_history = self.execute(
|
|
'SELECT version_id, http_function_id, script_content, version_number, updated_at FROM http_functions_versions WHERE http_function_id=%s ORDER BY version_number DESC', [function_id])
|
|
return http_function_history |