from sqlalchemy import func from datetime import datetime, timedelta import io from flask_sqlalchemy import SQLAlchemy from flask import Flask, make_response, render_template, request, jsonify import jinja_partials from flask_htmx import HTMX import matplotlib.pyplot as plt import matplotlib.dates as mdates import os import sparklines app = Flask(__name__) # TODO CHANGE SECRET KEY TO ENVIRONMENT VARIABLE app.config['SECRET_KEY'] = 'secret!' app.config["SQLALCHEMY_ECHO"] = False # True for debugging uri = os.getenv("DATABASE_URL") if uri and uri.startswith("postgres://"): uri = uri.replace("postgres://", "postgresql://", 1) app.config['SQLALCHEMY_DATABASE_URI'] = uri app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False jinja_partials.register_extensions(app) htmx = HTMX(app) db = SQLAlchemy(app) class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(255), nullable=False) bike_id = db.Column(db.Integer, db.ForeignKey( 'bikes.id', ondelete='CASCADE'), nullable=False) workouts = db.relationship('Workout', backref='user', lazy=True) bike = db.relationship('Bike', backref='user', lazy=True) class Bike(db.Model): __tablename__ = 'bikes' id = db.Column(db.Integer, primary_key=True) display_name = db.Column(db.String(255), nullable=False) code_name = db.Column(db.String(255), nullable=False) class Workout(db.Model): __tablename__ = 'workouts' id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey( 'users.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False, default=db.func.now()) bike_id = db.Column(db.Integer, db.ForeignKey( 'bikes.id', ondelete='CASCADE'), nullable=False) started_at = db.Column(db.DateTime, nullable=False) duration = db.Column(db.Numeric, nullable=False) average_rpm = db.Column(db.Numeric, nullable=False) min_rpm = db.Column(db.Integer, nullable=False) max_rpm = db.Column(db.Integer, nullable=False) calories = db.Column(db.Numeric, nullable=False) distance = db.Column(db.Numeric, nullable=False) average_bpm = db.Column(db.Numeric, nullable=False) min_bpm = db.Column(db.Integer, nullable=False) max_bpm = db.Column(db.Integer, nullable=False) is_heart_rate_available = db.Column( db.Boolean, nullable=False, default=False) is_cadence_available = db.Column(db.Boolean, nullable=False, default=False) cadence_readings = db.relationship( 'CadenceReading', backref='workout', lazy=True) heart_rate_readings = db.relationship( 'HeartRateReading', backref='workout', lazy=True) bike = db.relationship('Bike', backref='workouts', lazy=True) class CadenceReading(db.Model): __tablename__ = 'cadence_readings' id = db.Column(db.Integer, primary_key=True) workout_id = db.Column(db.Integer, db.ForeignKey( 'workouts.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False) rpm = db.Column(db.Integer, nullable=False) distance = db.Column(db.Numeric, nullable=False) speed = db.Column(db.Numeric, nullable=False) calories = db.Column(db.Numeric, nullable=False) power = db.Column(db.Integer, nullable=False) class HeartRateReading(db.Model): __tablename__ = 'heartrate_readings' id = db.Column(db.Integer, primary_key=True) workout_id = db.Column(db.Integer, db.ForeignKey( 'workouts.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False) bpm = db.Column(db.Integer, nullable=False) @app.route('/', methods=['GET']) def get_workouts(): return render_users_and_workouts() @app.route("/user//new_workout") def new_workout(user_id): user = User.query.get(user_id) return render_template('new_workout.html', user=user) @app.route('/users', methods=['GET', 'POST']) def users(): # create a new user data = request.form name = data['name'] bike_id = data['bike_id'] # create a new user and add it to the database new_user = User(name=name, bike_id=bike_id) db.session.add(new_user) db.session.commit() return render_users_and_workouts() @app.route('/user/', methods=['DELETE']) def delete_user(user_id): user = User.query.get(user_id) if user: db.session.delete(user) db.session.commit() return render_users_and_workouts() else: return jsonify({'error': 'User not found.'}), 404 @app.route('/user//workouts', methods=['GET', 'POST']) def workouts(user_id): if request.method == 'GET': workouts_data = get_workouts_for_user(user_id) return render_template('workouts_list.html', workouts=workouts_data) elif request.method == 'POST': user = User.query.get(user_id) app.logger.info(f'Creating workout for user {user.name} ({user.id})') data = request.json cadence_readings = data['cadence_readings'] or [] heart_rate_readings = data['heart_rate_readings'] or [] # create a new workout workout = Workout(user_id=user_id, bike_id=user.bike_id) db.session.add(workout) db.session.commit() app.logger.info( f'Workout({workout.id}) created for user {user.name} ({user.id}) with {len(cadence_readings)} cadence readings and {len(heart_rate_readings)} heart rate readings') # add cadence readings to the workout for c in cadence_readings: cadence_reading = CadenceReading( workout_id=workout.id, created_at=c['timestamp'], rpm=c['rpm'], distance=c['distance'], speed=c['speed'], calories=c['calories'], power=c['power']) db.session.add(cadence_reading) for h in heart_rate_readings: heart_rate_reading = HeartRateReading( workout_id=workout.id, created_at=h['timestamp'], bpm=h['bpm']) db.session.add(heart_rate_reading) if cadence_readings: timestamps = [datetime_converter( c['timestamp']) for c in cadence_readings] start_time = min(timestamps) end_time = max(timestamps) duration = end_time - start_time duration = duration.total_seconds() average_rpm = sum( c['rpm'] for c in cadence_readings) / len(cadence_readings) min_rpm = min( c['rpm'] for c in cadence_readings) max_rpm = max( c['rpm'] for c in cadence_readings) calories = cadence_readings[-1]['calories'] distance = cadence_readings[-1]['distance'] workout.is_cadence_available = True workout.started_at = start_time workout.duration = duration workout.average_rpm = average_rpm workout.min_rpm = min_rpm workout.max_rpm = max_rpm workout.calories = calories workout.distance = distance if heart_rate_readings: average_bpm = sum(h['bpm'] for h in heart_rate_readings) / \ len(heart_rate_readings) min_bpm = min( h['bpm'] for h in heart_rate_readings) max_bpm = max( h['bpm'] for h in heart_rate_readings) workout.is_heart_rate_available = True workout.average_bpm = average_bpm workout.min_bpm = min_bpm workout.max_bpm = max_bpm db.session.commit() return jsonify({'message': 'Workout created successfully.'}), 201 @app.route('/user//workout//', methods=['GET', 'DELETE']) def workout(user_id, workout_id, graph_type): workout = Workout.query.filter_by(user_id=user_id, id=workout_id).join( Workout.cadence_readings).join(Workout.heart_rate_readings).first() if not workout: return jsonify({'message': 'Workout {} not found for user {}.'.format(workout_id, user_id)}), 404 if workout.is_cadence_available: x_values = [reading.created_at for reading in workout.cadence_readings] if graph_type == 'cadence': y_values = [reading.rpm for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Cadence (RPM)', filename='cadence'), 200 elif graph_type == 'speed': y_values = [reading.speed for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Speed (KPH)', filename='speed'), 200 elif graph_type == 'distance': y_values = [ reading.distance for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Distance (KM)', filename='distance'), 200 elif graph_type == 'calories': y_values = [ reading.calories for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Calories (KCAL)', filename='calories'), 200 elif graph_type == 'power': y_values = [reading.power for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Power (WATTS)', filename='power'), 200 if workout.is_heart_rate_available: x_values = [ reading.created_at for reading in workout.heart_rate_readings] y_values = [reading.bpm for reading in workout.heart_rate_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Heart Rate (BPM)', filename='heart_rate'), 200 return jsonify({'message': 'Unable to generate {} for workout {}.'.format(graph_type, workout_id)}), 409 @app.route('/user//workout//view', methods=['GET']) def view_workout(user_id, workout_id): workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first() graph_types = request.args.getlist('graph_types') return render_template('workout_view.html', workout=workout, graph_types=graph_types) @app.route('/user//workout//delete', methods=['DELETE']) def delete_workout(user_id, workout_id): # Delete the workout and its associated cadence readings CadenceReading.query.filter_by(workout_id=workout_id).delete() HeartRateReading.query.filter_by(workout_id=workout_id).delete() Workout.query.filter_by(user_id=user_id, id=workout_id).delete() db.session.commit() return render_users_and_workouts() @app.route('/user//workouts', methods=['GET']) def workouts_for_user(user_id): workouts_data = get_workouts_for_user(user_id) return render_template('workouts_list.html', workouts=workouts_data) @app.route('/user//bike', methods=['GET']) def update_users_bike(user_id): bike_id = request.args.get('bike_id') user = User.query.get(user_id) bike = Bike.query.get(bike_id) if user and bike: user.bike_id = bike_id db.session.commit() return render_users_and_workouts() else: return jsonify({'error': 'User or bike not found.'}), 404 def render_users_and_workouts(): users = User.query.all() users_data = [] for user in users: workouts = get_workouts_for_user(user.id) if not workouts: user_data = { 'id': user.id, 'name': user.name, 'bike_id': user.bike_id, 'workouts_count': 0, 'workouts': [], 'workout_counts_by_week': 0, 'duration_by_week': 0 } users_data.append(user_data) continue workout_counts_by_week = [] duration_by_week = [] # get start date from last workout start_date = workouts[-1]['start_time_date'].date() # get end date from first workout end_date = workouts[0]['start_time_date'].date() # get difference in days between start and end date num_days = (end_date - start_date).days + 1 # calculate number of weeks between start and end date num_weeks = (end_date - start_date).days // 7 + 1 for i in range(num_weeks): # Calculate the start and end of the current week start = start_date + timedelta(days=7*i) end = start + timedelta(days=6) # Get the workouts for the current week weekly_workouts = [w for w in workouts if start <= w['start_time_date'].date() <= end] # Calculate the workout counts and duration for the current week workout_counts_by_week.append(len(weekly_workouts)) duration_by_week.append( int(sum([int(w['duration_minutes']) for w in weekly_workouts]))/len(weekly_workouts)) workout_counts_sparkline = sparklines.sparklines( workout_counts_by_week) duration_sparkline = sparklines.sparklines( [int(w['duration_minutes']) for w in workouts])[0] user_data = { 'id': user.id, 'name': user.name, 'bike_id': user.bike_id, 'workouts_count': len(workouts), 'workouts': workouts, 'workout_counts_by_week': workout_counts_by_week, 'duration_by_week': duration_by_week, 'num_days': num_days, 'workout_counts_sparkline': workout_counts_sparkline, 'duration_sparkline': duration_sparkline } users_data.append(user_data) if htmx: return render_template('users_and_workouts_wrapper.html', users=users_data, bikes=Bike.query.all()) return render_template('users_and_workouts.html', users=users_data, bikes=Bike.query.all()) def get_workouts_for_user(user_id): user = User.query.get(user_id) workouts_data = [] workouts = Workout.query.filter_by(user_id=user_id).order_by( Workout.created_at.desc()).all() for workout in workouts: duration = timedelta( seconds=int(workout.duration)) if workout.duration else timedelta(seconds=0) average_rpm = workout.average_rpm if workout.average_rpm else 0 min_rpm = workout.min_rpm if workout.min_rpm else 0 max_rpm = workout.max_rpm if workout.max_rpm else 0 calories = workout.calories if workout.calories else 0 distance = workout.distance if workout.distance else 0 average_bpm = workout.average_bpm if workout.average_bpm else 0 min_bpm = workout.min_bpm if workout.min_bpm else 0 max_bpm = workout.max_bpm if workout.max_bpm else 0 is_heart_rate_available = workout.is_heart_rate_available is_cadence_available = workout.is_cadence_available start_time = workout.started_at selected_graph_types = ['speed'] if is_heart_rate_available: selected_graph_types.append('heart_rate') if is_cadence_available or is_heart_rate_available: workouts_data.append({ 'id': workout.id, 'user_id': user_id, 'user_name': user.name, 'start_time': format_date_with_ordinal(start_time, '%#H:%M %B %dth %Y'), 'start_time_date': start_time, 'duration': format_duration(duration), 'duration_minutes': duration.total_seconds() / 60, 'average_rpm': int(average_rpm), 'min_rpm': int(min_rpm), 'max_rpm': int(max_rpm), 'calories': int(calories), 'distance': int(distance), 'bike_display_name': workout.bike.display_name, 'selected_graph_types': selected_graph_types, 'is_heart_rate_available': is_heart_rate_available, 'is_cadence_available': is_cadence_available, 'average_bpm': int(average_bpm), 'min_bpm': int(min_bpm), 'max_bpm': int(max_bpm), }) return workouts_data def create_graph(x_values, y_values, y_label, filename, x_label='Time'): fig, ax = plt.subplots() ax.plot(x_values, y_values) ax.set_xlabel(x_label) ax.set_ylabel(y_label) # ax.set_title( # 'Cadence Readings for Workout {}'.format(workout_id)) ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) # set the y-axis limits to start at 0 ax.set_ylim(bottom=0) # Save the graph to a bytes buffer buffer = io.BytesIO() plt.savefig(buffer, format='png', transparent=True, bbox_inches='tight') buffer.seek(0) # Create a response object with the graph image response = make_response(buffer.getvalue()) response.headers['Content-Type'] = 'image/png' response.headers['Content-Disposition'] = 'attachment; filename={filename}.png' return response def format_date_with_ordinal(d, format_string): ordinal = {'1': 'st', '2': 'nd', '3': 'rd'}.get(str(d.day)[-1:], 'th') return d.strftime(format_string).replace('{th}', ordinal) def format_duration(duration): hours, remainder = divmod(duration.seconds, 3600) minutes, _ = divmod(remainder, 60) if duration >= timedelta(hours=1): return f"{hours}h {minutes}m" else: return f"{minutes}m" def datetime_converter(datetime_str: str) -> datetime: return datetime.strptime(datetime_str, "%Y-%m-%dT%H:%M:%S%z") if __name__ == '__main__': # Bind to PORT if defined, otherwise default to 5000. port = int(os.environ.get('PORT', 5000)) app.run(host='127.0.0.1', port=port)