Files
cardio/app.py
2023-10-20 18:45:21 +11:00

617 lines
22 KiB
Python

from sqlalchemy import desc
from flask_basicauth import BasicAuth
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
from dateutil.relativedelta import relativedelta
import humanize
from dateutil.parser import isoparse
import sparklines
import os
from datetime import datetime, timedelta
import io
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, make_response, render_template, request, jsonify
import jinja_partials
from flask_htmx import HTMX
import matplotlib
matplotlib.use('Agg')
app = Flask(__name__)
# TODO CHANGE SECRET KEY TO ENVIRONMENT VARIABLE
app.config['SECRET_KEY'] = 'secret!'
app.config["SQLALCHEMY_ECHO"] = False # True for debugging
uri = os.getenv("DATABASE_URL")
if uri and uri.startswith("postgres://"):
uri = uri.replace("postgres://", "postgresql://", 1)
app.config['SQLALCHEMY_DATABASE_URI'] = uri
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
jinja_partials.register_extensions(app)
htmx = HTMX(app)
db = SQLAlchemy(app)
app.config['BASIC_AUTH_USERNAME'] = os.getenv("ADMIN_USERNAME") or 'admin'
app.config['BASIC_AUTH_PASSWORD'] = os.getenv("ADMIN_PASSWORD") or 'admin'
basic_auth = BasicAuth(app)
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), nullable=False)
bike_id = db.Column(db.Integer, db.ForeignKey(
'bikes.id', ondelete='CASCADE'), nullable=False)
workouts = db.relationship(
'Workout', backref='user', lazy=True, order_by='desc(Workout.created_at)') # 'Workout.created_at'
bike = db.relationship('Bike', backref='user', lazy=True)
class Bike(db.Model):
__tablename__ = 'bikes'
id = db.Column(db.Integer, primary_key=True)
display_name = db.Column(db.String(255), nullable=False)
code_name = db.Column(db.String(255), nullable=False)
class Workout(db.Model):
__tablename__ = 'workouts'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey(
'users.id', ondelete='CASCADE'), nullable=False)
created_at = db.Column(db.DateTime, nullable=False, default=db.func.now())
bike_id = db.Column(db.Integer, db.ForeignKey(
'bikes.id', ondelete='CASCADE'), nullable=False)
started_at = db.Column(db.DateTime, nullable=False)
duration = db.Column(db.Numeric, nullable=False)
average_rpm = db.Column(db.Numeric, nullable=False)
min_rpm = db.Column(db.Integer, nullable=False)
max_rpm = db.Column(db.Integer, nullable=False)
calories = db.Column(db.Numeric, nullable=False)
distance = db.Column(db.Numeric, nullable=False)
average_bpm = db.Column(db.Numeric, nullable=False)
min_bpm = db.Column(db.Integer, nullable=False)
max_bpm = db.Column(db.Integer, nullable=False)
is_heart_rate_available = db.Column(
db.Boolean, nullable=False, default=False)
is_cadence_available = db.Column(db.Boolean, nullable=False, default=False)
cadence_readings = db.relationship(
'CadenceReading', backref='workout', lazy=True)
heart_rate_readings = db.relationship(
'HeartRateReading', backref='workout', lazy=True)
bike = db.relationship('Bike', backref='workouts', lazy=True)
class CadenceReading(db.Model):
__tablename__ = 'cadence_readings'
id = db.Column(db.Integer, primary_key=True)
workout_id = db.Column(db.Integer, db.ForeignKey(
'workouts.id', ondelete='CASCADE'), nullable=False)
created_at = db.Column(db.DateTime, nullable=False)
rpm = db.Column(db.Integer, nullable=False)
distance = db.Column(db.Numeric, nullable=False)
speed = db.Column(db.Numeric, nullable=False)
calories = db.Column(db.Numeric, nullable=False)
power = db.Column(db.Integer, nullable=False)
class HeartRateReading(db.Model):
__tablename__ = 'heartrate_readings'
id = db.Column(db.Integer, primary_key=True)
workout_id = db.Column(db.Integer, db.ForeignKey(
'workouts.id', ondelete='CASCADE'), nullable=False)
created_at = db.Column(db.DateTime, nullable=False)
bpm = db.Column(db.Integer, nullable=False)
@app.route('/', methods=['GET'])
def overview():
return render_users_and_workouts()
@app.route("/user/<int:user_id>/new_workout")
def new_workout(user_id):
user = User.query.get(user_id)
return render_template('new_workout.html', user=user)
@app.route('/users', methods=['GET', 'POST'])
def users():
# create a new user
data = request.form
name = data['name']
bike_id = data['bike_id']
# create a new user and add it to the database
new_user = User(name=name, bike_id=bike_id)
db.session.add(new_user)
db.session.commit()
return render_users_and_workouts()
@app.route('/user/<int:user_id>', methods=['DELETE'])
@basic_auth.required
def delete_user(user_id):
user = User.query.get(user_id)
if user:
db.session.delete(user)
db.session.commit()
return render_users_and_workouts()
else:
return jsonify({'error': 'User not found.'}), 404
@app.route('/user/<int:user_id>/workouts', methods=['GET'])
def get_workouts(user_id):
user = User.query.get(user_id)
workouts_data = get_workouts_for_user_view_data(user)
return render_template('workouts_list.html', workouts=workouts_data)
@app.route('/user/<int:user_id>/workouts', methods=['POST'])
def create_workout(user_id):
user = User.query.get(user_id)
app.logger.info(f'Creating workout for user {user.name} ({user.id})')
data = request.json
cadence_readings = data.get('cadence_readings', [])
heart_rate_readings = data.get('heart_rate_readings', [])
# Create a new workout
workout = Workout(user_id=user_id, bike_id=user.bike_id)
db.session.add(workout)
db.session.flush() # To get the workout.id before committing
# Add cadence readings to the workout
cadences = [{
'workout_id': workout.id,
'created_at': c['timestamp'],
'rpm': c['rpm'],
'distance': c['distance'],
'speed': c['speed'],
'calories': c['calories'],
'power': c['power']
} for c in cadence_readings]
heart_rates = [{
'workout_id': workout.id,
'created_at': h['timestamp'],
'bpm': h['bpm']
} for h in heart_rate_readings]
db.session.bulk_insert_mappings(CadenceReading, cadences)
db.session.bulk_insert_mappings(HeartRateReading, heart_rates)
if cadence_readings:
timestamps = [isoparse(c['timestamp']) for c in cadence_readings]
rpms = [c['rpm'] for c in cadence_readings]
workout.is_cadence_available = True
workout.started_at = min(timestamps)
workout.duration = (
max(timestamps) - workout.started_at).total_seconds()
workout.average_rpm = sum(rpms) / len(rpms)
workout.min_rpm = min(rpms)
workout.max_rpm = max(rpms)
workout.calories = cadence_readings[-1]['calories']
workout.distance = cadence_readings[-1]['distance']
if heart_rate_readings:
bpms = [h['bpm'] for h in heart_rate_readings]
workout.is_heart_rate_available = True
workout.average_bpm = sum(bpms) / len(bpms)
workout.min_bpm = min(bpms)
workout.max_bpm = max(bpms)
db.session.commit()
return f'Added {humanize.naturaldelta(workout.duration)} session.', 201
"""
@app.route('/user/<int:user_id>/workouts', methods=['GET', 'POST'])
def workouts(user_id):
user = User.query.get(user_id)
if request.method == 'GET':
workouts_data = get_workouts_for_user_view_data(user)
return render_template('workouts_list.html', workouts=workouts_data)
elif request.method == 'POST':
app.logger.info(f'Creating workout for user {user.name} ({user.id})')
data = request.json
cadence_readings = data['cadence_readings'] or []
heart_rate_readings = data['heart_rate_readings'] or []
# create a new workout
workout = Workout(user_id=user_id, bike_id=user.bike_id)
db.session.add(workout)
db.session.commit()
app.logger.info(
f'Workout({workout.id}) created for user {user.name} ({user.id}) with {len(cadence_readings)} cadence readings and {len(heart_rate_readings)} heart rate readings')
# add cadence readings to the workout
for c in cadence_readings:
cadence_reading = CadenceReading(
workout_id=workout.id, created_at=c['timestamp'], rpm=c['rpm'], distance=c['distance'], speed=c['speed'], calories=c['calories'], power=c['power'])
db.session.add(cadence_reading)
for h in heart_rate_readings:
heart_rate_reading = HeartRateReading(
workout_id=workout.id, created_at=h['timestamp'], bpm=h['bpm'])
db.session.add(heart_rate_reading)
if cadence_readings:
timestamps = [isoparse(
c['timestamp']) for c in cadence_readings]
start_time = min(timestamps)
end_time = max(timestamps)
duration = end_time - start_time
duration = duration.total_seconds()
average_rpm = sum(
c['rpm'] for c in cadence_readings) / len(cadence_readings)
min_rpm = min(
c['rpm'] for c in cadence_readings)
max_rpm = max(
c['rpm'] for c in cadence_readings)
calories = cadence_readings[-1]['calories']
distance = cadence_readings[-1]['distance']
workout.is_cadence_available = True
workout.started_at = start_time
workout.duration = duration
workout.average_rpm = average_rpm
workout.min_rpm = min_rpm
workout.max_rpm = max_rpm
workout.calories = calories
workout.distance = distance
if heart_rate_readings:
bpm = [h['bpm'] for h in heart_rate_readings]
average_bpm = sum(bpm) / len(bpm)
min_bpm = min(bpm)
max_bpm = max(bpm)
workout.is_heart_rate_available = True
workout.average_bpm = average_bpm
workout.min_bpm = min_bpm
workout.max_bpm = max_bpm
db.session.commit()
return jsonify({'message': 'Workout created successfully.'}), 201
"""
@app.route('/user/<int:user_id>/workout/<int:workout_id>/<string:graph_type>', methods=['GET'])
def workout(user_id, workout_id, graph_type):
workout = Workout.query.filter_by(user_id=user_id, id=workout_id).join(
Workout.cadence_readings).join(Workout.heart_rate_readings).first()
if not workout:
return jsonify({'message': 'Workout {} not found for user {}.'.format(workout_id, user_id)}), 404
if workout.is_cadence_available:
x_values = [reading.created_at for reading in workout.cadence_readings]
if graph_type == 'cadence':
y_values = [reading.rpm for reading in workout.cadence_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Cadence (RPM)', filename='cadence'), 200
elif graph_type == 'speed':
y_values = [reading.speed for reading in workout.cadence_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Speed (KPH)', filename='speed'), 200
elif graph_type == 'distance':
y_values = [
reading.distance for reading in workout.cadence_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Distance (KM)', filename='distance'), 200
elif graph_type == 'calories':
y_values = [
reading.calories for reading in workout.cadence_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Calories (KCAL)', filename='calories'), 200
elif graph_type == 'power':
y_values = [reading.power for reading in workout.cadence_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Power (WATTS)', filename='power'), 200
if workout.is_heart_rate_available:
x_values = [
reading.created_at for reading in workout.heart_rate_readings]
y_values = [reading.bpm for reading in workout.heart_rate_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Heart Rate (BPM)', filename='heart_rate'), 200
return jsonify({'message': 'Unable to generate {} for workout {}.'.format(graph_type, workout_id)}), 409
@app.route('/user/<int:user_id>/workout/<int:workout_id>/view', methods=['GET'])
def view_workout(user_id, workout_id):
workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first()
graph_types = request.args.getlist('graph_types')
return render_template('workout_view.html', workout=workout, graph_types=graph_types)
@app.route('/user/<int:user_id>/workout/<int:workout_id>/delete', methods=['DELETE'])
@basic_auth.required
def delete_workout(user_id, workout_id):
# Delete the workout and its associated cadence readings
CadenceReading.query.filter_by(workout_id=workout_id).delete()
HeartRateReading.query.filter_by(workout_id=workout_id).delete()
Workout.query.filter_by(user_id=user_id, id=workout_id).delete()
db.session.commit()
return render_users_and_workouts()
@app.route('/user/<int:user_id>/bike', methods=['GET'])
def update_users_bike(user_id):
bike_id = request.args.get('bike_id')
user = User.query.get(user_id)
bike = Bike.query.get(bike_id)
if user and bike:
user.bike_id = bike_id
db.session.commit()
return render_users_and_workouts()
else:
return jsonify({'error': 'User or bike not found.'}), 404
@ app.route('/user/<int:user_id>/calendar', methods=['GET'])
def calendar_view(user_id):
user = User.query.get(user_id)
workouts = get_workouts_for_user_view_data(user)
date = request.args.get('date', default=datetime.now().date(), type=toDate)
calendar_month = generate_calendar_monthly_view(workouts, date)
return render_template('partials/calendar.html', calendar_month=calendar_month, user_id=user_id)
@ app.route("/user/<int:user_id>/workout/list", methods=['GET'])
def workout_list(user_id):
user = User.query.get(user_id)
workouts = get_workouts_for_user_view_data(user)
return render_template('partials/workouts_list_fragment.html', workouts=workouts, user_id=user_id, workouts_all_loaded=True)
@ app.route("/user/<int:user_id>/workout/<int:workout_id>/calendar_view", methods=['GET'])
def calendar_workout_view(user_id, workout_id):
workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first()
user = User.query.get(user_id)
workout_view_data = get_workout_view_data(workout, user)
return render_template('partials/selected_workout_view.html', workout=workout_view_data, user_id=user_id)
# Utilities
def generate_user_data(user, workouts=[]):
"""
Generate data for a single user.
Parameters:
- user: The user object.
- workouts: List of workouts for the user.
Returns:
- dict: A dictionary containing user data and related information.
"""
return {
'id': user.id,
'name': user.name,
'bike_id': user.bike_id,
'workouts_count': len(workouts),
'workouts': workouts,
'daily_duration_sparkline': generate_daily_duration_sparkline(workouts),
'calendar_month': generate_calendar_monthly_view(workouts, datetime.now().date())
}
def render_users_and_workouts():
"""
Render users and their associated workouts.
"""
users = User.query.all()
users_data = [generate_user_data(user, get_workouts_for_user_view_data(
user)) for user in users]
template_name = 'users_and_workouts_wrapper.html' if htmx else 'users_and_workouts.html'
return render_template(template_name, users=users_data, bikes=Bike.query.all())
def get_workouts_for_user_view_data(user):
"""
Retrieve view data for all workouts of a user.
Parameters:
- user: The user object.
Returns:
- list: A list of view data for valid workouts.
"""
return [get_workout_view_data(workout, user) for workout in user.workouts if get_workout_view_data(workout, user)]
def format_workout_data(workout, user):
"""
Formats the workout data for view.
Parameters:
- workout: The workout object.
- user: The user associated with the workout.
Returns:
- dict: A dictionary with formatted workout data.
"""
duration = timedelta(seconds=int(workout.duration or 0))
return {
'id': workout.id,
'user_id': user.id,
'user_name': user.name,
'start_time': format_date_with_ordinal(workout.started_at, '%#H:%M %B %dth %Y'),
'start_time_date': workout.started_at,
'start_time_ago': humanize.naturaltime(workout.started_at),
'duration': humanize.naturaldelta(duration),
'duration_minutes': duration.total_seconds() / 60,
'average_rpm': int(workout.average_rpm or 0),
'min_rpm': int(workout.min_rpm or 0),
'max_rpm': int(workout.max_rpm or 0),
'calories': int(workout.calories or 0),
'distance': int(workout.distance or 0),
'bike_display_name': workout.bike.display_name,
'selected_graph_types': ['speed'] + (['heart_rate'] if workout.is_heart_rate_available else []),
'is_heart_rate_available': workout.is_heart_rate_available,
'is_cadence_available': workout.is_cadence_available,
'average_bpm': int(workout.average_bpm or 0),
'min_bpm': int(workout.min_bpm or 0),
'max_bpm': int(workout.max_bpm or 0),
}
def get_workout_view_data(workout, user):
"""
Retrieve view data for a single workout.
Parameters:
- workout: The workout object.
- user: The user associated with the workout.
Returns:
- dict or None: A dictionary containing view data if cadence or heart rate is available, otherwise None.
"""
if workout.is_cadence_available or workout.is_heart_rate_available:
return format_workout_data(workout, user)
return None
def create_graph(x_values, y_values, y_label, filename, x_label='Time'):
# Plotting
fig, ax = plt.subplots()
ax.plot(x_values, y_values)
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
ax.set_ylim(bottom=0)
# Save the graph to a bytes buffer
buffer = io.BytesIO()
plt.savefig(buffer, format='png',
transparent=True, bbox_inches='tight')
buffer.seek(0)
# Create a response object with the graph image
response = make_response(buffer.getvalue())
response.headers['Content-Type'] = 'image/png'
response.headers['Content-Disposition'] = 'attachment; filename={filename}.png'
return response
def format_date_with_ordinal(d, format_string):
ordinal = {'1': 'st', '2': 'nd', '3': 'rd'}.get(str(d.day)[-1:], 'th')
return d.strftime(format_string).replace('{th}', ordinal)
def date_range(start_date, end_date):
"""
Generator for dates between two dates (inclusive).
"""
current_date = start_date
while current_date <= end_date:
yield current_date
current_date += timedelta(days=1)
def get_month_bounds(dt):
"""
Determine the bounds of a month for a given date.
This considers the starting and ending weekdays to fit a calendar view.
"""
first_day_of_month = dt.replace(day=1)
next_month = first_day_of_month + relativedelta(months=1)
last_day_of_month = next_month - timedelta(days=1)
# Define weekday mappings to determine start and end bounds
weekday_to_start_offset = {6: 0, 0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6}
weekday_to_end_offset = {6: 6, 0: 5, 1: 4, 2: 3, 3: 2, 4: 1, 5: 0}
start_date = first_day_of_month - \
timedelta(days=weekday_to_start_offset[first_day_of_month.weekday()])
end_date = last_day_of_month + \
timedelta(days=weekday_to_end_offset[last_day_of_month.weekday()])
return start_date, end_date
def generate_calendar_monthly_view(workouts, selected_date):
"""
Generate a monthly calendar view of the workouts.
"""
start_date, end_date = get_month_bounds(selected_date)
# Build a lookup dictionary for faster access
workout_lookup = {w['start_time_date'].date(
): w for w in workouts if start_date <= w['start_time_date'].date() <= end_date}
current_date = datetime.now().date()
days_of_month = [
{
'date': day,
'day_of_month': day.day,
'is_workout': day in workout_lookup,
'workout': workout_lookup.get(day),
'is_current_date': day == current_date,
'is_current_month': day.month == selected_date.month and day.year == selected_date.year
}
for day in date_range(start_date, end_date)
]
next_month_date = selected_date + relativedelta(months=1)
previous_month_date = selected_date - relativedelta(months=1)
calendar_month = {
'month_year': selected_date.strftime('%B, %Y'),
'days_of_month': days_of_month,
'next_month': next_month_date,
'previous_month': previous_month_date,
}
return calendar_month
def generate_daily_duration_sparkline(workouts):
"""
Generate a sparkline string representation of daily workout durations.
Parameters:
- workouts (list of dict): Each dict should contain 'start_time_date' and 'duration_minutes' keys.
Returns:
- str: Sparkline representation of daily durations.
"""
if not workouts:
return ''
# Determine date range based on workouts data
start_date = workouts[-1]['start_time_date'].date()
end_date = workouts[0]['start_time_date'].date()
# Build a mapping of dates to their respective durations for easier lookup
workouts_by_date = {w['start_time_date'].date(): int(
w['duration_minutes']) for w in workouts}
daily_durations = [workouts_by_date.get(
date, 0) for date in date_range(start_date, end_date)]
# Reverse the list to make the most recent day appear on the right
daily_durations.reverse()
return sparklines.sparklines(daily_durations)[0]
def toDate(dateString):
return datetime.strptime(dateString, "%Y-%m-%d").date()
if __name__ == '__main__':
# Bind to PORT if defined, otherwise default to 5000.
port = int(os.environ.get('PORT', 5000))
app.run(host='127.0.0.1', port=port)