import os import json import subprocess import re import ipaddress import urllib.request from datetime import datetime from functools import wraps, lru_cache from flask import Flask, jsonify, render_template, request, session, redirect, url_for, abort app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY", "change-this-in-production-please") POLL_SECONDS = int(os.getenv("POLL_SECONDS", "10")) APP_DOMAIN = os.getenv("APP_DOMAIN", "peterstockings.com") DOCKER = os.getenv("DOCKER_BIN", "/usr/bin/docker") SHOW_INFRA = os.getenv("SHOW_INFRA", "1") == "1" LOGS_PASSWORD = os.getenv("LOGS_PASSWORD", "dokkustatus123") IP_WHITELIST = os.getenv("IP_WHITELIST", "") # Comma separated CIDRs ALLOWED_COUNTRIES = os.getenv("ALLOWED_COUNTRIES", "AU") # Comma separated ISO codes _UNIT = { "b": 1, "kb": 1000, "mb": 1000**2, "gb": 1000**3, "tb": 1000**4, "kib": 1024, "mib": 1024**2, "gib": 1024**3, "tib": 1024**4, } # Optional JSON map: {"gitea":"https://gitea.peterstockings.com", "bloodpressure":"https://bp.peterstockings.com"} APP_URL_OVERRIDES = {} try: APP_URL_OVERRIDES = json.loads(os.getenv("APP_URL_OVERRIDES", "{}")) except Exception: APP_URL_OVERRIDES = {} def sh(cmd: list[str]) -> str: return subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True).strip() def docker_ps_all() -> list[dict]: # Name + Image + Status + Ports fmt = "{{.Names}}\t{{.Image}}\t{{.Status}}\t{{.Ports}}" out = sh([DOCKER, "ps", "--format", fmt]) rows = [] for line in out.splitlines(): name, image, status, ports = line.split("\t") rows.append({"name": name, "image": image, "status": status, "ports": ports}) return rows def docker_stats() -> dict: # Name + CPU + MemUsage + MemPerc fmt = "{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}" out = sh([DOCKER, "stats", "--no-stream", "--format", fmt]) stats = {} for line in out.splitlines(): name, cpu, mem_usage, mem_pct = line.split("\t") # mem_usage like: "58.84MiB / 384MiB" mem_used, mem_limit = [s.strip() for s in mem_usage.split("/", 1)] stats[name] = { "cpu": cpu, "mem_used": mem_used, "mem_limit": mem_limit, "mem_pct": mem_pct, } return stats def docker_info() -> dict: # docker info --format "{{json .}}" gives us structured host-level info out = sh([DOCKER, "info", "--format", "{{json .}}"]) return json.loads(out) def docker_system_df() -> dict: # Parse `docker system df` (text). It's stable enough for a dashboard. out = sh([DOCKER, "system", "df"]) # Example lines: # Images 175 18 15.15GB 13.93GB (91%) # Containers 27 26 145.1MB 16.57kB (0%) # Local Volumes 47 1 817.7MB 817.7MB (100%) # Build Cache 889 0 423B 423B rows = {} for line in out.splitlines(): line = line.strip() if not line or line.startswith("TYPE"): continue parts = re.split(r"\s{2,}", line) if len(parts) >= 5: typ, total, active, size, reclaimable = parts[:5] rows[typ] = { "total": total, "active": active, "size": size, "reclaimable": reclaimable, } return rows def system_summary() -> dict: info = docker_info() df = docker_system_df() return { "name": info.get("Name", ""), "server_version": info.get("ServerVersion", ""), "operating_system": info.get("OperatingSystem", ""), "os_type": info.get("OSType", ""), "architecture": info.get("Architecture", ""), "kernel_version": info.get("KernelVersion", ""), "cpus": info.get("NCPU", ""), "mem_total": info.get("MemTotal", ""), # bytes "containers": info.get("Containers", ""), "containers_running": info.get("ContainersRunning", ""), "containers_stopped": info.get("ContainersStopped", ""), "images": info.get("Images", ""), "docker_root_dir": info.get("DockerRootDir", ""), "system_df": df, } def format_bytes(n: int) -> str: # for mem_total units = ["B", "KB", "MB", "GB", "TB"] f = float(n) for u in units: if f < 1024 or u == units[-1]: return f"{f:.1f}{u}" f /= 1024 return f"{n}B" def docker_inspect_restart_count(container_name: str) -> int: # RestartCount is useful when stuff is flapping / OOMing try: out = sh([DOCKER, "inspect", "-f", "{{.RestartCount}}", container_name]) return int(out.strip()) except Exception: return 0 def get_container_logs(container_name: str, lines: int = 50) -> list[dict]: """ Get last N lines of container logs with error detection. Returns list of dicts with 'text' and 'level' keys. """ try: out = sh([DOCKER, "logs", "--tail", str(lines), container_name]) log_lines = [] for line in out.splitlines(): # Strip ANSI color codes line_clean = re.sub(r'\x1b\[[0-9;]*m', '', line) # Detect log level line_lower = line_clean.lower() if any(x in line_lower for x in ['error', 'exception', 'fatal', 'critical']): level = 'error' elif any(x in line_lower for x in ['warn', 'warning']): level = 'warn' else: level = 'info' log_lines.append({ 'text': line_clean, 'level': level }) return log_lines except Exception: return [] def get_container_detail(container_name: str) -> dict: """ Get detailed container information using docker inspect. Returns parsed container metadata. """ try: out = sh([DOCKER, "inspect", container_name]) inspect_data = json.loads(out) if not inspect_data: return {} container = inspect_data[0] # Extract useful information config = container.get("Config", {}) state = container.get("State", {}) network_settings = container.get("NetworkSettings", {}) mounts = container.get("Mounts", []) return { "name": container.get("Name", "").lstrip("/"), "id": container.get("Id", "")[:12], "image": config.get("Image", ""), "created": container.get("Created", ""), "state": { "status": state.get("Status", ""), "running": state.get("Running", False), "paused": state.get("Paused", False), "restarting": state.get("Restarting", False), "started_at": state.get("StartedAt", ""), "finished_at": state.get("FinishedAt", ""), }, "env": config.get("Env", []), "cmd": config.get("Cmd", []), "entrypoint": config.get("Entrypoint", []), "working_dir": config.get("WorkingDir", ""), "exposed_ports": list(config.get("ExposedPorts", {}).keys()), "ports": network_settings.get("Ports", {}), "networks": list(network_settings.get("Networks", {}).keys()), "ip_address": network_settings.get("IPAddress", ""), "mounts": [ { "type": m.get("Type", ""), "source": m.get("Source", ""), "destination": m.get("Destination", ""), "mode": m.get("Mode", ""), "rw": m.get("RW", False), } for m in mounts ], "restart_policy": container.get("HostConfig", {}).get("RestartPolicy", {}), } except Exception as e: return {"error": str(e)} def run_sql_query(container_name: str, query: str) -> dict: """ Execute a SQL query inside a database container using docker exec. Supports Postgres and MySQL/MariaDB. Infers the database name from the Dokku container name. """ try: # Infer DB name: dokku.postgres.my-service -> my-service_db or my_service # Dokku typically replaces hyphens with underscores in database names. parts = container_name.split(".", 2) service_name = parts[2] if len(parts) >= 3 else None # Most common Dokku pattern: replace - with _ db_name = service_name.replace("-", "_") if service_name else None if "postgres" in container_name: # Postgres: use psql with CSV-like output db_arg = ["-d", db_name] if db_name else [] cmd = [DOCKER, "exec", container_name, "psql", "-U", "postgres"] + db_arg + ["-c", query, "-A", "-F", ","] elif "mysql" in container_name or "mariadb" in container_name: # MySQL: use mysql with tab-separated output db_arg = [db_name] if db_name else [] cmd = [DOCKER, "exec", container_name, "mysql", "-u", "root", "-e", query, "-B"] + db_arg else: return {"error": "Unsupported database type"} out = sh(cmd) # Parse output into rows and columns lines = out.splitlines() if not lines: return {"columns": [], "rows": [], "message": "Query executed successfully (no results)"} if "postgres" in container_name: import csv from io import StringIO reader = csv.reader(StringIO(out)) rows = list(reader) if not rows: return {"columns": [], "rows": []} # psql -A can include a footer like "(1 row)", filter that out if rows[-1] and rows[-1][0].startswith("(") and rows[-1][0].endswith(")"): rows = rows[:-1] columns = rows[0] data = rows[1:] else: # MySQL is tab-separated by default with -B rows = [line.split("\t") for line in lines] columns = rows[0] data = rows[1:] return { "columns": columns, "rows": data, "count": len(data) } except subprocess.CalledProcessError as e: return {"error": e.output or str(e)} except Exception as e: return {"error": str(e)} def is_app_web_container(name: str) -> bool: # Dokku apps typically have containers like ".web.1" return name.endswith(".web.1") and not name.startswith("dokku.") def infer_app_name(container_name: str) -> str: # ".web.1" -> "" return container_name.rsplit(".web.1", 1)[0] def infer_url(app_name: str) -> str: if app_name in APP_URL_OVERRIDES: return APP_URL_OVERRIDES[app_name] # default return f"https://{app_name}.{APP_DOMAIN}" def classify_infra(container_name: str) -> bool: return ( container_name.startswith("dokku.postgres.") or container_name.startswith("dokku.redis.") or container_name.startswith("dokku.mysql.") or container_name.startswith("dokku.mongodb.") or container_name == "dokku.minio.storage" or container_name == "logspout" ) def collect(): ps_rows = docker_ps_all() stats = docker_stats() apps = [] infra = [] for r in ps_rows: name = r["name"] s = stats.get(name, {}) row = { "container": name, "image": r["image"], "status": r["status"], "ports": r["ports"], "cpu": s.get("cpu", ""), "mem_used": s.get("mem_used", ""), "mem_limit": s.get("mem_limit", ""), "mem_pct": s.get("mem_pct", ""), "restarts": docker_inspect_restart_count(name), } if is_app_web_container(name): app_name = infer_app_name(name) row["app"] = app_name row["url"] = infer_url(app_name) apps.append(row) elif SHOW_INFRA and classify_infra(name): infra.append(row) # Sort stable apps.sort(key=lambda x: x["app"]) infra.sort(key=lambda x: x["container"]) # Simple top-line summary warnings = [] for a in apps: # mem_pct is like "15.32%" try: pct = float(a["mem_pct"].replace("%", "")) if a["mem_pct"] else 0.0 except Exception: pct = 0.0 if pct >= 85: warnings.append(f"{a['app']} RAM high ({a['mem_pct']})") if a["restarts"] >= 3: warnings.append(f"{a['app']} restarting (restarts={a['restarts']})") sysinfo = system_summary() # format mem bytes nicely try: mem_total_h = format_bytes(int(sysinfo["mem_total"])) except Exception: mem_total_h = "" sysinfo["mem_total_h"] = mem_total_h # --- Gauges (live-ish) --- total_cpu_pct = 0.0 total_mem_used_bytes = 0 for name, s in stats.items(): total_cpu_pct += pct_str_to_float(s.get("cpu", "0%")) total_mem_used_bytes += parse_human_bytes(s.get("mem_used", "0B")) sysinfo = system_summary() # Host total RAM (bytes) comes from docker info host_mem_total = int(sysinfo.get("mem_total") or 0) ram_pct = (total_mem_used_bytes / host_mem_total * 100.0) if host_mem_total else 0.0 sysinfo["ram_total_h"] = format_bytes(host_mem_total) # Docker disk: images "Size" and "Reclaimable" df_images = sysinfo.get("system_df", {}).get("Images", {}) images_size_bytes = parse_human_bytes(df_images.get("size", "0B")) # Reclaimable looks like "13.93GB (91%)" so grab the first token reclaimable_raw = (df_images.get("reclaimable") or "").split(" ", 1)[0] images_reclaimable_bytes = parse_human_bytes(reclaimable_raw) if reclaimable_raw else 0 images_used_bytes = max(0, images_size_bytes - images_reclaimable_bytes) disk_pct = (images_used_bytes / images_size_bytes * 100.0) if images_size_bytes else 0.0 gauges = { "cpu_total_pct": clamp(total_cpu_pct), # sum of container CPU%, can exceed 100 if multi-core; we clamp for display "ram_used_bytes": total_mem_used_bytes, "ram_total_bytes": host_mem_total, "ram_used_h": format_bytes(total_mem_used_bytes), "ram_total_h": format_bytes(host_mem_total), "ram_pct": clamp(ram_pct), "docker_images_size_bytes": images_size_bytes, "docker_images_used_bytes": images_used_bytes, "docker_images_pct": clamp(disk_pct), } return { "generated_at": datetime.utcnow().isoformat() + "Z", "poll_seconds": POLL_SECONDS, "domain": APP_DOMAIN, "system": sysinfo, "gauges": gauges, "apps": apps, "infra": infra, "warnings": warnings, } def collect_admin_data(): """ Collects logs and detailed container info for the admin dashboard. Also identifies database containers for the SQL interface. """ ps_rows = docker_ps_all() apps = [] databases = [] for r in ps_rows: name = r["name"] if is_app_web_container(name): app_name = infer_app_name(name) apps.append({ "app": app_name, "container": name, "logs": get_container_logs(name, lines=50), "detail": get_container_detail(name) }) elif classify_infra(name) and ("postgres" in name or "mysql" in name) and not name.endswith(".ambassador"): databases.append({ "name": name, "type": "postgres" if "postgres" in name else "mysql" }) # Sort apps.sort(key=lambda x: x["app"]) databases.sort(key=lambda x: x["name"]) return { "apps": apps, "databases": databases, } def parse_human_bytes(s: str) -> int: # Handles "58.84MiB", "145.1MB", "423B" s = s.strip() m = re.match(r"^([0-9]*\.?[0-9]+)\s*([A-Za-z]+)$", s) if not m: return 0 val = float(m.group(1)) unit = m.group(2).lower() return int(val * _UNIT.get(unit, 0)) def pct_str_to_float(p: str) -> float: try: return float(p.strip().replace("%", "")) except Exception: return 0.0 def clamp(n: float, lo: float = 0.0, hi: float = 100.0) -> float: return max(lo, min(hi, n)) @lru_cache(maxsize=1024) def get_country_from_ip(ip): """ Fetch country code for an IP using a public API. Cached to minimize external requests. """ try: # Using ip-api.com (free for non-commercial, no key for low volume) with urllib.request.urlopen(f"http://ip-api.com/json/{ip}?fields=status,countryCode", timeout=3) as response: data = json.loads(response.read().decode()) if data.get("status") == "success": return data.get("countryCode") except Exception as e: print(f"GeoIP Error: {e}") return None def get_client_ip(): """ Extract the real client IP, respecting Dokku/Nginx proxy headers. """ if request.headers.get('X-Forwarded-For'): # Take the first IP in the list (the actual client) return request.headers.get('X-Forwarded-For').split(',')[0].strip() return request.remote_addr def is_ip_allowed(ip): """ Verifies if the IP is in the whitelist or allowed country. """ # 1. Check Specific Whitelist (CIDR) if IP_WHITELIST: try: client_addr = ipaddress.ip_address(ip) for entry in IP_WHITELIST.split(','): entry = entry.strip() if not entry: continue if client_addr in ipaddress.ip_network(entry, strict=False): return True except ValueError: pass # 2. Check GeoIP (Default: Australia) if ALLOWED_COUNTRIES: country = get_country_from_ip(ip) if country in [c.strip() for c in ALLOWED_COUNTRIES.split(',')]: return True # Default to deny if no rules matched return False @app.get("/") def index(): return render_template("index.html", poll_seconds=POLL_SECONDS) @app.get("/partial/apps") def partial_apps(): data = collect() return render_template("apps_table.html", data=data) @app.get("/api/status") def api_status(): return jsonify(collect()) # Authentication decorator def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): # 1. IP Check (GeoIP/Whitelist) client_ip = get_client_ip() if not is_ip_allowed(client_ip): abort(403, description=f"Access denied from {client_ip}. Restricted to {ALLOWED_COUNTRIES}.") # 2. Session Check (Password) if not session.get("logged_in"): return redirect(url_for("login")) return f(*args, **kwargs) return decorated_function # Login routes @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": password = request.form.get("password", "") if password == LOGS_PASSWORD: session['logged_in'] = True return redirect(url_for('admin')) else: return render_template("login.html", error="Invalid password") return render_template("login.html", error=None) @app.get("/logout") def logout(): session.pop('logged_in', None) return redirect(url_for('index')) # Protected admin page (logs + container details) @app.get("/admin") @login_required def admin(): data = collect_admin_data() return render_template("admin.html", data=data, poll_seconds=POLL_SECONDS) # API endpoint for container details (used by admin panel) @app.get("/api/container/") @login_required def api_container_detail(container_name): detail = get_container_detail(container_name) return jsonify(detail) # API endpoint for SQL queries @app.post("/api/sql/query") @login_required def api_sql_query(): data = request.json container = data.get("container") query = data.get("query") if not container or not query: return jsonify({"error": "Missing container or query"}), 400 result = run_sql_query(container, query) return jsonify(result) # API endpoint for shell commands @app.post("/api/terminal/exec") @login_required def api_terminal_exec(): data = request.json command = data.get("command") if not command: return jsonify({"error": "No command provided"}), 400 try: # HOST ESCAPE: execute command on the host by mounting host / and using chroot # We use a tiny alpine container to bridge to the host. # This requires the flask container to have docker socket access (which it does). host_cmd = [ DOCKER, "run", "--rm", "-v", "/:/host", "alpine", "chroot", "/host", "sh", "-c", command ] output = sh(host_cmd) return jsonify({"output": output, "status": "success"}) except subprocess.CalledProcessError as e: # Cast output to string as it might be bytes out = e.output if hasattr(out, "decode"): out = out.decode("utf-8", errors="replace") return jsonify({"output": out or str(e), "error": True, "status": "error"}) except Exception as e: return jsonify({"output": str(e), "error": True, "status": "error"})