from datetime import date, datetime, timedelta from dateutil.relativedelta import relativedelta from collections import defaultdict from flask import Blueprint, render_template, redirect, url_for, request, current_app from flask_htmx import HTMX from jinja2_fragments import render_block from utils import convert_str_to_date calendar_bp = Blueprint('calendar', __name__) htmx = HTMX() # --- Helper Functions --- def _get_date_range_and_links(date_obj, view): """Calculates start/end dates and prev/next links based on view.""" start_date, end_date, prev_date, next_date = None, None, None, None if view == 'month': first_day_of_month = date_obj.replace(day=1) # Day of week: Monday is 0 and Sunday is 6 # Calculate the first day to display on the calendar grid (previous Sunday) start_date = first_day_of_month - timedelta(days=first_day_of_month.weekday() + 1) # Calculate the last day (start + 6 weeks - 1 day) end_date = start_date + timedelta(days=41) # 6*7 - 1 = 41 days total in 6 weeks prev_date = first_day_of_month - relativedelta(months=1) next_date = first_day_of_month + relativedelta(months=1) elif view == 'year': start_date = date_obj.replace(month=1, day=1) end_date = date_obj.replace(year=date_obj.year + 1, month=1, day=1) - timedelta(days=1) prev_date = date_obj - relativedelta(years=1) next_date = date_obj + relativedelta(years=1) else: raise ValueError('Invalid view type specified.') return start_date, end_date, prev_date, next_date def _fetch_workout_data(db_executor, person_id, start_date, end_date, include_details=True): """Fetches workout data for a person within a date range.""" if include_details: query = """ WITH workout_stats AS ( SELECT w.workout_id, w.start_date, t.topset_id, t.repetitions, t.weight, e.name AS exercise_name, p.name AS person_name, -- Max weight ever for this exercise before this set MAX(t.weight) OVER ( PARTITION BY p.person_id, e.exercise_id ORDER BY w.start_date, t.topset_id ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING ) as prev_max_weight, -- Weight from the last time this exercise was performed LAG(t.weight) OVER ( PARTITION BY p.person_id, e.exercise_id ORDER BY w.start_date, t.topset_id ) as prev_session_weight, -- Reps from the last time this exercise was performed LAG(t.repetitions) OVER ( PARTITION BY p.person_id, e.exercise_id ORDER BY w.start_date, t.topset_id ) as prev_session_reps FROM person p LEFT JOIN workout w ON p.person_id = w.person_id LEFT JOIN topset t ON w.workout_id = t.workout_id LEFT JOIN exercise e ON t.exercise_id = e.exercise_id WHERE p.person_id = %s ) SELECT * FROM workout_stats WHERE start_date BETWEEN %s AND %s ORDER BY start_date, topset_id; """ return db_executor(query, [person_id, start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d')]) else: query = """ SELECT w.workout_id, w.start_date, p.name AS person_name FROM person p LEFT JOIN workout w ON p.person_id = w.person_id AND w.start_date BETWEEN %s AND %s WHERE p.person_id = %s ORDER BY w.start_date; """ return db_executor(query, [start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'), person_id]) def _group_workouts_by_date(workouts_data): """Groups workout data by date and workout ID.""" # Structure: { date_object: { workout_id: { workout_details..., 'sets': [...] } } } workouts_by_date = defaultdict(lambda: defaultdict(lambda: {'sets': []})) person_name = 'Unknown' if workouts_data: person_name = workouts_data[0].get('person_name', 'Unknown') for row in workouts_data: # Basic validation for row data if not row or row.get('workout_id') is None or row.get('start_date') is None: continue workout_date = row['start_date'] workout_id = row['workout_id'] # Initialize workout details if this workout_id hasn't been seen for this date if workout_id not in workouts_by_date[workout_date]: workouts_by_date[workout_date][workout_id].update({ 'workout_id': workout_id, 'start_date': workout_date, }) # Add set details if topset_id exists if row.get('topset_id'): weight = row.get('weight') or 0 reps = row.get('repetitions') or 0 prev_max = row.get('prev_max_weight') or 0 prev_weight = row.get('prev_session_weight') or 0 prev_reps = row.get('prev_session_reps') or 0 is_pr = weight > prev_max and prev_max > 0 is_improvement = (weight > prev_weight) or (weight == prev_weight and reps > prev_reps) if prev_weight > 0 else False workouts_by_date[workout_date][workout_id]['sets'].append({ 'repetitions': reps, 'weight': weight, 'exercise_name': row.get('exercise_name'), 'is_pr': is_pr, 'is_improvement': is_improvement }) # Convert nested defaultdict to regular dict processed_workouts = { d: dict(w) for d, w in workouts_by_date.items() } return processed_workouts, person_name def _process_workouts_for_month_view(grouped_workouts, start_date, end_date, selected_date): """Formats grouped workout data for the monthly calendar view.""" days_data = [] today = datetime.today().date() current_date = start_date while current_date <= end_date: day_workouts_dict = grouped_workouts.get(current_date, {}) day_workouts_list = list(day_workouts_dict.values()) # Convert workout dicts to list total_sets = 0 has_pr = False has_improvement = False pr_count = 0 improvement_count = 0 unique_exercise_names = [] for workout in day_workouts_list: total_sets += len(workout.get('sets', [])) for s in workout.get('sets', []): if s.get('is_pr'): has_pr = True pr_count += 1 if s.get('is_improvement'): has_improvement = True improvement_count += 1 name = s.get('exercise_name') if name and name not in unique_exercise_names: unique_exercise_names.append(name) days_data.append({ 'date_obj': current_date, # Pass the date object for easier template logic 'day': current_date.day, 'is_today': current_date == today, # Correct comparison: date object == date object 'is_in_current_month': current_date.month == selected_date.month, 'has_workouts': len(day_workouts_list) > 0, 'workout_count': len(day_workouts_list), 'total_sets': total_sets, 'has_pr': has_pr, 'has_improvement': has_improvement, 'pr_count': pr_count, 'improvement_count': improvement_count, 'exercise_names': unique_exercise_names[:3], # Limit to first 3 for summary 'workouts': day_workouts_list }) current_date += timedelta(days=1) return days_data def _process_workouts_for_year_view(grouped_workouts, year_date): """Formats grouped workout data for the yearly calendar view.""" months_data = [] today = datetime.today().date() for month_num in range(1, 13): first_day_of_month = year_date.replace(month=month_num, day=1) # Calculate the first day to display on the calendar month grid (previous Sunday) start_date_month = first_day_of_month - timedelta(days=first_day_of_month.weekday() + 1) # Calculate the last day of the grid (start + 6 weeks - 1 day) end_date_month = start_date_month + timedelta(days=41) month_days_data = [] current_day = start_date_month while current_day <= end_date_month: day_workouts_dict = grouped_workouts.get(current_day, {}) day_workouts_list = list(day_workouts_dict.values()) has_workouts = len(day_workouts_list) > 0 # Get first workout ID if workouts exist first_workout_id = day_workouts_list[0]['workout_id'] if has_workouts else None month_days_data.append({ 'date_obj': current_day, 'day': current_day.day, 'is_today': current_day == today, # Correct comparison: date object == date object 'is_in_current_month': current_day.month == month_num, 'has_workouts': has_workouts, 'workouts': day_workouts_list, # Pass full workout details 'first_workout_id': first_workout_id # Keep if template uses this }) current_day += timedelta(days=1) months_data.append({ 'name': first_day_of_month.strftime('%B'), 'first_day_of_month': first_day_of_month, # Pass the actual date object 'days': month_days_data }) return months_data # --- Route --- from extensions import db # Import db locally within the route's scope @calendar_bp.route("/person//calendar") def get_calendar(person_id): """Displays the workout calendar for a given person.""" selected_date_str = request.args.get('date') # Get as string first # Use today's date if 'date' param is missing or invalid selected_date = convert_str_to_date(selected_date_str, '%Y-%m-%d') or date.today() selected_view = request.args.get('view', 'month') # Default to month view # Redirect for non-calendar views if selected_view == 'overview': return redirect(url_for('person_overview', person_id=person_id)) if selected_view == 'notes': return redirect(url_for('notes.get_person_notes', person_id=person_id)) try: start_date, end_date, prev_date, next_date = _get_date_range_and_links(selected_date, selected_view) except ValueError as e: # Handle invalid view type gracefully (e.g., flash message and redirect, or error page) # For now, returning a simple error response return f"Error: Invalid view type '{selected_view}'.", 400 # Fetch and process data (only fetch details if in month view) include_details = (selected_view == 'month') raw_workouts = _fetch_workout_data(db.execute, person_id, start_date, end_date, include_details=include_details) grouped_workouts, person_name = _group_workouts_by_date(raw_workouts) # Prepare base context for the template calendar_view_data = { 'person_id': person_id, 'person_name': person_name, 'view': selected_view, 'date': selected_date, # Pass the actual date object for display formatting 'prev_date': prev_date.strftime('%Y-%m-%d') if prev_date else None, # Format dates for URL generation 'next_date': next_date.strftime('%Y-%m-%d') if next_date else None, } # Add view-specific data if selected_view == 'month': calendar_view_data['days'] = _process_workouts_for_month_view(grouped_workouts, start_date, end_date, selected_date) # Calculate summary stats for the selected month total_workouts = 0 total_sets = 0 unique_exercises = set() for workout_date, workouts in grouped_workouts.items(): if workout_date.month == selected_date.month and workout_date.year == selected_date.year: total_workouts += len(workouts) for workout in workouts.values(): total_sets += len(workout.get('sets', [])) for topset in workout.get('sets', []): if topset.get('exercise_name'): unique_exercises.add(topset.get('exercise_name')) calendar_view_data['summary_stats'] = { 'total_workouts': total_workouts, 'total_sets': total_sets, 'total_exercises': len(unique_exercises) } elif selected_view == 'year': calendar_view_data['months'] = _process_workouts_for_year_view(grouped_workouts, selected_date) # Format selected_date for URL generation consistently selected_date_url_str = selected_date.strftime('%Y-%m-%d') # Render response (HTMX or full page) if htmx: # Use current_app imported from Flask return render_block(current_app.jinja_env, 'calendar.html', 'content', **calendar_view_data), 200, {"HX-Push-Url": url_for('.get_calendar', person_id=person_id, view=selected_view, date=selected_date_url_str), "HX-Trigger": "refreshStats"} return render_template('calendar.html', **calendar_view_data), 200, {"HX-Push-Url": url_for('.get_calendar', person_id=person_id, view=selected_view, date=selected_date_url_str), "HX-Trigger": "refreshStats"}