import csv from io import StringIO import io from flask import Blueprint, Response, make_response, render_template, redirect, request, send_file, url_for, flash from sqlalchemy import func from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.http import http_date from app.models import Profile, Reading, db, User from app.forms import DeleteForm, LoginForm, ProfileForm, ReadingForm, SignupForm from flask_login import login_user, login_required, current_user, logout_user import base64 from datetime import datetime, timedelta from PIL import Image main = Blueprint('main', __name__) auth = Blueprint('auth', __name__) user = Blueprint('user', __name__) @auth.route('/signup', methods=['GET', 'POST']) def signup(): form = SignupForm() if form.validate_on_submit(): hashed_password = generate_password_hash(form.password.data) new_user = User(username=form.username.data, password_hash=hashed_password) db.session.add(new_user) db.session.commit() flash("Account created successfully. Please log in.", "success") return redirect(url_for('auth.login')) return render_template('signup.html', form=form) @auth.route('/login', methods=['GET', 'POST']) def login(): form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.username.data).first() if user and check_password_hash(user.password_hash, form.password.data): login_user(user) flash("Logged in successfully.", "success") return redirect(url_for('main.dashboard')) flash("Invalid username or password.", "danger") return render_template('login.html', form=form) @auth.route('/logout') @login_required def logout(): logout_user() # Logs out the current user flash('You have been logged out.', 'success') return redirect(url_for('auth.login')) # Redirect to login page or home page @main.route('/', methods=['GET']) def landing(): if current_user.is_authenticated: return redirect(url_for('main.dashboard')) return render_template('landing.html') @main.route('/dashboard', methods=['GET', 'POST']) @login_required def dashboard(): # Initialize start_date and end_date start_date = None end_date = None # Retrieve the first and last timestamps in a single query first_last_readings = ( db.session.query( func.min(Reading.timestamp).label('first'), func.max(Reading.timestamp).label('last') ) .filter(Reading.user_id == current_user.id) .first() ) # Extract the first and last timestamps first_reading_timestamp = first_last_readings.first last_reading_timestamp = first_last_readings.last # Default to first and last reading dates if not provided if not start_date and first_reading_timestamp: start_date = first_reading_timestamp.strftime('%Y-%m-%d') if not end_date and last_reading_timestamp: end_date = last_reading_timestamp.strftime('%Y-%m-%d') # Default to all readings readings_query = Reading.query.filter_by(user_id=current_user.id) # Handle filtering if it's a POST request if request.method == 'POST': start_date = request.form.get('start_date') end_date = request.form.get('end_date') if start_date and end_date: start_date_obj = datetime.strptime(start_date, '%Y-%m-%d') end_date_obj = datetime.strptime(end_date, '%Y-%m-%d') readings_query = readings_query.filter( Reading.timestamp >= start_date_obj, Reading.timestamp <= end_date_obj ) # Format start_date and end_date for the template start_date = start_date_obj.strftime('%Y-%m-%d') end_date = end_date_obj.strftime('%Y-%m-%d') # Fetch and order readings readings = readings_query.order_by(Reading.timestamp.desc()).all() # Weekly summary (last 7 days) one_week_ago = datetime.now() - timedelta(days=7) weekly_readings = [r for r in readings if r.timestamp >= one_week_ago] systolic_avg = round(sum(r.systolic for r in weekly_readings) / len(weekly_readings), 1) if weekly_readings else 0 diastolic_avg = round(sum(r.diastolic for r in weekly_readings) / len(weekly_readings), 1) if weekly_readings else 0 heart_rate_avg = round(sum(r.heart_rate for r in weekly_readings) / len(weekly_readings), 1) if weekly_readings else 0 # Progress badges badges = [] if len(readings) >= 10: badges.append("10 Readings Logged") if len(readings) >= 100: badges.append("100 Readings Milestone") if len(weekly_readings) >= 7: badges.append("Logged Readings for 7 Days") # Prepare data for the graphs timestamps = [reading.timestamp.strftime('%b %d') for reading in readings] systolic = [reading.systolic for reading in readings] diastolic = [reading.diastolic for reading in readings] heart_rate = [reading.heart_rate for reading in readings] # Pass the delete form to the template delete_form = DeleteForm() return render_template( 'dashboard.html', readings=readings, profile=current_user.profile, badges=badges, systolic_avg=systolic_avg, diastolic_avg=diastolic_avg, heart_rate_avg=heart_rate_avg, delete_form=delete_form, timestamps=timestamps, systolic=systolic, diastolic=diastolic, heart_rate=heart_rate, start_date=start_date, end_date=end_date ) @main.route('/add-reading', methods=['GET', 'POST']) @login_required def add_reading(): form = ReadingForm() if form.validate_on_submit(): new_reading = Reading( user_id=current_user.id, timestamp=form.timestamp.data, systolic=form.systolic.data, diastolic=form.diastolic.data, heart_rate=form.heart_rate.data ) db.session.add(new_reading) db.session.commit() flash("Reading added successfully.", "success") return redirect(url_for('main.dashboard')) return render_template('add_reading.html', form=form) @main.route('/reading//edit', methods=['GET', 'POST']) @login_required def edit_reading(reading_id): reading = Reading.query.get_or_404(reading_id) # Ensure the reading belongs to the logged-in user if reading.user_id != current_user.id: flash('You are not authorized to edit this reading.', 'danger') return redirect(url_for('main.dashboard')) form = ReadingForm(obj=reading) # Populate form with existing reading data if form.validate_on_submit(): reading.timestamp = form.timestamp.data reading.systolic = form.systolic.data reading.diastolic = form.diastolic.data reading.heart_rate = form.heart_rate.data db.session.commit() flash('Reading updated successfully!', 'success') return redirect(url_for('main.dashboard')) return render_template('edit_reading.html', form=form, reading=reading) @main.route('/confirm_delete/', methods=['GET', 'POST']) @login_required def confirm_delete(reading_id): # Fetch the reading to confirm deletion reading = Reading.query.filter_by(id=reading_id, user_id=current_user.id).first_or_404() if request.method == 'POST': # Handle deletion db.session.delete(reading) db.session.commit() flash('Reading deleted successfully!', 'success') return redirect(url_for('main.dashboard')) return render_template('confirm_delete.html', reading=reading) @main.route('/reading//delete', methods=['POST']) @login_required def delete_reading(reading_id): reading = Reading.query.get_or_404(reading_id) # Ensure the reading belongs to the logged-in user if reading.user_id != current_user.id: flash('You are not authorized to delete this reading.', 'danger') return redirect(url_for('main.dashboard')) db.session.delete(reading) db.session.commit() flash('Reading deleted successfully!', 'success') return redirect(url_for('main.dashboard')) @user.route('/profile', methods=['GET', 'POST']) @login_required def profile(): profile = current_user.profile or Profile(user_id=current_user.id) form = ProfileForm(obj=profile) if form.validate_on_submit(): # Update profile fields profile.name = form.name.data profile.email = form.email.data profile.systolic_threshold = form.systolic_threshold.data or profile.systolic_threshold profile.diastolic_threshold = form.diastolic_threshold.data or profile.diastolic_threshold profile.dark_mode = form.dark_mode.data # Handle profile picture upload if form.profile_pic.data: file_data = form.profile_pic.data.read() # Resize and compress the image try: image = Image.open(io.BytesIO(file_data)) image = image.convert("RGB") # Ensure it's in RGB format image.thumbnail((300, 300)) # Resize to a maximum of 300x300 pixels # Save the resized image to a buffer buffer = io.BytesIO() image.save(buffer, format="JPEG", quality=80) # Compress with quality=80 buffer.seek(0) # Encode the compressed image as base64 profile.profile_pic = base64.b64encode(buffer.read()).decode('utf-8') except Exception as e: flash(f"Error processing profile picture: {e}", 'danger') db.session.add(profile) db.session.commit() flash('Profile updated successfully!', 'success') return redirect(url_for('user.profile')) return render_template('profile.html', form=form, profile=profile) @user.route('/profile/image/') def profile_image(user_id): # Ensure the reading belongs to the logged-in user if user_id != current_user.id: flash('You are not authorized to delete this reading.', 'danger') return redirect(url_for('main.dashboard')) profile = Profile.query.filter_by(user_id=user_id).first() if profile and profile.profile_pic: image_data = base64.b64decode(profile.profile_pic) response = make_response(image_data) response.headers.set('Content-Type', 'image/jpeg') response.headers.set('Cache-Control', 'public, max-age=86400') # Cache for 1 day response.headers.set('ETag', str(hash(profile.profile_pic))) # Unique ETag for the image response.headers.set('Last-Modified', http_date(datetime.utcnow().timestamp())) return response else: # Serve the default SVG if no profile picture is found with open('app/static/images/default-profile.svg', 'r') as f: default_image = f.read() response = make_response(default_image) response.headers.set('Content-Type', 'image/svg+xml') @main.route('/data', methods=['GET', 'POST']) @login_required def manage_data(): if request.method == 'POST': # Handle CSV file upload file = request.files.get('file') if file and file.filename.endswith('.csv'): try: csv_data = csv.reader(StringIO(file.read().decode('utf-8'))) next(csv_data) # Skip the header row for row in csv_data: timestamp, systolic, diastolic, heart_rate = row reading = Reading( user_id=current_user.id, timestamp=datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S'), systolic=int(systolic), diastolic=int(diastolic), heart_rate=int(heart_rate), ) db.session.add(reading) db.session.commit() flash('Data imported successfully!', 'success') except Exception as e: flash(f'Error importing data: {str(e)}', 'danger') else: flash('Please upload a valid CSV file.', 'danger') return redirect(url_for('main.manage_data')) return render_template('data.html') @main.route('/data/export', methods=['GET']) @login_required def export_data(): import io output = io.StringIO() writer = csv.writer(output) # Write CSV header writer.writerow(['Timestamp', 'Systolic', 'Diastolic', 'Heart Rate']) # Write user readings to the CSV readings = Reading.query.filter_by(user_id=current_user.id).all() for reading in readings: writer.writerow([ reading.timestamp.strftime('%Y-%m-%d %H:%M:%S'), reading.systolic, reading.diastolic, reading.heart_rate, ]) # Convert text to bytes for `send_file` output.seek(0) response = io.BytesIO(output.getvalue().encode('utf-8')) output.close() return send_file( response, mimetype='text/csv', as_attachment=True, download_name='readings.csv' ) @main.route('/health') def health(): return "OK", 200