diff --git a/app.py b/app.py index 17cd312..9805e20 100644 --- a/app.py +++ b/app.py @@ -1,10 +1,12 @@ import os import json import subprocess -from datetime import datetime -from flask import Flask, jsonify, render_template, request, session, redirect, url_for -from functools import wraps import re +import ipaddress +import urllib.request +from datetime import datetime +from functools import wraps, lru_cache +from flask import Flask, jsonify, render_template, request, session, redirect, url_for, abort app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY", "change-this-in-production-please") @@ -13,7 +15,9 @@ POLL_SECONDS = int(os.getenv("POLL_SECONDS", "10")) APP_DOMAIN = os.getenv("APP_DOMAIN", "peterstockings.com") DOCKER = os.getenv("DOCKER_BIN", "/usr/bin/docker") SHOW_INFRA = os.getenv("SHOW_INFRA", "1") == "1" -LOGS_PASSWORD = os.getenv("LOGS_PASSWORD", "dokkustatus123") # Change via environment variable +LOGS_PASSWORD = os.getenv("LOGS_PASSWORD", "dokkustatus123") +IP_WHITELIST = os.getenv("IP_WHITELIST", "") # Comma separated CIDRs +ALLOWED_COUNTRIES = os.getenv("ALLOWED_COUNTRIES", "AU") # Comma separated ISO codes _UNIT = { "b": 1, @@ -58,7 +62,6 @@ def docker_stats() -> dict: } return stats -import re def docker_info() -> dict: # docker info --format "{{json .}}" gives us structured host-level info @@ -460,6 +463,57 @@ def pct_str_to_float(p: str) -> float: def clamp(n: float, lo: float = 0.0, hi: float = 100.0) -> float: return max(lo, min(hi, n)) + +@lru_cache(max_size=1024) +def get_country_from_ip(ip): + """ + Fetch country code for an IP using a public API. + Cached to minimize external requests. + """ + try: + # Using ip-api.com (free for non-commercial, no key for low volume) + with urllib.request.urlopen(f"http://ip-api.com/json/{ip}?fields=status,countryCode", timeout=3) as response: + data = json.loads(response.read().decode()) + if data.get("status") == "success": + return data.get("countryCode") + except Exception as e: + print(f"GeoIP Error: {e}") + return None + +def get_client_ip(): + """ + Extract the real client IP, respecting Dokku/Nginx proxy headers. + """ + if request.headers.get('X-Forwarded-For'): + # Take the first IP in the list (the actual client) + return request.headers.get('X-Forwarded-For').split(',')[0].strip() + return request.remote_addr + +def is_ip_allowed(ip): + """ + Verifies if the IP is in the whitelist or allowed country. + """ + # 1. Check Specific Whitelist (CIDR) + if IP_WHITELIST: + try: + client_addr = ipaddress.ip_address(ip) + for entry in IP_WHITELIST.split(','): + entry = entry.strip() + if not entry: continue + if client_addr in ipaddress.ip_network(entry, strict=False): + return True + except ValueError: + pass + + # 2. Check GeoIP (Default: Australia) + if ALLOWED_COUNTRIES: + country = get_country_from_ip(ip) + if country in [c.strip() for c in ALLOWED_COUNTRIES.split(',')]: + return True + + # Default to deny if no rules matched + return False + @app.get("/") def index(): return render_template("index.html", poll_seconds=POLL_SECONDS) @@ -477,8 +531,14 @@ def api_status(): def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): - if not session.get('logged_in'): - return redirect(url_for('login')) + # 1. IP Check (GeoIP/Whitelist) + client_ip = get_client_ip() + if not is_ip_allowed(client_ip): + abort(403, description=f"Access denied from {client_ip}. Restricted to {ALLOWED_COUNTRIES}.") + + # 2. Session Check (Password) + if not session.get("logged_in"): + return redirect(url_for("login")) return f(*args, **kwargs) return decorated_function