from flask_basicauth import BasicAuth import matplotlib.dates as mdates import matplotlib.pyplot as plt from dateutil.relativedelta import relativedelta import humanize from dateutil.parser import isoparse import sparklines import os import time from sqlalchemy import func from datetime import datetime, timedelta import io from flask_sqlalchemy import SQLAlchemy from flask import Flask, make_response, render_template, request, jsonify import jinja_partials from flask_htmx import HTMX import matplotlib matplotlib.use('Agg') app = Flask(__name__) # TODO CHANGE SECRET KEY TO ENVIRONMENT VARIABLE app.config['SECRET_KEY'] = 'secret!' app.config["SQLALCHEMY_ECHO"] = False # True for debugging uri = os.getenv("DATABASE_URL") if uri and uri.startswith("postgres://"): uri = uri.replace("postgres://", "postgresql://", 1) app.config['SQLALCHEMY_DATABASE_URI'] = uri app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False jinja_partials.register_extensions(app) htmx = HTMX(app) db = SQLAlchemy(app) app.config['BASIC_AUTH_USERNAME'] = os.getenv("ADMIN_USERNAME") or 'admin' app.config['BASIC_AUTH_PASSWORD'] = os.getenv("ADMIN_PASSWORD") or 'admin' basic_auth = BasicAuth(app) class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(255), nullable=False) bike_id = db.Column(db.Integer, db.ForeignKey( 'bikes.id', ondelete='CASCADE'), nullable=False) workouts = db.relationship('Workout', backref='user', lazy=True) bike = db.relationship('Bike', backref='user', lazy=True) class Bike(db.Model): __tablename__ = 'bikes' id = db.Column(db.Integer, primary_key=True) display_name = db.Column(db.String(255), nullable=False) code_name = db.Column(db.String(255), nullable=False) class Workout(db.Model): __tablename__ = 'workouts' id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey( 'users.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False, default=db.func.now()) bike_id = db.Column(db.Integer, db.ForeignKey( 'bikes.id', ondelete='CASCADE'), nullable=False) started_at = db.Column(db.DateTime, nullable=False) duration = db.Column(db.Numeric, nullable=False) average_rpm = db.Column(db.Numeric, nullable=False) min_rpm = db.Column(db.Integer, nullable=False) max_rpm = db.Column(db.Integer, nullable=False) calories = db.Column(db.Numeric, nullable=False) distance = db.Column(db.Numeric, nullable=False) average_bpm = db.Column(db.Numeric, nullable=False) min_bpm = db.Column(db.Integer, nullable=False) max_bpm = db.Column(db.Integer, nullable=False) is_heart_rate_available = db.Column( db.Boolean, nullable=False, default=False) is_cadence_available = db.Column(db.Boolean, nullable=False, default=False) cadence_readings = db.relationship( 'CadenceReading', backref='workout', lazy=True) heart_rate_readings = db.relationship( 'HeartRateReading', backref='workout', lazy=True) bike = db.relationship('Bike', backref='workouts', lazy=True) class CadenceReading(db.Model): __tablename__ = 'cadence_readings' id = db.Column(db.Integer, primary_key=True) workout_id = db.Column(db.Integer, db.ForeignKey( 'workouts.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False) rpm = db.Column(db.Integer, nullable=False) distance = db.Column(db.Numeric, nullable=False) speed = db.Column(db.Numeric, nullable=False) calories = db.Column(db.Numeric, nullable=False) power = db.Column(db.Integer, nullable=False) class HeartRateReading(db.Model): __tablename__ = 'heartrate_readings' id = db.Column(db.Integer, primary_key=True) workout_id = db.Column(db.Integer, db.ForeignKey( 'workouts.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False) bpm = db.Column(db.Integer, nullable=False) @app.route('/', methods=['GET']) def get_workouts(): return render_users_and_workouts() @app.route("/user//new_workout") def new_workout(user_id): user = User.query.get(user_id) return render_template('new_workout.html', user=user) @app.route('/users', methods=['GET', 'POST']) def users(): # create a new user data = request.form name = data['name'] bike_id = data['bike_id'] # create a new user and add it to the database new_user = User(name=name, bike_id=bike_id) db.session.add(new_user) db.session.commit() return render_users_and_workouts() @app.route('/user/', methods=['DELETE']) @basic_auth.required def delete_user(user_id): user = User.query.get(user_id) if user: db.session.delete(user) db.session.commit() return render_users_and_workouts() else: return jsonify({'error': 'User not found.'}), 404 @app.route('/user//workouts', methods=['GET', 'POST']) def workouts(user_id): user = User.query.get(user_id) if request.method == 'GET': workouts_data = get_workouts_for_user(user, user.workouts) return render_template('workouts_list.html', workouts=workouts_data) elif request.method == 'POST': app.logger.info(f'Creating workout for user {user.name} ({user.id})') data = request.json cadence_readings = data['cadence_readings'] or [] heart_rate_readings = data['heart_rate_readings'] or [] # create a new workout workout = Workout(user_id=user_id, bike_id=user.bike_id) db.session.add(workout) db.session.commit() app.logger.info( f'Workout({workout.id}) created for user {user.name} ({user.id}) with {len(cadence_readings)} cadence readings and {len(heart_rate_readings)} heart rate readings') # add cadence readings to the workout for c in cadence_readings: cadence_reading = CadenceReading( workout_id=workout.id, created_at=c['timestamp'], rpm=c['rpm'], distance=c['distance'], speed=c['speed'], calories=c['calories'], power=c['power']) db.session.add(cadence_reading) for h in heart_rate_readings: heart_rate_reading = HeartRateReading( workout_id=workout.id, created_at=h['timestamp'], bpm=h['bpm']) db.session.add(heart_rate_reading) if cadence_readings: timestamps = [isoparse( c['timestamp']) for c in cadence_readings] start_time = min(timestamps) end_time = max(timestamps) duration = end_time - start_time duration = duration.total_seconds() average_rpm = sum( c['rpm'] for c in cadence_readings) / len(cadence_readings) min_rpm = min( c['rpm'] for c in cadence_readings) max_rpm = max( c['rpm'] for c in cadence_readings) calories = cadence_readings[-1]['calories'] distance = cadence_readings[-1]['distance'] workout.is_cadence_available = True workout.started_at = start_time workout.duration = duration workout.average_rpm = average_rpm workout.min_rpm = min_rpm workout.max_rpm = max_rpm workout.calories = calories workout.distance = distance if heart_rate_readings: bpm = [h['bpm'] for h in heart_rate_readings] average_bpm = sum(bpm) / len(bpm) min_bpm = min(bpm) max_bpm = max(bpm) workout.is_heart_rate_available = True workout.average_bpm = average_bpm workout.min_bpm = min_bpm workout.max_bpm = max_bpm db.session.commit() return jsonify({'message': 'Workout created successfully.'}), 201 @app.route('/user//workout//', methods=['GET']) def workout(user_id, workout_id, graph_type): workout = Workout.query.filter_by(user_id=user_id, id=workout_id).join( Workout.cadence_readings).join(Workout.heart_rate_readings).first() if not workout: return jsonify({'message': 'Workout {} not found for user {}.'.format(workout_id, user_id)}), 404 if workout.is_cadence_available: x_values = [reading.created_at for reading in workout.cadence_readings] if graph_type == 'cadence': y_values = [reading.rpm for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Cadence (RPM)', filename='cadence'), 200 elif graph_type == 'speed': y_values = [reading.speed for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Speed (KPH)', filename='speed'), 200 elif graph_type == 'distance': y_values = [ reading.distance for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Distance (KM)', filename='distance'), 200 elif graph_type == 'calories': y_values = [ reading.calories for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Calories (KCAL)', filename='calories'), 200 elif graph_type == 'power': y_values = [reading.power for reading in workout.cadence_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Power (WATTS)', filename='power'), 200 if workout.is_heart_rate_available: x_values = [ reading.created_at for reading in workout.heart_rate_readings] y_values = [reading.bpm for reading in workout.heart_rate_readings] return create_graph(x_values=x_values, y_values=y_values, y_label='Heart Rate (BPM)', filename='heart_rate'), 200 return jsonify({'message': 'Unable to generate {} for workout {}.'.format(graph_type, workout_id)}), 409 @app.route('/user//workout//view', methods=['GET']) def view_workout(user_id, workout_id): workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first() graph_types = request.args.getlist('graph_types') return render_template('workout_view.html', workout=workout, graph_types=graph_types) @app.route('/user//workout//delete', methods=['DELETE']) @basic_auth.required def delete_workout(user_id, workout_id): # Delete the workout and its associated cadence readings CadenceReading.query.filter_by(workout_id=workout_id).delete() HeartRateReading.query.filter_by(workout_id=workout_id).delete() Workout.query.filter_by(user_id=user_id, id=workout_id).delete() db.session.commit() return render_users_and_workouts() @app.route('/user//workouts', methods=['GET']) def workouts_for_user(user_id): user = User.query.get(user_id) workouts_data = get_workouts_for_user(user, user.workouts) return render_template('workouts_list.html', workouts=workouts_data) @app.route('/user//bike', methods=['GET']) def update_users_bike(user_id): bike_id = request.args.get('bike_id') user = User.query.get(user_id) bike = Bike.query.get(bike_id) if user and bike: user.bike_id = bike_id db.session.commit() return render_users_and_workouts() else: return jsonify({'error': 'User or bike not found.'}), 404 @ app.route('/user//calendar', methods=['GET']) def calendar_view(user_id): user = User.query.get(user_id) workouts = get_workouts_for_user(user, user.workouts) date = request.args.get('date', default=datetime.now().date(), type=toDate) calendar_month = generate_calendar_monthly_view(workouts, date) return render_template('partials/calendar.html', calendar_month=calendar_month, user_id=user_id) @ app.route("/user//workout/list", methods=['GET']) def workout_list(user_id): user = User.query.get(user_id) workouts = get_workouts_for_user(user, user.workouts) return render_template('partials/workouts_list_fragment.html', workouts=workouts, user_id=user_id, workouts_all_loaded=True) @ app.route("/user//workout//calendar_view", methods=['GET']) def calendar_workout_view(user_id, workout_id): workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first() user = User.query.get(user_id) workout_view_data = get_workout_view_data(workout, user) return render_template('partials/selected_workout_view.html', workout=workout_view_data, user_id=user_id) # Utilities def generate_user_data(user, workouts=[]): """ Generate data for a single user. Parameters: - user: The user object. - workouts: List of workouts for the user. Returns: - dict: A dictionary containing user data and related information. """ return { 'id': user.id, 'name': user.name, 'bike_id': user.bike_id, 'workouts_count': len(workouts), 'workouts': workouts, 'daily_duration_sparkline': generate_daily_duration_sparkline(workouts), 'calendar_month': generate_calendar_monthly_view(workouts, datetime.now().date()) } def render_users_and_workouts(): """ Render users and their associated workouts. """ users = User.query.all() users_data = [generate_user_data(user, get_workouts_for_user( user, user.workouts)) for user in users] template_name = 'users_and_workouts_wrapper.html' if htmx else 'users_and_workouts.html' return render_template(template_name, users=users_data, bikes=Bike.query.all()) def get_workouts_for_user(user, workouts): """ Retrieve view data for all workouts of a user. Parameters: - user: The user object. - workouts: A list of all user workouts. Returns: - list: A list of view data for valid workouts. """ return [get_workout_view_data(workout, user) for workout in workouts if get_workout_view_data(workout, user)] def format_workout_data(workout, user): """ Formats the workout data for view. Parameters: - workout: The workout object. - user: The user associated with the workout. Returns: - dict: A dictionary with formatted workout data. """ duration = timedelta(seconds=int(workout.duration or 0)) return { 'id': workout.id, 'user_id': user.id, 'user_name': user.name, 'start_time': format_date_with_ordinal(workout.started_at, '%#H:%M %B %dth %Y'), 'start_time_date': workout.started_at, 'start_time_ago': humanize.naturaltime(workout.started_at), 'duration': humanize.naturaldelta(duration), 'duration_minutes': duration.total_seconds() / 60, 'average_rpm': int(workout.average_rpm or 0), 'min_rpm': int(workout.min_rpm or 0), 'max_rpm': int(workout.max_rpm or 0), 'calories': int(workout.calories or 0), 'distance': int(workout.distance or 0), 'bike_display_name': workout.bike.display_name, 'selected_graph_types': ['speed'] + (['heart_rate'] if workout.is_heart_rate_available else []), 'is_heart_rate_available': workout.is_heart_rate_available, 'is_cadence_available': workout.is_cadence_available, 'average_bpm': int(workout.average_bpm or 0), 'min_bpm': int(workout.min_bpm or 0), 'max_bpm': int(workout.max_bpm or 0), } def get_workout_view_data(workout, user): """ Retrieve view data for a single workout. Parameters: - workout: The workout object. - user: The user associated with the workout. Returns: - dict or None: A dictionary containing view data if cadence or heart rate is available, otherwise None. """ if workout.is_cadence_available or workout.is_heart_rate_available: return format_workout_data(workout, user) return None def create_graph(x_values, y_values, y_label, filename, x_label='Time'): # Plotting fig, ax = plt.subplots() ax.plot(x_values, y_values) ax.set_xlabel(x_label) ax.set_ylabel(y_label) ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) ax.set_ylim(bottom=0) # Save the graph to a bytes buffer buffer = io.BytesIO() plt.savefig(buffer, format='png', transparent=True, bbox_inches='tight') buffer.seek(0) # Create a response object with the graph image response = make_response(buffer.getvalue()) response.headers['Content-Type'] = 'image/png' response.headers['Content-Disposition'] = 'attachment; filename={filename}.png' return response def format_date_with_ordinal(d, format_string): ordinal = {'1': 'st', '2': 'nd', '3': 'rd'}.get(str(d.day)[-1:], 'th') return d.strftime(format_string).replace('{th}', ordinal) def format_duration(duration): hours, remainder = divmod(duration.seconds, 3600) minutes, _ = divmod(remainder, 60) if duration >= timedelta(hours=1): return f"{hours}h {minutes}m" else: return f"{minutes}m" def date_range(start_date, end_date): """ Generator for dates between two dates (inclusive). """ current_date = start_date while current_date <= end_date: yield current_date current_date += timedelta(days=1) def get_month_bounds(dt): """ Determine the bounds of a month for a given date. This considers the starting and ending weekdays to fit a calendar view. """ first_day_of_month = dt.replace(day=1) next_month = first_day_of_month + relativedelta(months=1) last_day_of_month = next_month - timedelta(days=1) # Define weekday mappings to determine start and end bounds weekday_to_start_offset = {6: 0, 0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6} weekday_to_end_offset = {6: 6, 0: 5, 1: 4, 2: 3, 3: 2, 4: 1, 5: 0} start_date = first_day_of_month - \ timedelta(days=weekday_to_start_offset[first_day_of_month.weekday()]) end_date = last_day_of_month + \ timedelta(days=weekday_to_end_offset[last_day_of_month.weekday()]) return start_date, end_date def generate_calendar_monthly_view(workouts, selected_date): """ Generate a monthly calendar view of the workouts. """ def get_workout_for_date(workout_lookup, date): return workout_lookup.get(date) start_date, end_date = get_month_bounds(selected_date) # Build a lookup dictionary for faster access workout_lookup = {w['start_time_date'].date( ): w for w in workouts if start_date <= w['start_time_date'].date() <= end_date} current_date = datetime.now().date() days_of_month = [ { 'date': day, 'day_of_month': day.day, 'is_workout': day in workout_lookup, 'workout': get_workout_for_date(workout_lookup, day), 'is_current_date': day == current_date, 'is_current_month': day.month == selected_date.month and day.year == selected_date.year } for day in date_range(start_date, end_date) ] next_month_date = selected_date + relativedelta(months=1) previous_month_date = selected_date - relativedelta(months=1) calendar_month = { 'month_year': selected_date.strftime('%B, %Y'), 'days_of_month': days_of_month, 'next_month': next_month_date, 'previous_month': previous_month_date, } return calendar_month def generate_daily_duration_sparkline(workouts): """ Generate a sparkline string representation of daily workout durations. Parameters: - workouts (list of dict): Each dict should contain 'start_time_date' and 'duration_minutes' keys. Returns: - str: Sparkline representation of daily durations. """ if not workouts: return '' # Determine date range based on workouts data start_date = workouts[-1]['start_time_date'].date() end_date = workouts[0]['start_time_date'].date() # Build a mapping of dates to their respective durations for easier lookup workouts_by_date = {w['start_time_date'].date(): int( w['duration_minutes']) for w in workouts} daily_durations = [workouts_by_date.get( date, 0) for date in date_range(start_date, end_date)] # Reverse the list to make the most recent day appear on the right daily_durations.reverse() return sparklines.sparklines(daily_durations)[0] def toDate(dateString): return datetime.strptime(dateString, "%Y-%m-%d").date() if __name__ == '__main__': # Bind to PORT if defined, otherwise default to 5000. port = int(os.environ.get('PORT', 5000)) app.run(host='127.0.0.1', port=port)