from sqlalchemy import func from datetime import datetime, timedelta import io from flask_sqlalchemy import SQLAlchemy from flask import Flask, make_response, render_template, request, jsonify import jinja_partials from flask_htmx import HTMX import matplotlib.pyplot as plt import matplotlib.dates as mdates import os import sparklines from dateutil.parser import isoparse import humanize from dateutil.relativedelta import relativedelta app = Flask(__name__) # TODO CHANGE SECRET KEY TO ENVIRONMENT VARIABLE app.config['SECRET_KEY'] = 'secret!' app.config["SQLALCHEMY_ECHO"] = False # True for debugging uri = os.getenv("DATABASE_URL") if uri and uri.startswith("postgres://"): uri = uri.replace("postgres://", "postgresql://", 1) app.config['SQLALCHEMY_DATABASE_URI'] = uri app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False jinja_partials.register_extensions(app) htmx = HTMX(app) db = SQLAlchemy(app) class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(255), nullable=False) bike_id = db.Column(db.Integer, db.ForeignKey( 'bikes.id', ondelete='CASCADE'), nullable=False) workouts = db.relationship('Workout', backref='user', lazy=True) bike = db.relationship('Bike', backref='user', lazy=True) class Bike(db.Model): __tablename__ = 'bikes' id = db.Column(db.Integer, primary_key=True) display_name = db.Column(db.String(255), nullable=False) code_name = db.Column(db.String(255), nullable=False) class Workout(db.Model): __tablename__ = 'workouts' id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey( 'users.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False, default=db.func.now()) bike_id = db.Column(db.Integer, db.ForeignKey( 'bikes.id', ondelete='CASCADE'), nullable=False) started_at = db.Column(db.DateTime, nullable=False) duration = db.Column(db.Numeric, nullable=False) average_rpm = db.Column(db.Numeric, nullable=False) min_rpm = db.Column(db.Integer, nullable=False) max_rpm = db.Column(db.Integer, nullable=False) calories = db.Column(db.Numeric, nullable=False) distance = db.Column(db.Numeric, nullable=False) average_bpm = db.Column(db.Numeric, nullable=False) min_bpm = db.Column(db.Integer, nullable=False) max_bpm = db.Column(db.Integer, nullable=False) is_heart_rate_available = db.Column( db.Boolean, nullable=False, default=False) is_cadence_available = db.Column(db.Boolean, nullable=False, default=False) cadence_readings = db.relationship( 'CadenceReading', backref='workout', lazy=True) heart_rate_readings = db.relationship( 'HeartRateReading', backref='workout', lazy=True) bike = db.relationship('Bike', backref='workouts', lazy=True) class CadenceReading(db.Model): __tablename__ = 'cadence_readings' id = db.Column(db.Integer, primary_key=True) workout_id = db.Column(db.Integer, db.ForeignKey( 'workouts.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False) rpm = db.Column(db.Integer, nullable=False) distance = db.Column(db.Numeric, nullable=False) speed = db.Column(db.Numeric, nullable=False) calories = db.Column(db.Numeric, nullable=False) power = db.Column(db.Integer, nullable=False) class HeartRateReading(db.Model): __tablename__ = 'heartrate_readings' id = db.Column(db.Integer, primary_key=True) workout_id = db.Column(db.Integer, db.ForeignKey( 'workouts.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False) bpm = db.Column(db.Integer, nullable=False) @app.route('/', methods=['GET']) def get_workouts(): return render_users_and_workouts() @app.route("/user//new_workout") def new_workout(user_id): user = User.query.get(user_id) return render_template('new_workout.html', user=user) @app.route('/users', methods=['GET', 'POST']) def users(): # create a new user data = request.form name = data['name'] bike_id = data['bike_id'] # create a new user and add it to the database new_user = User(name=name, bike_id=bike_id) db.session.add(new_user) db.session.commit() return render_users_and_workouts() @app.route('/user/', methods=['DELETE']) def delete_user(user_id): user = User.query.get(user_id) if user: db.session.delete(user) db.session.commit() return render_users_and_workouts() else: return jsonify({'error': 'User not found.'}), 404 @app.route('/user//workouts', methods=['GET', 'POST']) def workouts(user_id): user = User.query.get(user_id) if request.method == 'GET': workouts_data = get_workouts_for_user(user, user.workouts) return render_template('workouts_list.html', workouts=workouts_data) elif request.method == 'POST': app.logger.info(f'Creating workout for user {user.name} ({user.id})') data = request.json cadence_readings = data['cadence_readings'] or [] heart_rate_readings = data['heart_rate_readings'] or [] # create a new workout workout = Workout(user_id=user_id, bike_id=user.bike_id) db.session.add(workout) db.session.commit() app.logger.info( f'Workout({workout.id}) created for user {user.name} ({user.id}) with {len(cadence_readings)} cadence readings and {len(heart_rate_readings)} heart rate readings') # add cadence readings to the workout for c in cadence_readings: cadence_reading = CadenceReading( workout_id=workout.id, created_at=c['timestamp'], rpm=c['rpm'], distance=c['distance'], speed=c['speed'], calories=c['calories'], power=c['power']) db.session.add(cadence_reading) for h in heart_rate_readings: heart_rate_reading = HeartRateReading( workout_id=workout.id, created_at=h['timestamp'], bpm=h['bpm']) db.session.add(heart_rate_reading) if cadence_readings: timestamps = [isoparse( c['timestamp']) for c in cadence_readings] start_time = min(timestamps) end_time = max(timestamps) duration = end_time - start_time duration = duration.total_seconds() average_rpm = sum( c['rpm'] for c in cadence_readings) / len(cadence_readings) min_rpm = min( c['rpm'] for c in cadence_readings) max_rpm = max( c['rpm'] for c in cadence_readings) calories = cadence_readings[-1]['calories'] distance = cadence_readings[-1]['distance'] workout.is_cadence_available = True workout.started_at = start_time workout.duration = duration workout.average_rpm = average_rpm workout.min_rpm = min_rpm workout.max_rpm = max_rpm workout.calories = calories workout.distance = distance if heart_rate_readings: bpm = [h['bpm'] for h in heart_rate_readings] average_bpm = sum(bpm) / len(bpm) min_bpm = min(bpm) max_bpm = max(bpm) workout.is_heart_rate_available = True workout.average_bpm = average_bpm workout.min_bpm = min_bpm workout.max_bpm = max_bpm db.session.commit() return jsonify({'message': 'Workout created successfully.'}), 201 @app.route('/user//workout//', methods=['GET', 'DELETE']) def workout(user_id, workout_id, graph_type): workout = Workout.query.filter_by(user_id=user_id, id=workout_id).join( Workout.cadence_readings).join(Workout.heart_rate_readings).first() if not workout: return jsonify({'message': 'Workout {} not found for user {}.'.format(workout_id, user_id)}), 404 if workout.is_cadence_available: x_values = [reading.created_at for reading in workout.cadence_readings] if graph_type == 'cadence': y_values = [reading.rpm for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Cadence (RPM)', filename='cadence'), 200 elif graph_type == 'speed': y_values = [reading.speed for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Speed (KPH)', filename='speed'), 200 elif graph_type == 'distance': y_values = [ reading.distance for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Distance (KM)', filename='distance'), 200 elif graph_type == 'calories': y_values = [ reading.calories for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Calories (KCAL)', filename='calories'), 200 elif graph_type == 'power': y_values = [reading.power for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Power (WATTS)', filename='power'), 200 if workout.is_heart_rate_available: x_values = [ reading.created_at for reading in workout.heart_rate_readings] y_values = [reading.bpm for reading in workout.heart_rate_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Heart Rate (BPM)', filename='heart_rate'), 200 return jsonify({'message': 'Unable to generate {} for workout {}.'.format(graph_type, workout_id)}), 409 @app.route('/user//workout//view', methods=['GET']) def view_workout(user_id, workout_id): workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first() graph_types = request.args.getlist('graph_types') return render_template('workout_view.html', workout=workout, graph_types=graph_types) @app.route('/user//workout//delete', methods=['DELETE']) def delete_workout(user_id, workout_id): # Delete the workout and its associated cadence readings CadenceReading.query.filter_by(workout_id=workout_id).delete() HeartRateReading.query.filter_by(workout_id=workout_id).delete() Workout.query.filter_by(user_id=user_id, id=workout_id).delete() db.session.commit() return render_users_and_workouts() @app.route('/user//workouts', methods=['GET']) def workouts_for_user(user_id): user = User.query.get(user_id) workouts_data = get_workouts_for_user(user, user.workouts) return render_template('workouts_list.html', workouts=workouts_data) @app.route('/user//bike', methods=['GET']) def update_users_bike(user_id): bike_id = request.args.get('bike_id') user = User.query.get(user_id) bike = Bike.query.get(bike_id) if user and bike: user.bike_id = bike_id db.session.commit() return render_users_and_workouts() else: return jsonify({'error': 'User or bike not found.'}), 404 @ app.route('/user//calendar', methods=['GET']) def calendar_view(user_id): user = User.query.get(user_id) workouts = get_workouts_for_user(user, user.workouts) date = request.args.get('date', default=datetime.now().date(), type=toDate) calendar_month = generate_calendar_monthly_view(workouts, date) return render_template('partials/calendar.html', calendar_month=calendar_month, user_id=user_id) @ app.route("/user//workout/list", methods=['GET']) def workout_list(user_id): user = User.query.get(user_id) workouts = get_workouts_for_user(user, user.workouts) return render_template('partials/workouts_list_fragment.html', workouts=workouts, user_id=user_id, workouts_all_loaded=True) @ app.route("/user//workout//calendar_view", methods=['GET']) def calendar_workout_view(user_id, workout_id): workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first() user = User.query.get(user_id) workout_view_data = get_workout_view_data(workout, user) return render_template('partials/selected_workout_view.html', workout=workout_view_data, user_id=user_id) # Utilities def generate_user_data(user, workouts=[]): """ Generate data for a single user. Parameters: - user: The user object. - workouts: List of workouts for the user. Returns: - dict: A dictionary containing user data and related information. """ return { 'id': user.id, 'name': user.name, 'bike_id': user.bike_id, 'workouts_count': len(workouts), 'workouts': workouts, 'daily_duration_sparkline': generate_daily_duration_sparkline(workouts), 'calendar_month': generate_calendar_monthly_view(workouts, datetime.now().date()) } def render_users_and_workouts(): """ Render users and their associated workouts. """ users = User.query.all() users_data = [generate_user_data(user, get_workouts_for_user( user, user.workouts)) for user in users] template_name = 'users_and_workouts_wrapper.html' if htmx else 'users_and_workouts.html' return render_template(template_name, users=users_data, bikes=Bike.query.all()) def get_workouts_for_user(user, workouts): """ Retrieve view data for all workouts of a user. Parameters: - user: The user object. - workouts: A list of all user workouts. Returns: - list: A list of view data for valid workouts. """ return [get_workout_view_data(workout, user) for workout in workouts if get_workout_view_data(workout, user)] def get_workout_view_data(workout, user): duration = timedelta( seconds=int(workout.duration)) if workout.duration else timedelta(seconds=0) average_rpm = workout.average_rpm if workout.average_rpm else 0 min_rpm = workout.min_rpm if workout.min_rpm else 0 max_rpm = workout.max_rpm if workout.max_rpm else 0 calories = workout.calories if workout.calories else 0 distance = workout.distance if workout.distance else 0 average_bpm = workout.average_bpm if workout.average_bpm else 0 min_bpm = workout.min_bpm if workout.min_bpm else 0 max_bpm = workout.max_bpm if workout.max_bpm else 0 is_heart_rate_available = workout.is_heart_rate_available is_cadence_available = workout.is_cadence_available start_time = workout.started_at selected_graph_types = ['speed'] if is_heart_rate_available: selected_graph_types.append('heart_rate') if is_cadence_available or is_heart_rate_available: return { 'id': workout.id, 'user_id': user.id, 'user_name': user.name, 'start_time': format_date_with_ordinal(start_time, '%#H:%M %B %dth %Y'), 'start_time_date': start_time, 'start_time_ago': humanize.naturaltime(start_time), 'duration': humanize.naturaldelta(duration), 'duration_minutes': duration.total_seconds() / 60, 'average_rpm': int(average_rpm), 'min_rpm': int(min_rpm), 'max_rpm': int(max_rpm), 'calories': int(calories), 'distance': int(distance), 'bike_display_name': workout.bike.display_name, 'selected_graph_types': selected_graph_types, 'is_heart_rate_available': is_heart_rate_available, 'is_cadence_available': is_cadence_available, 'average_bpm': int(average_bpm), 'min_bpm': int(min_bpm), 'max_bpm': int(max_bpm), } else: return None def create_graph(x_values, y_values, y_label, filename, x_label='Time'): fig, ax = plt.subplots() ax.plot(x_values, y_values) ax.set_xlabel(x_label) ax.set_ylabel(y_label) # ax.set_title( # 'Cadence Readings for Workout {}'.format(workout_id)) ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) # set the y-axis limits to start at 0 ax.set_ylim(bottom=0) # Save the graph to a bytes buffer buffer = io.BytesIO() plt.savefig(buffer, format='png', transparent=True, bbox_inches='tight') buffer.seek(0) # Create a response object with the graph image response = make_response(buffer.getvalue()) response.headers['Content-Type'] = 'image/png' response.headers['Content-Disposition'] = 'attachment; filename={filename}.png' return response def format_date_with_ordinal(d, format_string): ordinal = {'1': 'st', '2': 'nd', '3': 'rd'}.get(str(d.day)[-1:], 'th') return d.strftime(format_string).replace('{th}', ordinal) def format_duration(duration): hours, remainder = divmod(duration.seconds, 3600) minutes, _ = divmod(remainder, 60) if duration >= timedelta(hours=1): return f"{hours}h {minutes}m" else: return f"{minutes}m" def date_range(start_date, end_date): """ Generator for dates between two dates (inclusive). """ current_date = start_date while current_date <= end_date: yield current_date current_date += timedelta(days=1) def get_month_bounds(dt): """ Determine the bounds of a month for a given date. This considers the starting and ending weekdays to fit a calendar view. """ first_day_of_month = dt.replace(day=1) next_month = first_day_of_month + relativedelta(months=1) last_day_of_month = next_month - timedelta(days=1) # Define weekday mappings to determine start and end bounds weekday_to_start_offset = {6: 0, 0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6} weekday_to_end_offset = {6: 6, 0: 5, 1: 4, 2: 3, 3: 2, 4: 1, 5: 0} start_date = first_day_of_month - \ timedelta(days=weekday_to_start_offset[first_day_of_month.weekday()]) end_date = last_day_of_month + \ timedelta(days=weekday_to_end_offset[last_day_of_month.weekday()]) return start_date, end_date def generate_calendar_monthly_view(workouts, selected_date): """ Generate a monthly calendar view of the workouts. """ def get_workout_for_date(workout_lookup, date): return workout_lookup.get(date) start_date, end_date = get_month_bounds(selected_date) # Build a lookup dictionary for faster access workout_lookup = {w['start_time_date'].date( ): w for w in workouts if start_date <= w['start_time_date'].date() <= end_date} current_date = datetime.now().date() days_of_month = [ { 'date': day, 'day_of_month': day.day, 'is_workout': day in workout_lookup, 'workout': get_workout_for_date(workout_lookup, day), 'is_current_date': day == current_date, 'is_current_month': day.month == selected_date.month and day.year == selected_date.year } for day in date_range(start_date, end_date) ] next_month_date = selected_date + relativedelta(months=1) previous_month_date = selected_date - relativedelta(months=1) calendar_month = { 'month_year': selected_date.strftime('%B, %Y'), 'days_of_month': days_of_month, 'next_month': next_month_date, 'previous_month': previous_month_date, } return calendar_month def generate_daily_duration_sparkline(workouts): """ Generate a sparkline string representation of daily workout durations. Parameters: - workouts (list of dict): Each dict should contain 'start_time_date' and 'duration_minutes' keys. Returns: - str: Sparkline representation of daily durations. """ if not workouts: return '' # Determine date range based on workouts data start_date = workouts[-1]['start_time_date'].date() end_date = workouts[0]['start_time_date'].date() # Build a mapping of dates to their respective durations for easier lookup workouts_by_date = {w['start_time_date'].date(): int( w['duration_minutes']) for w in workouts} daily_durations = [workouts_by_date.get( date, 0) for date in date_range(start_date, end_date)] # Reverse the list to make the most recent day appear on the right daily_durations.reverse() return sparklines.sparklines(daily_durations)[0] def toDate(dateString): return datetime.strptime(dateString, "%Y-%m-%d").date() if __name__ == '__main__': # Bind to PORT if defined, otherwise default to 5000. port = int(os.environ.get('PORT', 5000)) app.run(host='127.0.0.1', port=port)