import pandas as pd from utils import get_distinct_colors, calculate_estimated_1rm class PeopleGraphs: def __init__(self, db_connection_method): self.execute = db_connection_method def get(self, selected_people_ids=None, min_date=None, max_date=None, selected_exercise_ids=None): """ Fetch workout topsets, calculate Estimated1RM in Python, then generate weekly workout & PR graphs. """ # Build query (no in-SQL 1RM calculation). query = """ SELECT P.person_id AS "PersonId", P.name AS "PersonName", W.workout_id AS "WorkoutId", W.start_date AS "StartDate", T.topset_id AS "TopSetId", E.exercise_id AS "ExerciseId", E.name AS "ExerciseName", T.repetitions AS "Repetitions", T.weight AS "Weight" FROM Person P LEFT JOIN Workout W ON P.person_id = W.person_id LEFT JOIN TopSet T ON W.workout_id = T.workout_id LEFT JOIN Exercise E ON T.exercise_id = E.exercise_id WHERE TRUE """ params = [] if selected_people_ids: query += f" AND P.person_id IN ({', '.join(['%s'] * len(selected_people_ids))})" params.extend(selected_people_ids) if min_date: query += " AND W.start_date >= %s" params.append(min_date) if max_date: query += " AND W.start_date <= %s" params.append(max_date) if selected_exercise_ids: query += f" AND E.exercise_id IN ({', '.join(['%s'] * len(selected_exercise_ids))})" params.extend(selected_exercise_ids) # Execute and convert to DataFrame raw_data = self.execute(query, params) if not raw_data: # Return empty graphs if no data at all return [ self.get_graph_model("Workouts per week", {}), self.get_graph_model("PRs per week", {}) ] df = pd.DataFrame(raw_data) # Calculate Estimated1RM in Python df['Estimated1RM'] = df.apply( lambda row: calculate_estimated_1rm(row["Weight"], row["Repetitions"]), axis=1 ) # Build the weekly data models weekly_counts = self.get_workout_counts(df, period='week') weekly_pr_counts = self.count_prs_over_time(df, period='week') return [ self.get_graph_model("Workouts per week", weekly_counts), self.get_graph_model("PRs per week", weekly_pr_counts) ] def _prepare_period_column(self, df, period='week'): """ Convert StartDate to datetime and add a Period column based on 'week' or 'month' as needed. """ df['StartDate'] = pd.to_datetime(df['StartDate'], errors='coerce') freq = 'W' if period == 'week' else 'M' df['Period'] = df['StartDate'].dt.to_period(freq) return df def get_workout_counts(self, df, period='week'): """ Returns a dictionary: { person_id: { 'PersonName': 'Alice', 'PRCounts': { Timestamp('2023-01-02'): 2, ... } }, ... } representing how many workouts each person performed per time period. """ # Make a copy and prepare Period column df = self._prepare_period_column(df.copy(), period) # Count unique workouts per (PersonId, PersonName, Period) grp = ( df.groupby(['PersonId', 'PersonName', 'Period'], as_index=False)['WorkoutId'] .nunique() .rename(columns={'WorkoutId': 'Count'}) ) # Convert each Period to its start time grp['Period'] = grp['Period'].apply(lambda p: p.start_time) return self._pivot_to_graph_dict( grp, index_col='PersonId', name_col='PersonName', period_col='Period', value_col='Count' ) def count_prs_over_time(self, df, period='week'): """ Returns a dictionary: { person_id: { 'PersonName': 'Alice', 'PRCounts': { Timestamp('2023-01-02'): 1, ... } }, ... } representing how many PRs each person hit per time period. """ # Make a copy and prepare Period column df = self._prepare_period_column(df.copy(), period) # Max 1RM per (Person, Exercise, Period) grouped = ( df.groupby(['PersonId', 'PersonName', 'ExerciseId', 'Period'], as_index=False)['Estimated1RM'] .max() .rename(columns={'Estimated1RM': 'PeriodMax'}) ) # Sort so we can track "all-time max" up to that row grouped.sort_values(by=['PersonId', 'ExerciseId', 'Period'], inplace=True) # For each person & exercise, track the cumulative max (shifted by 1) grouped['AllTimeMax'] = grouped.groupby(['PersonId', 'ExerciseId'])['PeriodMax'].cummax().shift(1) grouped['IsPR'] = (grouped['PeriodMax'] > grouped['AllTimeMax']).astype(int) # Sum PRs across exercises for (Person, Period) pr_counts = ( grouped.groupby(['PersonId', 'PersonName', 'Period'], as_index=False)['IsPR'] .sum() .rename(columns={'IsPR': 'Count'}) ) pr_counts['Period'] = pr_counts['Period'].apply(lambda p: p.start_time) return self._pivot_to_graph_dict( pr_counts, index_col='PersonId', name_col='PersonName', period_col='Period', value_col='Count' ) def _pivot_to_graph_dict(self, df, index_col, name_col, period_col, value_col): """ Convert [index_col, name_col, period_col, value_col] into a nested dictionary for plotting: { person_id: { 'PersonName': <...>, 'PRCounts': { : , ... } }, ... } """ if df.empty: return {} pivoted = df.pivot( index=[index_col, name_col], columns=period_col, values=value_col ).fillna(0) pivoted.reset_index(inplace=True) result = {} for _, row in pivoted.iterrows(): pid = row[index_col] pname = row[name_col] # Remaining columns = date -> count period_counts = row.drop([index_col, name_col]).to_dict() result[pid] = { 'PersonName': pname, 'PRCounts': period_counts } return result def get_graph_model(self, title, data_dict): """ Builds a line-graph model from a dictionary of the form: { person_id: { 'PersonName': 'Alice', 'PRCounts': { Timestamp('2023-01-02'): 2, Timestamp('2023-01-09'): 1, ... } }, ... } """ if not data_dict: return { 'title': title, 'vb_width': 200, 'vb_height': 75, 'plots': [] } # Gather all dates & values all_dates = [] all_values = [] for user_data in data_dict.values(): all_dates.extend(user_data['PRCounts'].keys()) all_values.extend(user_data['PRCounts'].values()) min_date = min(all_dates) max_date = max(all_dates) date_span = max((max_date - min_date).days, 1) max_val = max(all_values) min_val = 0 val_range = max_val - min_val if max_val != min_val else 1 vb_width, vb_height = 200, 75 colors = get_distinct_colors(len(data_dict)) plots = [] for i, (pid, user_data) in enumerate(data_dict.items()): name = user_data['PersonName'] pr_counts = user_data['PRCounts'] # Sort by date so points are in chronological order sorted_pr = sorted(pr_counts.items(), key=lambda x: x[0]) points = [] labels = [] for d, val in sorted_pr: # Scale x,y to fit [0..1], then we multiply y by vb_height x = (d - min_date).days / date_span y = (val - min_val) / val_range * vb_height points.append((y, x)) labels.append((y, x, f'{val} for {name} at {d.strftime("%d %b %y")}')) plots.append({ 'label': name, 'color': colors[i], 'points': points, 'plot_labels': labels }) return { 'title': title, 'vb_width': vb_width, 'vb_height': vb_height, 'plots': plots }