Files
cardio/app.py

444 lines
17 KiB
Python

from sqlalchemy import func
from datetime import datetime, timedelta
import io
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, make_response, render_template, request, jsonify
import jinja_partials
from flask_htmx import HTMX
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import os
import sparklines
app = Flask(__name__)
# TODO CHANGE SECRET KEY TO ENVIRONMENT VARIABLE
app.config['SECRET_KEY'] = 'secret!'
app.config["SQLALCHEMY_ECHO"] = False # True for debugging
uri = os.getenv("DATABASE_URL")
if uri and uri.startswith("postgres://"):
uri = uri.replace("postgres://", "postgresql://", 1)
app.config['SQLALCHEMY_DATABASE_URI'] = uri
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
jinja_partials.register_extensions(app)
htmx = HTMX(app)
db = SQLAlchemy(app)
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), nullable=False)
bike_id = db.Column(db.Integer, db.ForeignKey(
'bikes.id', ondelete='CASCADE'), nullable=False)
workouts = db.relationship('Workout', backref='user', lazy=True)
bike = db.relationship('Bike', backref='user', lazy=True)
class Bike(db.Model):
__tablename__ = 'bikes'
id = db.Column(db.Integer, primary_key=True)
display_name = db.Column(db.String(255), nullable=False)
code_name = db.Column(db.String(255), nullable=False)
class Workout(db.Model):
__tablename__ = 'workouts'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey(
'users.id', ondelete='CASCADE'), nullable=False)
created_at = db.Column(db.DateTime, nullable=False, default=db.func.now())
bike_id = db.Column(db.Integer, db.ForeignKey(
'bikes.id', ondelete='CASCADE'), nullable=False)
started_at = db.Column(db.DateTime, nullable=False)
duration = db.Column(db.Numeric, nullable=False)
average_rpm = db.Column(db.Numeric, nullable=False)
min_rpm = db.Column(db.Integer, nullable=False)
max_rpm = db.Column(db.Integer, nullable=False)
calories = db.Column(db.Numeric, nullable=False)
distance = db.Column(db.Numeric, nullable=False)
average_bpm = db.Column(db.Numeric, nullable=False)
min_bpm = db.Column(db.Integer, nullable=False)
max_bpm = db.Column(db.Integer, nullable=False)
is_heart_rate_available = db.Column(
db.Boolean, nullable=False, default=False)
is_cadence_available = db.Column(db.Boolean, nullable=False, default=False)
cadence_readings = db.relationship(
'CadenceReading', backref='workout', lazy=True)
heart_rate_readings = db.relationship(
'HeartRateReading', backref='workout', lazy=True)
bike = db.relationship('Bike', backref='workouts', lazy=True)
class CadenceReading(db.Model):
__tablename__ = 'cadence_readings'
id = db.Column(db.Integer, primary_key=True)
workout_id = db.Column(db.Integer, db.ForeignKey(
'workouts.id', ondelete='CASCADE'), nullable=False)
created_at = db.Column(db.DateTime, nullable=False)
rpm = db.Column(db.Integer, nullable=False)
distance = db.Column(db.Numeric, nullable=False)
speed = db.Column(db.Numeric, nullable=False)
calories = db.Column(db.Numeric, nullable=False)
power = db.Column(db.Integer, nullable=False)
class HeartRateReading(db.Model):
__tablename__ = 'heartrate_readings'
id = db.Column(db.Integer, primary_key=True)
workout_id = db.Column(db.Integer, db.ForeignKey(
'workouts.id', ondelete='CASCADE'), nullable=False)
created_at = db.Column(db.DateTime, nullable=False)
bpm = db.Column(db.Integer, nullable=False)
@app.route('/', methods=['GET'])
def get_workouts():
return render_users_and_workouts()
@app.route("/user/<int:user_id>/new_workout")
def new_workout(user_id):
user = User.query.get(user_id)
return render_template('new_workout.html', user=user)
@app.route('/users', methods=['GET', 'POST'])
def users():
# create a new user
data = request.form
name = data['name']
bike_id = data['bike_id']
# create a new user and add it to the database
new_user = User(name=name, bike_id=bike_id)
db.session.add(new_user)
db.session.commit()
return render_users_and_workouts()
@app.route('/user/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
user = User.query.get(user_id)
if user:
db.session.delete(user)
db.session.commit()
return render_users_and_workouts()
else:
return jsonify({'error': 'User not found.'}), 404
@app.route('/user/<int:user_id>/workouts', methods=['GET', 'POST'])
def workouts(user_id):
if request.method == 'GET':
workouts_data = get_workouts_for_user(user_id)
return render_template('workouts_list.html', workouts=workouts_data)
elif request.method == 'POST':
user = User.query.get(user_id)
app.logger.info(f'Creating workout for user {user.name} ({user.id})')
data = request.json
cadence_readings = data['cadence_readings'] or []
heart_rate_readings = data['heart_rate_readings'] or []
# create a new workout
workout = Workout(user_id=user_id, bike_id=user.bike_id)
db.session.add(workout)
db.session.commit()
app.logger.info(
f'Workout({workout.id}) created for user {user.name} ({user.id}) with {len(cadence_readings)} cadence readings and {len(heart_rate_readings)} heart rate readings')
# add cadence readings to the workout
for c in cadence_readings:
cadence_reading = CadenceReading(
workout_id=workout.id, created_at=c['timestamp'], rpm=c['rpm'], distance=c['distance'], speed=c['speed'], calories=c['calories'], power=c['power'])
db.session.add(cadence_reading)
for h in heart_rate_readings:
heart_rate_reading = HeartRateReading(
workout_id=workout.id, created_at=h['timestamp'], bpm=h['bpm'])
db.session.add(heart_rate_reading)
if cadence_readings:
start_time = min(
c['timestamp'] for c in cadence_readings)
end_time = max(
c['timestamp'] for c in cadence_readings)
duration = end_time - start_time
duration = duration.total_seconds()
average_rpm = sum(
c['rpm'] for c in cadence_readings) / len(cadence_readings)
min_rpm = min(
c['rpm'] for c in cadence_readings)
max_rpm = max(
c['rpm'] for c in cadence_readings)
calories = cadence_readings[-1]['calories']
distance = cadence_readings[-1]['distance']
workout.is_cadence_available = True
workout.started_at = start_time
workout.duration = duration
workout.average_rpm = average_rpm
workout.min_rpm = min_rpm
workout.max_rpm = max_rpm
workout.calories = calories
workout.distance = distance
if heart_rate_readings:
average_bpm = sum(h['bpm'] for h in heart_rate_readings) / \
len(heart_rate_readings)
min_bpm = min(
h['bpm'] for h in heart_rate_readings)
max_bpm = max(
h['bpm'] for h in heart_rate_readings)
workout.is_heart_rate_available = True
workout.average_bpm = average_bpm
workout.min_bpm = min_bpm
workout.max_bpm = max_bpm
db.session.commit()
return jsonify({'message': 'Workout created successfully.'}), 201
@app.route('/user/<int:user_id>/workout/<int:workout_id>/<string:graph_type>', methods=['GET', 'DELETE'])
def workout(user_id, workout_id, graph_type):
workout = Workout.query.filter_by(user_id=user_id, id=workout_id).join(
Workout.cadence_readings).join(Workout.heart_rate_readings).first()
if not workout:
return jsonify({'message': 'Workout {} not found for user {}.'.format(workout_id, user_id)}), 404
if workout.is_cadence_available:
x_values = [reading.created_at for reading in workout.cadence_readings]
if graph_type == 'cadence':
y_values = [reading.rpm for reading in workout.cadence_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Cadence (RPM)', filename='cadence'), 200
elif graph_type == 'speed':
y_values = [reading.speed for reading in workout.cadence_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Speed (KPH)', filename='speed'), 200
elif graph_type == 'distance':
y_values = [
reading.distance for reading in workout.cadence_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Distance (KM)', filename='distance'), 200
elif graph_type == 'calories':
y_values = [
reading.calories for reading in workout.cadence_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Calories (KCAL)', filename='calories'), 200
elif graph_type == 'power':
y_values = [reading.power for reading in workout.cadence_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Power (WATTS)', filename='power'), 200
if workout.is_heart_rate_available:
x_values = [
reading.created_at for reading in workout.heart_rate_readings]
y_values = [reading.bpm for reading in workout.heart_rate_readings]
return create_graph(x_values=x_values, y_values=y_values, y_label='Heart Rate (BPM)', filename='heart_rate'), 200
return jsonify({'message': 'Unable to generate {} for workout {}.'.format(graph_type, workout_id)}), 409
@app.route('/user/<int:user_id>/workout/<int:workout_id>/view', methods=['GET'])
def view_workout(user_id, workout_id):
workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first()
graph_types = request.args.getlist('graph_types')
return render_template('workout_view.html', workout=workout, graph_types=graph_types)
@app.route('/user/<int:user_id>/workout/<int:workout_id>/delete', methods=['DELETE'])
def delete_workout(user_id, workout_id):
# Delete the workout and its associated cadence readings
CadenceReading.query.filter_by(workout_id=workout_id).delete()
HeartRateReading.query.filter_by(workout_id=workout_id).delete()
Workout.query.filter_by(user_id=user_id, id=workout_id).delete()
db.session.commit()
return render_users_and_workouts()
@app.route('/user/<int:user_id>/workouts', methods=['GET'])
def workouts_for_user(user_id):
workouts_data = get_workouts_for_user(user_id)
return render_template('workouts_list.html', workouts=workouts_data)
@app.route('/user/<int:user_id>/bike', methods=['GET'])
def update_users_bike(user_id):
bike_id = request.args.get('bike_id')
user = User.query.get(user_id)
bike = Bike.query.get(bike_id)
if user and bike:
user.bike_id = bike_id
db.session.commit()
return render_users_and_workouts()
else:
return jsonify({'error': 'User or bike not found.'}), 404
def render_users_and_workouts():
users = User.query.all()
users_data = []
for user in users:
workouts = get_workouts_for_user(user.id)
if not workouts:
user_data = {
'id': user.id,
'name': user.name,
'bike_id': user.bike_id,
'workouts_count': 0,
'workouts': [],
'workout_counts_by_week': 0,
'duration_by_week': 0
}
users_data.append(user_data)
continue
workout_counts_by_week = []
duration_by_week = []
# get start date from last workout
start_date = workouts[-1]['start_time_date'].date()
# get end date from first workout
end_date = workouts[0]['start_time_date'].date()
# get difference in days between start and end date
num_days = (end_date - start_date).days + 1
# calculate number of weeks between start and end date
num_weeks = (end_date - start_date).days // 7 + 1
for i in range(num_weeks):
# Calculate the start and end of the current week
start = start_date + timedelta(days=7*i)
end = start + timedelta(days=6)
# Get the workouts for the current week
weekly_workouts = [w for w in workouts if start <=
w['start_time_date'].date() <= end]
# Calculate the workout counts and duration for the current week
workout_counts_by_week.append(len(weekly_workouts))
duration_by_week.append(
int(sum([int(w['duration_minutes']) for w in weekly_workouts]))/len(weekly_workouts))
workout_counts_sparkline = sparklines.sparklines(
workout_counts_by_week)
duration_sparkline = sparklines.sparklines(
[int(w['duration_minutes']) for w in workouts])[0]
user_data = {
'id': user.id,
'name': user.name,
'bike_id': user.bike_id,
'workouts_count': len(workouts),
'workouts': workouts,
'workout_counts_by_week': workout_counts_by_week,
'duration_by_week': duration_by_week,
'num_days': num_days,
'workout_counts_sparkline': workout_counts_sparkline,
'duration_sparkline': duration_sparkline
}
users_data.append(user_data)
if htmx:
return render_template('users_and_workouts_wrapper.html',
users=users_data, bikes=Bike.query.all())
return render_template('users_and_workouts.html', users=users_data, bikes=Bike.query.all())
def get_workouts_for_user(user_id):
user = User.query.get(user_id)
workouts_data = []
workouts = Workout.query.filter_by(user_id=user_id).order_by(
Workout.created_at.desc()).all()
for workout in workouts:
duration = timedelta(
seconds=int(workout.duration)) if workout.duration else timedelta(seconds=0)
average_rpm = workout.average_rpm if workout.average_rpm else 0
min_rpm = workout.min_rpm if workout.min_rpm else 0
max_rpm = workout.max_rpm if workout.max_rpm else 0
calories = workout.calories if workout.calories else 0
distance = workout.distance if workout.distance else 0
average_bpm = workout.average_bpm if workout.average_bpm else 0
min_bpm = workout.min_bpm if workout.min_bpm else 0
max_bpm = workout.max_bpm if workout.max_bpm else 0
is_heart_rate_available = workout.is_heart_rate_available
is_cadence_available = workout.is_cadence_available
start_time = workout.started_at
selected_graph_types = ['speed']
if is_heart_rate_available:
selected_graph_types.append('heart_rate')
if is_cadence_available or is_heart_rate_available:
workouts_data.append({
'id': workout.id,
'user_id': user_id,
'user_name': user.name,
'start_time': format_date_with_ordinal(start_time, '%#H:%M %B %dth %Y'),
'start_time_date': start_time,
'duration': format_duration(duration),
'duration_minutes': duration.total_seconds() / 60,
'average_rpm': int(average_rpm),
'min_rpm': int(min_rpm),
'max_rpm': int(max_rpm),
'calories': int(calories),
'distance': int(distance),
'bike_display_name': workout.bike.display_name,
'selected_graph_types': selected_graph_types,
'is_heart_rate_available': is_heart_rate_available,
'is_cadence_available': is_cadence_available,
'average_bpm': int(average_bpm),
'min_bpm': int(min_bpm),
'max_bpm': int(max_bpm),
})
return workouts_data
def create_graph(x_values, y_values, y_label, filename, x_label='Time'):
fig, ax = plt.subplots()
ax.plot(x_values, y_values)
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
# ax.set_title(
# 'Cadence Readings for Workout {}'.format(workout_id))
ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
# set the y-axis limits to start at 0
ax.set_ylim(bottom=0)
# Save the graph to a bytes buffer
buffer = io.BytesIO()
plt.savefig(buffer, format='png',
transparent=True, bbox_inches='tight')
buffer.seek(0)
# Create a response object with the graph image
response = make_response(buffer.getvalue())
response.headers['Content-Type'] = 'image/png'
response.headers['Content-Disposition'] = 'attachment; filename={filename}.png'
return response
def format_date_with_ordinal(d, format_string):
ordinal = {'1': 'st', '2': 'nd', '3': 'rd'}.get(str(d.day)[-1:], 'th')
return d.strftime(format_string).replace('{th}', ordinal)
def format_duration(duration):
hours, remainder = divmod(duration.seconds, 3600)
minutes, _ = divmod(remainder, 60)
if duration >= timedelta(hours=1):
return f"{hours}h {minutes}m"
else:
return f"{minutes}m"
if __name__ == '__main__':
# Bind to PORT if defined, otherwise default to 5000.
port = int(os.environ.get('PORT', 5000))
app.run(host='127.0.0.1', port=port)