Files
workout/features/dashboard.py
Peter Stockings f70438e4e4 Refactor dashboard
2025-01-27 14:46:20 +11:00

271 lines
9.4 KiB
Python

from utils import calculate_estimated_1rm, get_exercise_graph_model
class Dashboard:
def __init__(self, db_connection_method):
self.execute = db_connection_method
def get_people_ids(self):
query = """
SELECT person_id
FROM Person
ORDER BY person_id
"""
result = self.execute(query)
# Extract and return the list of IDs
return [row["person_id"] for row in result]
def get_earliest_and_latest_workout_dates(self, selected_people_ids):
# Create placeholders for the person IDs
placeholders = ", ".join(["%s"] * len(selected_people_ids))
sql_query = f"""
SELECT
MIN(w.start_date) AS earliest_date,
MAX(w.start_date) AS latest_date
FROM workout w
INNER JOIN topset t ON w.workout_id = t.workout_id
WHERE w.person_id IN ({placeholders});
"""
result = self.execute(sql_query, selected_people_ids)
if not result or not result[0]:
return None, None
return result[0]['earliest_date'], result[0]['latest_date']
def list_of_performed_exercise_ids(self, selected_people_ids, min_date, max_date):
# Create placeholders for the person IDs
placeholders = ", ".join(["%s"] * len(selected_people_ids))
sql_query = f"""
SELECT
ARRAY_AGG(DISTINCT e.exercise_id) AS exercise_ids
FROM workout w
LEFT JOIN topset t ON w.workout_id = t.workout_id
LEFT JOIN exercise e ON t.exercise_id = e.exercise_id
WHERE w.start_date BETWEEN %s AND %s
AND w.person_id IN ({placeholders})
"""
# Add min_date, max_date, and selected_people_ids to the parameters
params = [min_date, max_date] + selected_people_ids
result = self.execute(sql_query, params)
if not result or not result[0]:
return []
return result[0]['exercise_ids']
def get_exercises_with_selection(self, selected_people_ids, start_date, end_date, selected_exercise_ids):
# Create placeholders for the person IDs
placeholders = ", ".join(["%s"] * len(selected_people_ids))
# SQL query to fetch all exercises performed by the selected people in the given time range
sql_query = f"""
SELECT DISTINCT
e.exercise_id,
e.name AS exercise_name
FROM
workout w
JOIN
topset t ON w.workout_id = t.workout_id
JOIN
exercise e ON t.exercise_id = e.exercise_id
WHERE
w.person_id IN ({placeholders})
AND w.start_date BETWEEN %s AND %s
ORDER BY
e.name ASC;
"""
# Add parameters for the query
params = selected_people_ids + [start_date, end_date]
# Execute the query with parameters
result = self.execute(sql_query, params)
if not result:
return [] # No exercises found in the given time range
# Add the "selected" property to each exercise
exercises = []
for row in result:
exercises.append({
"id": row["exercise_id"],
"name": row["exercise_name"],
"selected": row["exercise_id"] in selected_exercise_ids
})
return exercises
def get_people_with_selection(self, selected_people_ids):
# SQL query to fetch all people
sql_query = """
SELECT DISTINCT
p.person_id AS id,
p.name AS name
FROM
person p
ORDER BY
p.name ASC;
"""
# Execute the query (no parameters required since we're fetching all people)
result = self.execute(sql_query)
if not result:
return [] # No people found
# Add the "selected" property to each person
people = []
for row in result:
people.append({
"id": row["id"],
"name": row["name"],
"selected": row["id"] in selected_people_ids
})
return people
def generate_exercise_progress_graphs(self, person_id, exercise_id, exercise_name, exercise_sets):
# Extract the required data
estimated_1rm = [t["estimated_1rm"] for t in exercise_sets]
repetitions = [t["reps"] for t in exercise_sets]
weight = [t["weight"] for t in exercise_sets]
start_dates = [t["workout_start_date"] for t in exercise_sets]
messages = [
f'{t["reps"]} x {t["weight"]}kg ({t["estimated_1rm"]}kg E1RM) on {t["workout_start_date"].strftime("%d %b %y")}'
for t in exercise_sets
]
epoch = "All"
# Check for valid data before generating the graph
if exercise_name and estimated_1rm and repetitions and weight and start_dates and messages:
exercise_progress = get_exercise_graph_model(
title=exercise_name,
estimated_1rm=estimated_1rm,
repetitions=repetitions,
weight=weight,
start_dates=start_dates,
messages=messages,
epoch=epoch,
person_id=person_id,
exercise_id=exercise_id,
)
return exercise_progress
def get(self, selected_people_ids, start_date, end_date, selected_exercise_ids):
# Create placeholders for selected_people_ids and selected_exercise_ids
people_placeholders = ", ".join(["%s"] * len(selected_people_ids))
exercise_placeholders = ", ".join(["%s"] * len(selected_exercise_ids))
# SQL query to fetch data
sql_query = f"""
SELECT
p.person_id,
p.name AS person_name,
e.exercise_id,
e.name AS exercise_name,
t.topset_id,
t.repetitions,
t.weight,
w.start_date AS workout_date,
w.workout_id
FROM
person p
JOIN
workout w ON p.person_id = w.person_id
JOIN
topset t ON w.workout_id = t.workout_id
JOIN
exercise e ON t.exercise_id = e.exercise_id
WHERE
p.person_id IN ({people_placeholders})
AND w.start_date BETWEEN %s AND %s
AND e.exercise_id IN ({exercise_placeholders})
ORDER BY
p.person_id ASC, e.exercise_id ASC, t.topset_id DESC;
"""
# Add parameters for the query
params = selected_people_ids + [start_date, end_date] + selected_exercise_ids
# Execute the query
result = self.execute(sql_query, params)
# Handle empty result
if not result:
return {"people": []}
# Organize data into the desired structure
people_map = {}
for row in result:
# Person level
person_id = row["person_id"]
if person_id not in people_map:
people_map[person_id] = {
"id": person_id,
"name": row["person_name"],
"exercises": {}
}
# Exercise level
exercise_id = row["exercise_id"]
person_exercises = people_map[person_id]["exercises"]
if exercise_id not in person_exercises:
person_exercises[exercise_id] = {
"id": exercise_id,
"name": row["exercise_name"],
"sets": []
}
# Set level
person_exercises[exercise_id]["sets"].append({
"id": row["topset_id"],
"reps": row["repetitions"],
"weight": row["weight"],
"exercise_id": row["exercise_id"],
"exercise_name": row["exercise_name"],
"workout_id": row["workout_id"],
"workout_start_date": row["workout_date"],
"estimated_1rm": calculate_estimated_1rm(row["weight"], row["repetitions"])
})
# Convert the map into a list of people, generate graphs, and organize exercises
people = []
for person_id, person_data in people_map.items():
exercises = []
for exercise_id, exercise_data in person_data["exercises"].items():
# Sort sets by timestamp (descending)
exercise_data["sets"] = sorted(
exercise_data["sets"], key=lambda x: x["id"], reverse=True
)
# Generate the graph for the exercise
graph = self.generate_exercise_progress_graphs(
person_id=person_id,
exercise_id=exercise_id,
exercise_name=exercise_data["name"],
exercise_sets=exercise_data["sets"]
)
# Add the graph to the exercise data
exercises.append({
"id": exercise_data["id"],
"name": exercise_data["name"],
"graph": graph,
"sets": exercise_data["sets"]
})
person_data["exercises"] = exercises
people.append(person_data)
return {"dashboard": people}