163 lines
6.5 KiB
Python
163 lines
6.5 KiB
Python
from functools import wraps
|
|
from flask import render_template, url_for, request
|
|
from flask_login import current_user
|
|
|
|
|
|
def get_params(*args):
|
|
"""Helper to get parameters from kwargs, form, or args."""
|
|
res = []
|
|
for arg in args:
|
|
val = request.view_args.get(arg)
|
|
if val is None:
|
|
val = request.form.get(arg, type=int)
|
|
if val is None:
|
|
val = request.args.get(arg, type=int)
|
|
res.append(val)
|
|
return res[0] if len(res) == 1 else tuple(res)
|
|
|
|
|
|
def get_person_id_from_context():
|
|
"""Helper to find person_id from URL/form context."""
|
|
person_id, workout_id, topset_id = get_params('person_id', 'workout_id', 'topset_id')
|
|
|
|
from app import db
|
|
if person_id is not None:
|
|
return person_id
|
|
|
|
if workout_id is not None:
|
|
workout_info = db.execute("SELECT person_id FROM workout WHERE workout_id = %s", [workout_id], one=True)
|
|
if workout_info:
|
|
return workout_info['person_id']
|
|
|
|
if topset_id is not None:
|
|
topset_info = db.execute("SELECT workout_id FROM topset WHERE topset_id = %s", [topset_id], one=True)
|
|
if topset_info:
|
|
w_id = topset_info['workout_id']
|
|
workout_info = db.execute("SELECT person_id FROM workout WHERE workout_id = %s", [w_id], one=True)
|
|
if workout_info:
|
|
return workout_info['person_id']
|
|
|
|
return None
|
|
|
|
|
|
def validate_person(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
person_id = get_params('person_id')
|
|
from app import db
|
|
person = db.is_valid_person(person_id)
|
|
if person is None:
|
|
return render_template('error.html', error='404', message=f'Unable to find Person({person_id})', url='/')
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def validate_workout(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
person_id, workout_id = get_params('person_id', 'workout_id')
|
|
from app import db
|
|
if person_id is None and workout_id is not None:
|
|
person_id = get_person_id_from_context()
|
|
|
|
workout = db.is_valid_workout(person_id, workout_id)
|
|
if workout is None:
|
|
return render_template('error.html', error='404', message=f'Unable to find Workout({workout_id}) completed by Person({person_id})', url=url_for('person_overview', person_id=person_id) if person_id else '/')
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def validate_topset(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
person_id, workout_id, topset_id = get_params('person_id', 'workout_id', 'topset_id')
|
|
from app import db
|
|
if (person_id is None or workout_id is None) and topset_id is not None:
|
|
person_id = get_person_id_from_context()
|
|
# We could also find workout_id, but is_valid_topset handles it if we have at least topset_id
|
|
|
|
topset = db.is_valid_topset(person_id, workout_id, topset_id)
|
|
if topset is None:
|
|
fallback_url = url_for('person_overview', person_id=person_id) if person_id else '/'
|
|
return render_template('error.html', error='404', message=f'Unable to find TopSet({topset_id})', url=fallback_url)
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
ACTION_MAP = {
|
|
'workout.create_workout': 'create a workout',
|
|
'workout.delete_workout': 'delete this workout',
|
|
'workout.update_workout_start_date': 'change the date for this workout',
|
|
'workout.create_topset': 'add a set',
|
|
'workout.update_topset': 'update this set',
|
|
'workout.delete_topset': 'delete this set',
|
|
'delete_person': 'delete this person',
|
|
'update_person_name': 'update this person\'s name',
|
|
'tags.add_tag': 'add a tag',
|
|
'tags.delete_tag': 'delete this tag',
|
|
'tags.add_tag_to_workout': 'add a tag to this workout',
|
|
'tags.create_new_tag_for_workout': 'create a new tag for this workout',
|
|
'workout.create_program': 'create a workout program',
|
|
'programs.delete_program': 'delete this workout program',
|
|
'delete_exercise': 'delete an exercise',
|
|
'delete_person': 'delete a user',
|
|
}
|
|
|
|
|
|
def admin_required(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
if not current_user.is_authenticated or not getattr(current_user, 'is_admin', False):
|
|
from flask import flash
|
|
msg = "You must be an admin to perform this action."
|
|
if request.endpoint in ACTION_MAP:
|
|
msg = f"You must be an admin to {ACTION_MAP[request.endpoint]}."
|
|
|
|
flash(msg, "warning")
|
|
if request.headers.get('HX-Request'):
|
|
return '', 200, {'HX-Redirect': url_for('dashboard')}
|
|
return render_template('error.html', error='403', message=msg, url='/')
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def get_auth_message(endpoint, person_id=None, is_authenticated=False):
|
|
"""Generates a friendly authorization message."""
|
|
action = ACTION_MAP.get(endpoint)
|
|
if not action:
|
|
# Fallback: prettify endpoint name if not in map
|
|
# e.g. 'workout.create_topset' -> 'create topset'
|
|
action = endpoint.split('.')[-1].replace('_', ' ')
|
|
|
|
if is_authenticated:
|
|
msg = f"You are not authorized to {action}"
|
|
else:
|
|
msg = f"Please log in to {action}"
|
|
|
|
if person_id:
|
|
from app import db
|
|
person_name = db.get_person_name(person_id)
|
|
if person_name:
|
|
msg += f" for {person_name}"
|
|
return msg
|
|
|
|
|
|
def require_ownership(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
person_id = get_person_id_from_context()
|
|
|
|
# Authorization check: must be logged in and (the owner or an admin)
|
|
is_admin = getattr(current_user, 'is_admin', False)
|
|
if not current_user.is_authenticated or (person_id is not None and int(current_user.get_id()) != person_id and not is_admin):
|
|
from flask import flash
|
|
msg = get_auth_message(request.endpoint, person_id, is_authenticated=current_user.is_authenticated)
|
|
flash(msg, "info")
|
|
|
|
if request.headers.get('HX-Request'):
|
|
return '', 200, {'HX-Redirect': url_for('auth.login') if not current_user.is_authenticated else url_for('dashboard')}
|
|
return render_template('error.html', error='403', message='You are not authorized to modify this resource.', url='/')
|
|
|
|
return func(*args, **kwargs)
|
|
return wrapper
|