556 lines
22 KiB
Python
556 lines
22 KiB
Python
import os
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
from datetime import datetime
|
|
from dateutil.relativedelta import relativedelta
|
|
from urllib.parse import urlparse
|
|
from flask import g
|
|
from features.exercises import Exercises
|
|
from features.people_graphs import PeopleGraphs
|
|
from features.person_overview import PersonOverview
|
|
from features.stats import Stats
|
|
from features.dashboard import Dashboard
|
|
from features.schema import Schema
|
|
from utils import get_exercise_graph_model
|
|
|
|
|
|
class DataBase():
|
|
def __init__(self, app=None):
|
|
self.stats = Stats(self.execute)
|
|
self.exercises = Exercises(self.execute)
|
|
self.person_overview = PersonOverview(self.execute)
|
|
self.people_graphs = PeopleGraphs(self.execute)
|
|
self.dashboard = Dashboard(self.execute)
|
|
self.schema = Schema(self.execute)
|
|
|
|
db_url = urlparse(os.environ['DATABASE_URL'])
|
|
# if db_url is null then throw error
|
|
if not db_url:
|
|
raise Exception("No DATABASE_URL environment variable set")
|
|
|
|
def getDB(self):
|
|
db = getattr(g, 'database', None)
|
|
if db is None:
|
|
db_url = urlparse(os.environ['DATABASE_URL'])
|
|
g.database = psycopg2.connect(
|
|
database=db_url.path[1:],
|
|
user=db_url.username,
|
|
password=db_url.password,
|
|
host=db_url.hostname,
|
|
port=db_url.port
|
|
)
|
|
db = g.database
|
|
return db
|
|
|
|
def close_connection(exception):
|
|
db = getattr(g, 'database', None)
|
|
if db is not None:
|
|
db.close()
|
|
|
|
def execute(self, query, args=(), one=False, commit=False):
|
|
conn = self.getDB()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
cur.execute(query, args)
|
|
rv = None
|
|
if cur.description is not None:
|
|
rv = cur.fetchall()
|
|
if commit:
|
|
try:
|
|
conn.commit()
|
|
except:
|
|
conn.rollback()
|
|
cur.close()
|
|
|
|
return (rv[0] if rv else None) if one else rv
|
|
|
|
|
|
|
|
def get_exercise(self, exercise_id):
|
|
return self.exercises.get_exercise(exercise_id)
|
|
|
|
def create_exercise(self, name, attribute_ids=None):
|
|
return self.exercises.add_exercise(name, attribute_ids)
|
|
|
|
def delete_exercise(self, exercise_id):
|
|
self.execute('DELETE FROM exercise WHERE exercise_id=%s', [
|
|
exercise_id], commit=True)
|
|
|
|
def update_exercise(self, exercise_id, name, attribute_ids=None):
|
|
return self.exercises.update_exercise(exercise_id, name, attribute_ids)
|
|
|
|
def get_people(self):
|
|
people = self.execute(
|
|
'SELECT person_id AS "PersonId", name AS "Name" FROM person')
|
|
return people
|
|
|
|
def is_valid_person(self, person_id):
|
|
person = self.execute(
|
|
'SELECT person_id AS "PersonId" FROM person WHERE person_id=%s LIMIT 1', [person_id], one=True)
|
|
return person
|
|
|
|
def create_person(self, name):
|
|
new_person = self.execute('INSERT INTO person (name) VALUES (%s) RETURNING person_id AS "PersonId"', [
|
|
name], commit=True, one=True)
|
|
return new_person['PersonId']
|
|
|
|
def delete_person(self, person_id):
|
|
self.execute('DELETE FROM topset WHERE workout_id IN (SELECT workout_id FROM workout WHERE person_id=%s)', [
|
|
person_id], commit=True)
|
|
self.execute('DELETE FROM workout WHERE person_id=%s',
|
|
[person_id], commit=True)
|
|
self.execute('DELETE FROM person WHERE person_id=%s',
|
|
[person_id], commit=True)
|
|
|
|
def update_person_name(self, person_id, name):
|
|
self.execute('UPDATE person SET name=%s WHERE person_id=%s', [
|
|
name, person_id], commit=True)
|
|
|
|
def is_valid_workout(self, person_id, workout_id):
|
|
workout = self.execute('SELECT W.workout_id AS "WorkoutId" FROM Person P, Workout W WHERE P.person_id=W.person_id AND P.person_id=%s AND W.workout_id=%s LIMIT 1', [
|
|
person_id, workout_id], one=True)
|
|
return workout
|
|
|
|
def is_valid_topset(self, person_id, workout_id, topset_id):
|
|
topset = self.execute("""
|
|
SELECT T.topset_id AS "TopSetId"
|
|
FROM Person P, Workout W, TopSet T
|
|
WHERE W.person_id=W.person_id AND W.workout_id=T.workout_id AND P.person_id=%s AND W.workout_id = %s AND T.topset_id = %s
|
|
LIMIT 1""", [person_id, workout_id, topset_id], one=True)
|
|
return topset
|
|
|
|
def delete_workout(self, workout_id):
|
|
self.execute('DELETE FROM topset WHERE workout_id=%s',
|
|
[workout_id], commit=True)
|
|
self.execute('DELETE FROM workout WHERE workout_id=%s',
|
|
[workout_id], commit=True)
|
|
|
|
def update_topset(self, exercise_id, repetitions, weight, topset_id):
|
|
self.execute('UPDATE topset SET exercise_id=%s, repetitions=%s, weight=%s WHERE topSet_id=%s', [
|
|
exercise_id, repetitions, weight, topset_id], commit=True)
|
|
|
|
def create_topset(self, workout_id, exercise_id, repetitions, weight):
|
|
new_top_set = self.execute('INSERT INTO topset (workout_id, exercise_id, repetitions, weight) VALUES (%s, %s, %s, %s) RETURNING topset_id AS "TopSetId"', [
|
|
workout_id, exercise_id, repetitions, weight], commit=True, one=True)
|
|
return new_top_set['TopSetId']
|
|
|
|
def delete_topset(self, topset_id):
|
|
self.execute('DELETE FROM topset WHERE topset_id=%s', [
|
|
topset_id], commit=True)
|
|
|
|
def create_workout(self, person_id):
|
|
now = datetime.now()
|
|
date_string = now.strftime('%Y-%m-%d')
|
|
# check if a workout has already been created for this person today that doesnt contain any topsets and if so return the WorkoutId
|
|
workout = self.execute('SELECT workout_id AS "WorkoutId" FROM workout WHERE person_id=%s AND start_date=%s AND workout_id NOT IN (SELECT workout_id FROM topset)', [
|
|
person_id, date_string], one=True)
|
|
if workout:
|
|
print(
|
|
f'Workout already created for PersonId {person_id} starting at {date_string} so returning WorkoutId {workout["WorkoutId"]} rather then creating new workout')
|
|
return workout['WorkoutId']
|
|
|
|
print(
|
|
f'Creating workout for PersonId {person_id} starting at {date_string}')
|
|
new_workout = self.execute('INSERT INTO workout (person_id, start_date) VALUES (%s, %s) RETURNING workout_id AS "WorkoutId"', [
|
|
person_id, date_string], commit=True, one=True)
|
|
return new_workout['WorkoutId']
|
|
|
|
def get_people_and_workout_count(self, person_id):
|
|
return self.execute("""
|
|
SELECT
|
|
P.person_id AS "PersonId",
|
|
P.name AS "Name",
|
|
COUNT(W.workout_id) AS "NumberOfWorkouts",
|
|
CASE P.person_id
|
|
WHEN %s
|
|
THEN 1
|
|
ELSE 0
|
|
END "IsActive"
|
|
FROM
|
|
Person P LEFT JOIN Workout W ON P.person_id = W.person_id
|
|
GROUP BY
|
|
P.person_id
|
|
ORDER BY
|
|
P.person_id""", [person_id])
|
|
|
|
def update_workout_start_date(self, workout_id, start_date):
|
|
self.execute('UPDATE workout SET start_date=%s WHERE workout_id=%s', [
|
|
start_date, workout_id], commit=True)
|
|
|
|
def get_person_name(self, person_id):
|
|
result = self.execute("""SELECT name from Person WHERE person_id=%s""", [person_id], one=True)
|
|
return result["name"]
|
|
|
|
|
|
def get_workout(self, person_id, workout_id):
|
|
topsets = self.execute("""
|
|
SELECT
|
|
P.person_id,
|
|
P.name AS "person_name",
|
|
W.workout_id,
|
|
W.start_date,
|
|
T.topset_id,
|
|
E.exercise_id,
|
|
E.name AS "exercise_name",
|
|
T.repetitions,
|
|
T.weight,
|
|
W.note
|
|
FROM Person P
|
|
LEFT JOIN Workout W ON P.person_id=W.person_id
|
|
LEFT JOIN TopSet T ON W.workout_id=T.workout_id
|
|
LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id
|
|
WHERE P.person_id=%s
|
|
AND W.workout_id = %s
|
|
ORDER BY T.topset_id""", [person_id, workout_id])
|
|
|
|
return {
|
|
'person_id': person_id,
|
|
'person_name': topsets[0]['person_name'],
|
|
'workout_id': workout_id,
|
|
'start_date': topsets[0]['start_date'],
|
|
'top_sets': [{"topset_id": t['topset_id'], "exercise_id": t['exercise_id'], "exercise_name": t['exercise_name'], "weight": t['weight'], "repetitions": t['repetitions']} for t in topsets if t['topset_id'] is not None],
|
|
'note': topsets[0]['note']
|
|
}
|
|
|
|
def get_topset(self, topset_id):
|
|
topset = self.execute("""
|
|
SELECT
|
|
T.topset_id,
|
|
E.exercise_id,
|
|
E.name AS "exercise_name",
|
|
T.repetitions,
|
|
T.weight
|
|
FROM TopSet T
|
|
INNER JOIN Exercise E ON T.exercise_id=E.exercise_id
|
|
WHERE T.topset_id = %s""", [topset_id], one=True)
|
|
|
|
return {
|
|
"topset_id": topset['topset_id'],
|
|
"exercise_id": topset['exercise_id'],
|
|
"exercise_name": topset['exercise_name'],
|
|
"weight": topset['weight'],
|
|
"repetitions": topset['repetitions']
|
|
}
|
|
|
|
def get_all_topsets(self):
|
|
all_topsets = self.execute("""
|
|
SELECT
|
|
P.person_id AS "PersonId",
|
|
P.name AS "PersonName",
|
|
W.workout_id AS "WorkoutId",
|
|
W.start_date AS "StartDate",
|
|
T.topset_id AS "TopSetId",
|
|
E.exercise_id AS "ExerciseId",
|
|
E.name AS "ExerciseName",
|
|
T.repetitions AS "Repetitions",
|
|
T.weight AS "Weight",
|
|
round((100 * T.Weight::numeric::integer)/(101.3-2.67123 * T.Repetitions),0)::numeric::integer AS "Estimated1RM"
|
|
FROM Person P
|
|
LEFT JOIN Workout W ON P.person_id=W.person_id
|
|
LEFT JOIN TopSet T ON W.workout_id=T.workout_id
|
|
LEFT JOIN Exercise E ON T.exercise_id=E.exercise_id""")
|
|
|
|
return all_topsets
|
|
|
|
def get_tags_for_person(self, person_id):
|
|
# Fetch tags from the database
|
|
tags = self.execute("""
|
|
SELECT
|
|
T.tag_id as "tag_id",
|
|
T.person_id as "person_id",
|
|
T.name AS "tag_name",
|
|
T.filter AS "tag_filter"
|
|
FROM
|
|
Tag T
|
|
WHERE
|
|
T.person_id = %s
|
|
ORDER BY
|
|
T.name
|
|
""", [person_id])
|
|
|
|
# Add the static 'All' entry at the beginning
|
|
all_tag = {
|
|
"tag_id": -1, # No specific ID for 'All'
|
|
"person_id": person_id,
|
|
"tag_name": "All", # Static name
|
|
"tag_filter": "" # No filter
|
|
}
|
|
return [all_tag] + tags
|
|
|
|
def add_or_update_tag_for_person(self, person_id, tag_name, tag_filter):
|
|
# check if a tag exists for dashboard with the same tag_name
|
|
tag = self.execute('SELECT tag_id AS "TagId" FROM Tag WHERE person_id=%s AND name=%s LIMIT 1', [
|
|
person_id, tag_name], one=True)
|
|
if tag:
|
|
# update the tag
|
|
self.execute('UPDATE Tag SET filter=%s WHERE tag_id=%s', [
|
|
tag_filter, tag['TagId']], commit=True)
|
|
else:
|
|
self.execute('INSERT INTO Tag (person_id, name, filter) VALUES (%s, %s, %s)', [
|
|
person_id, tag_name, tag_filter], commit=True)
|
|
|
|
def delete_tag_for_person(self, person_id, tag_id):
|
|
self.execute('DELETE FROM Tag WHERE person_id=%s AND tag_id=%s', [
|
|
person_id, tag_id], commit=True)
|
|
|
|
def get_tags_for_dashboard(self):
|
|
tags = self.execute("""
|
|
SELECT
|
|
T.tag_id AS "tag_id",
|
|
T.person_id AS "person_id",
|
|
T.name AS "tag_name",
|
|
T.filter AS "tag_filter"
|
|
FROM
|
|
Tag T
|
|
WHERE
|
|
T.person_id IS NULL
|
|
ORDER BY
|
|
T.name""", [])
|
|
|
|
# Add the static 'All' entry at the beginning
|
|
all_tag = {
|
|
"tag_id": -1, # No specific ID for 'All'
|
|
"person_id": None,
|
|
"tag_name": "All", # Static name
|
|
"tag_filter": "" # No filter
|
|
}
|
|
|
|
return [all_tag] + tags
|
|
|
|
def add_or_update_tag_for_dashboard(self, tag_name, tag_filter):
|
|
# check if a tag exists for dashboard with the same tag_name
|
|
tag = self.execute('SELECT tag_id AS "tag_id" FROM Tag WHERE person_id IS NULL AND name=%s LIMIT 1', [
|
|
tag_name], one=True)
|
|
if tag:
|
|
# update the tag
|
|
self.execute('UPDATE Tag SET filter=%s WHERE tag_id=%s', [
|
|
tag_filter, tag['tag_id']], commit=True)
|
|
else:
|
|
self.execute('INSERT INTO Tag (name, filter) VALUES (%s, %s)', [
|
|
tag_name, tag_filter], commit=True)
|
|
|
|
def delete_tag_for_dashboard(self, tag_id):
|
|
self.execute('DELETE FROM Tag WHERE tag_id=%s', [tag_id], commit=True)
|
|
|
|
def get_workout_tags(self, person_id, workout_id):
|
|
person_tags = self.execute("""
|
|
SELECT
|
|
T.tag_id AS "tag_id",
|
|
T.name AS "tag_name",
|
|
T.filter AS "tag_filter"
|
|
FROM Tag T
|
|
WHERE T.person_id=%s""", [person_id])
|
|
|
|
workout_tags = self.execute("""
|
|
SELECT
|
|
T.tag_id AS "tag_id",
|
|
T.person_id AS "person_id",
|
|
T.name AS "tag_name",
|
|
T.filter AS "tag_filter"
|
|
FROM Workout_Tag WT
|
|
LEFT JOIN Tag T ON WT.tag_id=T.tag_id
|
|
WHERE WT.workout_id=%s""", [workout_id])
|
|
|
|
selected_workout_tag_ids = [wt['tag_id'] for wt in workout_tags]
|
|
|
|
return (person_tags, workout_tags, selected_workout_tag_ids)
|
|
|
|
def get_most_recent_topset_for_exercise(self, person_id, exercise_id):
|
|
topset = self.execute("""
|
|
SELECT
|
|
t.repetitions,
|
|
t.weight,
|
|
e.name AS "exercise_name"
|
|
FROM
|
|
exercise e
|
|
LEFT JOIN topset t ON e.exercise_id = t.exercise_id
|
|
LEFT JOIN workout w ON t.workout_id = w.workout_id
|
|
WHERE
|
|
e.exercise_id = %s AND (w.person_id = %s OR w.person_id IS NULL)
|
|
ORDER BY
|
|
w.start_date DESC
|
|
LIMIT 1;
|
|
""", [exercise_id, person_id], one=True)
|
|
|
|
if not topset:
|
|
return None
|
|
else:
|
|
return (topset.get('repetitions'), topset.get('weight'), topset['exercise_name'])
|
|
|
|
def get_all_exercises(self):
|
|
return self.exercises.get("")
|
|
|
|
def get_exercise_progress_for_user(self, person_id, exercise_id, min_date=None, max_date=None, epoch='all', degree=1):
|
|
today = datetime.now()
|
|
if epoch == '1M':
|
|
min_date = today - relativedelta(months=1)
|
|
elif epoch == '3M':
|
|
min_date = today - relativedelta(months=3)
|
|
elif epoch == '6M':
|
|
min_date = today - relativedelta(months=6)
|
|
|
|
# Execute SQL query to fetch topset data for a specific person and exercise
|
|
topsets = self.execute("""
|
|
SELECT
|
|
T.topset_id, E.name AS exercise_name, W.person_id, T.workout_id,
|
|
T.repetitions, T.weight,
|
|
ROUND((100 * T.weight::NUMERIC::INTEGER) / (101.3 - 2.67123 * T.repetitions), 0)::NUMERIC::INTEGER AS estimated_1rm,
|
|
W.start_date
|
|
FROM
|
|
topset T
|
|
JOIN
|
|
exercise E ON T.exercise_id = E.exercise_id
|
|
JOIN
|
|
workout W ON T.workout_id = W.workout_id
|
|
WHERE
|
|
W.person_id = %s
|
|
AND E.exercise_id = %s AND
|
|
(%s IS NULL OR W.start_date >= %s) AND
|
|
(%s IS NULL OR W.start_date <= %s)
|
|
ORDER BY
|
|
W.start_date;
|
|
""", [person_id, exercise_id, min_date, min_date, max_date, max_date])
|
|
|
|
# Return None if no topsets found
|
|
if not topsets:
|
|
return None
|
|
|
|
# Extracting values and calculating value ranges for SVG dimensions
|
|
exercise_name = topsets[0]['exercise_name']
|
|
estimated_1rm = [t['estimated_1rm'] for t in topsets]
|
|
repetitions = [t['repetitions'] for t in topsets]
|
|
weight = [t['weight'] for t in topsets]
|
|
start_dates = [t['start_date'] for t in topsets]
|
|
messages = [f'{t["repetitions"]} x {t["weight"]}kg ({t["estimated_1rm"]}kg E1RM) on {t["start_date"].strftime("%d %b %y")}' for t in topsets]
|
|
|
|
exercise_progress = get_exercise_graph_model(
|
|
exercise_name,
|
|
estimated_1rm,
|
|
repetitions,
|
|
weight,
|
|
start_dates,
|
|
messages,
|
|
epoch,
|
|
person_id,
|
|
exercise_id,
|
|
min_date,
|
|
max_date,
|
|
degree)
|
|
|
|
return exercise_progress
|
|
|
|
# Note fetching logic moved to routes/notes.py
|
|
|
|
def get_exercise_earliest_and_latest_dates(self, person_id, exercise_id):
|
|
sql_query = """
|
|
SELECT
|
|
MIN(w.start_date) AS earliest_date,
|
|
MAX(w.start_date) AS latest_date
|
|
FROM workout w
|
|
INNER JOIN topset t ON w.workout_id = t.workout_id
|
|
INNER JOIN exercise e ON t.exercise_id = e.exercise_id
|
|
WHERE w.person_id = %s AND e.exercise_id = %s;
|
|
"""
|
|
|
|
# Execute the SQL query
|
|
result = self.execute(sql_query, [person_id, exercise_id])
|
|
|
|
if not result or not result[0]:
|
|
return None, None
|
|
|
|
return result[0]['earliest_date'], result[0]['latest_date']
|
|
|
|
def get_topset_achievements(self, topset_id):
|
|
# 1. Fetch current topset details
|
|
current = self.execute("""
|
|
SELECT
|
|
t.weight, t.repetitions, t.exercise_id, w.person_id, w.start_date, w.workout_id,
|
|
ROUND((100 * t.weight::NUMERIC::INTEGER) / (101.3 - 2.67123 * t.repetitions), 0)::NUMERIC::INTEGER AS estimated_1rm
|
|
FROM topset t
|
|
JOIN workout w ON t.workout_id = w.workout_id
|
|
WHERE t.topset_id = %s
|
|
""", [topset_id], one=True)
|
|
|
|
if not current:
|
|
return {}
|
|
|
|
person_id = current['person_id']
|
|
exercise_id = current['exercise_id']
|
|
current_date = current['start_date']
|
|
current_weight = current['weight']
|
|
current_reps = current['repetitions']
|
|
current_e1rm = current['estimated_1rm']
|
|
|
|
# 2. Fetch "Last Time" (previous workout's best set for this exercise)
|
|
last_set = self.execute("""
|
|
SELECT t.weight, t.repetitions
|
|
FROM topset t
|
|
JOIN workout w ON t.workout_id = w.workout_id
|
|
WHERE w.person_id = %s AND t.exercise_id = %s AND w.start_date < %s
|
|
ORDER BY w.start_date DESC, (100 * t.weight::NUMERIC::INTEGER) / (101.3 - 2.67123 * t.repetitions) DESC
|
|
LIMIT 1
|
|
""", [person_id, exercise_id, current_date], one=True)
|
|
|
|
# 3. Fetch All-Time Bests (strictly before current workout)
|
|
best_stats = self.execute("""
|
|
SELECT
|
|
MAX(t.weight) as max_weight,
|
|
MAX(ROUND((100 * t.weight::NUMERIC::INTEGER) / (101.3 - 2.67123 * t.repetitions), 0)) as max_e1rm,
|
|
MAX(t.repetitions) FILTER (WHERE t.weight >= %s) as max_reps_at_weight
|
|
FROM topset t
|
|
JOIN workout w ON t.workout_id = w.workout_id
|
|
WHERE w.person_id = %s AND t.exercise_id = %s AND w.start_date < %s
|
|
""", [current_weight, person_id, exercise_id, current_date], one=True)
|
|
|
|
achievements = {
|
|
'is_pr_weight': False,
|
|
'is_pr_e1rm': False,
|
|
'is_pr_reps': False,
|
|
'weight_increase': 0,
|
|
'rep_increase': 0,
|
|
'stalled_sessions': 0
|
|
}
|
|
|
|
# Calculate PRs
|
|
if best_stats:
|
|
if best_stats['max_weight'] and current_weight > best_stats['max_weight']:
|
|
achievements['is_pr_weight'] = True
|
|
if best_stats['max_e1rm'] and current_e1rm > best_stats['max_e1rm']:
|
|
achievements['is_pr_e1rm'] = True
|
|
if best_stats['max_reps_at_weight'] and current_reps > best_stats['max_reps_at_weight']:
|
|
achievements['is_pr_reps'] = True
|
|
|
|
# Calculate Stalled Sessions
|
|
# Count consecutive previous workouts for this exercise where weight and reps were identical to current
|
|
previous_sets = self.execute("""
|
|
SELECT t.weight, t.repetitions
|
|
FROM topset t
|
|
JOIN workout w ON t.workout_id = w.workout_id
|
|
WHERE w.person_id = %s AND t.exercise_id = %s AND w.start_date < %s
|
|
ORDER BY w.start_date DESC
|
|
""", [person_id, exercise_id, current_date])
|
|
|
|
stalled_count = 0
|
|
for s in previous_sets:
|
|
if s['weight'] == current_weight and s['repetitions'] == current_reps:
|
|
stalled_count += 1
|
|
else:
|
|
break
|
|
|
|
if stalled_count >= 1: # If it's the same as at least the previous session
|
|
achievements['stalled_sessions'] = stalled_count
|
|
|
|
# Calculate Increases vs Last Time
|
|
if last_set:
|
|
if current_weight > last_set['weight']:
|
|
achievements['weight_increase'] = current_weight - last_set['weight']
|
|
elif current_weight == last_set['weight'] and current_reps > last_set['repetitions']:
|
|
achievements['rep_increase'] = current_reps - last_set['repetitions']
|
|
|
|
return achievements
|
|
|
|
|
|
|
|
|
|
|
|
|