Files
bloodpressure/app/routes/main.py
2026-03-13 15:11:11 +11:00

336 lines
14 KiB
Python

from collections import defaultdict
from flask import Blueprint, render_template, redirect, request, url_for
import humanize
from pytz import timezone, utc
from sqlalchemy import func
from app.models import Reading, db
from app.forms import DeleteForm
from flask_login import login_required, current_user
from datetime import date, datetime, timedelta
main = Blueprint('main', __name__)
# Number of readings to show per page in list view
PAGE_SIZE = 25
@main.route('/', methods=['GET'])
def landing():
return redirect(url_for('main.dashboard')) if current_user.is_authenticated else render_template('landing.html')
@main.route('/health')
def health():
return "OK", 200
@main.route('/dashboard', methods=['GET', 'POST'])
@login_required
def dashboard():
"""Render the dashboard shell and default list view."""
user_tz = timezone(current_user.profile.timezone or 'UTC')
# Calculate weekly averages via SQL
systolic_avg, diastolic_avg, heart_rate_avg = calculate_weekly_summary_sql(current_user.id)
badges = calculate_progress_badges(current_user.id, user_tz)
return render_template(
'dashboard.html',
profile=current_user.profile,
badges=badges,
systolic_avg=systolic_avg,
diastolic_avg=diastolic_avg,
heart_rate_avg=heart_rate_avg,
delete_form=DeleteForm(),
active_view='list',
)
@main.route('/dashboard/list', methods=['GET'])
@login_required
def dashboard_list():
user_tz = timezone(current_user.profile.timezone or 'UTC')
page = request.args.get('page', 1, type=int)
# List view is no longer constrained by date filter
paginated = fetch_readings_paginated(current_user.id, None, None, user_tz, page, PAGE_SIZE)
annotate_readings(paginated.items, user_tz)
return render_template('partials/dashboard_list.html', readings=paginated.items, pagination=paginated)
@main.route('/dashboard/weekly', methods=['GET'])
@login_required
def dashboard_weekly():
user_tz = timezone(current_user.profile.timezone or 'UTC')
week_offset = request.args.get('week_offset', 0, type=int)
now = datetime.now(user_tz)
target_week_date = now + timedelta(weeks=week_offset)
target_week_start = target_week_date - timedelta(days=target_week_date.weekday())
target_week_start_utc = target_week_start.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(utc)
target_week_end_utc = (target_week_start + timedelta(days=7)).replace(hour=0, minute=0, second=0, microsecond=0).astimezone(utc)
calendar_readings = fetch_readings_for_range(current_user.id, target_week_start_utc, target_week_end_utc)
annotate_readings(calendar_readings, user_tz)
readings_by_day = build_readings_by_day(calendar_readings, user_tz)
week_view = generate_weekly_calendar(readings_by_day, target_week_date, user_tz)
return render_template('partials/dashboard_weekly.html', week=week_view, week_offset=week_offset)
@main.route('/dashboard/monthly', methods=['GET'])
@login_required
def dashboard_monthly():
user_tz = timezone(current_user.profile.timezone or 'UTC')
month_offset = request.args.get('month_offset', 0, type=int)
now = datetime.now(user_tz)
target_month_year = now.year + (now.month + month_offset - 1) // 12
target_month_month = (now.month + month_offset - 1) % 12 + 1
target_month_date = now.replace(year=target_month_year, month=target_month_month, day=1, hour=0, minute=0, second=0, microsecond=0)
first_day = target_month_date
start_date = first_day - timedelta(days=(first_day.weekday() + 1) % 7)
end_date = start_date + timedelta(days=42)
start_utc = start_date.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(utc)
end_utc = end_date.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(utc)
calendar_readings = fetch_readings_for_range(current_user.id, start_utc, end_utc)
annotate_readings(calendar_readings, user_tz)
readings_by_day = build_readings_by_day(calendar_readings, user_tz)
month_view = generate_monthly_calendar(readings_by_day, target_month_date, user_tz)
return render_template('partials/dashboard_monthly.html', month=month_view, target_month_date=target_month_date, month_offset=month_offset)
@main.route('/dashboard/graph', methods=['GET'])
@login_required
def dashboard_graph():
user_tz = timezone(current_user.profile.timezone or 'UTC')
first_reading, last_reading = get_reading_date_range(current_user.id, user_tz)
start_date = request.args.get('start_date') or (first_reading and first_reading.strftime('%Y-%m-%d'))
end_date = request.args.get('end_date') or (last_reading and last_reading.strftime('%Y-%m-%d'))
if start_date and end_date:
start_dt = user_tz.localize(datetime.strptime(start_date, '%Y-%m-%d')).astimezone(utc)
end_dt = user_tz.localize(datetime.strptime(end_date, '%Y-%m-%d')).astimezone(utc) + timedelta(days=1) - timedelta(seconds=1)
calendar_readings = fetch_readings_for_range(current_user.id, start_dt, end_dt)
else:
now = datetime.now(user_tz)
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
month_start_utc = month_start.astimezone(utc)
calendar_readings = fetch_readings_for_range(current_user.id, month_start_utc)
annotate_readings(calendar_readings, user_tz)
graph_data = prepare_graph_data(calendar_readings)
graph_data['start_date'] = start_date
graph_data['end_date'] = end_date
return render_template('partials/dashboard_graph.html', **graph_data)
def get_reading_date_range(user_id, user_tz):
"""Fetch the earliest and latest reading timestamps for a user."""
result = db.session.query(
func.min(Reading.timestamp).label('first'),
func.max(Reading.timestamp).label('last')
).filter(Reading.user_id == user_id).first()
first = utc.localize(result.first).astimezone(user_tz) if result.first else None
last = utc.localize(result.last).astimezone(user_tz) if result.last else None
return first, last
def fetch_readings_paginated(user_id, start_date, end_date, user_tz, page, per_page):
"""Retrieve paginated readings filtered by date range."""
query = Reading.query.filter_by(user_id=user_id)
if start_date and end_date:
start_dt = user_tz.localize(datetime.strptime(start_date, '%Y-%m-%d')).astimezone(utc)
end_dt = user_tz.localize(datetime.strptime(end_date, '%Y-%m-%d')).astimezone(utc) + timedelta(days=1) - timedelta(seconds=1)
query = query.filter(
Reading.timestamp >= start_dt,
Reading.timestamp <= end_dt
)
return query.order_by(Reading.timestamp.desc()).paginate(page=page, per_page=per_page, error_out=False)
def fetch_readings_for_range(user_id, start_utc, end_utc=None):
"""Fetch readings from a UTC start time onwards (for calendar/graph views)."""
query = Reading.query.filter(
Reading.user_id == user_id,
Reading.timestamp >= start_utc
)
if end_utc:
query = query.filter(Reading.timestamp <= end_utc)
return query.order_by(Reading.timestamp.desc()).all()
def annotate_readings(readings, user_tz):
"""Add relative and localized timestamps to readings."""
now = datetime.utcnow()
for reading in readings:
reading.relative_timestamp = humanize.naturaltime(now - reading.timestamp)
reading.local_timestamp = utc.localize(reading.timestamp).astimezone(user_tz)
def build_readings_by_day(readings, user_tz):
"""Build a dict mapping dates to readings (single pass, shared by calendar views)."""
readings_by_day = defaultdict(list)
for reading in readings:
local_date = reading.local_timestamp.date() if hasattr(reading, 'local_timestamp') else utc.localize(reading.timestamp).astimezone(user_tz).date()
readings_by_day[local_date].append(reading)
return readings_by_day
def calculate_weekly_summary_sql(user_id):
"""Calculate weekly averages using SQL aggregation (single DB query)."""
one_week_ago = datetime.utcnow() - timedelta(days=7)
result = db.session.query(
func.round(func.avg(Reading.systolic), 1).label('sys_avg'),
func.round(func.avg(Reading.diastolic), 1).label('dia_avg'),
func.round(func.avg(Reading.heart_rate), 1).label('hr_avg'),
).filter(
Reading.user_id == user_id,
Reading.timestamp >= one_week_ago
).first()
if result and result.sys_avg is not None:
return float(result.sys_avg), float(result.dia_avg), float(result.hr_avg)
return 0, 0, 0
def generate_monthly_calendar(readings_by_day, selected_date, local_tz):
"""Generate a monthly calendar view from pre-built readings_by_day."""
today = datetime.now(local_tz).date()
first_day = selected_date.replace(day=1)
start_date = first_day - timedelta(days=(first_day.weekday() + 1) % 7)
end_date = start_date + timedelta(days=41)
return [
{
'day': current_date.day,
'is_today': current_date == today,
'is_in_current_month': current_date.month == selected_date.month,
'readings': readings_by_day.get(current_date.date(), []),
}
for current_date in (start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1))
]
def generate_weekly_calendar(readings_by_day, selected_date, local_tz):
"""Generate a weekly calendar view from pre-built readings_by_day."""
today = datetime.now(local_tz).date()
start_of_week = selected_date - timedelta(days=selected_date.weekday())
return [
{
'date': current_date.strftime('%a, %b %d'),
'is_today': current_date == today,
'readings': readings_by_day.get(current_date.date(), []),
}
for current_date in (start_of_week + timedelta(days=i) for i in range(7))
]
def prepare_graph_data(readings):
"""Prepare data for graph rendering, reversing so chronological order is left-to-right."""
chronological_readings = list(reversed(readings))
n = len(chronological_readings)
time_percentages = []
systolic_vals = [r.systolic for r in chronological_readings]
diastolic_vals = [r.diastolic for r in chronological_readings]
hr_vals = [r.heart_rate for r in chronological_readings]
sys_avg = round(sum(systolic_vals) / n, 1) if n > 0 else 0
dia_avg = round(sum(diastolic_vals) / n, 1) if n > 0 else 0
hr_avg = round(sum(hr_vals) / n, 1) if n > 0 else 0
if n == 0:
pass
elif n == 1:
time_percentages = [0.5]
else:
first_time = chronological_readings[0].timestamp.timestamp()
last_time = chronological_readings[-1].timestamp.timestamp()
time_span = last_time - first_time
for r in chronological_readings:
if time_span == 0:
time_percentages.append(0.5)
else:
p = (r.timestamp.timestamp() - first_time) / time_span
time_percentages.append(p)
return {
'timestamps': [r.timestamp.strftime('%b %d\n%H:%M') for r in chronological_readings],
'systolic': systolic_vals,
'diastolic': diastolic_vals,
'heart_rate': hr_vals,
'time_percentages': time_percentages,
'sys_avg': sys_avg,
'dia_avg': dia_avg,
'hr_avg': hr_avg,
}
def calculate_progress_badges(user_id, user_tz):
"""Generate badges based on user activity and milestones using optimized queries."""
total_readings = Reading.query.filter_by(user_id=user_id).count()
if total_readings == 0:
return []
# Fetch only timestamps (highly optimized compared to fetching full objects)
timestamps = db.session.query(Reading.timestamp).filter(Reading.user_id == user_id).order_by(Reading.timestamp.desc()).all()
return _compute_badges(total_readings, timestamps, user_tz)
def _compute_badges(total_readings, timestamps, user_tz, now_local=None):
if now_local is None:
now_local = datetime.now(user_tz).date()
badges = []
if total_readings == 0:
return badges
streak_count = 0
if timestamps:
distinct_dates = []
last_date = None
for (ts,) in timestamps:
local_date = utc.localize(ts).astimezone(user_tz).date()
if local_date != last_date:
distinct_dates.append(local_date)
last_date = local_date
if distinct_dates:
most_recent_date = distinct_dates[0]
if (now_local - most_recent_date).days <= 1:
streak_count = 1
current_check_date = most_recent_date
for d in distinct_dates[1:]:
if (current_check_date - d).days == 1:
streak_count += 1
current_check_date = d
else:
break
if streak_count >= 1:
badges.append(f"Current Streak: {streak_count} Days")
if streak_count >= 7:
badges.append("Logged Every Day for a Week")
if streak_count >= 30:
badges.append("Monthly Streak")
last_7_readings = timestamps[:7]
if len(last_7_readings) == 7:
if all(5 <= utc.localize(ts).astimezone(user_tz).hour < 12 for (ts,) in last_7_readings):
badges.append("Morning Riser: Logged Readings Every Morning for a Week")
if all(18 <= utc.localize(ts).astimezone(user_tz).hour <= 23 for (ts,) in last_7_readings):
badges.append("Night Owl: Logged Readings Every Night for a Week")
milestones = [10, 50, 100, 500, 1000, 5000, 10000]
highest_milestone = max((m for m in milestones if total_readings >= m), default=None)
if highest_milestone:
badges.append(f"{highest_milestone} Readings Logged")
return badges