import psycopg2 import psycopg2.extras from flask import g, current_app def init_db(app): """Test the database connection on startup.""" try: conn = psycopg2.connect(app.config["DATABASE_URL"]) conn.close() print(" * Database connection OK") except Exception as e: print(f" * Database connection FAILED: {e}") def get_db(): """Get a database connection for the current request.""" if "db" not in g: g.db = psycopg2.connect( current_app.config["DATABASE_URL"], cursor_factory=psycopg2.extras.RealDictCursor, ) return g.db def close_db(exception=None): """Close database connection at end of request.""" db = g.pop("db", None) if db is not None: db.close() def query(sql, params=None): """Execute a SELECT query and return all rows as dicts.""" db = get_db() with db.cursor() as cur: cur.execute(sql, params) return cur.fetchall() def query_one(sql, params=None): """Execute a SELECT query and return one row as a dict.""" db = get_db() with db.cursor() as cur: cur.execute(sql, params) return cur.fetchone() def execute(sql, params=None): """Execute an INSERT/UPDATE/DELETE and commit.""" db = get_db() with db.cursor() as cur: cur.execute(sql, params) db.commit() def execute_returning(sql, params=None): """Execute an INSERT/UPDATE/DELETE with RETURNING and commit.""" db = get_db() with db.cursor() as cur: cur.execute(sql, params) row = cur.fetchone() db.commit() return row