import os import psycopg2 import numpy as np from psycopg2.extras import RealDictCursor from datetime import datetime from dateutil.relativedelta import relativedelta from urllib.parse import urlparse from flask import g from features.calendar import Calendar from features.exercises import Exercises from features.stats import Stats from features.workout import Workout from utils import count_prs_over_time, get_all_exercises_from_topsets, get_exercise_graph_model, get_stats_from_topsets, get_topsets_for_person, get_weekly_pr_graph_model, get_workout_counts, get_workouts class DataBase(): def __init__(self, app): self.calendar = Calendar(self.execute) self.stats = Stats(self.execute) self.workout = Workout(self.execute) self.exercises = Exercises(self.execute) db_url = urlparse(os.environ['DATABASE_URL']) # if db_url is null then throw error if not db_url: raise Exception("No DATABASE_URL environment variable set") def getDB(self): db = getattr(g, 'database', None) if db is None: db_url = urlparse(os.environ['DATABASE_URL']) g.database = psycopg2.connect( database=db_url.path[1:], user=db_url.username, password=db_url.password, host=db_url.hostname, port=db_url.port ) db = g.database return db def close_connection(exception): db = getattr(g, 'database', None) if db is not None: db.close() def execute(self, query, args=(), one=False, commit=False): conn = self.getDB() cur = conn.cursor(cursor_factory=RealDictCursor) cur.execute(query, args) rv = None if cur.description is not None: rv = cur.fetchall() if commit: try: conn.commit() except: conn.rollback() cur.close() return (rv[0] if rv else None) if one else rv def get_exercise(self, exercise_id): exercise = self.execute( 'SELECT exercise_id, name FROM exercise WHERE exercise_id=%s LIMIT 1', [exercise_id], one=True) return exercise def create_exercise(self, name): new_exercise = self.execute('INSERT INTO exercise (name) VALUES (%s) RETURNING exercise_id AS "ExerciseId"', [name], commit=True, one=True) return new_exercise['ExerciseId'] def delete_exercise(self, exercise_id): self.execute('DELETE FROM exercise WHERE exercise_id=%s', [ exercise_id], commit=True) def update_exercise(self, exercise_id, name): self.execute('UPDATE Exercise SET Name=%s WHERE exercise_id=%s', [ name, exercise_id], commit=True) def get_people(self): people = self.execute( 'SELECT person_id AS "PersonId", name AS "Name" FROM person') return people def is_valid_person(self, person_id): person = self.execute( 'SELECT person_id AS "PersonId" FROM person WHERE person_id=%s LIMIT 1', [person_id], one=True) return person def create_person(self, name): new_person = self.execute('INSERT INTO person (name) VALUES (%s) RETURNING person_id AS "PersonId"', [ name], commit=True, one=True) return new_person['PersonId'] def delete_person(self, person_id): self.execute('DELETE FROM topset WHERE workout_id IN (SELECT workout_id FROM workout WHERE person_id=%s)', [ person_id], commit=True) self.execute('DELETE FROM workout WHERE person_id=%s', [person_id], commit=True) self.execute('DELETE FROM person WHERE person_id=%s', [person_id], commit=True) def update_person_name(self, person_id, name): self.execute('UPDATE person SET name=%s WHERE person_id=%s', [ name, person_id], commit=True) def is_valid_workout(self, person_id, workout_id): workout = self.execute('SELECT W.workout_id AS "WorkoutId" FROM Person P, Workout W WHERE P.person_id=W.person_id AND P.person_id=%s AND W.workout_id=%s LIMIT 1', [ person_id, workout_id], one=True) return workout def is_valid_topset(self, person_id, workout_id, topset_id): topset = self.execute(""" SELECT T.topset_id AS "TopSetId" FROM Person P, Workout W, TopSet T WHERE W.person_id=W.person_id AND W.workout_id=T.workout_id AND P.person_id=%s AND W.workout_id = %s AND T.topset_id = %s LIMIT 1""", [person_id, workout_id, topset_id], one=True) return topset def delete_workout(self, workout_id): self.execute('DELETE FROM topset WHERE workout_id=%s', [workout_id], commit=True) self.execute('DELETE FROM workout WHERE workout_id=%s', [workout_id], commit=True) def update_topset(self, exercise_id, repetitions, weight, topset_id): self.execute('UPDATE topset SET exercise_id=%s, repetitions=%s, weight=%s WHERE topSet_id=%s', [ exercise_id, repetitions, weight, topset_id], commit=True) def create_topset(self, workout_id, exercise_id, repetitions, weight): new_top_set = self.execute('INSERT INTO topset (workout_id, exercise_id, repetitions, weight) VALUES (%s, %s, %s, %s) RETURNING topset_id AS "TopSetId"', [ workout_id, exercise_id, repetitions, weight], commit=True, one=True) return new_top_set['TopSetId'] def delete_topset(self, topset_id): self.execute('DELETE FROM topset WHERE topset_id=%s', [ topset_id], commit=True) def create_workout(self, person_id): now = datetime.now() date_string = now.strftime('%Y-%m-%d') # check if a workout has already been created for this person today that doesnt contain any topsets and if so return the WorkoutId workout = self.execute('SELECT workout_id AS "WorkoutId" FROM workout WHERE person_id=%s AND start_date=%s AND workout_id NOT IN (SELECT workout_id FROM topset)', [ person_id, date_string], one=True) if workout: print( f'Workout already created for PersonId {person_id} starting at {date_string} so returning WorkoutId {workout["WorkoutId"]} rather then creating new workout') return workout['WorkoutId'] print( f'Creating workout for PersonId {person_id} starting at {date_string}') new_workout = self.execute('INSERT INTO workout (person_id, start_date) VALUES (%s, %s) RETURNING workout_id AS "WorkoutId"', [ person_id, date_string], commit=True, one=True) return new_workout['WorkoutId'] def get_people_and_workout_count(self, person_id): return self.execute(""" SELECT P.person_id AS "PersonId", P.name AS "Name", COUNT(W.workout_id) AS "NumberOfWorkouts", CASE P.person_id WHEN %s THEN 1 ELSE 0 END "IsActive" FROM Person P LEFT JOIN Workout W ON P.person_id = W.person_id GROUP BY P.person_id ORDER BY P.person_id""", [person_id]) def update_workout_start_date(self, workout_id, start_date): self.execute('UPDATE workout SET start_date=%s WHERE workout_id=%s', [ start_date, workout_id], commit=True) def get_person(self, person_id): topsets = self.execute(""" SELECT P.person_id AS "PersonId", P.name AS "PersonName", W.workout_id AS "WorkoutId", W.start_date AS "StartDate", T.topset_id AS "TopSetId", E.exercise_id AS "ExerciseId", E.name AS "ExerciseName", T.repetitions AS "Repetitions", T.weight AS "Weight", round((100 * T.Weight::numeric::integer)/(101.3-2.67123 * T.Repetitions),0)::numeric::integer AS "Estimated1RM" FROM Person P LEFT JOIN Workout W ON P.person_id=W.person_id LEFT JOIN TopSet T ON W.workout_id=T.workout_id LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id WHERE P.person_id=%s""", [person_id]) weekly_counts = get_workout_counts(topsets, 'week') weekly_pr_counts = count_prs_over_time(topsets, 'week') person_graphs = [get_weekly_pr_graph_model('Workouts per week', weekly_counts), get_weekly_pr_graph_model('PRs per week', weekly_pr_counts)] return { 'PersonId': next((t['PersonId'] for t in topsets), -1), 'PersonName': next((t['PersonName'] for t in topsets), 'Unknown'), 'Stats': get_stats_from_topsets(topsets), 'Exercises': get_all_exercises_from_topsets(topsets), 'Workouts': get_workouts(topsets), 'ExerciseProgressGraphs': get_topsets_for_person(topsets), 'PersonGraphs': person_graphs } def get_workout(self, person_id, workout_id): topsets = self.execute(""" SELECT P.person_id, P.name AS "person_name", W.workout_id, W.start_date, T.topset_id, E.exercise_id, E.name AS "exercise_name", T.repetitions, T.weight, W.note FROM Person P LEFT JOIN Workout W ON P.person_id=W.person_id LEFT JOIN TopSet T ON W.workout_id=T.workout_id LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id WHERE P.person_id=%s AND W.workout_id = %s ORDER BY T.topset_id""", [person_id, workout_id]) return { 'person_id': person_id, 'person_name': topsets[0]['person_name'], 'workout_id': workout_id, 'start_date': topsets[0]['start_date'], 'top_sets': [{"topset_id": t['topset_id'], "exercise_id": t['exercise_id'], "exercise_name": t['exercise_name'], "weight": t['weight'], "repetitions": t['repetitions']} for t in topsets if t['topset_id'] is not None], 'note': topsets[0]['note'] } def get_topset(self, topset_id): topset = self.execute(""" SELECT T.topset_id, E.exercise_id, E.name AS "exercise_name", T.repetitions, T.weight FROM TopSet T INNER JOIN Exercise E ON T.exercise_id=E.exercise_id WHERE T.topset_id = %s""", [topset_id], one=True) return { "topset_id": topset['topset_id'], "exercise_id": topset['exercise_id'], "exercise_name": topset['exercise_name'], "weight": topset['weight'], "repetitions": topset['repetitions'] } def get_all_topsets(self): all_topsets = self.execute(""" SELECT P.person_id AS "PersonId", P.name AS "PersonName", W.workout_id AS "WorkoutId", W.start_date AS "StartDate", T.topset_id AS "TopSetId", E.exercise_id AS "ExerciseId", E.name AS "ExerciseName", T.repetitions AS "Repetitions", T.weight AS "Weight", round((100 * T.Weight::numeric::integer)/(101.3-2.67123 * T.Repetitions),0)::numeric::integer AS "Estimated1RM" FROM Person P LEFT JOIN Workout W ON P.person_id=W.person_id LEFT JOIN TopSet T ON W.workout_id=T.workout_id LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id""") return all_topsets def get_tags_for_person(self, person_id): return self.execute(""" SELECT T.tag_id as "tag_id", T.person_id as "person_id", T.name AS "tag_name", T.filter AS "tag_filter" FROM Tag T WHERE T.person_id = %s ORDER BY T.name""", [person_id]) def add_or_update_tag_for_person(self, person_id, tag_name, tag_filter): # check if a tag exists for dashboard with the same tag_name tag = self.execute('SELECT tag_id AS "TagId" FROM Tag WHERE person_id=%s AND name=%s LIMIT 1', [ person_id, tag_name], one=True) if tag: # update the tag self.execute('UPDATE Tag SET filter=%s WHERE tag_id=%s', [ tag_filter, tag['TagId']], commit=True) else: self.execute('INSERT INTO Tag (person_id, name, filter) VALUES (%s, %s, %s)', [ person_id, tag_name, tag_filter], commit=True) def delete_tag_for_person(self, person_id, tag_id): self.execute('DELETE FROM Tag WHERE person_id=%s AND tag_id=%s', [ person_id, tag_id], commit=True) def get_tags_for_dashboard(self): return self.execute(""" SELECT T.tag_id AS "tag_id", T.person_id AS "person_id", T.name AS "tag_name", T.filter AS "tag_filter" FROM Tag T WHERE T.person_id IS NULL ORDER BY T.name""", []) def add_or_update_tag_for_dashboard(self, tag_name, tag_filter): # check if a tag exists for dashboard with the same tag_name tag = self.execute('SELECT tag_id AS "tag_id" FROM Tag WHERE person_id IS NULL AND name=%s LIMIT 1', [ tag_name], one=True) if tag: # update the tag self.execute('UPDATE Tag SET filter=%s WHERE tag_id=%s', [ tag_filter, tag['tag_id']], commit=True) else: self.execute('INSERT INTO Tag (name, filter) VALUES (%s, %s)', [ tag_name, tag_filter], commit=True) def delete_tag_for_dashboard(self, tag_id): self.execute('DELETE FROM Tag WHERE tag_id=%s', [tag_id], commit=True) def update_workout_note_for_person(self, person_id, workout_id, note): self.execute('UPDATE workout SET note=%s WHERE person_id=%s AND workout_id=%s', [ note, person_id, workout_id], commit=True) def add_tag_for_workout(self, workout_id, tags_id): # If tags_id is not empty, delete tags that are not in the new selection if tags_id: self.execute( """ DELETE FROM workout_tag WHERE workout_id = %s AND tag_id NOT IN %s """, [workout_id, tuple(tags_id)], commit=True ) else: # If tags_id is empty, delete all tags for this workout self.execute( """ DELETE FROM workout_tag WHERE workout_id = %s """, [workout_id], commit=True ) # Then, attempt to insert the new tags for tag_id in tags_id: self.execute( """ INSERT INTO workout_tag (workout_id, tag_id) VALUES (%s, %s) ON CONFLICT (workout_id, tag_id) DO NOTHING """, [workout_id, tag_id], commit=True ) # Now fetch updated list of workout tags workout_tags = self.execute(""" SELECT T.tag_id AS "tag_id", T.person_id AS "person_id", T.name AS "tag_name", T.filter AS "tag_filter", TRUE AS "is_selected" FROM Workout_Tag WT LEFT JOIN Tag T ON WT.tag_id=T.tag_id WHERE WT.workout_id=%s""", [workout_id]) return workout_tags def create_tag_for_workout(self, person_id, workout_id, tag_name): workout_exercises = self.execute(""" SELECT E.exercise_id AS "exercise_id", E.name AS "exercise_name" FROM Workout W LEFT JOIN TopSet T ON W.workout_id=T.workout_id LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id WHERE W.workout_id=%s""", [workout_id]) tag_filter = "?" + \ "&".join( f"exercise_id={e['exercise_id']}" for e in workout_exercises) # create tag for person row = self.execute('INSERT INTO Tag (person_id, name, filter) VALUES (%s, %s, %s) RETURNING tag_id AS "tag_id"', [ person_id, tag_name, tag_filter], commit=True, one=True) # add tag to workout self.execute('INSERT INTO Workout_Tag (workout_id, tag_id) VALUES (%s, %s)', [ workout_id, row['tag_id']], commit=True) # Now fetch updated list of workout tags workout_tags = self.execute(""" SELECT T.tag_id AS "tag_id", T.person_id AS "person_id", T.name AS "tag_name", T.filter AS "tag_filter" FROM Workout_Tag WT LEFT JOIN Tag T ON WT.tag_id=T.tag_id WHERE WT.workout_id=%s""", [workout_id]) return workout_tags def get_workout_tags(self, person_id, workout_id): person_tags = self.execute(""" SELECT T.tag_id AS "tag_id", T.name AS "tag_name", T.filter AS "tag_filter" FROM Tag T WHERE T.person_id=%s""", [person_id]) workout_tags = self.execute(""" SELECT T.tag_id AS "tag_id", T.person_id AS "person_id", T.name AS "tag_name", T.filter AS "tag_filter" FROM Workout_Tag WT LEFT JOIN Tag T ON WT.tag_id=T.tag_id WHERE WT.workout_id=%s""", [workout_id]) selected_workout_tag_ids = [wt['tag_id'] for wt in workout_tags] return (person_tags, workout_tags, selected_workout_tag_ids) def get_most_recent_topset_for_exercise(self, person_id, exercise_id): topset = self.execute(""" SELECT t.repetitions, t.weight FROM topset t JOIN workout w ON t.workout_id = w.workout_id WHERE w.person_id = %s AND t.exercise_id = %s ORDER BY w.start_date DESC LIMIT 1; """, [person_id, exercise_id], one=True) if not topset: return None else: return (topset['repetitions'], topset['weight']) def get_all_exercises(self): exercises = self.execute( 'SELECT exercise_id, name FROM exercise') return exercises def get_exercise_progress_for_user(self, person_id, exercise_id, min_date=None, max_date=None, epoch='all'): today = datetime.now() if epoch == '1M': min_date = today - relativedelta(months=1) elif epoch == '3M': min_date = today - relativedelta(months=3) elif epoch == '6M': min_date = today - relativedelta(months=6) # Execute SQL query to fetch topset data for a specific person and exercise topsets = self.execute(""" SELECT T.topset_id, E.name AS exercise_name, W.person_id, T.workout_id, T.repetitions, T.weight, ROUND((100 * T.weight::NUMERIC::INTEGER) / (101.3 - 2.67123 * T.repetitions), 0)::NUMERIC::INTEGER AS estimated_1rm, W.start_date FROM topset T JOIN exercise E ON T.exercise_id = E.exercise_id JOIN workout W ON T.workout_id = W.workout_id WHERE W.person_id = %s AND E.exercise_id = %s AND (%s IS NULL OR W.start_date >= %s) AND (%s IS NULL OR W.start_date <= %s) ORDER BY W.start_date; """, [person_id, exercise_id, min_date, min_date, max_date, max_date]) # Return None if no topsets found if not topsets: return None # Extracting values and calculating value ranges for SVG dimensions estimated_1rm = [t['estimated_1rm'] for t in topsets] repetitions = [t['repetitions'] for t in topsets] weight = [t['weight'] for t in topsets] start_dates = [t['start_date'] for t in topsets] messages = [f'{t["repetitions"]} x {t["weight"]}kg ({t["estimated_1rm"]}kg E1RM) on {t["start_date"].strftime("%d %b %y")}' for t in topsets] exercise_progress = get_exercise_graph_model(topsets[0]['exercise_name'], estimated_1rm, repetitions, weight, start_dates, messages, epoch, person_id, exercise_id, min_date, max_date) return exercise_progress def get_workout_notes_for_person(self, person_id): sql_query = """ SELECT p.name AS person_name, w.workout_id, to_char(w.start_date, 'Mon DD YYYY') AS formatted_start_date, w.note, t.filter as tag_filter, t.name AS tag_name FROM person p LEFT JOIN workout w ON p.person_id = w.person_id AND w.note IS NOT NULL AND w.note <> '' LEFT JOIN workout_tag wt ON w.workout_id = wt.workout_id LEFT JOIN tag t ON wt.tag_id = t.tag_id WHERE p.person_id = %s ORDER BY w.start_date DESC, w.workout_id, t.name; """ # Execute the SQL query raw_workout_notes = self.execute(sql_query, [person_id]) # Initialize variables to hold the person's name and the workouts person_name = None workout_notes = {} for row in raw_workout_notes: # Update person_name (it will be the same for all rows) if person_name is None: person_name = row['person_name'] # Process workout notes and tags if there's a note associated with the workout if row['workout_id'] and row['note']: # Check if workout_id exists and note is not None or empty workout_id = row['workout_id'] if workout_id not in workout_notes: workout_notes[workout_id] = { 'workout_id': workout_id, 'formatted_start_date': row['formatted_start_date'], 'note': row['note'], 'tags': [] } if row['tag_name']: # Only add the tag if it is not None workout_notes[workout_id]['tags'].append({'tag_filter': row['tag_filter'], 'tag_name': row['tag_name'], 'person_id': person_id}) # Convert the workout_notes dictionary back into a list as the final result workout_notes_list = list(workout_notes.values()) # Return a tuple containing the person's name and their workout notes return (person_name, workout_notes_list) def get_exercise_earliest_and_latest_dates(self, person_id, exercise_id): sql_query = """ SELECT w.start_date FROM workout w INNER JOIN topset t on w.workout_id = t.workout_id INNER JOIN exercise e on t.exercise_id = e.exercise_id WHERE w.person_id = %s AND e.exercise_id = %s ORDER BY w.start_date DESC; """ # Execute the SQL query workout_exercise_dates = self.execute(sql_query, [person_id, exercise_id]) if not workout_exercise_dates: return None, None latest_date = workout_exercise_dates[0]['start_date'] earliest_date = workout_exercise_dates[-1]['start_date'] return earliest_date, latest_date