Initial commit
This commit is contained in:
106
migrations/runner.py
Normal file
106
migrations/runner.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""
|
||||
Database Migration Runner
|
||||
|
||||
Reads SQL migration files from the migrations/ directory, checks which
|
||||
have already been applied via the schema_migrations table, and runs
|
||||
any unapplied migrations in order.
|
||||
|
||||
Usage:
|
||||
python migrations/runner.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import psycopg2
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
def get_migration_files(migrations_dir):
|
||||
"""Find all SQL migration files and return sorted by version number."""
|
||||
pattern = re.compile(r"^(\d+)_.+\.sql$")
|
||||
files = []
|
||||
for filename in os.listdir(migrations_dir):
|
||||
match = pattern.match(filename)
|
||||
if match:
|
||||
version = int(match.group(1))
|
||||
files.append((version, filename))
|
||||
return sorted(files, key=lambda x: x[0])
|
||||
|
||||
|
||||
def ensure_migrations_table(conn):
|
||||
"""Create schema_migrations table if it doesn't exist."""
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
version INTEGER PRIMARY KEY,
|
||||
applied_at TIMESTAMP DEFAULT NOW()
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
|
||||
|
||||
def get_applied_versions(conn):
|
||||
"""Get set of already-applied migration versions."""
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT version FROM schema_migrations ORDER BY version")
|
||||
return {row[0] for row in cur.fetchall()}
|
||||
|
||||
|
||||
def run_migration(conn, version, filepath):
|
||||
"""Run a single migration file inside a transaction."""
|
||||
print(f" Applying migration {version}: {os.path.basename(filepath)}...")
|
||||
with open(filepath, "r", encoding="utf-8") as f:
|
||||
sql = f.read()
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql)
|
||||
cur.execute(
|
||||
"INSERT INTO schema_migrations (version) VALUES (%s)",
|
||||
(version,),
|
||||
)
|
||||
conn.commit()
|
||||
print(f" ✓ Migration {version} applied successfully")
|
||||
|
||||
|
||||
def main():
|
||||
load_dotenv()
|
||||
database_url = os.environ.get("DATABASE_URL")
|
||||
if not database_url:
|
||||
print("ERROR: DATABASE_URL not set in .env")
|
||||
sys.exit(1)
|
||||
|
||||
# Determine migrations directory
|
||||
migrations_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
print(f"Connecting to database...")
|
||||
conn = psycopg2.connect(database_url)
|
||||
|
||||
try:
|
||||
ensure_migrations_table(conn)
|
||||
applied = get_applied_versions(conn)
|
||||
migrations = get_migration_files(migrations_dir)
|
||||
|
||||
pending = [(v, f) for v, f in migrations if v not in applied]
|
||||
|
||||
if not pending:
|
||||
print("All migrations are up to date.")
|
||||
return
|
||||
|
||||
print(f"Found {len(pending)} pending migration(s):")
|
||||
for version, filename in pending:
|
||||
filepath = os.path.join(migrations_dir, filename)
|
||||
run_migration(conn, version, filepath)
|
||||
|
||||
print("\nAll migrations applied successfully!")
|
||||
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
print(f"\nERROR: Migration failed: {e}")
|
||||
sys.exit(1)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user