Files
function/db.py
2023-12-18 20:33:58 +11:00

86 lines
3.6 KiB
Python

import json
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from urllib.parse import urlparse
from flask import g
class DataBase():
def __init__(self, app):
db_url = urlparse(os.environ['DATABASE_URL'])
# if db_url is null then throw error
if not db_url:
raise Exception("No DATABASE_URL environment variable set")
def getDB(self):
db = getattr(g, 'database', None)
if db is None:
db_url = urlparse(os.environ['DATABASE_URL'])
g.database = psycopg2.connect(
database=db_url.path[1:],
user=db_url.username,
password=db_url.password,
host=db_url.hostname,
port=db_url.port
)
db = g.database
return db
def close_connection(exception):
db = getattr(g, 'database', None)
if db is not None:
db.close()
def execute(self, query, args=(), one=False, commit=False):
conn = self.getDB()
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute(query, args)
rv = None
if cur.description is not None:
rv = cur.fetchall()
if commit:
try:
conn.commit()
except:
conn.rollback()
cur.close()
return (rv[0] if rv else None) if one else rv
def get_http_functions(self):
http_functions = self.execute(
'SELECT id, NAME, script_content, invoked_count, environment_info FROM http_functions ORDER by id DESC', [])
return http_functions
def get_http_function(self, name):
http_function = self.execute(
'SELECT id, NAME, script_content, invoked_count, environment_info FROM http_functions WHERE NAME=%s', [name], one=True)
return http_function
def create_new_http_function(self, name, script_content, environment_info):
self.execute(
'INSERT INTO http_functions (NAME, script_content, environment_info) VALUES (%s, %s, %s)', [name, script_content, environment_info], commit=True)
def edit_http_function(self, name, script_content, environment_info):
self.execute(
'UPDATE http_functions SET script_content=%s, environment_info=%s WHERE NAME=%s', [script_content, environment_info, name], commit=True)
def update_http_function_environment_info_and_invoked_count(self, name, environment_info):
self.execute(
'UPDATE http_functions SET environment_info=%s, invoked_count = invoked_count + 1 WHERE NAME=%s', [json.dumps(environment_info), name], commit=True)
def delete_http_function(self, name):
self.execute(
'DELETE FROM http_functions WHERE NAME=%s', [name], commit=True)
def add_http_function_invocation(self, http_function_id, status, request_data, response_data, logs):
self.execute(
'INSERT INTO http_function_invocations (http_function_id, status, request_data, response_data, logs) VALUES (%s, %s, %s, %s, %s)', [http_function_id, status, json.dumps(request_data), json.dumps(response_data), json.dumps(logs)], commit=True)
def get_http_function_invocations(self, http_function_id):
http_function_invocations = self.execute(
"""SELECT id, http_function_id, STATUS, invocation_time, request_data, response_data, LOGS
FROM http_function_invocations
WHERE http_function_id=9
ORDER BY invocation_time DESC""", [http_function_id])
return http_function_invocations