Files
workout/db.py
2025-04-21 20:13:30 +10:00

479 lines
19 KiB
Python

import os
import psycopg2
from psycopg2.extras import RealDictCursor
from datetime import datetime
from dateutil.relativedelta import relativedelta
from urllib.parse import urlparse
from flask import g
import pandas as pd
from features.exercises import Exercises
from features.people_graphs import PeopleGraphs
from features.person_overview import PersonOverview
from features.stats import Stats
from features.dashboard import Dashboard
from utils import get_exercise_graph_model
class DataBase():
def __init__(self, app=None):
self.stats = Stats(self.execute)
self.exercises = Exercises(self.execute)
self.person_overview = PersonOverview(self.execute)
self.people_graphs = PeopleGraphs(self.execute)
self.dashboard = Dashboard(self.execute)
db_url = urlparse(os.environ['DATABASE_URL'])
# if db_url is null then throw error
if not db_url:
raise Exception("No DATABASE_URL environment variable set")
def getDB(self):
db = getattr(g, 'database', None)
if db is None:
db_url = urlparse(os.environ['DATABASE_URL'])
g.database = psycopg2.connect(
database=db_url.path[1:],
user=db_url.username,
password=db_url.password,
host=db_url.hostname,
port=db_url.port
)
db = g.database
return db
def close_connection(exception):
db = getattr(g, 'database', None)
if db is not None:
db.close()
def execute(self, query, args=(), one=False, commit=False):
conn = self.getDB()
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute(query, args)
rv = None
if cur.description is not None:
rv = cur.fetchall()
if commit:
try:
conn.commit()
except:
conn.rollback()
cur.close()
return (rv[0] if rv else None) if one else rv
def read_sql_as_df(self, query, params=None):
conn = self.getDB()
try:
df = pd.read_sql(query, conn, params=params)
return df
except Exception as e:
raise e
def get_exercise(self, exercise_id):
exercise = self.execute(
'SELECT exercise_id, name FROM exercise WHERE exercise_id=%s LIMIT 1', [exercise_id], one=True)
return exercise
def create_exercise(self, name):
new_exercise = self.execute('INSERT INTO exercise (name) VALUES (%s) RETURNING exercise_id AS "ExerciseId"',
[name], commit=True, one=True)
return new_exercise['ExerciseId']
def delete_exercise(self, exercise_id):
self.execute('DELETE FROM exercise WHERE exercise_id=%s', [
exercise_id], commit=True)
def update_exercise(self, exercise_id, name):
self.execute('UPDATE Exercise SET Name=%s WHERE exercise_id=%s', [
name, exercise_id], commit=True)
def get_people(self):
people = self.execute(
'SELECT person_id AS "PersonId", name AS "Name" FROM person')
return people
def is_valid_person(self, person_id):
person = self.execute(
'SELECT person_id AS "PersonId" FROM person WHERE person_id=%s LIMIT 1', [person_id], one=True)
return person
def create_person(self, name):
new_person = self.execute('INSERT INTO person (name) VALUES (%s) RETURNING person_id AS "PersonId"', [
name], commit=True, one=True)
return new_person['PersonId']
def delete_person(self, person_id):
self.execute('DELETE FROM topset WHERE workout_id IN (SELECT workout_id FROM workout WHERE person_id=%s)', [
person_id], commit=True)
self.execute('DELETE FROM workout WHERE person_id=%s',
[person_id], commit=True)
self.execute('DELETE FROM person WHERE person_id=%s',
[person_id], commit=True)
def update_person_name(self, person_id, name):
self.execute('UPDATE person SET name=%s WHERE person_id=%s', [
name, person_id], commit=True)
def is_valid_workout(self, person_id, workout_id):
workout = self.execute('SELECT W.workout_id AS "WorkoutId" FROM Person P, Workout W WHERE P.person_id=W.person_id AND P.person_id=%s AND W.workout_id=%s LIMIT 1', [
person_id, workout_id], one=True)
return workout
def is_valid_topset(self, person_id, workout_id, topset_id):
topset = self.execute("""
SELECT T.topset_id AS "TopSetId"
FROM Person P, Workout W, TopSet T
WHERE W.person_id=W.person_id AND W.workout_id=T.workout_id AND P.person_id=%s AND W.workout_id = %s AND T.topset_id = %s
LIMIT 1""", [person_id, workout_id, topset_id], one=True)
return topset
def delete_workout(self, workout_id):
self.execute('DELETE FROM topset WHERE workout_id=%s',
[workout_id], commit=True)
self.execute('DELETE FROM workout WHERE workout_id=%s',
[workout_id], commit=True)
def update_topset(self, exercise_id, repetitions, weight, topset_id):
self.execute('UPDATE topset SET exercise_id=%s, repetitions=%s, weight=%s WHERE topSet_id=%s', [
exercise_id, repetitions, weight, topset_id], commit=True)
def create_topset(self, workout_id, exercise_id, repetitions, weight):
new_top_set = self.execute('INSERT INTO topset (workout_id, exercise_id, repetitions, weight) VALUES (%s, %s, %s, %s) RETURNING topset_id AS "TopSetId"', [
workout_id, exercise_id, repetitions, weight], commit=True, one=True)
return new_top_set['TopSetId']
def delete_topset(self, topset_id):
self.execute('DELETE FROM topset WHERE topset_id=%s', [
topset_id], commit=True)
def create_workout(self, person_id):
now = datetime.now()
date_string = now.strftime('%Y-%m-%d')
# check if a workout has already been created for this person today that doesnt contain any topsets and if so return the WorkoutId
workout = self.execute('SELECT workout_id AS "WorkoutId" FROM workout WHERE person_id=%s AND start_date=%s AND workout_id NOT IN (SELECT workout_id FROM topset)', [
person_id, date_string], one=True)
if workout:
print(
f'Workout already created for PersonId {person_id} starting at {date_string} so returning WorkoutId {workout["WorkoutId"]} rather then creating new workout')
return workout['WorkoutId']
print(
f'Creating workout for PersonId {person_id} starting at {date_string}')
new_workout = self.execute('INSERT INTO workout (person_id, start_date) VALUES (%s, %s) RETURNING workout_id AS "WorkoutId"', [
person_id, date_string], commit=True, one=True)
return new_workout['WorkoutId']
def get_people_and_workout_count(self, person_id):
return self.execute("""
SELECT
P.person_id AS "PersonId",
P.name AS "Name",
COUNT(W.workout_id) AS "NumberOfWorkouts",
CASE P.person_id
WHEN %s
THEN 1
ELSE 0
END "IsActive"
FROM
Person P LEFT JOIN Workout W ON P.person_id = W.person_id
GROUP BY
P.person_id
ORDER BY
P.person_id""", [person_id])
def update_workout_start_date(self, workout_id, start_date):
self.execute('UPDATE workout SET start_date=%s WHERE workout_id=%s', [
start_date, workout_id], commit=True)
def get_person_name(self, person_id):
result = self.execute("""SELECT name from Person WHERE person_id=%s""", [person_id], one=True)
return result["name"]
def get_workout(self, person_id, workout_id):
topsets = self.execute("""
SELECT
P.person_id,
P.name AS "person_name",
W.workout_id,
W.start_date,
T.topset_id,
E.exercise_id,
E.name AS "exercise_name",
T.repetitions,
T.weight,
W.note
FROM Person P
LEFT JOIN Workout W ON P.person_id=W.person_id
LEFT JOIN TopSet T ON W.workout_id=T.workout_id
LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id
WHERE P.person_id=%s
AND W.workout_id = %s
ORDER BY T.topset_id""", [person_id, workout_id])
return {
'person_id': person_id,
'person_name': topsets[0]['person_name'],
'workout_id': workout_id,
'start_date': topsets[0]['start_date'],
'top_sets': [{"topset_id": t['topset_id'], "exercise_id": t['exercise_id'], "exercise_name": t['exercise_name'], "weight": t['weight'], "repetitions": t['repetitions']} for t in topsets if t['topset_id'] is not None],
'note': topsets[0]['note']
}
def get_topset(self, topset_id):
topset = self.execute("""
SELECT
T.topset_id,
E.exercise_id,
E.name AS "exercise_name",
T.repetitions,
T.weight
FROM TopSet T
INNER JOIN Exercise E ON T.exercise_id=E.exercise_id
WHERE T.topset_id = %s""", [topset_id], one=True)
return {
"topset_id": topset['topset_id'],
"exercise_id": topset['exercise_id'],
"exercise_name": topset['exercise_name'],
"weight": topset['weight'],
"repetitions": topset['repetitions']
}
def get_all_topsets(self):
all_topsets = self.execute("""
SELECT
P.person_id AS "PersonId",
P.name AS "PersonName",
W.workout_id AS "WorkoutId",
W.start_date AS "StartDate",
T.topset_id AS "TopSetId",
E.exercise_id AS "ExerciseId",
E.name AS "ExerciseName",
T.repetitions AS "Repetitions",
T.weight AS "Weight",
round((100 * T.Weight::numeric::integer)/(101.3-2.67123 * T.Repetitions),0)::numeric::integer AS "Estimated1RM"
FROM Person P
LEFT JOIN Workout W ON P.person_id=W.person_id
LEFT JOIN TopSet T ON W.workout_id=T.workout_id
LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id""")
return all_topsets
def get_tags_for_person(self, person_id):
# Fetch tags from the database
tags = self.execute("""
SELECT
T.tag_id as "tag_id",
T.person_id as "person_id",
T.name AS "tag_name",
T.filter AS "tag_filter"
FROM
Tag T
WHERE
T.person_id = %s
ORDER BY
T.name
""", [person_id])
# Add the static 'All' entry at the beginning
all_tag = {
"tag_id": -1, # No specific ID for 'All'
"person_id": person_id,
"tag_name": "All", # Static name
"tag_filter": "" # No filter
}
return [all_tag] + tags
def add_or_update_tag_for_person(self, person_id, tag_name, tag_filter):
# check if a tag exists for dashboard with the same tag_name
tag = self.execute('SELECT tag_id AS "TagId" FROM Tag WHERE person_id=%s AND name=%s LIMIT 1', [
person_id, tag_name], one=True)
if tag:
# update the tag
self.execute('UPDATE Tag SET filter=%s WHERE tag_id=%s', [
tag_filter, tag['TagId']], commit=True)
else:
self.execute('INSERT INTO Tag (person_id, name, filter) VALUES (%s, %s, %s)', [
person_id, tag_name, tag_filter], commit=True)
def delete_tag_for_person(self, person_id, tag_id):
self.execute('DELETE FROM Tag WHERE person_id=%s AND tag_id=%s', [
person_id, tag_id], commit=True)
def get_tags_for_dashboard(self):
tags = self.execute("""
SELECT
T.tag_id AS "tag_id",
T.person_id AS "person_id",
T.name AS "tag_name",
T.filter AS "tag_filter"
FROM
Tag T
WHERE
T.person_id IS NULL
ORDER BY
T.name""", [])
# Add the static 'All' entry at the beginning
all_tag = {
"tag_id": -1, # No specific ID for 'All'
"person_id": None,
"tag_name": "All", # Static name
"tag_filter": "" # No filter
}
return [all_tag] + tags
def add_or_update_tag_for_dashboard(self, tag_name, tag_filter):
# check if a tag exists for dashboard with the same tag_name
tag = self.execute('SELECT tag_id AS "tag_id" FROM Tag WHERE person_id IS NULL AND name=%s LIMIT 1', [
tag_name], one=True)
if tag:
# update the tag
self.execute('UPDATE Tag SET filter=%s WHERE tag_id=%s', [
tag_filter, tag['tag_id']], commit=True)
else:
self.execute('INSERT INTO Tag (name, filter) VALUES (%s, %s)', [
tag_name, tag_filter], commit=True)
def delete_tag_for_dashboard(self, tag_id):
self.execute('DELETE FROM Tag WHERE tag_id=%s', [tag_id], commit=True)
def get_workout_tags(self, person_id, workout_id):
person_tags = self.execute("""
SELECT
T.tag_id AS "tag_id",
T.name AS "tag_name",
T.filter AS "tag_filter"
FROM Tag T
WHERE T.person_id=%s""", [person_id])
workout_tags = self.execute("""
SELECT
T.tag_id AS "tag_id",
T.person_id AS "person_id",
T.name AS "tag_name",
T.filter AS "tag_filter"
FROM Workout_Tag WT
LEFT JOIN Tag T ON WT.tag_id=T.tag_id
WHERE WT.workout_id=%s""", [workout_id])
selected_workout_tag_ids = [wt['tag_id'] for wt in workout_tags]
return (person_tags, workout_tags, selected_workout_tag_ids)
def get_most_recent_topset_for_exercise(self, person_id, exercise_id):
topset = self.execute("""
SELECT
t.repetitions,
t.weight,
e.name AS "exercise_name"
FROM
exercise e
LEFT JOIN topset t ON e.exercise_id = t.exercise_id
LEFT JOIN workout w ON t.workout_id = w.workout_id
WHERE
e.exercise_id = %s AND (w.person_id = %s OR w.person_id IS NULL)
ORDER BY
w.start_date DESC
LIMIT 1;
""", [exercise_id, person_id], one=True)
if not topset:
return None
else:
return (topset.get('repetitions'), topset.get('weight'), topset['exercise_name'])
def get_all_exercises(self):
exercises = self.execute(
'SELECT exercise_id, name FROM exercise')
return exercises
def get_exercise_progress_for_user(self, person_id, exercise_id, min_date=None, max_date=None, epoch='all', degree=1):
today = datetime.now()
if epoch == '1M':
min_date = today - relativedelta(months=1)
elif epoch == '3M':
min_date = today - relativedelta(months=3)
elif epoch == '6M':
min_date = today - relativedelta(months=6)
# Execute SQL query to fetch topset data for a specific person and exercise
topsets = self.execute("""
SELECT
T.topset_id, E.name AS exercise_name, W.person_id, T.workout_id,
T.repetitions, T.weight,
ROUND((100 * T.weight::NUMERIC::INTEGER) / (101.3 - 2.67123 * T.repetitions), 0)::NUMERIC::INTEGER AS estimated_1rm,
W.start_date
FROM
topset T
JOIN
exercise E ON T.exercise_id = E.exercise_id
JOIN
workout W ON T.workout_id = W.workout_id
WHERE
W.person_id = %s
AND E.exercise_id = %s AND
(%s IS NULL OR W.start_date >= %s) AND
(%s IS NULL OR W.start_date <= %s)
ORDER BY
W.start_date;
""", [person_id, exercise_id, min_date, min_date, max_date, max_date])
# Return None if no topsets found
if not topsets:
return None
# Extracting values and calculating value ranges for SVG dimensions
exercise_name = topsets[0]['exercise_name']
estimated_1rm = [t['estimated_1rm'] for t in topsets]
repetitions = [t['repetitions'] for t in topsets]
weight = [t['weight'] for t in topsets]
start_dates = [t['start_date'] for t in topsets]
messages = [f'{t["repetitions"]} x {t["weight"]}kg ({t["estimated_1rm"]}kg E1RM) on {t["start_date"].strftime("%d %b %y")}' for t in topsets]
exercise_progress = get_exercise_graph_model(
exercise_name,
estimated_1rm,
repetitions,
weight,
start_dates,
messages,
epoch,
person_id,
exercise_id,
min_date,
max_date,
degree)
return exercise_progress
# Note fetching logic moved to routes/notes.py
def get_exercise_earliest_and_latest_dates(self, person_id, exercise_id):
sql_query = """
SELECT
MIN(w.start_date) AS earliest_date,
MAX(w.start_date) AS latest_date
FROM workout w
INNER JOIN topset t ON w.workout_id = t.workout_id
INNER JOIN exercise e ON t.exercise_id = e.exercise_id
WHERE w.person_id = %s AND e.exercise_id = %s;
"""
# Execute the SQL query
result = self.execute(sql_query, [person_id, exercise_id])
if not result or not result[0]:
return None, None
return result[0]['earliest_date'], result[0]['latest_date']