107 lines
3.1 KiB
Python
107 lines
3.1 KiB
Python
"""
|
|
Database Migration Runner
|
|
|
|
Reads SQL migration files from the migrations/ directory, checks which
|
|
have already been applied via the schema_migrations table, and runs
|
|
any unapplied migrations in order.
|
|
|
|
Usage:
|
|
python migrations/runner.py
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import psycopg2
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
def get_migration_files(migrations_dir):
|
|
"""Find all SQL migration files and return sorted by version number."""
|
|
pattern = re.compile(r"^(\d+)_.+\.sql$")
|
|
files = []
|
|
for filename in os.listdir(migrations_dir):
|
|
match = pattern.match(filename)
|
|
if match:
|
|
version = int(match.group(1))
|
|
files.append((version, filename))
|
|
return sorted(files, key=lambda x: x[0])
|
|
|
|
|
|
def ensure_migrations_table(conn):
|
|
"""Create schema_migrations table if it doesn't exist."""
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
version INTEGER PRIMARY KEY,
|
|
applied_at TIMESTAMP DEFAULT NOW()
|
|
)
|
|
""")
|
|
conn.commit()
|
|
|
|
|
|
def get_applied_versions(conn):
|
|
"""Get set of already-applied migration versions."""
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT version FROM schema_migrations ORDER BY version")
|
|
return {row[0] for row in cur.fetchall()}
|
|
|
|
|
|
def run_migration(conn, version, filepath):
|
|
"""Run a single migration file inside a transaction."""
|
|
print(f" Applying migration {version}: {os.path.basename(filepath)}...")
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
sql = f.read()
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
cur.execute(
|
|
"INSERT INTO schema_migrations (version) VALUES (%s)",
|
|
(version,),
|
|
)
|
|
conn.commit()
|
|
print(f" ✓ Migration {version} applied successfully")
|
|
|
|
|
|
def main():
|
|
load_dotenv()
|
|
database_url = os.environ.get("DATABASE_URL")
|
|
if not database_url:
|
|
print("ERROR: DATABASE_URL not set in .env")
|
|
sys.exit(1)
|
|
|
|
# Determine migrations directory
|
|
migrations_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
print(f"Connecting to database...")
|
|
conn = psycopg2.connect(database_url)
|
|
|
|
try:
|
|
ensure_migrations_table(conn)
|
|
applied = get_applied_versions(conn)
|
|
migrations = get_migration_files(migrations_dir)
|
|
|
|
pending = [(v, f) for v, f in migrations if v not in applied]
|
|
|
|
if not pending:
|
|
print("All migrations are up to date.")
|
|
return
|
|
|
|
print(f"Found {len(pending)} pending migration(s):")
|
|
for version, filename in pending:
|
|
filepath = os.path.join(migrations_dir, filename)
|
|
run_migration(conn, version, filepath)
|
|
|
|
print("\nAll migrations applied successfully!")
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
print(f"\nERROR: Migration failed: {e}")
|
|
sys.exit(1)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|