65 lines
1.7 KiB
Python
65 lines
1.7 KiB
Python
import psycopg2
|
|
import psycopg2.extras
|
|
from flask import g, current_app
|
|
|
|
|
|
def init_db(app):
|
|
"""Test the database connection on startup."""
|
|
try:
|
|
conn = psycopg2.connect(app.config["DATABASE_URL"])
|
|
conn.close()
|
|
print(" * Database connection OK")
|
|
except Exception as e:
|
|
print(f" * Database connection FAILED: {e}")
|
|
|
|
|
|
def get_db():
|
|
"""Get a database connection for the current request."""
|
|
if "db" not in g:
|
|
g.db = psycopg2.connect(
|
|
current_app.config["DATABASE_URL"],
|
|
cursor_factory=psycopg2.extras.RealDictCursor,
|
|
)
|
|
return g.db
|
|
|
|
|
|
def close_db(exception=None):
|
|
"""Close database connection at end of request."""
|
|
db = g.pop("db", None)
|
|
if db is not None:
|
|
db.close()
|
|
|
|
|
|
def query(sql, params=None):
|
|
"""Execute a SELECT query and return all rows as dicts."""
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
cur.execute(sql, params)
|
|
return cur.fetchall()
|
|
|
|
|
|
def query_one(sql, params=None):
|
|
"""Execute a SELECT query and return one row as a dict."""
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
cur.execute(sql, params)
|
|
return cur.fetchone()
|
|
|
|
|
|
def execute(sql, params=None):
|
|
"""Execute an INSERT/UPDATE/DELETE and commit."""
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
cur.execute(sql, params)
|
|
db.commit()
|
|
|
|
|
|
def execute_returning(sql, params=None):
|
|
"""Execute an INSERT/UPDATE/DELETE with RETURNING and commit."""
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
cur.execute(sql, params)
|
|
row = cur.fetchone()
|
|
db.commit()
|
|
return row
|