Files
WeightTracker/app/utils.py

201 lines
6.7 KiB
Python

"""
Shared business-logic helpers.
Keep route handlers thin — calculation logic lives here.
"""
from app.db import query, execute_many
from app.config import SYDNEY_TZ
from datetime import datetime, timedelta
# ---------------------------------------------------------------------------
# Milestones — single source of truth for keys, thresholds, and labels
# ---------------------------------------------------------------------------
MILESTONES = [
# (key, check_fn_or_description, emoji + display label)
("first_checkin", "count >= 1", "✅ First Check-in"),
("5_checkins", "count >= 5", "🔥 5 Check-ins"),
("10_checkins", "count >= 10", "💪 10 Check-ins"),
("25_checkins", "count >= 25", "🎯 25 Check-ins"),
("lost_1kg", "lost >= 1", "⭐ 1kg Lost"),
("lost_2kg", "lost >= 2", "⭐ 2kg Lost"),
("lost_5kg", "lost >= 5", "🌟 5kg Lost"),
("lost_10kg", "lost >= 10", "💎 10kg Lost"),
("lost_15kg", "lost >= 15", "👑 15kg Lost"),
("lost_20kg", "lost >= 20", "🏆 20kg Lost"),
]
# Quick lookup: milestone_key → display label (used in templates)
MILESTONE_LABELS = {key: label for key, _, label in MILESTONES}
# ---------------------------------------------------------------------------
# Weight calculations
# ---------------------------------------------------------------------------
def calculate_bmi(weight_kg, height_cm):
"""Calculate BMI from weight (kg) and height (cm)."""
if not weight_kg or not height_cm or float(height_cm) == 0:
return None
h_m = float(height_cm) / 100.0
return round(float(weight_kg) / (h_m * h_m), 1)
def calculate_weight_change(start_w, current_w):
"""Return (kg_lost, pct_lost) from a start and current weight.
kg_lost is positive when weight decreased.
pct_lost is the percentage of start weight that was lost.
"""
start_w = float(start_w or 0)
current_w = float(current_w or start_w)
if start_w > 0:
kg_lost = round(start_w - current_w, 1)
pct_lost = round((kg_lost / start_w) * 100, 1)
else:
kg_lost = 0.0
pct_lost = 0.0
return kg_lost, pct_lost
# ---------------------------------------------------------------------------
# Streaks
# ---------------------------------------------------------------------------
def _compute_streak_from_dates(days, today):
"""Compute current and best streak from a sorted-desc list of dates."""
if not days:
return {"current": 0, "best": 0}
# Current streak: must include today or yesterday to count
current = 0
expected = today
if days[0] == today or days[0] == today - timedelta(days=1):
expected = days[0]
for d in days:
if d == expected:
current += 1
expected -= timedelta(days=1)
else:
break
# Best streak
best = 1
run = 1
for i in range(1, len(days)):
if days[i] == days[i - 1] - timedelta(days=1):
run += 1
best = max(best, run)
else:
run = 1
best = max(best, current)
return {"current": current, "best": best}
def calculate_streak(user_id):
"""Calculate current and best consecutive-day check-in streaks."""
rows = query(
"""SELECT DISTINCT (checked_in_at AT TIME ZONE 'UTC' AT TIME ZONE 'Australia/Sydney')::date AS d
FROM checkins WHERE user_id = %s ORDER BY d DESC""",
(user_id,),
)
days = [r["d"] for r in rows]
today = datetime.now(SYDNEY_TZ).date()
return _compute_streak_from_dates(days, today)
def calculate_streaks_bulk(user_ids):
"""Calculate streaks for multiple users in a single query.
Returns a dict: {user_id: {"current": int, "best": int}}.
"""
if not user_ids:
return {}
placeholders = ",".join(["%s"] * len(user_ids))
rows = query(
f"""SELECT user_id,
(checked_in_at AT TIME ZONE 'UTC' AT TIME ZONE 'Australia/Sydney')::date AS d
FROM checkins
WHERE user_id IN ({placeholders})
GROUP BY user_id, d
ORDER BY user_id, d DESC""",
tuple(user_ids),
)
# Group by user
from collections import defaultdict
user_days = defaultdict(list)
for r in rows:
user_days[r["user_id"]].append(r["d"])
today = datetime.now(SYDNEY_TZ).date()
result = {}
for uid in user_ids:
result[uid] = _compute_streak_from_dates(user_days.get(uid, []), today)
return result
# ---------------------------------------------------------------------------
# Milestone checker
# ---------------------------------------------------------------------------
def check_milestones(user_id, user):
"""Check and award any new milestones after a check-in."""
checkins = query(
"SELECT weight_kg, checked_in_at FROM checkins WHERE user_id = %s ORDER BY checked_in_at ASC",
(user_id,),
)
if not checkins:
return
starting = float(user.get("starting_weight_kg") or checkins[0]["weight_kg"])
current = float(checkins[-1]["weight_kg"])
total_lost = starting - current
count = len(checkins)
milestone_checks = [
("first_checkin", count >= 1),
("5_checkins", count >= 5),
("10_checkins", count >= 10),
("25_checkins", count >= 25),
("lost_1kg", total_lost >= 1),
("lost_2kg", total_lost >= 2),
("lost_5kg", total_lost >= 5),
("lost_10kg", total_lost >= 10),
("lost_15kg", total_lost >= 15),
("lost_20kg", total_lost >= 20),
]
achieved = [(user_id, key) for key, ok in milestone_checks if ok]
if achieved:
execute_many(
"INSERT INTO milestones (user_id, milestone_key) VALUES (%s, %s) ON CONFLICT DO NOTHING",
achieved,
)
# ---------------------------------------------------------------------------
# Form helpers
# ---------------------------------------------------------------------------
def parse_profile_fields(form):
"""Extract the common profile fields from a request form.
Returns a dict suitable for both signup and profile-update flows.
"""
return {
"display_name": form.get("display_name", "").strip(),
"height_cm": form.get("height_cm") or None,
"age": form.get("age") or None,
"gender": form.get("gender") or None,
"goal_weight_kg": form.get("goal_weight_kg") or None,
"starting_weight_kg": form.get("starting_weight_kg") or None,
"is_private": form.get("is_private") == "on",
}