import os import psycopg2 from psycopg2.extras import RealDictCursor from datetime import datetime from dateutil.relativedelta import relativedelta from urllib.parse import urlparse from flask import g import pandas as pd from features.exercises import Exercises from features.people_graphs import PeopleGraphs from features.person_overview import PersonOverview from features.stats import Stats from features.dashboard import Dashboard from utils import get_exercise_graph_model class DataBase(): def __init__(self, app=None): self.stats = Stats(self.execute) self.exercises = Exercises(self.execute) self.person_overview = PersonOverview(self.execute) self.people_graphs = PeopleGraphs(self.execute) self.dashboard = Dashboard(self.execute) db_url = urlparse(os.environ['DATABASE_URL']) # if db_url is null then throw error if not db_url: raise Exception("No DATABASE_URL environment variable set") def getDB(self): db = getattr(g, 'database', None) if db is None: db_url = urlparse(os.environ['DATABASE_URL']) g.database = psycopg2.connect( database=db_url.path[1:], user=db_url.username, password=db_url.password, host=db_url.hostname, port=db_url.port ) db = g.database return db def close_connection(exception): db = getattr(g, 'database', None) if db is not None: db.close() def execute(self, query, args=(), one=False, commit=False): conn = self.getDB() cur = conn.cursor(cursor_factory=RealDictCursor) cur.execute(query, args) rv = None if cur.description is not None: rv = cur.fetchall() if commit: try: conn.commit() except: conn.rollback() cur.close() return (rv[0] if rv else None) if one else rv def read_sql_as_df(self, query, params=None): conn = self.getDB() try: df = pd.read_sql(query, conn, params=params) return df except Exception as e: raise e def get_exercise(self, exercise_id): exercise = self.execute( 'SELECT exercise_id, name FROM exercise WHERE exercise_id=%s LIMIT 1', [exercise_id], one=True) return exercise def create_exercise(self, name): new_exercise = self.execute('INSERT INTO exercise (name) VALUES (%s) RETURNING exercise_id AS "ExerciseId"', [name], commit=True, one=True) return new_exercise['ExerciseId'] def delete_exercise(self, exercise_id): self.execute('DELETE FROM exercise WHERE exercise_id=%s', [ exercise_id], commit=True) def update_exercise(self, exercise_id, name): self.execute('UPDATE Exercise SET Name=%s WHERE exercise_id=%s', [ name, exercise_id], commit=True) def get_people(self): people = self.execute( 'SELECT person_id AS "PersonId", name AS "Name" FROM person') return people def is_valid_person(self, person_id): person = self.execute( 'SELECT person_id AS "PersonId" FROM person WHERE person_id=%s LIMIT 1', [person_id], one=True) return person def create_person(self, name): new_person = self.execute('INSERT INTO person (name) VALUES (%s) RETURNING person_id AS "PersonId"', [ name], commit=True, one=True) return new_person['PersonId'] def delete_person(self, person_id): self.execute('DELETE FROM topset WHERE workout_id IN (SELECT workout_id FROM workout WHERE person_id=%s)', [ person_id], commit=True) self.execute('DELETE FROM workout WHERE person_id=%s', [person_id], commit=True) self.execute('DELETE FROM person WHERE person_id=%s', [person_id], commit=True) def update_person_name(self, person_id, name): self.execute('UPDATE person SET name=%s WHERE person_id=%s', [ name, person_id], commit=True) def is_valid_workout(self, person_id, workout_id): workout = self.execute('SELECT W.workout_id AS "WorkoutId" FROM Person P, Workout W WHERE P.person_id=W.person_id AND P.person_id=%s AND W.workout_id=%s LIMIT 1', [ person_id, workout_id], one=True) return workout def is_valid_topset(self, person_id, workout_id, topset_id): topset = self.execute(""" SELECT T.topset_id AS "TopSetId" FROM Person P, Workout W, TopSet T WHERE W.person_id=W.person_id AND W.workout_id=T.workout_id AND P.person_id=%s AND W.workout_id = %s AND T.topset_id = %s LIMIT 1""", [person_id, workout_id, topset_id], one=True) return topset def delete_workout(self, workout_id): self.execute('DELETE FROM topset WHERE workout_id=%s', [workout_id], commit=True) self.execute('DELETE FROM workout WHERE workout_id=%s', [workout_id], commit=True) def update_topset(self, exercise_id, repetitions, weight, topset_id): self.execute('UPDATE topset SET exercise_id=%s, repetitions=%s, weight=%s WHERE topSet_id=%s', [ exercise_id, repetitions, weight, topset_id], commit=True) def create_topset(self, workout_id, exercise_id, repetitions, weight): new_top_set = self.execute('INSERT INTO topset (workout_id, exercise_id, repetitions, weight) VALUES (%s, %s, %s, %s) RETURNING topset_id AS "TopSetId"', [ workout_id, exercise_id, repetitions, weight], commit=True, one=True) return new_top_set['TopSetId'] def delete_topset(self, topset_id): self.execute('DELETE FROM topset WHERE topset_id=%s', [ topset_id], commit=True) def create_workout(self, person_id): now = datetime.now() date_string = now.strftime('%Y-%m-%d') # check if a workout has already been created for this person today that doesnt contain any topsets and if so return the WorkoutId workout = self.execute('SELECT workout_id AS "WorkoutId" FROM workout WHERE person_id=%s AND start_date=%s AND workout_id NOT IN (SELECT workout_id FROM topset)', [ person_id, date_string], one=True) if workout: print( f'Workout already created for PersonId {person_id} starting at {date_string} so returning WorkoutId {workout["WorkoutId"]} rather then creating new workout') return workout['WorkoutId'] print( f'Creating workout for PersonId {person_id} starting at {date_string}') new_workout = self.execute('INSERT INTO workout (person_id, start_date) VALUES (%s, %s) RETURNING workout_id AS "WorkoutId"', [ person_id, date_string], commit=True, one=True) return new_workout['WorkoutId'] def get_people_and_workout_count(self, person_id): return self.execute(""" SELECT P.person_id AS "PersonId", P.name AS "Name", COUNT(W.workout_id) AS "NumberOfWorkouts", CASE P.person_id WHEN %s THEN 1 ELSE 0 END "IsActive" FROM Person P LEFT JOIN Workout W ON P.person_id = W.person_id GROUP BY P.person_id ORDER BY P.person_id""", [person_id]) def update_workout_start_date(self, workout_id, start_date): self.execute('UPDATE workout SET start_date=%s WHERE workout_id=%s', [ start_date, workout_id], commit=True) def get_person_name(self, person_id): result = self.execute("""SELECT name from Person WHERE person_id=%s""", [person_id], one=True) return result["name"] def get_workout(self, person_id, workout_id): topsets = self.execute(""" SELECT P.person_id, P.name AS "person_name", W.workout_id, W.start_date, T.topset_id, E.exercise_id, E.name AS "exercise_name", T.repetitions, T.weight, W.note FROM Person P LEFT JOIN Workout W ON P.person_id=W.person_id LEFT JOIN TopSet T ON W.workout_id=T.workout_id LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id WHERE P.person_id=%s AND W.workout_id = %s ORDER BY T.topset_id""", [person_id, workout_id]) return { 'person_id': person_id, 'person_name': topsets[0]['person_name'], 'workout_id': workout_id, 'start_date': topsets[0]['start_date'], 'top_sets': [{"topset_id": t['topset_id'], "exercise_id": t['exercise_id'], "exercise_name": t['exercise_name'], "weight": t['weight'], "repetitions": t['repetitions']} for t in topsets if t['topset_id'] is not None], 'note': topsets[0]['note'] } def get_topset(self, topset_id): topset = self.execute(""" SELECT T.topset_id, E.exercise_id, E.name AS "exercise_name", T.repetitions, T.weight FROM TopSet T INNER JOIN Exercise E ON T.exercise_id=E.exercise_id WHERE T.topset_id = %s""", [topset_id], one=True) return { "topset_id": topset['topset_id'], "exercise_id": topset['exercise_id'], "exercise_name": topset['exercise_name'], "weight": topset['weight'], "repetitions": topset['repetitions'] } def get_all_topsets(self): all_topsets = self.execute(""" SELECT P.person_id AS "PersonId", P.name AS "PersonName", W.workout_id AS "WorkoutId", W.start_date AS "StartDate", T.topset_id AS "TopSetId", E.exercise_id AS "ExerciseId", E.name AS "ExerciseName", T.repetitions AS "Repetitions", T.weight AS "Weight", round((100 * T.Weight::numeric::integer)/(101.3-2.67123 * T.Repetitions),0)::numeric::integer AS "Estimated1RM" FROM Person P LEFT JOIN Workout W ON P.person_id=W.person_id LEFT JOIN TopSet T ON W.workout_id=T.workout_id LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id""") return all_topsets def get_tags_for_person(self, person_id): # Fetch tags from the database tags = self.execute(""" SELECT T.tag_id as "tag_id", T.person_id as "person_id", T.name AS "tag_name", T.filter AS "tag_filter" FROM Tag T WHERE T.person_id = %s ORDER BY T.name """, [person_id]) # Add the static 'All' entry at the beginning all_tag = { "tag_id": -1, # No specific ID for 'All' "person_id": person_id, "tag_name": "All", # Static name "tag_filter": "" # No filter } return [all_tag] + tags def add_or_update_tag_for_person(self, person_id, tag_name, tag_filter): # check if a tag exists for dashboard with the same tag_name tag = self.execute('SELECT tag_id AS "TagId" FROM Tag WHERE person_id=%s AND name=%s LIMIT 1', [ person_id, tag_name], one=True) if tag: # update the tag self.execute('UPDATE Tag SET filter=%s WHERE tag_id=%s', [ tag_filter, tag['TagId']], commit=True) else: self.execute('INSERT INTO Tag (person_id, name, filter) VALUES (%s, %s, %s)', [ person_id, tag_name, tag_filter], commit=True) def delete_tag_for_person(self, person_id, tag_id): self.execute('DELETE FROM Tag WHERE person_id=%s AND tag_id=%s', [ person_id, tag_id], commit=True) def get_tags_for_dashboard(self): tags = self.execute(""" SELECT T.tag_id AS "tag_id", T.person_id AS "person_id", T.name AS "tag_name", T.filter AS "tag_filter" FROM Tag T WHERE T.person_id IS NULL ORDER BY T.name""", []) # Add the static 'All' entry at the beginning all_tag = { "tag_id": -1, # No specific ID for 'All' "person_id": None, "tag_name": "All", # Static name "tag_filter": "" # No filter } return [all_tag] + tags def add_or_update_tag_for_dashboard(self, tag_name, tag_filter): # check if a tag exists for dashboard with the same tag_name tag = self.execute('SELECT tag_id AS "tag_id" FROM Tag WHERE person_id IS NULL AND name=%s LIMIT 1', [ tag_name], one=True) if tag: # update the tag self.execute('UPDATE Tag SET filter=%s WHERE tag_id=%s', [ tag_filter, tag['tag_id']], commit=True) else: self.execute('INSERT INTO Tag (name, filter) VALUES (%s, %s)', [ tag_name, tag_filter], commit=True) def delete_tag_for_dashboard(self, tag_id): self.execute('DELETE FROM Tag WHERE tag_id=%s', [tag_id], commit=True) def get_workout_tags(self, person_id, workout_id): person_tags = self.execute(""" SELECT T.tag_id AS "tag_id", T.name AS "tag_name", T.filter AS "tag_filter" FROM Tag T WHERE T.person_id=%s""", [person_id]) workout_tags = self.execute(""" SELECT T.tag_id AS "tag_id", T.person_id AS "person_id", T.name AS "tag_name", T.filter AS "tag_filter" FROM Workout_Tag WT LEFT JOIN Tag T ON WT.tag_id=T.tag_id WHERE WT.workout_id=%s""", [workout_id]) selected_workout_tag_ids = [wt['tag_id'] for wt in workout_tags] return (person_tags, workout_tags, selected_workout_tag_ids) def get_most_recent_topset_for_exercise(self, person_id, exercise_id): topset = self.execute(""" SELECT t.repetitions, t.weight, e.name AS "exercise_name" FROM exercise e LEFT JOIN topset t ON e.exercise_id = t.exercise_id LEFT JOIN workout w ON t.workout_id = w.workout_id WHERE e.exercise_id = %s AND (w.person_id = %s OR w.person_id IS NULL) ORDER BY w.start_date DESC LIMIT 1; """, [exercise_id, person_id], one=True) if not topset: return None else: return (topset.get('repetitions'), topset.get('weight'), topset['exercise_name']) def get_all_exercises(self): exercises = self.execute( 'SELECT exercise_id, name FROM exercise') return exercises def get_exercise_progress_for_user(self, person_id, exercise_id, min_date=None, max_date=None, epoch='all', degree=1): today = datetime.now() if epoch == '1M': min_date = today - relativedelta(months=1) elif epoch == '3M': min_date = today - relativedelta(months=3) elif epoch == '6M': min_date = today - relativedelta(months=6) # Execute SQL query to fetch topset data for a specific person and exercise topsets = self.execute(""" SELECT T.topset_id, E.name AS exercise_name, W.person_id, T.workout_id, T.repetitions, T.weight, ROUND((100 * T.weight::NUMERIC::INTEGER) / (101.3 - 2.67123 * T.repetitions), 0)::NUMERIC::INTEGER AS estimated_1rm, W.start_date FROM topset T JOIN exercise E ON T.exercise_id = E.exercise_id JOIN workout W ON T.workout_id = W.workout_id WHERE W.person_id = %s AND E.exercise_id = %s AND (%s IS NULL OR W.start_date >= %s) AND (%s IS NULL OR W.start_date <= %s) ORDER BY W.start_date; """, [person_id, exercise_id, min_date, min_date, max_date, max_date]) # Return None if no topsets found if not topsets: return None # Extracting values and calculating value ranges for SVG dimensions exercise_name = topsets[0]['exercise_name'] estimated_1rm = [t['estimated_1rm'] for t in topsets] repetitions = [t['repetitions'] for t in topsets] weight = [t['weight'] for t in topsets] start_dates = [t['start_date'] for t in topsets] messages = [f'{t["repetitions"]} x {t["weight"]}kg ({t["estimated_1rm"]}kg E1RM) on {t["start_date"].strftime("%d %b %y")}' for t in topsets] exercise_progress = get_exercise_graph_model( exercise_name, estimated_1rm, repetitions, weight, start_dates, messages, epoch, person_id, exercise_id, min_date, max_date, degree) return exercise_progress # Note fetching logic moved to routes/notes.py def get_exercise_earliest_and_latest_dates(self, person_id, exercise_id): sql_query = """ SELECT MIN(w.start_date) AS earliest_date, MAX(w.start_date) AS latest_date FROM workout w INNER JOIN topset t ON w.workout_id = t.workout_id INNER JOIN exercise e ON t.exercise_id = e.exercise_id WHERE w.person_id = %s AND e.exercise_id = %s; """ # Execute the SQL query result = self.execute(sql_query, [person_id, exercise_id]) if not result or not result[0]: return None, None return result[0]['earliest_date'], result[0]['latest_date']