from collections import defaultdict from flask_basicauth import BasicAuth import matplotlib.dates as mdates import matplotlib.pyplot as plt from dateutil.relativedelta import relativedelta import humanize from dateutil.parser import isoparse import sparklines import os from datetime import datetime, timedelta import io from flask_sqlalchemy import SQLAlchemy from flask import Flask, make_response, render_template, request, jsonify import jinja_partials from flask_htmx import HTMX import matplotlib matplotlib.use('Agg') app = Flask(__name__) # TODO CHANGE SECRET KEY TO ENVIRONMENT VARIABLE app.config['SECRET_KEY'] = 'secret!' app.config["SQLALCHEMY_ECHO"] = False # True for debugging uri = os.getenv("DATABASE_URL") if uri and uri.startswith("postgres://"): uri = uri.replace("postgres://", "postgresql://", 1) app.config['SQLALCHEMY_DATABASE_URI'] = uri app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False jinja_partials.register_extensions(app) htmx = HTMX(app) db = SQLAlchemy(app) app.config['BASIC_AUTH_USERNAME'] = os.getenv("ADMIN_USERNAME") or 'admin' app.config['BASIC_AUTH_PASSWORD'] = os.getenv("ADMIN_PASSWORD") or 'admin' basic_auth = BasicAuth(app) class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(255), nullable=False) bike_id = db.Column(db.Integer, db.ForeignKey( 'bikes.id', ondelete='CASCADE'), nullable=False) workouts = db.relationship( 'Workout', backref='user', lazy=True, order_by='desc(Workout.created_at)') bike = db.relationship('Bike', backref='user', lazy=True) class Bike(db.Model): __tablename__ = 'bikes' id = db.Column(db.Integer, primary_key=True) display_name = db.Column(db.String(255), nullable=False) code_name = db.Column(db.String(255), nullable=False) class Workout(db.Model): __tablename__ = 'workouts' id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey( 'users.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False, default=db.func.now()) bike_id = db.Column(db.Integer, db.ForeignKey( 'bikes.id', ondelete='CASCADE'), nullable=False) started_at = db.Column(db.DateTime, nullable=False) duration = db.Column(db.Numeric, nullable=False) average_rpm = db.Column(db.Numeric, nullable=False) min_rpm = db.Column(db.Integer, nullable=False) max_rpm = db.Column(db.Integer, nullable=False) calories = db.Column(db.Numeric, nullable=False) distance = db.Column(db.Numeric, nullable=False) average_bpm = db.Column(db.Numeric, nullable=False) min_bpm = db.Column(db.Integer, nullable=False) max_bpm = db.Column(db.Integer, nullable=False) is_heart_rate_available = db.Column( db.Boolean, nullable=False, default=False) is_cadence_available = db.Column(db.Boolean, nullable=False, default=False) cadence_readings = db.relationship( 'CadenceReading', backref='workout', lazy=True) heart_rate_readings = db.relationship( 'HeartRateReading', backref='workout', lazy=True) bike = db.relationship('Bike', backref='workouts', lazy=True) class CadenceReading(db.Model): __tablename__ = 'cadence_readings' id = db.Column(db.Integer, primary_key=True) workout_id = db.Column(db.Integer, db.ForeignKey( 'workouts.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False) rpm = db.Column(db.Integer, nullable=False) distance = db.Column(db.Numeric, nullable=False) speed = db.Column(db.Numeric, nullable=False) calories = db.Column(db.Numeric, nullable=False) power = db.Column(db.Integer, nullable=False) class HeartRateReading(db.Model): __tablename__ = 'heartrate_readings' id = db.Column(db.Integer, primary_key=True) workout_id = db.Column(db.Integer, db.ForeignKey( 'workouts.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False) bpm = db.Column(db.Integer, nullable=False) class UserGraphs(db.Model): __tablename__ = 'user_graphs' id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey( 'users.id', ondelete='CASCADE'), nullable=False) start_date = db.Column(db.DateTime, nullable=False) end_date = db.Column(db.DateTime, nullable=False) period = db.Column(db.String(255), nullable=False) attributes = db.Column(db.ARRAY(db.String(255))) @app.route('/', methods=['GET']) def overview(): return render_users_and_workouts() @app.route("/user//new_workout") def new_workout(user_id): user = User.query.get(user_id) return render_template('new_workout.html', user=user) @app.route('/users', methods=['POST']) def create_user(): data = request.form name = data.get('name') bike_id = data.get('bike_id') # Ensure name and bike_id are provided if not name or not bike_id: return jsonify({'message': 'Name and Bike ID are required'}), 400 # Create and commit the new user to the database new_user = User(name=name, bike_id=bike_id) db.session.add(new_user) db.session.commit() return render_users_and_workouts() @app.route('/user/', methods=['DELETE']) @basic_auth.required def delete_user(user_id): user = User.query.get(user_id) if user: db.session.delete(user) db.session.commit() return render_users_and_workouts() else: return jsonify({'error': 'User not found.'}), 404 @app.route('/user//workouts', methods=['GET']) def get_workouts(user_id): user = User.query.get(user_id) workouts_data = get_workouts_for_user_view_data(user) return render_template('workouts_list.html', workouts=workouts_data) @app.route('/user//workouts', methods=['POST']) def create_workout(user_id): user = User.query.get(user_id) app.logger.info(f'Creating workout for user {user.name} ({user.id})') data = request.json cadence_readings = data.get('cadence_readings', []) heart_rate_readings = data.get('heart_rate_readings', []) # Create a new workout workout = Workout(user_id=user_id, bike_id=user.bike_id) db.session.add(workout) db.session.flush() # To get the workout.id before committing # Add cadence readings to the workout cadences = [{ 'workout_id': workout.id, 'created_at': c['timestamp'], 'rpm': c['rpm'], 'distance': c['distance'], 'speed': c['speed'], 'calories': c['calories'], 'power': c['power'] } for c in cadence_readings] heart_rates = [{ 'workout_id': workout.id, 'created_at': h['timestamp'], 'bpm': h['bpm'] } for h in heart_rate_readings] db.session.bulk_insert_mappings(CadenceReading, cadences) db.session.bulk_insert_mappings(HeartRateReading, heart_rates) if cadence_readings: timestamps = [isoparse(c['timestamp']) for c in cadence_readings] rpms = [c['rpm'] for c in cadence_readings] workout.is_cadence_available = True workout.started_at = min(timestamps) workout.duration = ( max(timestamps) - workout.started_at).total_seconds() workout.average_rpm = sum(rpms) / len(rpms) workout.min_rpm = min(rpms) workout.max_rpm = max(rpms) workout.calories = cadence_readings[-1]['calories'] workout.distance = cadence_readings[-1]['distance'] if heart_rate_readings: bpms = [h['bpm'] for h in heart_rate_readings] workout.is_heart_rate_available = True workout.average_bpm = sum(bpms) / len(bpms) workout.min_bpm = min(bpms) workout.max_bpm = max(bpms) db.session.commit() return f'Added {humanize.naturaldelta(workout.duration)} session.', 201 @app.route('/user//workouts/graph', methods=['GET']) def graph_user_workouts(user_id): attributes = request.args.getlist( 'attributes', type=str) if not attributes: return "" if htmx: return f""" No image """ user = User.query.get(user_id) # workouts = user.workouts workouts = get_workouts_for_user_view_data(user) earliest_workout_date = min([workout['start_time_date'] for workout in workouts]) most_recent_workout_date = max( [workout['start_time_date'] for workout in workouts]) start_date = request.args.get( 'start_date', default=earliest_workout_date, type=toDate) end_date = request.args.get( 'end_date', default=most_recent_workout_date, type=toDate) period = request.args.get('period', default='day', type=str) # Add record of user graph insert_usergraph_if_not_exists( user_id, start_date, end_date, period, attributes) return plot_averaged_attributes(workouts, user, start_date, end_date, period, attributes) @app.route('/user//workouts/graph/delete', methods=['DELETE']) def delete_user_graph(user_id): attributes = request.args.getlist( 'attributes', type=str) start_date = request.args.get( 'start_date', default=datetime.now().date(), type=toDate) end_date = request.args.get( 'end_date', default=datetime.now().date(), type=toDate) period = request.args.get('period', default='day', type=str) remove_usergraph(user_id, start_date, end_date, period, attributes) return "" def daterange(start_date, end_date, delta=timedelta(days=1)): """Helper generator to iterate over date ranges.""" curr_date = start_date while curr_date < end_date: yield curr_date curr_date += delta def average_workout_attributes_per_period(workouts, start_date, end_date, period, attributes): """ Returns a dictionary of averaged attributes for workouts within each given period between start_date and end_date. Parameters: - workouts (list): List of Workout objects. - start_date, end_date: Date range to consider. - period (str): 'day', 'week', or 'month'. - attributes (list): List of attribute names to average. Returns: - Dictionary: A nested dictionary where keys are dates representing the start of each period, and the values are dictionaries with averaged attributes for that period. """ if period == "day": delta = timedelta(days=1) elif period == "week": delta = timedelta(weeks=1) elif period == "month": # approximating month as 4 weeks for simplicity delta = timedelta(weeks=4) else: raise ValueError(f"Invalid period: {period}") results = defaultdict(lambda: defaultdict(float)) for start_period in daterange(start_date, end_date, delta): end_period = start_period + delta filtered_workouts = [ w for w in workouts if start_period <= w['start_time_date'] < end_period] for attribute in attributes: valid_values = [w.get(attribute) for w in filtered_workouts if w.get( attribute) is not None] if valid_values: average = sum(valid_values) / len(valid_values) results[start_period][attribute] = average else: results[start_period][attribute] = 0 # None results[start_period]['workout_count'] = len( filtered_workouts) return dict(results) def create_user_graph(x_values, y_data, filename, x_label='Time', title=None): """Create a graph for given x-values and y-values. Parameters: - x_values: A list of x-values (common for all graphs). - y_data: A dictionary where key is y_label and value is list of y-values. - filename: Name for the generated file. - x_label: Label for x-axis. - title: Title for the plot. Returns: - Flask Response object containing the image of the graph. """ fig, ax = plt.subplots() # Set the title if provided if title: ax.set_title(title) # Plotting multiple lines for y_label, y_values in y_data.items(): ax.plot(x_values, y_values, label=y_label) ax.set_xlabel(x_label) ax.legend() # Show legend to differentiate between multiple attributes ax.xaxis.set_major_formatter(mdates.DateFormatter("%d/%m")) ax.set_ylim(bottom=0) # Save the graph to a bytes buffer buffer = io.BytesIO() plt.savefig(buffer, format='png', transparent=True, bbox_inches='tight') buffer.seek(0) # Create a response object with the graph image response = make_response(buffer.getvalue()) response.headers['Content-Type'] = 'image/png' response.headers['Content-Disposition'] = f'attachment; filename={filename}.png' return response def plot_averaged_attributes(workouts_list, user, start_date, end_date, period, attributes): """Creates a graph for averaged attributes over a period. Parameters: - start_date, end_date: Date range to consider. - period (str): 'day', 'week', or 'month'. - attributes (list): A list of attribute names to plot. Returns: - Flask Response object containing the image of the graph. """ user_data = generate_user_data(user) title = f'{format_key_values(user_data["attributes"], attributes)} over {get_value_from_key(user_data["periods"], period)} ({start_date.strftime("%d/%m/%y")} - {end_date.strftime("%d/%m/%y")})' # Fetching the data averaged_attributes = average_workout_attributes_per_period( workouts_list, start_date, end_date, period, attributes) # Extracting x_values and y_values x_values = list(averaged_attributes.keys()) y_data = {} for attribute in attributes: y_data[get_value_from_key(user_data["attributes"], attribute)] = [averaged_attributes[date][attribute] for date in x_values] # Creating the graph return create_user_graph(x_values, y_data, filename=f"average_attributes_over_{period}", title=title) @ app.route('/user//workout//', methods=['GET']) def workout(user_id, workout_id, graph_type): workout = Workout.query.filter_by(user_id=user_id, id=workout_id) \ .join(Workout.cadence_readings) \ .join(Workout.heart_rate_readings) \ .first() if not workout: return jsonify({'message': f'Workout {workout_id} not found for user {user_id}.'}), 404 graph_mappings = { 'cadence': ('cadence_readings', 'rpm', 'Cadence (RPM)', 'cadence'), 'speed': ('cadence_readings', 'speed', 'Speed (KPH)', 'speed'), 'distance': ('cadence_readings', 'distance', 'Distance (KM)', 'distance'), 'calories': ('cadence_readings', 'calories', 'Calories (KCAL)', 'calories'), 'power': ('cadence_readings', 'power', 'Power (WATTS)', 'power'), 'heart_rate': ('heart_rate_readings', 'bpm', 'Heart Rate (BPM)', 'heart_rate') } readings_attr, y_attr, y_label, filename = graph_mappings.get( graph_type, (None, None, None, None)) readings = getattr(workout, readings_attr, []) if readings: x_values = [reading.created_at for reading in readings] y_values = [getattr(reading, y_attr) for reading in readings] return create_graph(x_values=x_values, y_values=y_values, y_label=y_label, filename=filename), 200 return jsonify({'message': f'Unable to generate {graph_type} for workout {workout_id}.'}), 409 @ app.route('/user//workout//view', methods=['GET']) def view_workout(user_id, workout_id): workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first() graph_types = request.args.getlist('graph_types') return render_template('workout_view.html', workout=workout, graph_types=graph_types) @ app.route('/user//workout//delete', methods=['DELETE']) @ basic_auth.required def delete_workout(user_id, workout_id): # Delete the workout and its associated cadence readings CadenceReading.query.filter_by(workout_id=workout_id).delete() HeartRateReading.query.filter_by(workout_id=workout_id).delete() Workout.query.filter_by(user_id=user_id, id=workout_id).delete() db.session.commit() return render_users_and_workouts() @ app.route('/user//bike', methods=['GET']) def update_users_bike(user_id): bike_id = request.args.get('bike_id') user = User.query.get(user_id) bike = Bike.query.get(bike_id) if user and bike: user.bike_id = bike_id db.session.commit() return render_users_and_workouts() else: return jsonify({'error': 'User or bike not found.'}), 404 @ app.route('/user//calendar', methods=['GET']) def calendar_view(user_id): user = User.query.get(user_id) workouts = get_workouts_for_user_view_data(user) date = request.args.get( 'date', default=datetime.now().date(), type=toDate) calendar_month = generate_calendar_monthly_view(workouts, date) return render_template('partials/calendar.html', calendar_month=calendar_month, user_id=user_id) @ app.route("/user//workout/list", methods=['GET']) def workout_list(user_id): user = User.query.get(user_id) workouts = get_workouts_for_user_view_data(user) return render_template('partials/workouts_list_fragment.html', workouts=workouts, user_id=user_id, workouts_all_loaded=True) @ app.route("/user//workout//calendar_view", methods=['GET']) def calendar_workout_view(user_id, workout_id): workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first() user = User.query.get(user_id) workout_view_data = get_workout_view_data(workout, user) return render_template('partials/selected_workout_view.html', workout=workout_view_data, user_id=user_id) # Utilities def generate_user_data(user, workouts=[]): """ Generate data for a single user. Parameters: - user: The user object. - workouts: List of workouts for the user. Returns: - dict: A dictionary containing user data and related information. """ return { 'id': user.id, 'name': user.name, 'bike_id': user.bike_id, 'workouts_count': len(workouts), 'workouts': workouts, 'daily_duration_sparkline': generate_daily_duration_sparkline(workouts), 'calendar_month': generate_calendar_monthly_view(workouts, datetime.now().date()), 'attributes': [('workout_count', 'Workout count'), ('duration_seconds', 'Duration (sec)'), ('duration_minutes', 'Duration (min)'), ('average_rpm', 'Average RPM'), ('max_rpm', 'Max RPM'), ('average_bpm', 'Average BPM'), ('max_bpm', 'Max BPM'), ('distance', 'Distance'), ('calories', 'Calories')], 'periods': [('day', 'Day'), ('week', 'Week'), ('month', 'Month')], # (period: str, attributes: [str]) # 'graphs': [('month', ['duration_minutes']), ('week', ['average_rpm', 'average_bpm']), ('week', ['workout_count'])], 'graphs': get_user_graphs(user.id), 'first_workout_date': workouts[-1]['start_time_date'] if workouts else None, 'last_workout_date': workouts[0]['start_time_date'] if workouts else None, } def render_users_and_workouts(): """ Render users and their associated workouts. """ users = User.query.all() users_data = [generate_user_data(user, get_workouts_for_user_view_data( user)) for user in users] template_name = 'users_and_workouts_list.html' if htmx else 'overview.html' return render_template(template_name, users=users_data, bikes=Bike.query.all()) def get_workouts_for_user_view_data(user): """ Retrieve view data for all workouts of a user. Parameters: - user: The user object. Returns: - list: A list of view data for valid workouts. """ return [get_workout_view_data(workout, user) for workout in user.workouts if get_workout_view_data(workout, user)] def format_workout_data(workout, user): """ Formats the workout data for view. Parameters: - workout: The workout object. - user: The user associated with the workout. Returns: - dict: A dictionary with formatted workout data. """ duration = timedelta(seconds=int(workout.duration or 0)) return { 'id': workout.id, 'user_id': user.id, 'user_name': user.name, 'start_time': format_date_with_ordinal(workout.started_at, '%#H:%M %B %dth %Y'), 'start_time_date': workout.started_at.date(), 'start_time_ago': humanize.naturaltime(workout.started_at), 'duration': humanize.naturaldelta(duration), 'duration_seconds': duration.total_seconds(), 'duration_minutes': duration.total_seconds() / 60, 'average_rpm': int(workout.average_rpm or 0), 'min_rpm': int(workout.min_rpm or 0), 'max_rpm': int(workout.max_rpm or 0), 'calories': int(workout.calories or 0), 'distance': int(workout.distance or 0), 'bike_display_name': workout.bike.display_name, 'selected_graph_types': ['speed'] + (['heart_rate'] if workout.is_heart_rate_available else []), 'is_heart_rate_available': workout.is_heart_rate_available, 'is_cadence_available': workout.is_cadence_available, 'average_bpm': int(workout.average_bpm or 0), 'min_bpm': int(workout.min_bpm or 0), 'max_bpm': int(workout.max_bpm or 0), } def get_workout_view_data(workout, user): """ Retrieve view data for a single workout. Parameters: - workout: The workout object. - user: The user associated with the workout. Returns: - dict or None: A dictionary containing view data if cadence or heart rate is available, otherwise None. """ if workout.is_cadence_available or workout.is_heart_rate_available: return format_workout_data(workout, user) return None def create_graph(x_values, y_values, y_label, filename, x_label='Time'): # Plotting fig, ax = plt.subplots() ax.plot(x_values, y_values) ax.set_xlabel(x_label) ax.set_ylabel(y_label) ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) ax.set_ylim(bottom=0) # Save the graph to a bytes buffer buffer = io.BytesIO() plt.savefig(buffer, format='png', transparent=True, bbox_inches='tight') buffer.seek(0) # Create a response object with the graph image response = make_response(buffer.getvalue()) response.headers['Content-Type'] = 'image/png' response.headers['Content-Disposition'] = 'attachment; filename={filename}.png' return response def format_date_with_ordinal(d, format_string): ordinal = {'1': 'st', '2': 'nd', '3': 'rd'}.get(str(d.day)[-1:], 'th') return d.strftime(format_string).replace('{th}', ordinal) def date_range(start_date, end_date): """ Generator for dates between two dates (inclusive). """ current_date = start_date while current_date <= end_date: yield current_date current_date += timedelta(days=1) def get_month_bounds(dt): """ Determine the bounds of a month for a given date. This considers the starting and ending weekdays to fit a calendar view. """ first_day_of_month = dt.replace(day=1) next_month = first_day_of_month + relativedelta(months=1) last_day_of_month = next_month - timedelta(days=1) # Define weekday mappings to determine start and end bounds weekday_to_start_offset = {6: 0, 0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6} weekday_to_end_offset = {6: 6, 0: 5, 1: 4, 2: 3, 3: 2, 4: 1, 5: 0} start_date = first_day_of_month - \ timedelta(days=weekday_to_start_offset[first_day_of_month.weekday()]) end_date = last_day_of_month + \ timedelta(days=weekday_to_end_offset[last_day_of_month.weekday()]) return start_date, end_date def generate_calendar_monthly_view(workouts, selected_date): """ Generate a monthly calendar view of the workouts. """ start_date, end_date = get_month_bounds(selected_date) # Build a lookup dictionary for faster access workout_lookup = {w['start_time_date'] : w for w in workouts if start_date <= w['start_time_date'] <= end_date} current_date = datetime.now().date() days_of_month = [ { 'date': day, 'day_of_month': day.day, 'is_workout': day in workout_lookup, 'workout': workout_lookup.get(day), 'is_current_date': day == current_date, 'is_current_month': day.month == selected_date.month and day.year == selected_date.year } for day in date_range(start_date, end_date) ] next_month_date = selected_date + relativedelta(months=1) previous_month_date = selected_date - relativedelta(months=1) calendar_month = { 'month_year': selected_date.strftime('%B, %Y'), 'days_of_month': days_of_month, 'next_month': next_month_date, 'previous_month': previous_month_date, } return calendar_month def generate_daily_duration_sparkline(workouts): """ Generate a sparkline string representation of daily workout durations. Parameters: - workouts (list of dict): Each dict should contain 'start_time_date' and 'duration_minutes' keys. Returns: - str: Sparkline representation of daily durations. """ if not workouts: return '' # Determine date range based on workouts data start_date = workouts[-1]['start_time_date'] end_date = datetime.now().date() # workouts[0]['start_time_date'] # Build a mapping of dates to their respective durations for easier lookup workouts_by_date = {w['start_time_date']: int( w['duration_minutes']) for w in workouts} daily_durations = [workouts_by_date.get( date, 0) for date in date_range(start_date, end_date)] # Reverse the list to make the most recent day appear on the right daily_durations.reverse() return sparklines.sparklines(daily_durations)[0] def get_user_graphs(user_id): """Retrieve a list of UserGraphs entries for the given user_id.""" user_graphs = UserGraphs.query.filter_by( user_id=user_id).order_by(UserGraphs.id.desc()).all() # change start_date, end_date from datetime to dates for user_graph in user_graphs: user_graph.start_date = user_graph.start_date.date() user_graph.end_date = user_graph.end_date.date() return user_graphs def insert_usergraph_if_not_exists(user_id, start_date, end_date, period, attributes): """Insert a UserGraphs entry if it doesn't already exist based on specified attributes and return its ID.""" existing_graph = UserGraphs.query.filter_by( user_id=user_id, start_date=start_date, end_date=end_date, period=period, attributes=attributes ).first() if not existing_graph: new_graph = UserGraphs( user_id=user_id, start_date=start_date, end_date=end_date, period=period, attributes=attributes ) db.session.add(new_graph) db.session.commit() return new_graph.id # Return the ID of the newly added object return None # Return None if the record already exists def remove_usergraph(user_id, start_date, end_date, period, attributes): """Remove a UserGraphs entry based on specified attributes.""" existing_graph = UserGraphs.query.filter_by( user_id=user_id, start_date=start_date, end_date=end_date, period=period, attributes=attributes ).first() if existing_graph: db.session.delete(existing_graph) db.session.commit() def toDate(dateString): return datetime.strptime(dateString, "%Y-%m-%d").date() def get_value_from_key(tuples_list, key): for k, v in tuples_list: if k == key: return v return None def format_key_values(tuples_list, keys_list): values = [get_value_from_key(tuples_list, key) for key in keys_list] if len(values) == 1: return values[0] elif len(values) == 2: return f"{values[0]} & {values[1]}" else: return ', '.join(values[:-1]) + f" & {values[-1]}" if __name__ == '__main__': # Bind to PORT if defined, otherwise default to 5000. port = int(os.environ.get('PORT', 5000)) app.run(host='127.0.0.1', port=port)