Files
bloodpressure/app/routes/main.py
2026-03-10 19:40:19 +11:00

308 lines
13 KiB
Python

from collections import defaultdict
from flask import Blueprint, render_template, redirect, request, url_for
import humanize
from pytz import timezone, utc
from sqlalchemy import func
from app.models import Reading, db
from app.forms import DeleteForm
from flask_login import login_required, current_user
from datetime import date, datetime, timedelta
main = Blueprint('main', __name__)
# Number of readings to show per page in list view
PAGE_SIZE = 25
@main.route('/', methods=['GET'])
def landing():
return redirect(url_for('main.dashboard')) if current_user.is_authenticated else render_template('landing.html')
@main.route('/health')
def health():
return "OK", 200
@main.route('/dashboard', methods=['GET', 'POST'])
@login_required
def dashboard():
"""Render the dashboard shell and default list view."""
user_tz = timezone(current_user.profile.timezone or 'UTC')
# Get date range for filters
first_reading, last_reading = get_reading_date_range(current_user.id, user_tz)
start_date = request.form.get('start_date') or request.args.get('start_date') or (first_reading and first_reading.strftime('%Y-%m-%d'))
end_date = request.form.get('end_date') or request.args.get('end_date') or (last_reading and last_reading.strftime('%Y-%m-%d'))
# Calculate weekly averages via SQL
systolic_avg, diastolic_avg, heart_rate_avg = calculate_weekly_summary_sql(current_user.id)
badges = calculate_progress_badges(current_user.id, user_tz)
# We will default to showing the list view on initial load
page = request.args.get('page', 1, type=int)
paginated = fetch_readings_paginated(current_user.id, start_date, end_date, user_tz, page, PAGE_SIZE)
annotate_readings(paginated.items, user_tz)
return render_template(
'dashboard.html',
profile=current_user.profile,
badges=badges,
systolic_avg=systolic_avg,
diastolic_avg=diastolic_avg,
heart_rate_avg=heart_rate_avg,
start_date=start_date,
end_date=end_date,
delete_form=DeleteForm(),
active_view='list',
# default view context
readings=paginated.items,
pagination=paginated
)
@main.route('/dashboard/list', methods=['GET'])
@login_required
def dashboard_list():
user_tz = timezone(current_user.profile.timezone or 'UTC')
first_reading, last_reading = get_reading_date_range(current_user.id, user_tz)
start_date = request.args.get('start_date') or (first_reading and first_reading.strftime('%Y-%m-%d'))
end_date = request.args.get('end_date') or (last_reading and last_reading.strftime('%Y-%m-%d'))
page = request.args.get('page', 1, type=int)
paginated = fetch_readings_paginated(current_user.id, start_date, end_date, user_tz, page, PAGE_SIZE)
annotate_readings(paginated.items, user_tz)
return render_template('partials/dashboard_list.html', readings=paginated.items, pagination=paginated)
@main.route('/dashboard/weekly', methods=['GET'])
@login_required
def dashboard_weekly():
user_tz = timezone(current_user.profile.timezone or 'UTC')
week_offset = request.args.get('week_offset', 0, type=int)
now = datetime.now(user_tz)
target_week_date = now + timedelta(weeks=week_offset)
target_week_start = target_week_date - timedelta(days=target_week_date.weekday())
target_week_start_utc = target_week_start.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(utc)
target_week_end_utc = (target_week_start + timedelta(days=7)).replace(hour=0, minute=0, second=0, microsecond=0).astimezone(utc)
calendar_readings = fetch_readings_for_range(current_user.id, target_week_start_utc, target_week_end_utc)
annotate_readings(calendar_readings, user_tz)
readings_by_day = build_readings_by_day(calendar_readings, user_tz)
week_view = generate_weekly_calendar(readings_by_day, target_week_date, user_tz)
return render_template('partials/dashboard_weekly.html', week=week_view, week_offset=week_offset)
@main.route('/dashboard/monthly', methods=['GET'])
@login_required
def dashboard_monthly():
user_tz = timezone(current_user.profile.timezone or 'UTC')
month_offset = request.args.get('month_offset', 0, type=int)
now = datetime.now(user_tz)
target_month_year = now.year + (now.month + month_offset - 1) // 12
target_month_month = (now.month + month_offset - 1) % 12 + 1
target_month_date = now.replace(year=target_month_year, month=target_month_month, day=1, hour=0, minute=0, second=0, microsecond=0)
first_day = target_month_date
start_date = first_day - timedelta(days=(first_day.weekday() + 1) % 7)
end_date = start_date + timedelta(days=42)
start_utc = start_date.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(utc)
end_utc = end_date.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(utc)
calendar_readings = fetch_readings_for_range(current_user.id, start_utc, end_utc)
annotate_readings(calendar_readings, user_tz)
readings_by_day = build_readings_by_day(calendar_readings, user_tz)
month_view = generate_monthly_calendar(readings_by_day, target_month_date, user_tz)
return render_template('partials/dashboard_monthly.html', month=month_view, target_month_date=target_month_date, month_offset=month_offset)
@main.route('/dashboard/graph', methods=['GET'])
@login_required
def dashboard_graph():
user_tz = timezone(current_user.profile.timezone or 'UTC')
now = datetime.now(user_tz)
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
month_start_utc = month_start.astimezone(utc)
calendar_readings = fetch_readings_for_range(current_user.id, month_start_utc)
annotate_readings(calendar_readings, user_tz)
graph_data = prepare_graph_data(calendar_readings)
return render_template('partials/dashboard_graph.html', **graph_data)
def get_reading_date_range(user_id, user_tz):
"""Fetch the earliest and latest reading timestamps for a user."""
result = db.session.query(
func.min(Reading.timestamp).label('first'),
func.max(Reading.timestamp).label('last')
).filter(Reading.user_id == user_id).first()
first = utc.localize(result.first).astimezone(user_tz) if result.first else None
last = utc.localize(result.last).astimezone(user_tz) if result.last else None
return first, last
def fetch_readings_paginated(user_id, start_date, end_date, user_tz, page, per_page):
"""Retrieve paginated readings filtered by date range."""
query = Reading.query.filter_by(user_id=user_id)
if start_date and end_date:
start_dt = user_tz.localize(datetime.strptime(start_date, '%Y-%m-%d')).astimezone(utc)
end_dt = user_tz.localize(datetime.strptime(end_date, '%Y-%m-%d')).astimezone(utc) + timedelta(days=1) - timedelta(seconds=1)
query = query.filter(
Reading.timestamp >= start_dt,
Reading.timestamp <= end_dt
)
return query.order_by(Reading.timestamp.desc()).paginate(page=page, per_page=per_page, error_out=False)
def fetch_readings_for_range(user_id, start_utc, end_utc=None):
"""Fetch readings from a UTC start time onwards (for calendar/graph views)."""
query = Reading.query.filter(
Reading.user_id == user_id,
Reading.timestamp >= start_utc
)
if end_utc:
query = query.filter(Reading.timestamp <= end_utc)
return query.order_by(Reading.timestamp.desc()).all()
def annotate_readings(readings, user_tz):
"""Add relative and localized timestamps to readings."""
now = datetime.utcnow()
for reading in readings:
reading.relative_timestamp = humanize.naturaltime(now - reading.timestamp)
reading.local_timestamp = utc.localize(reading.timestamp).astimezone(user_tz)
def build_readings_by_day(readings, user_tz):
"""Build a dict mapping dates to readings (single pass, shared by calendar views)."""
readings_by_day = defaultdict(list)
for reading in readings:
local_date = reading.local_timestamp.date() if hasattr(reading, 'local_timestamp') else utc.localize(reading.timestamp).astimezone(user_tz).date()
readings_by_day[local_date].append(reading)
return readings_by_day
def calculate_weekly_summary_sql(user_id):
"""Calculate weekly averages using SQL aggregation (single DB query)."""
one_week_ago = datetime.utcnow() - timedelta(days=7)
result = db.session.query(
func.round(func.avg(Reading.systolic), 1).label('sys_avg'),
func.round(func.avg(Reading.diastolic), 1).label('dia_avg'),
func.round(func.avg(Reading.heart_rate), 1).label('hr_avg'),
).filter(
Reading.user_id == user_id,
Reading.timestamp >= one_week_ago
).first()
if result and result.sys_avg is not None:
return float(result.sys_avg), float(result.dia_avg), float(result.hr_avg)
return 0, 0, 0
def generate_monthly_calendar(readings_by_day, selected_date, local_tz):
"""Generate a monthly calendar view from pre-built readings_by_day."""
today = datetime.now(local_tz).date()
first_day = selected_date.replace(day=1)
start_date = first_day - timedelta(days=(first_day.weekday() + 1) % 7)
end_date = start_date + timedelta(days=41)
return [
{
'day': current_date.day,
'is_today': current_date == today,
'is_in_current_month': current_date.month == selected_date.month,
'readings': readings_by_day.get(current_date.date(), []),
}
for current_date in (start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1))
]
def generate_weekly_calendar(readings_by_day, selected_date, local_tz):
"""Generate a weekly calendar view from pre-built readings_by_day."""
today = datetime.now(local_tz).date()
start_of_week = selected_date - timedelta(days=selected_date.weekday())
return [
{
'date': current_date.strftime('%a, %b %d'),
'is_today': current_date == today,
'readings': readings_by_day.get(current_date.date(), []),
}
for current_date in (start_of_week + timedelta(days=i) for i in range(7))
]
def prepare_graph_data(readings):
"""Prepare data for graph rendering."""
return {
'timestamps': [r.timestamp.strftime('%b %d') for r in readings],
'systolic': [r.systolic for r in readings],
'diastolic': [r.diastolic for r in readings],
'heart_rate': [r.heart_rate for r in readings],
}
def calculate_progress_badges(user_id, user_tz):
"""Generate badges based on user activity and milestones using optimized queries."""
total_readings = Reading.query.filter_by(user_id=user_id).count()
if total_readings == 0:
return []
# Fetch only timestamps (highly optimized compared to fetching full objects)
timestamps = db.session.query(Reading.timestamp).filter(Reading.user_id == user_id).order_by(Reading.timestamp.desc()).all()
return _compute_badges(total_readings, timestamps, user_tz)
def _compute_badges(total_readings, timestamps, user_tz, now_local=None):
if now_local is None:
now_local = datetime.now(user_tz).date()
badges = []
if total_readings == 0:
return badges
streak_count = 0
if timestamps:
distinct_dates = []
last_date = None
for (ts,) in timestamps:
local_date = utc.localize(ts).astimezone(user_tz).date()
if local_date != last_date:
distinct_dates.append(local_date)
last_date = local_date
if distinct_dates:
most_recent_date = distinct_dates[0]
if (now_local - most_recent_date).days <= 1:
streak_count = 1
current_check_date = most_recent_date
for d in distinct_dates[1:]:
if (current_check_date - d).days == 1:
streak_count += 1
current_check_date = d
else:
break
if streak_count >= 1:
badges.append(f"Current Streak: {streak_count} Days")
if streak_count >= 7:
badges.append("Logged Every Day for a Week")
if streak_count >= 30:
badges.append("Monthly Streak")
last_7_readings = timestamps[:7]
if len(last_7_readings) == 7:
if all(5 <= utc.localize(ts).astimezone(user_tz).hour < 12 for (ts,) in last_7_readings):
badges.append("Morning Riser: Logged Readings Every Morning for a Week")
if all(18 <= utc.localize(ts).astimezone(user_tz).hour <= 23 for (ts,) in last_7_readings):
badges.append("Night Owl: Logged Readings Every Night for a Week")
milestones = [10, 50, 100, 500, 1000, 5000, 10000]
highest_milestone = max((m for m in milestones if total_readings >= m), default=None)
if highest_milestone:
badges.append(f"{highest_milestone} Readings Logged")
return badges