Files
cardio/app.py

810 lines
28 KiB
Python

from collections import defaultdict
from flask_basicauth import BasicAuth
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
from dateutil.relativedelta import relativedelta
import humanize
from dateutil.parser import isoparse
import sparklines
import os
from datetime import datetime, timedelta
import io
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, make_response, render_template, request, jsonify
import jinja_partials
from flask_htmx import HTMX
import matplotlib
matplotlib.use('Agg')
app = Flask(__name__)
# TODO CHANGE SECRET KEY TO ENVIRONMENT VARIABLE
app.config['SECRET_KEY'] = 'secret!'
app.config["SQLALCHEMY_ECHO"] = False # True for debugging
uri = os.getenv("DATABASE_URL")
if uri and uri.startswith("postgres://"):
uri = uri.replace("postgres://", "postgresql://", 1)
app.config['SQLALCHEMY_DATABASE_URI'] = uri
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
jinja_partials.register_extensions(app)
htmx = HTMX(app)
db = SQLAlchemy(app)
app.config['BASIC_AUTH_USERNAME'] = os.getenv("ADMIN_USERNAME") or 'admin'
app.config['BASIC_AUTH_PASSWORD'] = os.getenv("ADMIN_PASSWORD") or 'admin'
basic_auth = BasicAuth(app)
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), nullable=False)
bike_id = db.Column(db.Integer, db.ForeignKey(
'bikes.id', ondelete='CASCADE'), nullable=False)
workouts = db.relationship(
'Workout', backref='user', lazy=True, order_by='desc(Workout.created_at)')
bike = db.relationship('Bike', backref='user', lazy=True)
class Bike(db.Model):
__tablename__ = 'bikes'
id = db.Column(db.Integer, primary_key=True)
display_name = db.Column(db.String(255), nullable=False)
code_name = db.Column(db.String(255), nullable=False)
class Workout(db.Model):
__tablename__ = 'workouts'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey(
'users.id', ondelete='CASCADE'), nullable=False)
created_at = db.Column(db.DateTime, nullable=False, default=db.func.now())
bike_id = db.Column(db.Integer, db.ForeignKey(
'bikes.id', ondelete='CASCADE'), nullable=False)
started_at = db.Column(db.DateTime, nullable=False)
duration = db.Column(db.Numeric, nullable=False)
average_rpm = db.Column(db.Numeric, nullable=False)
min_rpm = db.Column(db.Integer, nullable=False)
max_rpm = db.Column(db.Integer, nullable=False)
calories = db.Column(db.Numeric, nullable=False)
distance = db.Column(db.Numeric, nullable=False)
average_bpm = db.Column(db.Numeric, nullable=False)
min_bpm = db.Column(db.Integer, nullable=False)
max_bpm = db.Column(db.Integer, nullable=False)
is_heart_rate_available = db.Column(
db.Boolean, nullable=False, default=False)
is_cadence_available = db.Column(db.Boolean, nullable=False, default=False)
cadence_readings = db.relationship(
'CadenceReading', backref='workout', lazy=True)
heart_rate_readings = db.relationship(
'HeartRateReading', backref='workout', lazy=True)
bike = db.relationship('Bike', backref='workouts', lazy=True)
class CadenceReading(db.Model):
__tablename__ = 'cadence_readings'
id = db.Column(db.Integer, primary_key=True)
workout_id = db.Column(db.Integer, db.ForeignKey(
'workouts.id', ondelete='CASCADE'), nullable=False)
created_at = db.Column(db.DateTime, nullable=False)
rpm = db.Column(db.Integer, nullable=False)
distance = db.Column(db.Numeric, nullable=False)
speed = db.Column(db.Numeric, nullable=False)
calories = db.Column(db.Numeric, nullable=False)
power = db.Column(db.Integer, nullable=False)
class HeartRateReading(db.Model):
__tablename__ = 'heartrate_readings'
id = db.Column(db.Integer, primary_key=True)
workout_id = db.Column(db.Integer, db.ForeignKey(
'workouts.id', ondelete='CASCADE'), nullable=False)
created_at = db.Column(db.DateTime, nullable=False)
bpm = db.Column(db.Integer, nullable=False)
class UserGraphs(db.Model):
__tablename__ = 'user_graphs'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey(
'users.id', ondelete='CASCADE'), nullable=False)
start_date = db.Column(db.DateTime, nullable=False)
end_date = db.Column(db.DateTime, nullable=False)
period = db.Column(db.String(255), nullable=False)
attributes = db.Column(db.ARRAY(db.String(255)))
@app.route('/', methods=['GET'])
def overview():
return render_users_and_workouts()
@app.route("/user/<int:user_id>/new_workout")
def new_workout(user_id):
user = User.query.get(user_id)
return render_template('new_workout.html', user=user)
@app.route('/users', methods=['POST'])
def create_user():
data = request.form
name = data.get('name')
bike_id = data.get('bike_id')
# Ensure name and bike_id are provided
if not name or not bike_id:
return jsonify({'message': 'Name and Bike ID are required'}), 400
# Create and commit the new user to the database
new_user = User(name=name, bike_id=bike_id)
db.session.add(new_user)
db.session.commit()
return render_users_and_workouts()
@app.route('/user/<int:user_id>', methods=['DELETE'])
@basic_auth.required
def delete_user(user_id):
user = User.query.get(user_id)
if user:
db.session.delete(user)
db.session.commit()
return render_users_and_workouts()
else:
return jsonify({'error': 'User not found.'}), 404
@app.route('/user/<int:user_id>/workouts', methods=['GET'])
def get_workouts(user_id):
user = User.query.get(user_id)
workouts_data = get_workouts_for_user_view_data(user)
return render_template('workouts_list.html', workouts=workouts_data)
@app.route('/user/<int:user_id>/workouts', methods=['POST'])
def create_workout(user_id):
user = User.query.get(user_id)
app.logger.info(f'Creating workout for user {user.name} ({user.id})')
data = request.json
cadence_readings = data.get('cadence_readings', [])
heart_rate_readings = data.get('heart_rate_readings', [])
# Create a new workout
workout = Workout(user_id=user_id, bike_id=user.bike_id)
db.session.add(workout)
db.session.flush() # To get the workout.id before committing
# Add cadence readings to the workout
cadences = [{
'workout_id': workout.id,
'created_at': c['timestamp'],
'rpm': c['rpm'],
'distance': c['distance'],
'speed': c['speed'],
'calories': c['calories'],
'power': c['power']
} for c in cadence_readings]
heart_rates = [{
'workout_id': workout.id,
'created_at': h['timestamp'],
'bpm': h['bpm']
} for h in heart_rate_readings]
db.session.bulk_insert_mappings(CadenceReading, cadences)
db.session.bulk_insert_mappings(HeartRateReading, heart_rates)
if cadence_readings:
timestamps = [isoparse(c['timestamp']) for c in cadence_readings]
rpms = [c['rpm'] for c in cadence_readings]
workout.is_cadence_available = True
workout.started_at = min(timestamps)
workout.duration = (
max(timestamps) - workout.started_at).total_seconds()
workout.average_rpm = sum(rpms) / len(rpms)
workout.min_rpm = min(rpms)
workout.max_rpm = max(rpms)
workout.calories = cadence_readings[-1]['calories']
workout.distance = cadence_readings[-1]['distance']
if heart_rate_readings:
bpms = [h['bpm'] for h in heart_rate_readings]
workout.is_heart_rate_available = True
workout.average_bpm = sum(bpms) / len(bpms)
workout.min_bpm = min(bpms)
workout.max_bpm = max(bpms)
db.session.commit()
return f'Added {humanize.naturaldelta(workout.duration)} session.', 201
@app.route('/user/<int:user_id>/workouts/graph', methods=['GET'])
def graph_user_workouts(user_id):
attributes = request.args.getlist(
'attributes', type=str)
if not attributes:
return ""
if htmx:
return f"""
<img src="{request.full_path}"
loading="lazy" alt="No image" class="mx-auto" _="on click remove me">
"""
user = User.query.get(user_id)
# workouts = user.workouts
workouts = get_workouts_for_user_view_data(user)
earliest_workout_date = min([workout['start_time_date']
for workout in workouts])
most_recent_workout_date = max(
[workout['start_time_date'] for workout in workouts])
start_date = request.args.get(
'start_date', default=earliest_workout_date, type=toDate)
end_date = request.args.get(
'end_date', default=most_recent_workout_date, type=toDate)
period = request.args.get('period', default='day', type=str)
# Add record of user graph
insert_usergraph_if_not_exists(
user_id, start_date, end_date, period, attributes)
return plot_averaged_attributes(workouts, user, start_date, end_date, period, attributes)
@app.route('/user/<int:user_id>/workouts/graph/delete', methods=['DELETE'])
def delete_user_graph(user_id):
attributes = request.args.getlist(
'attributes', type=str)
start_date = request.args.get(
'start_date', default=datetime.now().date(), type=toDate)
end_date = request.args.get(
'end_date', default=datetime.now().date(), type=toDate)
period = request.args.get('period', default='day', type=str)
remove_usergraph(user_id, start_date, end_date, period, attributes)
return ""
def daterange(start_date, end_date, delta=timedelta(days=1)):
"""Helper generator to iterate over date ranges."""
curr_date = start_date
while curr_date < end_date:
yield curr_date
curr_date += delta
def average_workout_attributes_per_period(workouts, start_date, end_date, period, attributes):
"""
Returns a dictionary of averaged attributes for workouts within each given period
between start_date and end_date.
Parameters:
- workouts (list): List of Workout objects.
- start_date, end_date: Date range to consider.
- period (str): 'day', 'week', or 'month'.
- attributes (list): List of attribute names to average.
Returns:
- Dictionary: A nested dictionary where keys are dates representing the start of each period,
and the values are dictionaries with averaged attributes for that period.
"""
if period == "day":
delta = timedelta(days=1)
elif period == "week":
delta = timedelta(weeks=1)
elif period == "month":
# approximating month as 4 weeks for simplicity
delta = timedelta(weeks=4)
else:
raise ValueError(f"Invalid period: {period}")
results = defaultdict(lambda: defaultdict(float))
for start_period in daterange(start_date, end_date, delta):
end_period = start_period + delta
filtered_workouts = [
w for w in workouts if start_period <= w['start_time_date'] < end_period]
for attribute in attributes:
valid_values = [w.get(attribute) for w in filtered_workouts if w.get(
attribute) is not None]
if valid_values:
average = sum(valid_values) / len(valid_values)
results[start_period][attribute] = average
else:
results[start_period][attribute] = 0 # None
results[start_period]['workout_count'] = len(
filtered_workouts)
return dict(results)
def create_user_graph(x_values, y_data, filename, x_label='Time', title=None):
"""Create a graph for given x-values and y-values.
Parameters:
- x_values: A list of x-values (common for all graphs).
- y_data: A dictionary where key is y_label and value is list of y-values.
- filename: Name for the generated file.
- x_label: Label for x-axis.
- title: Title for the plot.
Returns:
- Flask Response object containing the image of the graph.
"""
fig, ax = plt.subplots()
# Set the title if provided
if title:
ax.set_title(title)
# Plotting multiple lines
for y_label, y_values in y_data.items():
ax.plot(x_values, y_values, label=y_label)
ax.set_xlabel(x_label)
ax.legend() # Show legend to differentiate between multiple attributes
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d/%m"))
ax.set_ylim(bottom=0)
# Save the graph to a bytes buffer
buffer = io.BytesIO()
plt.savefig(buffer, format='png',
transparent=True, bbox_inches='tight')
buffer.seek(0)
# Create a response object with the graph image
response = make_response(buffer.getvalue())
response.headers['Content-Type'] = 'image/png'
response.headers['Content-Disposition'] = f'attachment; filename={filename}.png'
return response
def plot_averaged_attributes(workouts_list, user, start_date, end_date, period, attributes):
"""Creates a graph for averaged attributes over a period.
Parameters:
- start_date, end_date: Date range to consider.
- period (str): 'day', 'week', or 'month'.
- attributes (list): A list of attribute names to plot.
Returns:
- Flask Response object containing the image of the graph.
"""
user_data = generate_user_data(user)
title = f'{format_key_values(user_data["attributes"], attributes)} over {get_value_from_key(user_data["periods"], period)} ({start_date.strftime("%d/%m/%y")} - {end_date.strftime("%d/%m/%y")})'
# Fetching the data
averaged_attributes = average_workout_attributes_per_period(
workouts_list, start_date, end_date, period, attributes)
# Extracting x_values and y_values
x_values = list(averaged_attributes.keys())
y_data = {}
for attribute in attributes:
y_data[get_value_from_key(user_data["attributes"], attribute)] = [averaged_attributes[date][attribute]
for date in x_values]
# Creating the graph
return create_user_graph(x_values, y_data, filename=f"average_attributes_over_{period}", title=title)
@ app.route('/user/<int:user_id>/workout/<int:workout_id>/<string:graph_type>', methods=['GET'])
def workout(user_id, workout_id, graph_type):
workout = Workout.query.filter_by(user_id=user_id, id=workout_id) \
.join(Workout.cadence_readings) \
.join(Workout.heart_rate_readings) \
.first()
if not workout:
return jsonify({'message': f'Workout {workout_id} not found for user {user_id}.'}), 404
graph_mappings = {
'cadence': ('cadence_readings', 'rpm', 'Cadence (RPM)', 'cadence'),
'speed': ('cadence_readings', 'speed', 'Speed (KPH)', 'speed'),
'distance': ('cadence_readings', 'distance', 'Distance (KM)', 'distance'),
'calories': ('cadence_readings', 'calories', 'Calories (KCAL)', 'calories'),
'power': ('cadence_readings', 'power', 'Power (WATTS)', 'power'),
'heart_rate': ('heart_rate_readings', 'bpm', 'Heart Rate (BPM)', 'heart_rate')
}
readings_attr, y_attr, y_label, filename = graph_mappings.get(
graph_type, (None, None, None, None))
readings = getattr(workout, readings_attr, [])
if readings:
x_values = [reading.created_at for reading in readings]
y_values = [getattr(reading, y_attr) for reading in readings]
return create_graph(x_values=x_values, y_values=y_values, y_label=y_label, filename=filename), 200
return jsonify({'message': f'Unable to generate {graph_type} for workout {workout_id}.'}), 409
@ app.route('/user/<int:user_id>/workout/<int:workout_id>/view', methods=['GET'])
def view_workout(user_id, workout_id):
workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first()
graph_types = request.args.getlist('graph_types')
return render_template('workout_view.html', workout=workout, graph_types=graph_types)
@ app.route('/user/<int:user_id>/workout/<int:workout_id>/delete', methods=['DELETE'])
@ basic_auth.required
def delete_workout(user_id, workout_id):
# Delete the workout and its associated cadence readings
CadenceReading.query.filter_by(workout_id=workout_id).delete()
HeartRateReading.query.filter_by(workout_id=workout_id).delete()
Workout.query.filter_by(user_id=user_id, id=workout_id).delete()
db.session.commit()
return render_users_and_workouts()
@ app.route('/user/<int:user_id>/bike', methods=['GET'])
def update_users_bike(user_id):
bike_id = request.args.get('bike_id')
user = User.query.get(user_id)
bike = Bike.query.get(bike_id)
if user and bike:
user.bike_id = bike_id
db.session.commit()
return render_users_and_workouts()
else:
return jsonify({'error': 'User or bike not found.'}), 404
@ app.route('/user/<int:user_id>/calendar', methods=['GET'])
def calendar_view(user_id):
user = User.query.get(user_id)
workouts = get_workouts_for_user_view_data(user)
date = request.args.get(
'date', default=datetime.now().date(), type=toDate)
calendar_month = generate_calendar_monthly_view(workouts, date)
return render_template('partials/calendar.html', calendar_month=calendar_month, user_id=user_id)
@ app.route("/user/<int:user_id>/workout/list", methods=['GET'])
def workout_list(user_id):
user = User.query.get(user_id)
workouts = get_workouts_for_user_view_data(user)
return render_template('partials/workouts_list_fragment.html', workouts=workouts, user_id=user_id, workouts_all_loaded=True)
@ app.route("/user/<int:user_id>/workout/<int:workout_id>/calendar_view", methods=['GET'])
def calendar_workout_view(user_id, workout_id):
workout = Workout.query.filter_by(user_id=user_id, id=workout_id).first()
user = User.query.get(user_id)
workout_view_data = get_workout_view_data(workout, user)
return render_template('partials/selected_workout_view.html', workout=workout_view_data, user_id=user_id)
# Utilities
def generate_user_data(user, workouts=[]):
"""
Generate data for a single user.
Parameters:
- user: The user object.
- workouts: List of workouts for the user.
Returns:
- dict: A dictionary containing user data and related information.
"""
return {
'id': user.id,
'name': user.name,
'bike_id': user.bike_id,
'workouts_count': len(workouts),
'workouts': workouts,
'daily_duration_sparkline': generate_daily_duration_sparkline(workouts),
'calendar_month': generate_calendar_monthly_view(workouts, datetime.now().date()),
'attributes': [('workout_count', 'Workout count'), ('duration_seconds', 'Duration (sec)'), ('duration_minutes', 'Duration (min)'), ('average_rpm', 'Average RPM'), ('max_rpm', 'Max RPM'), ('average_bpm', 'Average BPM'), ('max_bpm', 'Max BPM'), ('distance', 'Distance'), ('calories', 'Calories')],
'periods': [('day', 'Day'), ('week', 'Week'), ('month', 'Month')],
# (period: str, attributes: [str])
# 'graphs': [('month', ['duration_minutes']), ('week', ['average_rpm', 'average_bpm']), ('week', ['workout_count'])],
'graphs': get_user_graphs(user.id),
'first_workout_date': workouts[-1]['start_time_date'] if workouts else None,
'last_workout_date': workouts[0]['start_time_date'] if workouts else None,
}
def render_users_and_workouts():
"""
Render users and their associated workouts.
"""
users = User.query.all()
users_data = [generate_user_data(user, get_workouts_for_user_view_data(
user)) for user in users]
template_name = 'users_and_workouts_list.html' if htmx else 'overview.html'
return render_template(template_name, users=users_data, bikes=Bike.query.all())
def get_workouts_for_user_view_data(user):
"""
Retrieve view data for all workouts of a user.
Parameters:
- user: The user object.
Returns:
- list: A list of view data for valid workouts.
"""
return [get_workout_view_data(workout, user) for workout in user.workouts if get_workout_view_data(workout, user)]
def format_workout_data(workout, user):
"""
Formats the workout data for view.
Parameters:
- workout: The workout object.
- user: The user associated with the workout.
Returns:
- dict: A dictionary with formatted workout data.
"""
duration = timedelta(seconds=int(workout.duration or 0))
return {
'id': workout.id,
'user_id': user.id,
'user_name': user.name,
'start_time': format_date_with_ordinal(workout.started_at, '%#H:%M %B %dth %Y'),
'start_time_date': workout.started_at.date(),
'start_time_ago': humanize.naturaltime(workout.started_at),
'duration': humanize.naturaldelta(duration),
'duration_seconds': duration.total_seconds(),
'duration_minutes': duration.total_seconds() / 60,
'average_rpm': int(workout.average_rpm or 0),
'min_rpm': int(workout.min_rpm or 0),
'max_rpm': int(workout.max_rpm or 0),
'calories': int(workout.calories or 0),
'distance': int(workout.distance or 0),
'bike_display_name': workout.bike.display_name,
'selected_graph_types': ['speed'] + (['heart_rate'] if workout.is_heart_rate_available else []),
'is_heart_rate_available': workout.is_heart_rate_available,
'is_cadence_available': workout.is_cadence_available,
'average_bpm': int(workout.average_bpm or 0),
'min_bpm': int(workout.min_bpm or 0),
'max_bpm': int(workout.max_bpm or 0),
}
def get_workout_view_data(workout, user):
"""
Retrieve view data for a single workout.
Parameters:
- workout: The workout object.
- user: The user associated with the workout.
Returns:
- dict or None: A dictionary containing view data if cadence or heart rate is available, otherwise None.
"""
if workout.is_cadence_available or workout.is_heart_rate_available:
return format_workout_data(workout, user)
return None
def create_graph(x_values, y_values, y_label, filename, x_label='Time'):
# Plotting
fig, ax = plt.subplots()
ax.plot(x_values, y_values)
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
ax.set_ylim(bottom=0)
# Save the graph to a bytes buffer
buffer = io.BytesIO()
plt.savefig(buffer, format='png',
transparent=True, bbox_inches='tight')
buffer.seek(0)
# Create a response object with the graph image
response = make_response(buffer.getvalue())
response.headers['Content-Type'] = 'image/png'
response.headers['Content-Disposition'] = 'attachment; filename={filename}.png'
return response
def format_date_with_ordinal(d, format_string):
ordinal = {'1': 'st', '2': 'nd', '3': 'rd'}.get(str(d.day)[-1:], 'th')
return d.strftime(format_string).replace('{th}', ordinal)
def date_range(start_date, end_date):
"""
Generator for dates between two dates (inclusive).
"""
current_date = start_date
while current_date <= end_date:
yield current_date
current_date += timedelta(days=1)
def get_month_bounds(dt):
"""
Determine the bounds of a month for a given date.
This considers the starting and ending weekdays to fit a calendar view.
"""
first_day_of_month = dt.replace(day=1)
next_month = first_day_of_month + relativedelta(months=1)
last_day_of_month = next_month - timedelta(days=1)
# Define weekday mappings to determine start and end bounds
weekday_to_start_offset = {6: 0, 0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6}
weekday_to_end_offset = {6: 6, 0: 5, 1: 4, 2: 3, 3: 2, 4: 1, 5: 0}
start_date = first_day_of_month - \
timedelta(days=weekday_to_start_offset[first_day_of_month.weekday()])
end_date = last_day_of_month + \
timedelta(days=weekday_to_end_offset[last_day_of_month.weekday()])
return start_date, end_date
def generate_calendar_monthly_view(workouts, selected_date):
"""
Generate a monthly calendar view of the workouts.
"""
start_date, end_date = get_month_bounds(selected_date)
# Build a lookup dictionary for faster access
workout_lookup = {w['start_time_date']
: w for w in workouts if start_date <= w['start_time_date'] <= end_date}
current_date = datetime.now().date()
days_of_month = [
{
'date': day,
'day_of_month': day.day,
'is_workout': day in workout_lookup,
'workout': workout_lookup.get(day),
'is_current_date': day == current_date,
'is_current_month': day.month == selected_date.month and day.year == selected_date.year
}
for day in date_range(start_date, end_date)
]
next_month_date = selected_date + relativedelta(months=1)
previous_month_date = selected_date - relativedelta(months=1)
calendar_month = {
'month_year': selected_date.strftime('%B, %Y'),
'days_of_month': days_of_month,
'next_month': next_month_date,
'previous_month': previous_month_date,
}
return calendar_month
def generate_daily_duration_sparkline(workouts):
"""
Generate a sparkline string representation of daily workout durations.
Parameters:
- workouts (list of dict): Each dict should contain 'start_time_date' and 'duration_minutes' keys.
Returns:
- str: Sparkline representation of daily durations.
"""
if not workouts:
return ''
# Determine date range based on workouts data
start_date = workouts[-1]['start_time_date']
end_date = datetime.now().date() # workouts[0]['start_time_date']
# Build a mapping of dates to their respective durations for easier lookup
workouts_by_date = {w['start_time_date']: int(
w['duration_minutes']) for w in workouts}
daily_durations = [workouts_by_date.get(
date, 0) for date in date_range(start_date, end_date)]
# Reverse the list to make the most recent day appear on the right
daily_durations.reverse()
return sparklines.sparklines(daily_durations)[0]
def get_user_graphs(user_id):
"""Retrieve a list of UserGraphs entries for the given user_id."""
user_graphs = UserGraphs.query.filter_by(
user_id=user_id).order_by(UserGraphs.id.desc()).all()
# change start_date, end_date from datetime to dates
for user_graph in user_graphs:
user_graph.start_date = user_graph.start_date.date()
user_graph.end_date = user_graph.end_date.date()
return user_graphs
def insert_usergraph_if_not_exists(user_id, start_date, end_date, period, attributes):
"""Insert a UserGraphs entry if it doesn't already exist based on specified attributes and return its ID."""
existing_graph = UserGraphs.query.filter_by(
user_id=user_id,
start_date=start_date,
end_date=end_date,
period=period,
attributes=attributes
).first()
if not existing_graph:
new_graph = UserGraphs(
user_id=user_id,
start_date=start_date,
end_date=end_date,
period=period,
attributes=attributes
)
db.session.add(new_graph)
db.session.commit()
return new_graph.id # Return the ID of the newly added object
return None # Return None if the record already exists
def remove_usergraph(user_id, start_date, end_date, period, attributes):
"""Remove a UserGraphs entry based on specified attributes."""
existing_graph = UserGraphs.query.filter_by(
user_id=user_id,
start_date=start_date,
end_date=end_date,
period=period,
attributes=attributes
).first()
if existing_graph:
db.session.delete(existing_graph)
db.session.commit()
def toDate(dateString):
return datetime.strptime(dateString, "%Y-%m-%d").date()
def get_value_from_key(tuples_list, key):
for k, v in tuples_list:
if k == key:
return v
return None
def format_key_values(tuples_list, keys_list):
values = [get_value_from_key(tuples_list, key) for key in keys_list]
if len(values) == 1:
return values[0]
elif len(values) == 2:
return f"{values[0]} & {values[1]}"
else:
return ', '.join(values[:-1]) + f" & {values[-1]}"
if __name__ == '__main__':
# Bind to PORT if defined, otherwise default to 5000.
port = int(os.environ.get('PORT', 5000))
app.run(host='127.0.0.1', port=port)