""" Database Migration Runner Reads SQL migration files from the migrations/ directory, checks which have already been applied via the schema_migrations table, and runs any unapplied migrations in order. Usage: python migrations/runner.py """ import os import re import sys import psycopg2 from dotenv import load_dotenv def get_migration_files(migrations_dir): """Find all SQL migration files and return sorted by version number.""" pattern = re.compile(r"^(\d+)_.+\.sql$") files = [] for filename in os.listdir(migrations_dir): match = pattern.match(filename) if match: version = int(match.group(1)) files.append((version, filename)) return sorted(files, key=lambda x: x[0]) def ensure_migrations_table(conn): """Create schema_migrations table if it doesn't exist.""" with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( version INTEGER PRIMARY KEY, applied_at TIMESTAMP DEFAULT NOW() ) """) conn.commit() def get_applied_versions(conn): """Get set of already-applied migration versions.""" with conn.cursor() as cur: cur.execute("SELECT version FROM schema_migrations ORDER BY version") return {row[0] for row in cur.fetchall()} def run_migration(conn, version, filepath): """Run a single migration file inside a transaction.""" print(f" Applying migration {version}: {os.path.basename(filepath)}...") with open(filepath, "r", encoding="utf-8") as f: sql = f.read() with conn.cursor() as cur: cur.execute(sql) cur.execute( "INSERT INTO schema_migrations (version) VALUES (%s)", (version,), ) conn.commit() print(f" ✓ Migration {version} applied successfully") def main(): load_dotenv() database_url = os.environ.get("DATABASE_URL") if not database_url: print("ERROR: DATABASE_URL not set in .env") sys.exit(1) # Determine migrations directory migrations_dir = os.path.dirname(os.path.abspath(__file__)) print(f"Connecting to database...") conn = psycopg2.connect(database_url) try: ensure_migrations_table(conn) applied = get_applied_versions(conn) migrations = get_migration_files(migrations_dir) pending = [(v, f) for v, f in migrations if v not in applied] if not pending: print("All migrations are up to date.") return print(f"Found {len(pending)} pending migration(s):") for version, filename in pending: filepath = os.path.join(migrations_dir, filename) run_migration(conn, version, filepath) print("\nAll migrations applied successfully!") except Exception as e: conn.rollback() print(f"\nERROR: Migration failed: {e}") sys.exit(1) finally: conn.close() if __name__ == "__main__": main()