from datetime import timedelta import io from flask_sqlalchemy import SQLAlchemy from flask import Flask, make_response, render_template, request, jsonify import jinja_partials from flask_htmx import HTMX import matplotlib.pyplot as plt import matplotlib.dates as mdates import os app = Flask(__name__) # TODO CHANGE SECRET KEY TO ENVIRONMENT VARIABLE app.config['SECRET_KEY'] = 'secret!' uri = os.getenv("DATABASE_URL") if uri and uri.startswith("postgres://"): uri = uri.replace("postgres://", "postgresql://", 1) app.config['SQLALCHEMY_DATABASE_URI'] = uri app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False jinja_partials.register_extensions(app) htmx = HTMX(app) db = SQLAlchemy(app) class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(255), nullable=False) workouts = db.relationship('Workout', backref='user', lazy=True) class Workout(db.Model): __tablename__ = 'workouts' id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey( 'users.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False, default=db.func.now()) cadence_readings = db.relationship( 'CadenceReading', backref='workout', lazy=True) class CadenceReading(db.Model): __tablename__ = 'cadence_readings' id = db.Column(db.Integer, primary_key=True) workout_id = db.Column(db.Integer, db.ForeignKey( 'workouts.id', ondelete='CASCADE'), nullable=False) created_at = db.Column(db.DateTime, nullable=False) rpm = db.Column(db.Integer, nullable=False) @app.route("/") def home(): return render_template('attemptv2.html') @app.route('/users', methods=['GET', 'POST']) def users(): if request.method == 'GET': # get a list of all users in the database users = User.query.all() users_list = [{'id': user.id, 'name': user.name, 'workouts': len(user.workouts)} for user in users] return jsonify(users_list), 200 elif request.method == 'POST': # create a new user data = request.json name = data['name'] # create a new user and add it to the database new_user = User(name=name) db.session.add(new_user) db.session.commit() return jsonify({'message': 'User created successfully.'}), 201 @app.route('/user/', methods=['DELETE']) def delete_user(user_id): user = User.query.get(user_id) if user: db.session.delete(user) db.session.commit() return jsonify({'message': 'User deleted successfully.'}), 200 else: return jsonify({'error': 'User not found.'}), 404 @app.route('/user//workouts', methods=['GET', 'POST']) def create_workout(user_id): if request.method == 'GET': # get a list of all workouts for a user workouts = Workout.query.filter_by(user_id=user_id).all() workouts_data = [] for workout in workouts: cadence_readings = CadenceReading.query.filter_by( workout_id=workout.id).all() if cadence_readings: # get the earliest and latest cadence readings timestamps start_time = min( reading.created_at for reading in cadence_readings) end_time = max( reading.created_at for reading in cadence_readings) duration = end_time - start_time # format the duration as hh:mm:ss or mm:ss or ss if duration >= timedelta(hours=1): duration_str = str(duration) else: duration_str = str(duration).split('.')[0] # calculate average and maximum rpm rpms = [reading.rpm for reading in cadence_readings] avg_rpm = sum(rpms) / len(rpms) max_rpm = max(rpms) workouts_data.append({ 'id': workout.id, 'started_at': start_time.strftime('%a %b %d %Y %H:%M:%S'), 'finished_at': end_time.strftime('%a %b %d %Y %H:%M:%S'), 'duration': duration_str, 'avg_rpm': int(avg_rpm), 'max_rpm': max_rpm }) return jsonify({'workouts': workouts_data}), 200 elif request.method == 'POST': data = request.json rpm_readings = data['workout'] # create a new workout workout = Workout(user_id=user_id) db.session.add(workout) db.session.commit() # add cadence readings to the workout for reading in rpm_readings: cadence_reading = CadenceReading( workout_id=workout.id, created_at=reading['timestamp'], rpm=reading['rpm']) db.session.add(cadence_reading) db.session.commit() return jsonify({'message': 'Workout created successfully.'}), 201 @app.route('/user//workout/', methods=['GET', 'DELETE']) def workout(user_id, workout_id): workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first() if workout: if request.method == 'GET': # Get the cadence readings for the workout cadence_readings = CadenceReading.query.filter_by( workout_id=workout_id).all() if cadence_readings: # Create a graph of cadence readings x_values = [reading.created_at for reading in cadence_readings] y_values = [reading.rpm for reading in cadence_readings] fig, ax = plt.subplots() ax.plot(x_values, y_values) ax.set_xlabel('Time') ax.set_ylabel('Cadence (RPM)') # ax.set_title( # 'Cadence Readings for Workout {}'.format(workout_id)) # Hide X and Y axes label marks # ax.xaxis.set_tick_params(labelbottom=False) # ax.yaxis.set_tick_params(labelleft=False) # Hide X and Y axes tick marks # ax.set_xticks([]) # ax.set_yticks([]) # use formatters to specify major and minor ticks # format date as hh:mm ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) # ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m")) # ax.xaxis.set_minor_formatter(mdates.DateFormatter("%Y-%m")) # set the y-axis limits to start at 0 ax.set_ylim(bottom=0) # Save the graph to a bytes buffer buffer = io.BytesIO() plt.savefig(buffer, format='png', transparent=True, bbox_inches='tight') buffer.seek(0) # Create a response object with the graph image response = make_response(buffer.getvalue()) response.headers['Content-Type'] = 'image/png' response.headers['Content-Disposition'] = 'attachment; filename=cadence.png' return response, 200 else: return jsonify({'message': 'No cadence readings for workout {}.'.format(workout_id)}), 404 elif request.method == 'DELETE': # Delete the workout and its associated cadence readings db.session.delete(workout) CadenceReading.query.filter_by(workout_id=workout_id).delete() db.session.commit() return jsonify({'message': 'Workout {} deleted successfully.'.format(workout_id)}), 200 else: return jsonify({'message': 'Workout {} not found for user {}.'.format(workout_id, user_id)}), 404 if __name__ == '__main__': app.run(debug=True) # @app.after_request # def response_minify(response): # """ # minify html response to decrease site traffic # """ # if response.content_type == u'text/html; charset=utf-8': # response.set_data( # minify_html.minify(response.get_data( # as_text=True), minify_js=True, remove_processing_instructions=True) # ) # return response # return response # @ app.route("/") # def home(): # return render_template('attemptv2.html') # @ app.route("/devices") # def devices(): # devices = db.get_devices() # return render_template('devices.html', devices=devices) # @ app.route("/device/") # def device(device_id): # device = db.get_device(device_id) # return render_template('device.html', device=device) # @app.route("/overview/") # def overview(device_id): # cadences = db.get_all_cadences(device_id) # last_cadence = cadences[-1]['rpm'] if cadences else 0 # if cadences: # first = cadences[0]['logged_at'] # last = cadences[-1]['logged_at'] # duration = str(timedelta(seconds=(last-first).seconds)) # last_cadence = cadences[-1]['rpm'] # power = round(decimal.Decimal(0.0011)*last_cadence ** 3 + decimal.Decimal( # 0.0026) * last_cadence ** 2 + decimal.Decimal(0.5642)*last_cadence) # graph_data = generate_sparkline_graph( # [c['rpm'] for c in cadences[-100:]]) # return render_template('overview.html', last_cadence=last_cadence, power=power, duration=duration, cadences=cadences[-15:], graph_data=graph_data) # return render_template('overview.html', last_cadence=0, power=0, duration=duration, cadences=[], graph_data='') # @ app.route("/cadence", methods=['POST']) # def cadence(): # data = request.get_json() # print('' + datetime.now().replace(microsecond=0).isoformat() + # ' ' + json.dumps(data)) # db.insert_cadence(data['rpm'], data['id']) # return 'ok' if __name__ == '__main__': # Bind to PORT if defined, otherwise default to 5000. port = int(os.environ.get('PORT', 5000)) app.run(host='127.0.0.1', port=port)