Files
workout/db.py
2024-11-02 23:09:26 +11:00

578 lines
24 KiB
Python

import os
import psycopg2
import numpy as np
from psycopg2.extras import RealDictCursor
from datetime import datetime
from dateutil.relativedelta import relativedelta
from urllib.parse import urlparse
from flask import g
from features.calendar import Calendar
from features.stats import Stats
from features.workout import Workout
from utils import count_prs_over_time, get_all_exercises_from_topsets, get_exercise_graph_model, get_stats_from_topsets, get_topsets_for_person, get_weekly_pr_graph_model, get_workout_counts, get_workouts
class DataBase():
def __init__(self, app):
self.calendar = Calendar(self.execute)
self.stats = Stats(self.execute)
self.workout = Workout(self.execute)
db_url = urlparse(os.environ['DATABASE_URL'])
# if db_url is null then throw error
if not db_url:
raise Exception("No DATABASE_URL environment variable set")
def getDB(self):
db = getattr(g, 'database', None)
if db is None:
db_url = urlparse(os.environ['DATABASE_URL'])
g.database = psycopg2.connect(
database=db_url.path[1:],
user=db_url.username,
password=db_url.password,
host=db_url.hostname,
port=db_url.port
)
db = g.database
return db
def close_connection(exception):
db = getattr(g, 'database', None)
if db is not None:
db.close()
def execute(self, query, args=(), one=False, commit=False):
conn = self.getDB()
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute(query, args)
rv = None
if cur.description is not None:
rv = cur.fetchall()
if commit:
try:
conn.commit()
except:
conn.rollback()
cur.close()
return (rv[0] if rv else None) if one else rv
def get_exercise(self, exercise_id):
exercise = self.execute(
'SELECT exercise_id, name FROM exercise WHERE exercise_id=%s LIMIT 1', [exercise_id], one=True)
return exercise
def create_exercise(self, name):
new_exercise = self.execute('INSERT INTO exercise (name) VALUES (%s) RETURNING exercise_id AS "ExerciseId"',
[name], commit=True, one=True)
return new_exercise['ExerciseId']
def delete_exercise(self, exercise_id):
self.execute('DELETE FROM exercise WHERE exercise_id=%s', [
exercise_id], commit=True)
def update_exercise(self, exercise_id, name):
self.execute('UPDATE Exercise SET Name=%s WHERE exercise_id=%s', [
name, exercise_id], commit=True)
def get_people(self):
people = self.execute(
'SELECT person_id AS "PersonId", name AS "Name" FROM person')
return people
def is_valid_person(self, person_id):
person = self.execute(
'SELECT person_id AS "PersonId" FROM person WHERE person_id=%s LIMIT 1', [person_id], one=True)
return person
def create_person(self, name):
new_person = self.execute('INSERT INTO person (name) VALUES (%s) RETURNING person_id AS "PersonId"', [
name], commit=True, one=True)
return new_person['PersonId']
def delete_person(self, person_id):
self.execute('DELETE FROM topset WHERE workout_id IN (SELECT workout_id FROM workout WHERE person_id=%s)', [
person_id], commit=True)
self.execute('DELETE FROM workout WHERE person_id=%s',
[person_id], commit=True)
self.execute('DELETE FROM person WHERE person_id=%s',
[person_id], commit=True)
def update_person_name(self, person_id, name):
self.execute('UPDATE person SET name=%s WHERE person_id=%s', [
name, person_id], commit=True)
def is_valid_workout(self, person_id, workout_id):
workout = self.execute('SELECT W.workout_id AS "WorkoutId" FROM Person P, Workout W WHERE P.person_id=W.person_id AND P.person_id=%s AND W.workout_id=%s LIMIT 1', [
person_id, workout_id], one=True)
return workout
def is_valid_topset(self, person_id, workout_id, topset_id):
topset = self.execute("""
SELECT T.topset_id AS "TopSetId"
FROM Person P, Workout W, TopSet T
WHERE W.person_id=W.person_id AND W.workout_id=T.workout_id AND P.person_id=%s AND W.workout_id = %s AND T.topset_id = %s
LIMIT 1""", [person_id, workout_id, topset_id], one=True)
return topset
def delete_workout(self, workout_id):
self.execute('DELETE FROM topset WHERE workout_id=%s',
[workout_id], commit=True)
self.execute('DELETE FROM workout WHERE workout_id=%s',
[workout_id], commit=True)
def update_topset(self, exercise_id, repetitions, weight, topset_id):
self.execute('UPDATE topset SET exercise_id=%s, repetitions=%s, weight=%s WHERE topSet_id=%s', [
exercise_id, repetitions, weight, topset_id], commit=True)
def create_topset(self, workout_id, exercise_id, repetitions, weight):
new_top_set = self.execute('INSERT INTO topset (workout_id, exercise_id, repetitions, weight) VALUES (%s, %s, %s, %s) RETURNING topset_id AS "TopSetId"', [
workout_id, exercise_id, repetitions, weight], commit=True, one=True)
return new_top_set['TopSetId']
def delete_topset(self, topset_id):
self.execute('DELETE FROM topset WHERE topset_id=%s', [
topset_id], commit=True)
def create_workout(self, person_id):
now = datetime.now()
date_string = now.strftime('%Y-%m-%d')
# check if a workout has already been created for this person today that doesnt contain any topsets and if so return the WorkoutId
workout = self.execute('SELECT workout_id AS "WorkoutId" FROM workout WHERE person_id=%s AND start_date=%s AND workout_id NOT IN (SELECT workout_id FROM topset)', [
person_id, date_string], one=True)
if workout:
print(
f'Workout already created for PersonId {person_id} starting at {date_string} so returning WorkoutId {workout["WorkoutId"]} rather then creating new workout')
return workout['WorkoutId']
print(
f'Creating workout for PersonId {person_id} starting at {date_string}')
new_workout = self.execute('INSERT INTO workout (person_id, start_date) VALUES (%s, %s) RETURNING workout_id AS "WorkoutId"', [
person_id, date_string], commit=True, one=True)
return new_workout['WorkoutId']
def get_people_and_workout_count(self, person_id):
return self.execute("""
SELECT
P.person_id AS "PersonId",
P.name AS "Name",
COUNT(W.workout_id) AS "NumberOfWorkouts",
CASE P.person_id
WHEN %s
THEN 1
ELSE 0
END "IsActive"
FROM
Person P LEFT JOIN Workout W ON P.person_id = W.person_id
GROUP BY
P.person_id
ORDER BY
P.person_id""", [person_id])
def update_workout_start_date(self, workout_id, start_date):
self.execute('UPDATE workout SET start_date=%s WHERE workout_id=%s', [
start_date, workout_id], commit=True)
def get_person(self, person_id):
topsets = self.execute("""
SELECT
P.person_id AS "PersonId",
P.name AS "PersonName",
W.workout_id AS "WorkoutId",
W.start_date AS "StartDate",
T.topset_id AS "TopSetId",
E.exercise_id AS "ExerciseId",
E.name AS "ExerciseName",
T.repetitions AS "Repetitions",
T.weight AS "Weight",
round((100 * T.Weight::numeric::integer)/(101.3-2.67123 * T.Repetitions),0)::numeric::integer AS "Estimated1RM"
FROM Person P
LEFT JOIN Workout W ON P.person_id=W.person_id
LEFT JOIN TopSet T ON W.workout_id=T.workout_id
LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id
WHERE P.person_id=%s""", [person_id])
weekly_counts = get_workout_counts(topsets, 'week')
weekly_pr_counts = count_prs_over_time(topsets, 'week')
person_graphs = [get_weekly_pr_graph_model('Workouts per week', weekly_counts), get_weekly_pr_graph_model('PRs per week', weekly_pr_counts)]
return {
'PersonId': next((t['PersonId'] for t in topsets), -1),
'PersonName': next((t['PersonName'] for t in topsets), 'Unknown'),
'Stats': get_stats_from_topsets(topsets),
'Exercises': get_all_exercises_from_topsets(topsets),
'Workouts': get_workouts(topsets),
'ExerciseProgressGraphs': get_topsets_for_person(topsets),
'PersonGraphs': person_graphs
}
def get_workout(self, person_id, workout_id):
topsets = self.execute("""
SELECT
P.person_id,
P.name AS "person_name",
W.workout_id,
W.start_date,
T.topset_id,
E.exercise_id,
E.name AS "exercise_name",
T.repetitions,
T.weight,
W.note
FROM Person P
LEFT JOIN Workout W ON P.person_id=W.person_id
LEFT JOIN TopSet T ON W.workout_id=T.workout_id
LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id
WHERE P.person_id=%s
AND W.workout_id = %s
ORDER BY T.topset_id""", [person_id, workout_id])
return {
'person_id': person_id,
'person_name': topsets[0]['person_name'],
'workout_id': workout_id,
'start_date': topsets[0]['start_date'],
'top_sets': [{"topset_id": t['topset_id'], "exercise_id": t['exercise_id'], "exercise_name": t['exercise_name'], "weight": t['weight'], "repetitions": t['repetitions']} for t in topsets if t['topset_id'] is not None],
'note': topsets[0]['note']
}
def get_topset(self, topset_id):
topset = self.execute("""
SELECT
T.topset_id,
E.exercise_id,
E.name AS "exercise_name",
T.repetitions,
T.weight
FROM TopSet T
INNER JOIN Exercise E ON T.exercise_id=E.exercise_id
WHERE T.topset_id = %s""", [topset_id], one=True)
return {
"topset_id": topset['topset_id'],
"exercise_id": topset['exercise_id'],
"exercise_name": topset['exercise_name'],
"weight": topset['weight'],
"repetitions": topset['repetitions']
}
def get_all_topsets(self):
all_topsets = self.execute("""
SELECT
P.person_id AS "PersonId",
P.name AS "PersonName",
W.workout_id AS "WorkoutId",
W.start_date AS "StartDate",
T.topset_id AS "TopSetId",
E.exercise_id AS "ExerciseId",
E.name AS "ExerciseName",
T.repetitions AS "Repetitions",
T.weight AS "Weight",
round((100 * T.Weight::numeric::integer)/(101.3-2.67123 * T.Repetitions),0)::numeric::integer AS "Estimated1RM"
FROM Person P
LEFT JOIN Workout W ON P.person_id=W.person_id
LEFT JOIN TopSet T ON W.workout_id=T.workout_id
LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id""")
return all_topsets
def get_tags_for_person(self, person_id):
return self.execute("""
SELECT
T.tag_id as "tag_id",
T.person_id as "person_id",
T.name AS "tag_name",
T.filter AS "tag_filter"
FROM
Tag T
WHERE
T.person_id = %s
ORDER BY
T.name""", [person_id])
def add_or_update_tag_for_person(self, person_id, tag_name, tag_filter):
# check if a tag exists for dashboard with the same tag_name
tag = self.execute('SELECT tag_id AS "TagId" FROM Tag WHERE person_id=%s AND name=%s LIMIT 1', [
person_id, tag_name], one=True)
if tag:
# update the tag
self.execute('UPDATE Tag SET filter=%s WHERE tag_id=%s', [
tag_filter, tag['TagId']], commit=True)
else:
self.execute('INSERT INTO Tag (person_id, name, filter) VALUES (%s, %s, %s)', [
person_id, tag_name, tag_filter], commit=True)
def delete_tag_for_person(self, person_id, tag_id):
self.execute('DELETE FROM Tag WHERE person_id=%s AND tag_id=%s', [
person_id, tag_id], commit=True)
def get_tags_for_dashboard(self):
return self.execute("""
SELECT
T.tag_id AS "tag_id",
T.person_id AS "person_id",
T.name AS "tag_name",
T.filter AS "tag_filter"
FROM
Tag T
WHERE
T.person_id IS NULL
ORDER BY
T.name""", [])
def add_or_update_tag_for_dashboard(self, tag_name, tag_filter):
# check if a tag exists for dashboard with the same tag_name
tag = self.execute('SELECT tag_id AS "tag_id" FROM Tag WHERE person_id IS NULL AND name=%s LIMIT 1', [
tag_name], one=True)
if tag:
# update the tag
self.execute('UPDATE Tag SET filter=%s WHERE tag_id=%s', [
tag_filter, tag['tag_id']], commit=True)
else:
self.execute('INSERT INTO Tag (name, filter) VALUES (%s, %s)', [
tag_name, tag_filter], commit=True)
def delete_tag_for_dashboard(self, tag_id):
self.execute('DELETE FROM Tag WHERE tag_id=%s', [tag_id], commit=True)
def update_workout_note_for_person(self, person_id, workout_id, note):
self.execute('UPDATE workout SET note=%s WHERE person_id=%s AND workout_id=%s', [
note, person_id, workout_id], commit=True)
def add_tag_for_workout(self, workout_id, tags_id):
# First, delete tags that are not in the new selection
self.execute(
"""
DELETE FROM workout_tag
WHERE workout_id = %s AND tag_id NOT IN %s
""",
[workout_id, tuple(tags_id)], commit=True
)
# Then, attempt to insert the new tags
for tag_id in tags_id:
self.execute(
"""
INSERT INTO workout_tag (workout_id, tag_id)
VALUES (%s, %s)
ON CONFLICT (workout_id, tag_id) DO NOTHING
""",
[workout_id, tag_id], commit=True
)
# Now fetch updated list of workout tags
workout_tags = self.execute("""
SELECT
T.tag_id AS "tag_id",
T.person_id AS "person_id",
T.name AS "tag_name",
T.filter AS "tag_filter"
FROM Workout_Tag WT
LEFT JOIN Tag T ON WT.tag_id=T.tag_id
WHERE WT.workout_id=%s""", [workout_id])
return workout_tags
def create_tag_for_workout(self, person_id, workout_id, tag_name):
workout_exercises = self.execute("""
SELECT
E.exercise_id AS "exercise_id",
E.name AS "exercise_name"
FROM Workout W
LEFT JOIN TopSet T ON W.workout_id=T.workout_id
LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id
WHERE W.workout_id=%s""", [workout_id])
tag_filter = "?" + \
"&".join(
f"exercise_id={e['exercise_id']}" for e in workout_exercises)
# create tag for person
row = self.execute('INSERT INTO Tag (person_id, name, filter) VALUES (%s, %s, %s) RETURNING tag_id AS "tag_id"', [
person_id, tag_name, tag_filter], commit=True, one=True)
# add tag to workout
self.execute('INSERT INTO Workout_Tag (workout_id, tag_id) VALUES (%s, %s)', [
workout_id, row['tag_id']], commit=True)
# Now fetch updated list of workout tags
workout_tags = self.execute("""
SELECT
T.tag_id AS "tag_id",
T.person_id AS "person_id",
T.name AS "tag_name",
T.filter AS "tag_filter"
FROM Workout_Tag WT
LEFT JOIN Tag T ON WT.tag_id=T.tag_id
WHERE WT.workout_id=%s""", [workout_id])
return workout_tags
def get_workout_tags(self, person_id, workout_id):
person_tags = self.execute("""
SELECT
T.tag_id AS "tag_id",
T.name AS "tag_name",
T.filter AS "tag_filter"
FROM Tag T
WHERE T.person_id=%s""", [person_id])
workout_tags = self.execute("""
SELECT
T.tag_id AS "tag_id",
T.person_id AS "person_id",
T.name AS "tag_name",
T.filter AS "tag_filter"
FROM Workout_Tag WT
LEFT JOIN Tag T ON WT.tag_id=T.tag_id
WHERE WT.workout_id=%s""", [workout_id])
selected_workout_tag_ids = [wt['tag_id'] for wt in workout_tags]
return (person_tags, workout_tags, selected_workout_tag_ids)
def get_most_recent_topset_for_exercise(self, person_id, exercise_id):
topset = self.execute("""
SELECT
t.repetitions,
t.weight
FROM
topset t JOIN workout w ON t.workout_id = w.workout_id
WHERE
w.person_id = %s AND t.exercise_id = %s
ORDER BY
w.start_date DESC
LIMIT 1;
""", [person_id, exercise_id], one=True)
if not topset:
return None
else:
return (topset['repetitions'], topset['weight'])
def get_all_exercises(self):
exercises = self.execute(
'SELECT exercise_id, name FROM exercise')
return exercises
def get_exercise_progress_for_user(self, person_id, exercise_id, min_date=None, max_date=None, epoch='all'):
today = datetime.now()
if epoch == '1M':
min_date = today - relativedelta(months=1)
elif epoch == '3M':
min_date = today - relativedelta(months=3)
elif epoch == '6M':
min_date = today - relativedelta(months=6)
# Execute SQL query to fetch topset data for a specific person and exercise
topsets = self.execute("""
SELECT
T.topset_id, E.name AS exercise_name, W.person_id, T.workout_id,
T.repetitions, T.weight,
ROUND((100 * T.weight::NUMERIC::INTEGER) / (101.3 - 2.67123 * T.repetitions), 0)::NUMERIC::INTEGER AS estimated_1rm,
W.start_date
FROM
topset T
JOIN
exercise E ON T.exercise_id = E.exercise_id
JOIN
workout W ON T.workout_id = W.workout_id
WHERE
W.person_id = %s
AND E.exercise_id = %s AND
(%s IS NULL OR W.start_date >= %s) AND
(%s IS NULL OR W.start_date <= %s)
ORDER BY
W.start_date;
""", [person_id, exercise_id, min_date, min_date, max_date, max_date])
# Return None if no topsets found
if not topsets:
return None
# Extracting values and calculating value ranges for SVG dimensions
estimated_1rm = [t['estimated_1rm'] for t in topsets]
repetitions = [t['repetitions'] for t in topsets]
weight = [t['weight'] for t in topsets]
start_dates = [t['start_date'] for t in topsets]
messages = [f'{t["repetitions"]} x {t["weight"]}kg ({t["estimated_1rm"]}kg E1RM) on {t["start_date"].strftime("%d %b %y")}' for t in topsets]
exercise_progress = get_exercise_graph_model(topsets[0]['exercise_name'], estimated_1rm, repetitions, weight, start_dates, messages, epoch, person_id, exercise_id, min_date, max_date)
return exercise_progress
def get_workout_notes_for_person(self, person_id):
sql_query = """
SELECT
p.name AS person_name,
w.workout_id,
to_char(w.start_date, 'Mon DD YYYY') AS formatted_start_date,
w.note,
t.filter as tag_filter,
t.name AS tag_name
FROM person p
LEFT JOIN workout w ON p.person_id = w.person_id AND w.note IS NOT NULL AND w.note <> ''
LEFT JOIN workout_tag wt ON w.workout_id = wt.workout_id
LEFT JOIN tag t ON wt.tag_id = t.tag_id
WHERE p.person_id = %s
ORDER BY w.start_date DESC, w.workout_id, t.name;
"""
# Execute the SQL query
raw_workout_notes = self.execute(sql_query, [person_id])
# Initialize variables to hold the person's name and the workouts
person_name = None
workout_notes = {}
for row in raw_workout_notes:
# Update person_name (it will be the same for all rows)
if person_name is None:
person_name = row['person_name']
# Process workout notes and tags if there's a note associated with the workout
if row['workout_id'] and row['note']: # Check if workout_id exists and note is not None or empty
workout_id = row['workout_id']
if workout_id not in workout_notes:
workout_notes[workout_id] = {
'workout_id': workout_id,
'formatted_start_date': row['formatted_start_date'],
'note': row['note'],
'tags': []
}
if row['tag_name']: # Only add the tag if it is not None
workout_notes[workout_id]['tags'].append({'tag_filter': row['tag_filter'], 'tag_name': row['tag_name'], 'person_id': person_id})
# Convert the workout_notes dictionary back into a list as the final result
workout_notes_list = list(workout_notes.values())
# Return a tuple containing the person's name and their workout notes
return (person_name, workout_notes_list)
def get_exercise_earliest_and_latest_dates(self, person_id, exercise_id):
sql_query = """
SELECT
w.start_date
FROM workout w
INNER JOIN topset t on w.workout_id = t.workout_id
INNER JOIN exercise e on t.exercise_id = e.exercise_id
WHERE w.person_id = %s AND e.exercise_id = %s
ORDER BY w.start_date DESC;
"""
# Execute the SQL query
workout_exercise_dates = self.execute(sql_query, [person_id, exercise_id])
if not workout_exercise_dates:
return None, None
latest_date = workout_exercise_dates[0]['start_date']
earliest_date = workout_exercise_dates[-1]['start_date']
return earliest_date, latest_date