import csv import io import datetime from flask import Blueprint, Response from extensions import db export_bp = Blueprint('export', __name__, url_prefix='/export') # --- CSV Export Logic --- def _fetch_all_workout_data_for_csv(): """Fetches all workout set data across all users for CSV export.""" query = """ SELECT p.name AS person_name, w.start_date, e.name AS exercise_name, t.repetitions, t.weight, w.note AS workout_note FROM topset t JOIN workout w ON t.workout_id = w.workout_id JOIN person p ON w.person_id = p.person_id JOIN exercise e ON t.exercise_id = e.exercise_id ORDER BY p.name, w.start_date, t.topset_id; """ return db.execute(query) @export_bp.route('/workouts.csv') def export_workouts_csv(): """Generates and returns a CSV file of all workout sets.""" data = _fetch_all_workout_data_for_csv() if not data: return Response("", mimetype='text/csv', headers={"Content-disposition": "attachment; filename=workout_export_empty.csv"}) si = io.StringIO() fieldnames = ['person_name', 'start_date', 'exercise_name', 'repetitions', 'weight', 'workout_note'] writer = csv.DictWriter(si, fieldnames=fieldnames, quoting=csv.QUOTE_ALL) # Quote all fields for safety writer.writeheader() # Format date objects to strings for CSV formatted_data = [] for row in data: new_row = row.copy() if isinstance(new_row.get('start_date'), (datetime.date, datetime.datetime)): new_row['start_date'] = new_row['start_date'].isoformat() formatted_data.append(new_row) writer.writerows(formatted_data) output = si.getvalue() return Response( output, mimetype='text/csv', headers={"Content-disposition": "attachment; filename=workout_export.csv"} ) # --- SQL Export Logic --- # Helper functions adapted from sql_explorer def _get_schema_info(schema='public'): """Fetches schema information directly.""" tables_result = db.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = %s AND table_type = 'BASE TABLE'; """, [schema]) tables = [row['table_name'] for row in tables_result] schema_info = {} for table in tables: columns_result = db.execute(""" SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position; """, [schema, table]) columns = [(row['column_name'], row['data_type']) for row in columns_result] primary_keys_result = db.execute(""" SELECT kcu.column_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema WHERE tc.constraint_type = 'PRIMARY KEY' AND tc.table_schema = %s AND tc.table_name = %s; """, [schema, table]) primary_keys = [row['column_name'] for row in primary_keys_result] foreign_keys_result = db.execute(""" SELECT kcu.column_name AS fk_column, ccu.table_name AS referenced_table, ccu.column_name AS referenced_column FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_schema = %s AND tc.table_name = %s; """, [schema, table]) foreign_keys = [(row['fk_column'], row['referenced_table'], row['referenced_column']) for row in foreign_keys_result] schema_info[table] = { 'columns': columns, 'primary_keys': primary_keys, 'foreign_keys': foreign_keys } return schema_info def _map_data_type_for_sql(postgres_type): """Maps PostgreSQL types to standard SQL types (simplified).""" return { 'character varying': 'VARCHAR', 'varchar': 'VARCHAR', 'text': 'TEXT', 'integer': 'INTEGER', 'bigint': 'BIGINT', 'boolean': 'BOOLEAN', 'timestamp without time zone': 'TIMESTAMP', 'timestamp with time zone': 'TIMESTAMPTZ', 'numeric': 'NUMERIC', 'real': 'REAL', 'date': 'DATE' }.get(postgres_type, postgres_type.upper()) def _generate_create_script(schema_info): """Generates SQL CREATE TABLE scripts from schema info.""" lines = [] for table, info in schema_info.items(): columns = info['columns'] pks = info.get('primary_keys', []) fks = info['foreign_keys'] column_defs = [] for column_name, data_type in columns: sql_type = _map_data_type_for_sql(data_type) # Ensure column names are quoted if they might be keywords or contain special chars column_defs.append(f' "{column_name}" {sql_type}') if pks: pk_columns = ", ".join(f'"{pk}"' for pk in pks) column_defs.append(f' PRIMARY KEY ({pk_columns})') columns_sql = ",\n".join(column_defs) # Ensure table names are quoted create_stmt = f'CREATE TABLE "{table}" (\n{columns_sql}\n);' lines.append(create_stmt) # Add FK constraints separately for clarity and potential circular dependencies for fk_column, ref_table, ref_col in fks: alter_stmt = ( f'ALTER TABLE "{table}" ADD CONSTRAINT "fk_{table}_{fk_column}" ' f'FOREIGN KEY ("{fk_column}") REFERENCES "{ref_table}" ("{ref_col}");' ) lines.append(alter_stmt) lines.append("\n-- ----------------------------\n") # Separator return "\n".join(lines) def _format_sql_value(value): """Formats Python values for SQL INSERT statements.""" if value is None: return "NULL" elif isinstance(value, (int, float)): return str(value) elif isinstance(value, bool): return "TRUE" if value else "FALSE" elif isinstance(value, (datetime.date, datetime.datetime)): # Format dates/timestamps in ISO 8601 format, suitable for PostgreSQL return f"'{value.isoformat()}'" else: # Assume string, escape single quotes and use concatenation escaped_value = str(value).replace("'", "''") return "'" + escaped_value + "'" def _fetch_and_format_data_for_sql_insert(): """Fetches data from all tables and formats it as SQL INSERT statements.""" # Define the order of tables to handle potential FK constraints during insert # (e.g., insert persons before workouts) table_order = ['person', 'exercise', 'tag', 'workout', 'topset', 'workout_tag', 'saved_query'] all_insert_statements = [] for table_name in table_order: all_insert_statements.append(f"\n-- Data for table: {table_name}\n") try: # Fetch all data from the table # Using db.execute which returns list of dicts rows = db.execute(f'SELECT * FROM "{table_name}"') # Quote table name if not rows: all_insert_statements.append(f"-- No data found for table {table_name}.\n") continue # Get column names from the first row (keys of the dict) # Ensure column names are quoted column_names = [f'"{col}"' for col in rows[0].keys()] columns_sql = ", ".join(column_names) # Generate INSERT statement for each row for row in rows: values = [_format_sql_value(row[col.strip('"')]) for col in column_names] # Use unquoted keys to access dict values_sql = ", ".join(values) insert_stmt = f'INSERT INTO "{table_name}" ({columns_sql}) VALUES ({values_sql});' all_insert_statements.append(insert_stmt) except Exception as e: # Log error or add a comment to the script all_insert_statements.append(f"-- Error fetching/formatting data for table {table_name}: {e}\n") return "\n".join(all_insert_statements) @export_bp.route('/database.sql') def export_database_sql(): """Generates and returns a .sql file with schema and data.""" try: # Generate Schema schema_info = _get_schema_info() create_script = _generate_create_script(schema_info) # Generate Data Inserts insert_script = _fetch_and_format_data_for_sql_insert() # Combine scripts full_script = f"-- WorkoutTracker Database Export\n" full_script += f"-- Generated on: {datetime.datetime.now().isoformat()}\n\n" full_script += "-- Schema Definition --\n" full_script += create_script full_script += "\n-- Data Inserts --\n" full_script += insert_script return Response( full_script, mimetype='application/sql', headers={"Content-disposition": "attachment; filename=workout_tracker_export.sql"} ) except Exception as e: # Log the error properly in a real application print(f"Error generating SQL export: {e}") return Response(f"-- Error generating SQL export: {e}", status=500, mimetype='text/plain')