from collections import defaultdict import csv from io import StringIO import io from flask import Blueprint, Response, make_response, render_template, redirect, request, send_file, url_for, flash import humanize from pytz import timezone, utc from sqlalchemy import func from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.http import http_date from app.models import Profile, Reading, db, User from app.forms import DeleteForm, LoginForm, ProfileForm, ReadingForm, SignupForm from flask_login import login_user, login_required, current_user, logout_user import base64 from datetime import date, datetime, timedelta from PIL import Image main = Blueprint('main', __name__) auth = Blueprint('auth', __name__) user = Blueprint('user', __name__) @auth.route('/signup', methods=['GET', 'POST']) def signup(): form = SignupForm() if form.validate_on_submit(): hashed_password = generate_password_hash(form.password.data) new_user = User(username=form.username.data, password_hash=hashed_password) db.session.add(new_user) db.session.commit() flash("Account created successfully. Please log in.", "success") return redirect(url_for('auth.login')) return render_template('signup.html', form=form) @auth.route('/login', methods=['GET', 'POST']) def login(): form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.username.data).first() if user and check_password_hash(user.password_hash, form.password.data): login_user(user) flash("Logged in successfully.", "success") return redirect(url_for('main.dashboard')) flash("Invalid username or password.", "danger") return render_template('login.html', form=form) @auth.route('/logout') @login_required def logout(): logout_user() # Logs out the current user flash('You have been logged out.', 'success') return redirect(url_for('auth.login')) # Redirect to login page or home page @main.route('/', methods=['GET']) def landing(): if current_user.is_authenticated: return redirect(url_for('main.dashboard')) return render_template('landing.html') @main.route('/dashboard', methods=['GET', 'POST']) @login_required def dashboard(): # Helper function to get first and last reading timestamps def get_reading_date_range(user_id): result = ( db.session.query( func.min(Reading.timestamp).label('first'), func.max(Reading.timestamp).label('last') ) .filter(Reading.user_id == user_id) .first() ) return result.first, result.last # Helper function to calculate weekly summary averages def calculate_weekly_summary(readings): one_week_ago = datetime.now() - timedelta(days=7) weekly_readings = [r for r in readings if r.timestamp >= one_week_ago] if weekly_readings: systolic_avg = round(sum(r.systolic for r in weekly_readings) / len(weekly_readings), 1) diastolic_avg = round(sum(r.diastolic for r in weekly_readings) / len(weekly_readings), 1) heart_rate_avg = round(sum(r.heart_rate for r in weekly_readings) / len(weekly_readings), 1) else: systolic_avg = diastolic_avg = heart_rate_avg = 0 return systolic_avg, diastolic_avg, heart_rate_avg # Helper function to calculate progress badges def calculate_progress_badges(readings): """Calculate badges based on user reading activity.""" badges = [] now = datetime.utcnow().date() # Prepare sorted readings by timestamp sorted_readings = sorted(readings, key=lambda r: r.timestamp) # Incremental milestone badge def highest_milestone(total_readings, milestones): """Determine the highest milestone achieved.""" for milestone in reversed(milestones): if total_readings >= milestone: return f"{milestone} Readings Logged" return None highest_milestone_badge = highest_milestone(len(readings), [10, 50, 100, 500, 1000, 5000, 10000]) if highest_milestone_badge: badges.append(highest_milestone_badge) # Streaks and calendar month badges if sorted_readings: streak_count = 1 daily_streak = True monthly_tracker = defaultdict(int) # Start with the first reading previous_date = sorted_readings[0].timestamp.date() for reading in sorted_readings[1:]: current_date = reading.timestamp.date() # Check for consecutive daily streaks if (current_date - previous_date).days == 1: streak_count += 1 elif (current_date - previous_date).days > 1: daily_streak = False # Track monthly activity monthly_tracker[current_date.strftime('%Y-%m')] += 1 previous_date = current_date # Add streak badges if daily_streak and streak_count >= 1: badges.append(f"Current Streak: {streak_count} Days") if daily_streak and streak_count >= 7: badges.append("Logged Every Day for a Week") if daily_streak and streak_count >= 30 and previous_date == now: badges.append("Monthly Streak") # Add calendar month streak badges for month, count in monthly_tracker.items(): if count >= 30: badges.append(f"Full Month of Logging: {month}") # Time-specific badges (morning/night logging) def is_morning(reading_time): return 5 <= reading_time.hour < 12 def is_night(reading_time): return 18 <= reading_time.hour <= 23 if all(is_morning(r.timestamp) for r in sorted_readings[-7:]): badges.append("Morning Riser: Logged Readings Every Morning for a Week") if all(is_night(r.timestamp) for r in sorted_readings[-7:]): badges.append("Night Owl: Logged Readings Every Night for a Week") return badges def generate_monthly_calendar(readings, selected_date, local_timezone): # Convert selected date to user's timezone and extract the start/end dates today = datetime.now(local_timezone).date() date = selected_date.astimezone(local_timezone).date() first_day_of_month = date.replace(day=1) days_to_subtract = (first_day_of_month.weekday() + 1) % 7 start_date = first_day_of_month - timedelta(days=days_to_subtract) end_date = start_date + timedelta(days=6 * 7 - 1) # Group readings by day readings_by_day = {} for reading in readings: local_date = reading.timestamp.astimezone(local_timezone).date() readings_by_day.setdefault(local_date, []).append(reading) # Build calendar days calendar = [] current_date = start_date while current_date <= end_date: calendar.append({ 'day': current_date.day, 'is_today': current_date == today, 'is_in_current_month': current_date.month == date.month, 'readings': readings_by_day.get(current_date, []), }) current_date += timedelta(days=1) return calendar # Get the first and last reading timestamps first_reading_timestamp, last_reading_timestamp = get_reading_date_range(current_user.id) # Set default start and end dates start_date = first_reading_timestamp.strftime('%Y-%m-%d') if first_reading_timestamp else None end_date = last_reading_timestamp.strftime('%Y-%m-%d') if last_reading_timestamp else None # Handle filtering for POST request readings_query = Reading.query.filter_by(user_id=current_user.id) if request.method == 'POST': start_date = request.form.get('start_date') or start_date end_date = request.form.get('end_date') or end_date if start_date and end_date: readings_query = readings_query.filter( Reading.timestamp >= datetime.strptime(start_date, '%Y-%m-%d'), Reading.timestamp <= datetime.strptime(end_date, '%Y-%m-%d') ) # Fetch readings readings = readings_query.order_by(Reading.timestamp.desc()).all() # Fetch the user's timezone (default to 'UTC' if none is set) user_timezone = current_user.profile.timezone if current_user.profile and current_user.profile.timezone else 'UTC' local_tz = timezone(user_timezone) # Add relative & local timestamps to readings now = datetime.utcnow() for reading in readings: reading.relative_timestamp = humanize.naturaltime(now - reading.timestamp) reading.local_timestamp = utc.localize(reading.timestamp).astimezone(local_tz) month_view = generate_monthly_calendar(readings, now, local_tz) # Calculate weekly summary and progress badges systolic_avg, diastolic_avg, heart_rate_avg = calculate_weekly_summary(readings) badges = calculate_progress_badges(readings) # Prepare graph data timestamps = [reading.timestamp.strftime('%b %d') for reading in readings] systolic = [reading.systolic for reading in readings] diastolic = [reading.diastolic for reading in readings] heart_rate = [reading.heart_rate for reading in readings] # Group readings by date readings_by_date = {} for reading in readings: date_key = reading.timestamp.date() if date_key not in readings_by_date: readings_by_date[date_key] = [] readings_by_date[date_key].append(reading) # Render template return render_template( 'dashboard.html', readings=readings, profile=current_user.profile, badges=badges, systolic_avg=systolic_avg, diastolic_avg=diastolic_avg, heart_rate_avg=heart_rate_avg, delete_form=DeleteForm(), timestamps=timestamps, systolic=systolic, diastolic=diastolic, heart_rate=heart_rate, start_date=start_date, end_date=end_date, readings_by_date=readings_by_date, month = month_view, date=date, timedelta=timedelta ) @main.route('/add-reading', methods=['GET', 'POST']) @login_required def add_reading(): form = ReadingForm() if form.validate_on_submit(): new_reading = Reading( user_id=current_user.id, timestamp=form.timestamp.data, systolic=form.systolic.data, diastolic=form.diastolic.data, heart_rate=form.heart_rate.data ) db.session.add(new_reading) db.session.commit() flash("Reading added successfully.", "success") return redirect(url_for('main.dashboard')) # Fetch the user's timezone (default to 'UTC' if none is set) user_timezone = current_user.profile.timezone if current_user.profile and current_user.profile.timezone else 'UTC' local_tz = timezone(user_timezone) form.timestamp.data = utc.localize(datetime.utcnow()).astimezone(local_tz) return render_template('add_reading.html', form=form) @main.route('/reading//edit', methods=['GET', 'POST']) @login_required def edit_reading(reading_id): reading = Reading.query.get_or_404(reading_id) # Ensure the reading belongs to the logged-in user if reading.user_id != current_user.id: flash('You are not authorized to edit this reading.', 'danger') return redirect(url_for('main.dashboard')) # Fetch the user's timezone (default to 'UTC' if none is set) user_timezone = current_user.profile.timezone if current_user.profile and current_user.profile.timezone else 'UTC' local_tz = timezone(user_timezone) reading.local_timestamp = utc.localize(reading.timestamp).astimezone(local_tz) form = ReadingForm(obj=reading) # Populate form with existing reading data form.timestamp.data = reading.local_timestamp if form.validate_on_submit(): # Convert the local timestamp back to UTC for saving local_timestamp = form.timestamp.data # Ensure the local timestamp is naive before localizing if local_timestamp.tzinfo is not None: local_timestamp = local_timestamp.replace(tzinfo=None) reading.timestamp = local_tz.localize(local_timestamp).astimezone(utc) reading.systolic = form.systolic.data reading.diastolic = form.diastolic.data reading.heart_rate = form.heart_rate.data db.session.commit() flash('Reading updated successfully!', 'success') return redirect(url_for('main.dashboard')) return render_template('edit_reading.html', form=form, reading=reading) @main.route('/confirm_delete/', methods=['GET', 'POST']) @login_required def confirm_delete(reading_id): # Fetch the reading to confirm deletion reading = Reading.query.filter_by(id=reading_id, user_id=current_user.id).first_or_404() if request.method == 'POST': # Handle deletion db.session.delete(reading) db.session.commit() flash('Reading deleted successfully!', 'success') return redirect(url_for('main.dashboard')) return render_template('confirm_delete.html', reading=reading) @main.route('/reading//delete', methods=['POST']) @login_required def delete_reading(reading_id): reading = Reading.query.get_or_404(reading_id) # Ensure the reading belongs to the logged-in user if reading.user_id != current_user.id: flash('You are not authorized to delete this reading.', 'danger') return redirect(url_for('main.dashboard')) db.session.delete(reading) db.session.commit() flash('Reading deleted successfully!', 'success') return redirect(url_for('main.dashboard')) @user.route('/profile', methods=['GET', 'POST']) @login_required def profile(): profile = current_user.profile or Profile(user_id=current_user.id) form = ProfileForm(obj=profile) if form.validate_on_submit(): # Update profile fields profile.name = form.name.data profile.email = form.email.data profile.systolic_threshold = form.systolic_threshold.data or profile.systolic_threshold profile.diastolic_threshold = form.diastolic_threshold.data or profile.diastolic_threshold profile.dark_mode = form.dark_mode.data profile.timezone = form.timezone.data # Handle profile picture upload if form.profile_pic.data: file_data = form.profile_pic.data.read() # Resize and compress the image try: image = Image.open(io.BytesIO(file_data)) image = image.convert("RGB") # Ensure it's in RGB format image.thumbnail((300, 300)) # Resize to a maximum of 300x300 pixels # Save the resized image to a buffer buffer = io.BytesIO() image.save(buffer, format="JPEG", quality=80) # Compress with quality=80 buffer.seek(0) # Encode the compressed image as base64 profile.profile_pic = base64.b64encode(buffer.read()).decode('utf-8') except Exception as e: flash(f"Error processing profile picture: {e}", 'danger') db.session.add(profile) db.session.commit() flash('Profile updated successfully!', 'success') return redirect(url_for('user.profile')) return render_template('profile.html', form=form, profile=profile) @user.route('/profile/image/') def profile_image(user_id): # Ensure the reading belongs to the logged-in user if user_id != current_user.id: flash('You are not authorized to delete this reading.', 'danger') return redirect(url_for('main.dashboard')) profile = Profile.query.filter_by(user_id=user_id).first() if profile and profile.profile_pic: image_data = base64.b64decode(profile.profile_pic) response = make_response(image_data) response.headers.set('Content-Type', 'image/jpeg') response.headers.set('Cache-Control', 'public, max-age=86400') # Cache for 1 day response.headers.set('ETag', str(hash(profile.profile_pic))) # Unique ETag for the image response.headers.set('Last-Modified', http_date(datetime.utcnow().timestamp())) return response else: # Serve the default SVG if no profile picture is found with open('app/static/images/default-profile.svg', 'r') as f: default_image = f.read() response = make_response(default_image) response.headers.set('Content-Type', 'image/svg+xml') @main.route('/data', methods=['GET', 'POST']) @login_required def manage_data(): if request.method == 'POST': # Handle CSV file upload file = request.files.get('file') if file and file.filename.endswith('.csv'): try: csv_data = csv.reader(StringIO(file.read().decode('utf-8'))) next(csv_data) # Skip the header row for row in csv_data: timestamp, systolic, diastolic, heart_rate = row reading = Reading( user_id=current_user.id, timestamp=datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S'), systolic=int(systolic), diastolic=int(diastolic), heart_rate=int(heart_rate), ) db.session.add(reading) db.session.commit() flash('Data imported successfully!', 'success') except Exception as e: flash(f'Error importing data: {str(e)}', 'danger') else: flash('Please upload a valid CSV file.', 'danger') return redirect(url_for('main.manage_data')) return render_template('data.html') @main.route('/data/export', methods=['GET']) @login_required def export_data(): import io output = io.StringIO() writer = csv.writer(output) # Write CSV header writer.writerow(['Timestamp', 'Systolic', 'Diastolic', 'Heart Rate']) # Write user readings to the CSV readings = Reading.query.filter_by(user_id=current_user.id).all() for reading in readings: writer.writerow([ reading.timestamp.strftime('%Y-%m-%d %H:%M:%S'), reading.systolic, reading.diastolic, reading.heart_rate, ]) # Convert text to bytes for `send_file` output.seek(0) response = io.BytesIO(output.getvalue().encode('utf-8')) output.close() return send_file( response, mimetype='text/csv', as_attachment=True, download_name='readings.csv' ) @main.route('/health') def health(): return "OK", 200