446 lines
17 KiB
Python
446 lines
17 KiB
Python
from collections import defaultdict
|
|
import csv
|
|
from io import StringIO
|
|
import io
|
|
from flask import Blueprint, Response, make_response, render_template, redirect, request, send_file, url_for, flash
|
|
import humanize
|
|
from pytz import timezone, utc
|
|
from sqlalchemy import func
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from werkzeug.http import http_date
|
|
from app.models import Profile, Reading, db, User
|
|
from app.forms import DeleteForm, LoginForm, ProfileForm, ReadingForm, SignupForm
|
|
from flask_login import login_user, login_required, current_user, logout_user
|
|
import base64
|
|
from datetime import date, datetime, timedelta
|
|
from PIL import Image
|
|
|
|
main = Blueprint('main', __name__)
|
|
auth = Blueprint('auth', __name__)
|
|
user = Blueprint('user', __name__)
|
|
|
|
|
|
@auth.route('/signup', methods=['GET', 'POST'])
|
|
def signup():
|
|
form = SignupForm()
|
|
if form.validate_on_submit():
|
|
hashed_password = generate_password_hash(form.password.data)
|
|
new_user = User(username=form.username.data, password_hash=hashed_password)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
flash("Account created successfully. Please log in.", "success")
|
|
return redirect(url_for('auth.login'))
|
|
return render_template('signup.html', form=form)
|
|
|
|
@auth.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
form = LoginForm()
|
|
if form.validate_on_submit():
|
|
user = User.query.filter_by(username=form.username.data).first()
|
|
if user and check_password_hash(user.password_hash, form.password.data):
|
|
login_user(user)
|
|
flash("Logged in successfully.", "success")
|
|
return redirect(url_for('main.dashboard'))
|
|
flash("Invalid username or password.", "danger")
|
|
return render_template('login.html', form=form)
|
|
|
|
@auth.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
logout_user() # Logs out the current user
|
|
flash('You have been logged out.', 'success')
|
|
return redirect(url_for('auth.login')) # Redirect to login page or home page
|
|
|
|
@main.route('/', methods=['GET'])
|
|
def landing():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('main.dashboard'))
|
|
return render_template('landing.html')
|
|
|
|
@main.route('/dashboard', methods=['GET', 'POST'])
|
|
@login_required
|
|
def dashboard():
|
|
# Helper function to get first and last reading timestamps
|
|
def get_reading_date_range(user_id):
|
|
result = (
|
|
db.session.query(
|
|
func.min(Reading.timestamp).label('first'),
|
|
func.max(Reading.timestamp).label('last')
|
|
)
|
|
.filter(Reading.user_id == user_id)
|
|
.first()
|
|
)
|
|
return result.first, result.last
|
|
|
|
# Helper function to calculate weekly summary averages
|
|
def calculate_weekly_summary(readings):
|
|
one_week_ago = datetime.now() - timedelta(days=7)
|
|
weekly_readings = [r for r in readings if r.timestamp >= one_week_ago]
|
|
if weekly_readings:
|
|
systolic_avg = round(sum(r.systolic for r in weekly_readings) / len(weekly_readings), 1)
|
|
diastolic_avg = round(sum(r.diastolic for r in weekly_readings) / len(weekly_readings), 1)
|
|
heart_rate_avg = round(sum(r.heart_rate for r in weekly_readings) / len(weekly_readings), 1)
|
|
else:
|
|
systolic_avg = diastolic_avg = heart_rate_avg = 0
|
|
return systolic_avg, diastolic_avg, heart_rate_avg
|
|
|
|
# Helper function to calculate progress badges
|
|
def calculate_progress_badges(readings):
|
|
"""Calculate badges based on user reading activity."""
|
|
badges = []
|
|
now = datetime.utcnow().date()
|
|
|
|
# Prepare sorted readings by timestamp
|
|
sorted_readings = sorted(readings, key=lambda r: r.timestamp)
|
|
|
|
# Incremental milestone badge
|
|
def highest_milestone(total_readings, milestones):
|
|
"""Determine the highest milestone achieved."""
|
|
for milestone in reversed(milestones):
|
|
if total_readings >= milestone:
|
|
return f"{milestone} Readings Logged"
|
|
return None
|
|
|
|
highest_milestone_badge = highest_milestone(len(readings), [10, 50, 100, 500, 1000, 5000, 10000])
|
|
if highest_milestone_badge:
|
|
badges.append(highest_milestone_badge)
|
|
|
|
# Streaks and calendar month badges
|
|
if sorted_readings:
|
|
streak_count = 1
|
|
daily_streak = True
|
|
monthly_tracker = defaultdict(int)
|
|
|
|
# Start with the first reading
|
|
previous_date = sorted_readings[0].timestamp.date()
|
|
|
|
for reading in sorted_readings[1:]:
|
|
current_date = reading.timestamp.date()
|
|
|
|
# Check for consecutive daily streaks
|
|
if (current_date - previous_date).days == 1:
|
|
streak_count += 1
|
|
elif (current_date - previous_date).days > 1:
|
|
daily_streak = False
|
|
|
|
# Track monthly activity
|
|
monthly_tracker[current_date.strftime('%Y-%m')] += 1
|
|
|
|
previous_date = current_date
|
|
|
|
# Add streak badges
|
|
if daily_streak and streak_count >= 1:
|
|
badges.append(f"Current Streak: {streak_count} Days")
|
|
if daily_streak and streak_count >= 7:
|
|
badges.append("Logged Every Day for a Week")
|
|
|
|
if daily_streak and streak_count >= 30 and previous_date == now:
|
|
badges.append("Monthly Streak")
|
|
|
|
# Add calendar month streak badges
|
|
for month, count in monthly_tracker.items():
|
|
if count >= 30:
|
|
badges.append(f"Full Month of Logging: {month}")
|
|
|
|
# Time-specific badges (morning/night logging)
|
|
def is_morning(reading_time):
|
|
return 5 <= reading_time.hour < 12
|
|
|
|
def is_night(reading_time):
|
|
return 18 <= reading_time.hour <= 23
|
|
|
|
if all(is_morning(r.timestamp) for r in sorted_readings[-7:]):
|
|
badges.append("Morning Riser: Logged Readings Every Morning for a Week")
|
|
|
|
if all(is_night(r.timestamp) for r in sorted_readings[-7:]):
|
|
badges.append("Night Owl: Logged Readings Every Night for a Week")
|
|
|
|
return badges
|
|
|
|
# Get the first and last reading timestamps
|
|
first_reading_timestamp, last_reading_timestamp = get_reading_date_range(current_user.id)
|
|
|
|
# Set default start and end dates
|
|
start_date = first_reading_timestamp.strftime('%Y-%m-%d') if first_reading_timestamp else None
|
|
end_date = last_reading_timestamp.strftime('%Y-%m-%d') if last_reading_timestamp else None
|
|
|
|
# Handle filtering for POST request
|
|
readings_query = Reading.query.filter_by(user_id=current_user.id)
|
|
if request.method == 'POST':
|
|
start_date = request.form.get('start_date') or start_date
|
|
end_date = request.form.get('end_date') or end_date
|
|
if start_date and end_date:
|
|
readings_query = readings_query.filter(
|
|
Reading.timestamp >= datetime.strptime(start_date, '%Y-%m-%d'),
|
|
Reading.timestamp <= datetime.strptime(end_date, '%Y-%m-%d')
|
|
)
|
|
|
|
# Fetch readings
|
|
readings = readings_query.order_by(Reading.timestamp.desc()).all()
|
|
|
|
# Fetch the user's timezone (default to 'UTC' if none is set)
|
|
user_timezone = current_user.profile.timezone if current_user.profile and current_user.profile.timezone else 'UTC'
|
|
local_tz = timezone(user_timezone)
|
|
|
|
# Add relative & local timestamps to readings
|
|
now = datetime.utcnow()
|
|
for reading in readings:
|
|
reading.relative_timestamp = humanize.naturaltime(now - reading.timestamp)
|
|
reading.local_timestamp = utc.localize(reading.timestamp).astimezone(local_tz)
|
|
|
|
# Calculate weekly summary and progress badges
|
|
systolic_avg, diastolic_avg, heart_rate_avg = calculate_weekly_summary(readings)
|
|
badges = calculate_progress_badges(readings)
|
|
|
|
# Prepare graph data
|
|
timestamps = [reading.timestamp.strftime('%b %d') for reading in readings]
|
|
systolic = [reading.systolic for reading in readings]
|
|
diastolic = [reading.diastolic for reading in readings]
|
|
heart_rate = [reading.heart_rate for reading in readings]
|
|
|
|
# Group readings by date
|
|
readings_by_date = {}
|
|
for reading in readings:
|
|
date_key = reading.timestamp.date()
|
|
if date_key not in readings_by_date:
|
|
readings_by_date[date_key] = []
|
|
readings_by_date[date_key].append(reading)
|
|
|
|
# Render template
|
|
return render_template(
|
|
'dashboard.html',
|
|
readings=readings,
|
|
profile=current_user.profile,
|
|
badges=badges,
|
|
systolic_avg=systolic_avg,
|
|
diastolic_avg=diastolic_avg,
|
|
heart_rate_avg=heart_rate_avg,
|
|
delete_form=DeleteForm(),
|
|
timestamps=timestamps,
|
|
systolic=systolic,
|
|
diastolic=diastolic,
|
|
heart_rate=heart_rate,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
readings_by_date=readings_by_date,
|
|
date=date,
|
|
timedelta=timedelta
|
|
)
|
|
|
|
@main.route('/add-reading', methods=['GET', 'POST'])
|
|
@login_required
|
|
def add_reading():
|
|
form = ReadingForm()
|
|
if form.validate_on_submit():
|
|
new_reading = Reading(
|
|
user_id=current_user.id,
|
|
timestamp=form.timestamp.data,
|
|
systolic=form.systolic.data,
|
|
diastolic=form.diastolic.data,
|
|
heart_rate=form.heart_rate.data
|
|
)
|
|
db.session.add(new_reading)
|
|
db.session.commit()
|
|
flash("Reading added successfully.", "success")
|
|
return redirect(url_for('main.dashboard'))
|
|
return render_template('add_reading.html', form=form)
|
|
|
|
@main.route('/reading/<int:reading_id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit_reading(reading_id):
|
|
reading = Reading.query.get_or_404(reading_id)
|
|
|
|
# Ensure the reading belongs to the logged-in user
|
|
if reading.user_id != current_user.id:
|
|
flash('You are not authorized to edit this reading.', 'danger')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
# Fetch the user's timezone (default to 'UTC' if none is set)
|
|
user_timezone = current_user.profile.timezone if current_user.profile and current_user.profile.timezone else 'UTC'
|
|
local_tz = timezone(user_timezone)
|
|
|
|
reading.local_timestamp = utc.localize(reading.timestamp).astimezone(local_tz)
|
|
|
|
form = ReadingForm(obj=reading) # Populate form with existing reading data
|
|
form.timestamp.data = reading.local_timestamp
|
|
if form.validate_on_submit():
|
|
# Convert the local timestamp back to UTC for saving
|
|
local_timestamp = form.timestamp.data
|
|
reading.timestamp = user_timezone.localize(local_timestamp).astimezone(utc)
|
|
|
|
reading.systolic = form.systolic.data
|
|
reading.diastolic = form.diastolic.data
|
|
reading.heart_rate = form.heart_rate.data
|
|
db.session.commit()
|
|
flash('Reading updated successfully!', 'success')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
return render_template('edit_reading.html', form=form, reading=reading)
|
|
|
|
@main.route('/confirm_delete/<int:reading_id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def confirm_delete(reading_id):
|
|
# Fetch the reading to confirm deletion
|
|
reading = Reading.query.filter_by(id=reading_id, user_id=current_user.id).first_or_404()
|
|
|
|
if request.method == 'POST':
|
|
# Handle deletion
|
|
db.session.delete(reading)
|
|
db.session.commit()
|
|
flash('Reading deleted successfully!', 'success')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
return render_template('confirm_delete.html', reading=reading)
|
|
|
|
|
|
@main.route('/reading/<int:reading_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def delete_reading(reading_id):
|
|
reading = Reading.query.get_or_404(reading_id)
|
|
|
|
# Ensure the reading belongs to the logged-in user
|
|
if reading.user_id != current_user.id:
|
|
flash('You are not authorized to delete this reading.', 'danger')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
db.session.delete(reading)
|
|
db.session.commit()
|
|
flash('Reading deleted successfully!', 'success')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
|
|
@user.route('/profile', methods=['GET', 'POST'])
|
|
@login_required
|
|
def profile():
|
|
profile = current_user.profile or Profile(user_id=current_user.id)
|
|
form = ProfileForm(obj=profile)
|
|
|
|
if form.validate_on_submit():
|
|
# Update profile fields
|
|
profile.name = form.name.data
|
|
profile.email = form.email.data
|
|
profile.systolic_threshold = form.systolic_threshold.data or profile.systolic_threshold
|
|
profile.diastolic_threshold = form.diastolic_threshold.data or profile.diastolic_threshold
|
|
profile.dark_mode = form.dark_mode.data
|
|
profile.timezone = form.timezone.data
|
|
|
|
# Handle profile picture upload
|
|
if form.profile_pic.data:
|
|
file_data = form.profile_pic.data.read()
|
|
|
|
# Resize and compress the image
|
|
try:
|
|
image = Image.open(io.BytesIO(file_data))
|
|
image = image.convert("RGB") # Ensure it's in RGB format
|
|
image.thumbnail((300, 300)) # Resize to a maximum of 300x300 pixels
|
|
|
|
# Save the resized image to a buffer
|
|
buffer = io.BytesIO()
|
|
image.save(buffer, format="JPEG", quality=80) # Compress with quality=80
|
|
buffer.seek(0)
|
|
|
|
# Encode the compressed image as base64
|
|
profile.profile_pic = base64.b64encode(buffer.read()).decode('utf-8')
|
|
except Exception as e:
|
|
flash(f"Error processing profile picture: {e}", 'danger')
|
|
|
|
|
|
db.session.add(profile)
|
|
db.session.commit()
|
|
flash('Profile updated successfully!', 'success')
|
|
return redirect(url_for('user.profile'))
|
|
|
|
return render_template('profile.html', form=form, profile=profile)
|
|
|
|
@user.route('/profile/image/<int:user_id>')
|
|
def profile_image(user_id):
|
|
# Ensure the reading belongs to the logged-in user
|
|
if user_id != current_user.id:
|
|
flash('You are not authorized to delete this reading.', 'danger')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
profile = Profile.query.filter_by(user_id=user_id).first()
|
|
if profile and profile.profile_pic:
|
|
image_data = base64.b64decode(profile.profile_pic)
|
|
response = make_response(image_data)
|
|
response.headers.set('Content-Type', 'image/jpeg')
|
|
response.headers.set('Cache-Control', 'public, max-age=86400') # Cache for 1 day
|
|
response.headers.set('ETag', str(hash(profile.profile_pic))) # Unique ETag for the image
|
|
response.headers.set('Last-Modified', http_date(datetime.utcnow().timestamp()))
|
|
|
|
return response
|
|
else:
|
|
# Serve the default SVG if no profile picture is found
|
|
with open('app/static/images/default-profile.svg', 'r') as f:
|
|
default_image = f.read()
|
|
|
|
response = make_response(default_image)
|
|
response.headers.set('Content-Type', 'image/svg+xml')
|
|
|
|
@main.route('/data', methods=['GET', 'POST'])
|
|
@login_required
|
|
def manage_data():
|
|
if request.method == 'POST':
|
|
# Handle CSV file upload
|
|
file = request.files.get('file')
|
|
if file and file.filename.endswith('.csv'):
|
|
try:
|
|
csv_data = csv.reader(StringIO(file.read().decode('utf-8')))
|
|
next(csv_data) # Skip the header row
|
|
for row in csv_data:
|
|
timestamp, systolic, diastolic, heart_rate = row
|
|
reading = Reading(
|
|
user_id=current_user.id,
|
|
timestamp=datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S'),
|
|
systolic=int(systolic),
|
|
diastolic=int(diastolic),
|
|
heart_rate=int(heart_rate),
|
|
)
|
|
db.session.add(reading)
|
|
db.session.commit()
|
|
flash('Data imported successfully!', 'success')
|
|
except Exception as e:
|
|
flash(f'Error importing data: {str(e)}', 'danger')
|
|
else:
|
|
flash('Please upload a valid CSV file.', 'danger')
|
|
return redirect(url_for('main.manage_data'))
|
|
|
|
return render_template('data.html')
|
|
|
|
@main.route('/data/export', methods=['GET'])
|
|
@login_required
|
|
def export_data():
|
|
import io
|
|
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
|
|
# Write CSV header
|
|
writer.writerow(['Timestamp', 'Systolic', 'Diastolic', 'Heart Rate'])
|
|
|
|
# Write user readings to the CSV
|
|
readings = Reading.query.filter_by(user_id=current_user.id).all()
|
|
for reading in readings:
|
|
writer.writerow([
|
|
reading.timestamp.strftime('%Y-%m-%d %H:%M:%S'),
|
|
reading.systolic,
|
|
reading.diastolic,
|
|
reading.heart_rate,
|
|
])
|
|
|
|
# Convert text to bytes for `send_file`
|
|
output.seek(0)
|
|
response = io.BytesIO(output.getvalue().encode('utf-8'))
|
|
output.close()
|
|
|
|
return send_file(
|
|
response,
|
|
mimetype='text/csv',
|
|
as_attachment=True,
|
|
download_name='readings.csv'
|
|
)
|
|
|
|
@main.route('/health')
|
|
def health():
|
|
return "OK", 200
|