Files
workout/features/people_graphs.py

184 lines
8.0 KiB
Python

import pandas as pd
from utils import get_distinct_colors
class PeopleGraphs:
def __init__(self, db_connection_method):
self.execute = db_connection_method
def get(self, selected_people_ids=None, min_date=None, max_date=None, selected_exercise_ids=None):
# Base query
query = """
SELECT
P.person_id AS "PersonId",
P.name AS "PersonName",
W.workout_id AS "WorkoutId",
W.start_date AS "StartDate",
T.topset_id AS "TopSetId",
E.exercise_id AS "ExerciseId",
E.name AS "ExerciseName",
T.repetitions AS "Repetitions",
T.weight AS "Weight",
round((100 * T.Weight::numeric::integer)/(101.3-2.67123 * T.Repetitions),0)::numeric::integer AS "Estimated1RM"
FROM Person P
LEFT JOIN Workout W ON P.person_id = W.person_id
LEFT JOIN TopSet T ON W.workout_id = T.workout_id
LEFT JOIN Exercise E ON T.exercise_id = E.exercise_id
WHERE TRUE
"""
# Parameters for the query
params = []
# Add optional filters
if selected_people_ids:
placeholders = ", ".join(["%s"] * len(selected_people_ids))
query += f" AND P.person_id IN ({placeholders})"
params.extend(selected_people_ids)
if min_date:
query += " AND W.start_date >= %s"
params.append(min_date)
if max_date:
query += " AND W.start_date <= %s"
params.append(max_date)
if selected_exercise_ids:
placeholders = ", ".join(["%s"] * len(selected_exercise_ids))
query += f" AND E.exercise_id IN ({placeholders})"
params.extend(selected_exercise_ids)
# Execute the query
topsets = self.execute(query, params)
# Generate graphs
weekly_counts = self.get_workout_counts(topsets, 'week')
weekly_pr_counts = self.count_prs_over_time(topsets, 'week')
graphs = [self.get_weekly_pr_graph_model('Workouts per week', weekly_counts), self.get_weekly_pr_graph_model('PRs per week', weekly_pr_counts)]
return graphs
def get_weekly_pr_graph_model(self, title, weekly_pr_data):
# Assuming weekly_pr_data is in the format {1: {"PersonName": "Alice", "PRCounts": {Timestamp('2022-01-01', freq='W-MON'): 0, ...}}, 2: {...}, ...}
# Find the overall date range for all users
all_dates = [date for user_data in weekly_pr_data.values() for date in user_data["PRCounts"].keys()]
min_date, max_date = min(all_dates), max(all_dates)
total_span = (max_date - min_date).days or 1
relative_positions = [(date - min_date).days / total_span for date in all_dates]
# Calculate viewBox dimensions
max_value = max(max(user_data["PRCounts"].values()) for user_data in weekly_pr_data.values()) or 1
min_value = 0
value_range = max_value - min_value
vb_width = 200
vb_height= 75
plots = []
colors = get_distinct_colors(len(weekly_pr_data.items()))
for count, (user_id, user_data) in enumerate(weekly_pr_data.items()):
pr_counts = user_data["PRCounts"]
person_name = user_data["PersonName"]
values = pr_counts.values()
values_scaled = [((value - min_value) / value_range) * vb_height for value in values]
plot_points = list(zip(values_scaled, relative_positions))
messages = [f'{value} for {person_name} at {date.strftime("%d %b %y")}' for value, date in zip(values, pr_counts.keys())]
plot_labels = zip(values_scaled, relative_positions, messages)
# Create a plot for each user
plot = {
'label': person_name, # Use PersonName instead of User ID
'color': colors[count],
'points': plot_points,
'plot_labels': plot_labels
}
plots.append(plot)
# Return workout data with SVG dimensions and data points
return {
'title': title,
'vb_width': vb_width,
'vb_height': vb_height,
'plots': plots
}
def get_workout_counts(self, workouts, period='week'):
df = pd.DataFrame(workouts)
# Convert 'StartDate' to datetime and set period
df['StartDate'] = pd.to_datetime(df['StartDate'])
df['Period'] = df['StartDate'].dt.to_period('W' if period == 'week' else 'M')
# Group by PersonId, Period and count unique workouts
workout_counts = df.groupby(['PersonId', 'Period'])['WorkoutId'].nunique().reset_index()
# Convert 'Period' to timestamp using the start date of the period
workout_counts['Period'] = workout_counts['Period'].apply(lambda x: x.start_time)
# Pivot the result to get periods as columns
workout_counts_pivot = workout_counts.pivot(index='PersonId', columns='Period', values='WorkoutId').fillna(0)
# Include person names
names = df[['PersonId', 'PersonName']].drop_duplicates().set_index('PersonId')
workout_counts_final = names.join(workout_counts_pivot, how='left').fillna(0)
# Convert DataFrame to dictionary
result = workout_counts_final.reset_index().to_dict('records')
# Reformat the dictionary to desired structure
formatted_result = {}
for record in result:
person_id = record.pop('PersonId')
person_name = record.pop('PersonName')
pr_counts = {k: v for k, v in record.items()}
formatted_result[person_id] = {'PersonName': person_name, 'PRCounts': pr_counts}
return formatted_result
def count_prs_over_time(self, workouts, period='week'):
df = pd.DataFrame(workouts)
# Convert 'StartDate' to datetime
df['StartDate'] = pd.to_datetime(df['StartDate'])
# Set period as week or month
df['Period'] = df['StartDate'].dt.to_period('W' if period == 'week' else 'M')
# Group by Person, Exercise, and Period to find max Estimated1RM in each period
period_max = df.groupby(['PersonId', 'ExerciseId', 'Period'])['Estimated1RM'].max().reset_index()
# Determine all-time max Estimated1RM up to the start of each period
period_max['AllTimeMax'] = period_max.groupby(['PersonId', 'ExerciseId'])['Estimated1RM'].cummax().shift(1)
# Identify PRs as entries where the period's max Estimated1RM exceeds the all-time max
period_max['IsPR'] = period_max['Estimated1RM'] > period_max['AllTimeMax']
# Count PRs in each period for each person
pr_counts = period_max.groupby(['PersonId', 'Period'])['IsPR'].sum().reset_index()
# Convert 'Period' to timestamp using the start date of the period
pr_counts['Period'] = pr_counts['Period'].apply(lambda x: x.start_time)
# Pivot table to get the desired output format
output = pr_counts.pivot(index='PersonId', columns='Period', values='IsPR').fillna(0)
# Convert only the PR count columns to integers
for col in output.columns:
output[col] = output[col].astype(int)
# Merge with names and convert to desired format
names = df[['PersonId', 'PersonName']].drop_duplicates().set_index('PersonId')
output = names.join(output, how='left').fillna(0)
# Reset the index to bring 'PersonId' back as a column
output.reset_index(inplace=True)
# Convert to the final dictionary format with PRCounts nested
result = {}
for index, row in output.iterrows():
person_id = row['PersonId']
person_name = row['PersonName']
pr_counts = row.drop(['PersonId', 'PersonName']).to_dict()
result[person_id] = {"PersonName": person_name, "PRCounts": pr_counts}
return result