import pandas as pd from utils import get_distinct_colors class PeopleGraphs: def __init__(self, db_connection_method): self.execute = db_connection_method def get(self, selected_people_ids=None, min_date=None, max_date=None, selected_exercise_ids=None): # Base query query = """ SELECT P.person_id AS "PersonId", P.name AS "PersonName", W.workout_id AS "WorkoutId", W.start_date AS "StartDate", T.topset_id AS "TopSetId", E.exercise_id AS "ExerciseId", E.name AS "ExerciseName", T.repetitions AS "Repetitions", T.weight AS "Weight", round((100 * T.Weight::numeric::integer)/(101.3-2.67123 * T.Repetitions),0)::numeric::integer AS "Estimated1RM" FROM Person P LEFT JOIN Workout W ON P.person_id = W.person_id LEFT JOIN TopSet T ON W.workout_id = T.workout_id LEFT JOIN Exercise E ON T.exercise_id = E.exercise_id WHERE TRUE """ # Parameters for the query params = [] # Add optional filters if selected_people_ids: placeholders = ", ".join(["%s"] * len(selected_people_ids)) query += f" AND P.person_id IN ({placeholders})" params.extend(selected_people_ids) if min_date: query += " AND W.start_date >= %s" params.append(min_date) if max_date: query += " AND W.start_date <= %s" params.append(max_date) if selected_exercise_ids: placeholders = ", ".join(["%s"] * len(selected_exercise_ids)) query += f" AND E.exercise_id IN ({placeholders})" params.extend(selected_exercise_ids) # Execute the query topsets = self.execute(query, params) # Generate graphs weekly_counts = self.get_workout_counts(topsets, 'week') weekly_pr_counts = self.count_prs_over_time(topsets, 'week') graphs = [self.get_weekly_pr_graph_model('Workouts per week', weekly_counts), self.get_weekly_pr_graph_model('PRs per week', weekly_pr_counts)] return graphs def get_weekly_pr_graph_model(self, title, weekly_pr_data): # Assuming weekly_pr_data is in the format {1: {"PersonName": "Alice", "PRCounts": {Timestamp('2022-01-01', freq='W-MON'): 0, ...}}, 2: {...}, ...} # Find the overall date range for all users all_dates = [date for user_data in weekly_pr_data.values() for date in user_data["PRCounts"].keys()] min_date, max_date = min(all_dates), max(all_dates) total_span = (max_date - min_date).days or 1 relative_positions = [(date - min_date).days / total_span for date in all_dates] # Calculate viewBox dimensions max_value = max(max(user_data["PRCounts"].values()) for user_data in weekly_pr_data.values()) or 1 min_value = 0 value_range = max_value - min_value vb_width = 200 vb_height= 75 plots = [] colors = get_distinct_colors(len(weekly_pr_data.items())) for count, (user_id, user_data) in enumerate(weekly_pr_data.items()): pr_counts = user_data["PRCounts"] person_name = user_data["PersonName"] values = pr_counts.values() values_scaled = [((value - min_value) / value_range) * vb_height for value in values] plot_points = list(zip(values_scaled, relative_positions)) messages = [f'{value} for {person_name} at {date.strftime("%d %b %y")}' for value, date in zip(values, pr_counts.keys())] plot_labels = zip(values_scaled, relative_positions, messages) # Create a plot for each user plot = { 'label': person_name, # Use PersonName instead of User ID 'color': colors[count], 'points': plot_points, 'plot_labels': plot_labels } plots.append(plot) # Return workout data with SVG dimensions and data points return { 'title': title, 'vb_width': vb_width, 'vb_height': vb_height, 'plots': plots } def get_workout_counts(self, workouts, period='week'): df = pd.DataFrame(workouts) # Convert 'StartDate' to datetime and set period df['StartDate'] = pd.to_datetime(df['StartDate']) df['Period'] = df['StartDate'].dt.to_period('W' if period == 'week' else 'M') # Group by PersonId, Period and count unique workouts workout_counts = df.groupby(['PersonId', 'Period'])['WorkoutId'].nunique().reset_index() # Convert 'Period' to timestamp using the start date of the period workout_counts['Period'] = workout_counts['Period'].apply(lambda x: x.start_time) # Pivot the result to get periods as columns workout_counts_pivot = workout_counts.pivot(index='PersonId', columns='Period', values='WorkoutId').fillna(0) # Include person names names = df[['PersonId', 'PersonName']].drop_duplicates().set_index('PersonId') workout_counts_final = names.join(workout_counts_pivot, how='left').fillna(0) # Convert DataFrame to dictionary result = workout_counts_final.reset_index().to_dict('records') # Reformat the dictionary to desired structure formatted_result = {} for record in result: person_id = record.pop('PersonId') person_name = record.pop('PersonName') pr_counts = {k: v for k, v in record.items()} formatted_result[person_id] = {'PersonName': person_name, 'PRCounts': pr_counts} return formatted_result def count_prs_over_time(self, workouts, period='week'): df = pd.DataFrame(workouts) # Convert 'StartDate' to datetime df['StartDate'] = pd.to_datetime(df['StartDate']) # Set period as week or month df['Period'] = df['StartDate'].dt.to_period('W' if period == 'week' else 'M') # Group by Person, Exercise, and Period to find max Estimated1RM in each period period_max = df.groupby(['PersonId', 'ExerciseId', 'Period'])['Estimated1RM'].max().reset_index() # Determine all-time max Estimated1RM up to the start of each period period_max['AllTimeMax'] = period_max.groupby(['PersonId', 'ExerciseId'])['Estimated1RM'].cummax().shift(1) # Identify PRs as entries where the period's max Estimated1RM exceeds the all-time max period_max['IsPR'] = period_max['Estimated1RM'] > period_max['AllTimeMax'] # Count PRs in each period for each person pr_counts = period_max.groupby(['PersonId', 'Period'])['IsPR'].sum().reset_index() # Convert 'Period' to timestamp using the start date of the period pr_counts['Period'] = pr_counts['Period'].apply(lambda x: x.start_time) # Pivot table to get the desired output format output = pr_counts.pivot(index='PersonId', columns='Period', values='IsPR').fillna(0) # Convert only the PR count columns to integers for col in output.columns: output[col] = output[col].astype(int) # Merge with names and convert to desired format names = df[['PersonId', 'PersonName']].drop_duplicates().set_index('PersonId') output = names.join(output, how='left').fillna(0) # Reset the index to bring 'PersonId' back as a column output.reset_index(inplace=True) # Convert to the final dictionary format with PRCounts nested result = {} for index, row in output.iterrows(): person_id = row['PersonId'] person_name = row['PersonName'] pr_counts = row.drop(['PersonId', 'PersonName']).to_dict() result[person_id] = {"PersonName": person_name, "PRCounts": pr_counts} return result