Add optional IP/Country whitelist for admin, configurable by env variables

This commit is contained in:
Peter Stockings
2025-12-24 11:36:49 +11:00
parent e9a29b7300
commit 3748f042e6

74
app.py
View File

@@ -1,10 +1,12 @@
import os import os
import json import json
import subprocess import subprocess
from datetime import datetime
from flask import Flask, jsonify, render_template, request, session, redirect, url_for
from functools import wraps
import re import re
import ipaddress
import urllib.request
from datetime import datetime
from functools import wraps, lru_cache
from flask import Flask, jsonify, render_template, request, session, redirect, url_for, abort
app = Flask(__name__) app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY", "change-this-in-production-please") app.secret_key = os.getenv("SECRET_KEY", "change-this-in-production-please")
@@ -13,7 +15,9 @@ POLL_SECONDS = int(os.getenv("POLL_SECONDS", "10"))
APP_DOMAIN = os.getenv("APP_DOMAIN", "peterstockings.com") APP_DOMAIN = os.getenv("APP_DOMAIN", "peterstockings.com")
DOCKER = os.getenv("DOCKER_BIN", "/usr/bin/docker") DOCKER = os.getenv("DOCKER_BIN", "/usr/bin/docker")
SHOW_INFRA = os.getenv("SHOW_INFRA", "1") == "1" SHOW_INFRA = os.getenv("SHOW_INFRA", "1") == "1"
LOGS_PASSWORD = os.getenv("LOGS_PASSWORD", "dokkustatus123") # Change via environment variable LOGS_PASSWORD = os.getenv("LOGS_PASSWORD", "dokkustatus123")
IP_WHITELIST = os.getenv("IP_WHITELIST", "") # Comma separated CIDRs
ALLOWED_COUNTRIES = os.getenv("ALLOWED_COUNTRIES", "AU") # Comma separated ISO codes
_UNIT = { _UNIT = {
"b": 1, "b": 1,
@@ -58,7 +62,6 @@ def docker_stats() -> dict:
} }
return stats return stats
import re
def docker_info() -> dict: def docker_info() -> dict:
# docker info --format "{{json .}}" gives us structured host-level info # docker info --format "{{json .}}" gives us structured host-level info
@@ -460,6 +463,57 @@ def pct_str_to_float(p: str) -> float:
def clamp(n: float, lo: float = 0.0, hi: float = 100.0) -> float: def clamp(n: float, lo: float = 0.0, hi: float = 100.0) -> float:
return max(lo, min(hi, n)) return max(lo, min(hi, n))
@lru_cache(max_size=1024)
def get_country_from_ip(ip):
"""
Fetch country code for an IP using a public API.
Cached to minimize external requests.
"""
try:
# Using ip-api.com (free for non-commercial, no key for low volume)
with urllib.request.urlopen(f"http://ip-api.com/json/{ip}?fields=status,countryCode", timeout=3) as response:
data = json.loads(response.read().decode())
if data.get("status") == "success":
return data.get("countryCode")
except Exception as e:
print(f"GeoIP Error: {e}")
return None
def get_client_ip():
"""
Extract the real client IP, respecting Dokku/Nginx proxy headers.
"""
if request.headers.get('X-Forwarded-For'):
# Take the first IP in the list (the actual client)
return request.headers.get('X-Forwarded-For').split(',')[0].strip()
return request.remote_addr
def is_ip_allowed(ip):
"""
Verifies if the IP is in the whitelist or allowed country.
"""
# 1. Check Specific Whitelist (CIDR)
if IP_WHITELIST:
try:
client_addr = ipaddress.ip_address(ip)
for entry in IP_WHITELIST.split(','):
entry = entry.strip()
if not entry: continue
if client_addr in ipaddress.ip_network(entry, strict=False):
return True
except ValueError:
pass
# 2. Check GeoIP (Default: Australia)
if ALLOWED_COUNTRIES:
country = get_country_from_ip(ip)
if country in [c.strip() for c in ALLOWED_COUNTRIES.split(',')]:
return True
# Default to deny if no rules matched
return False
@app.get("/") @app.get("/")
def index(): def index():
return render_template("index.html", poll_seconds=POLL_SECONDS) return render_template("index.html", poll_seconds=POLL_SECONDS)
@@ -477,8 +531,14 @@ def api_status():
def login_required(f): def login_required(f):
@wraps(f) @wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if not session.get('logged_in'): # 1. IP Check (GeoIP/Whitelist)
return redirect(url_for('login')) client_ip = get_client_ip()
if not is_ip_allowed(client_ip):
abort(403, description=f"Access denied from {client_ip}. Restricted to {ALLOWED_COUNTRIES}.")
# 2. Session Check (Password)
if not session.get("logged_in"):
return redirect(url_for("login"))
return f(*args, **kwargs) return f(*args, **kwargs)
return decorated_function return decorated_function