Files
WeightTracker/app/utils.py
Peter Stockings 56168a182b Refactor codebase
2026-02-24 21:23:14 +11:00

168 lines
5.7 KiB
Python

"""
Shared business-logic helpers.
Keep route handlers thin — calculation logic lives here.
"""
from app.db import query, execute
from app.config import SYDNEY_TZ
from datetime import datetime, timedelta
# ---------------------------------------------------------------------------
# Milestones — single source of truth for keys, thresholds, and labels
# ---------------------------------------------------------------------------
MILESTONES = [
# (key, check_fn_or_description, emoji + display label)
("first_checkin", "count >= 1", "✅ First Check-in"),
("5_checkins", "count >= 5", "🔥 5 Check-ins"),
("10_checkins", "count >= 10", "💪 10 Check-ins"),
("25_checkins", "count >= 25", "🎯 25 Check-ins"),
("lost_1kg", "lost >= 1", "⭐ 1kg Lost"),
("lost_2kg", "lost >= 2", "⭐ 2kg Lost"),
("lost_5kg", "lost >= 5", "🌟 5kg Lost"),
("lost_10kg", "lost >= 10", "💎 10kg Lost"),
("lost_15kg", "lost >= 15", "👑 15kg Lost"),
("lost_20kg", "lost >= 20", "🏆 20kg Lost"),
]
# Quick lookup: milestone_key → display label (used in templates)
MILESTONE_LABELS = {key: label for key, _, label in MILESTONES}
# ---------------------------------------------------------------------------
# Weight calculations
# ---------------------------------------------------------------------------
def calculate_bmi(weight_kg, height_cm):
"""Calculate BMI from weight (kg) and height (cm)."""
if not weight_kg or not height_cm or float(height_cm) == 0:
return None
h_m = float(height_cm) / 100.0
return round(float(weight_kg) / (h_m * h_m), 1)
def calculate_weight_change(start_w, current_w):
"""Return (kg_lost, pct_lost) from a start and current weight.
kg_lost is positive when weight decreased.
pct_lost is the percentage of start weight that was lost.
"""
start_w = float(start_w or 0)
current_w = float(current_w or start_w)
if start_w > 0:
kg_lost = round(start_w - current_w, 1)
pct_lost = round((kg_lost / start_w) * 100, 1)
else:
kg_lost = 0.0
pct_lost = 0.0
return kg_lost, pct_lost
# ---------------------------------------------------------------------------
# Streaks
# ---------------------------------------------------------------------------
def calculate_streak(user_id):
"""Calculate current and best consecutive-day check-in streaks."""
rows = query(
"""SELECT DISTINCT (checked_in_at AT TIME ZONE 'UTC' AT TIME ZONE 'Australia/Sydney')::date AS d
FROM checkins WHERE user_id = %s ORDER BY d DESC""",
(user_id,),
)
if not rows:
return {"current": 0, "best": 0}
days = [r["d"] for r in rows]
today = datetime.now(SYDNEY_TZ).date()
# Current streak: must include today or yesterday to count
current = 0
expected = today
if days[0] == today or days[0] == today - timedelta(days=1):
expected = days[0]
for d in days:
if d == expected:
current += 1
expected -= timedelta(days=1)
else:
break
# Best streak
best = 1
run = 1
for i in range(1, len(days)):
if days[i] == days[i - 1] - timedelta(days=1):
run += 1
best = max(best, run)
else:
run = 1
best = max(best, current)
return {"current": current, "best": best}
# ---------------------------------------------------------------------------
# Milestone checker
# ---------------------------------------------------------------------------
def check_milestones(user_id, user):
"""Check and award any new milestones after a check-in."""
checkins = query(
"SELECT weight_kg, checked_in_at FROM checkins WHERE user_id = %s ORDER BY checked_in_at ASC",
(user_id,),
)
if not checkins:
return
starting = float(user.get("starting_weight_kg") or checkins[0]["weight_kg"])
current = float(checkins[-1]["weight_kg"])
total_lost = starting - current
count = len(checkins)
milestone_checks = [
("first_checkin", count >= 1),
("5_checkins", count >= 5),
("10_checkins", count >= 10),
("25_checkins", count >= 25),
("lost_1kg", total_lost >= 1),
("lost_2kg", total_lost >= 2),
("lost_5kg", total_lost >= 5),
("lost_10kg", total_lost >= 10),
("lost_15kg", total_lost >= 15),
("lost_20kg", total_lost >= 20),
]
for key, achieved in milestone_checks:
if achieved:
try:
execute(
"INSERT INTO milestones (user_id, milestone_key) VALUES (%s, %s) ON CONFLICT DO NOTHING",
(user_id, key),
)
except Exception:
pass
# ---------------------------------------------------------------------------
# Form helpers
# ---------------------------------------------------------------------------
def parse_profile_fields(form):
"""Extract the common profile fields from a request form.
Returns a dict suitable for both signup and profile-update flows.
"""
return {
"display_name": form.get("display_name", "").strip(),
"height_cm": form.get("height_cm") or None,
"age": form.get("age") or None,
"gender": form.get("gender") or None,
"goal_weight_kg": form.get("goal_weight_kg") or None,
"starting_weight_kg": form.get("starting_weight_kg") or None,
"is_private": form.get("is_private") == "on",
}