Files
DokkuStatus/app.py
2025-12-24 10:50:37 +11:00

529 lines
18 KiB
Python

import os
import json
import subprocess
from datetime import datetime
from flask import Flask, jsonify, render_template, request, session, redirect, url_for
from functools import wraps
import re
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY", "change-this-in-production-please")
POLL_SECONDS = int(os.getenv("POLL_SECONDS", "10"))
APP_DOMAIN = os.getenv("APP_DOMAIN", "peterstockings.com")
DOCKER = os.getenv("DOCKER_BIN", "/usr/bin/docker")
SHOW_INFRA = os.getenv("SHOW_INFRA", "1") == "1"
LOGS_PASSWORD = os.getenv("LOGS_PASSWORD", "dokkustatus123") # Change via environment variable
_UNIT = {
"b": 1,
"kb": 1000, "mb": 1000**2, "gb": 1000**3, "tb": 1000**4,
"kib": 1024, "mib": 1024**2, "gib": 1024**3, "tib": 1024**4,
}
# Optional JSON map: {"gitea":"https://gitea.peterstockings.com", "bloodpressure":"https://bp.peterstockings.com"}
APP_URL_OVERRIDES = {}
try:
APP_URL_OVERRIDES = json.loads(os.getenv("APP_URL_OVERRIDES", "{}"))
except Exception:
APP_URL_OVERRIDES = {}
def sh(cmd: list[str]) -> str:
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True).strip()
def docker_ps_all() -> list[dict]:
# Name + Image + Status + Ports
fmt = "{{.Names}}\t{{.Image}}\t{{.Status}}\t{{.Ports}}"
out = sh([DOCKER, "ps", "--format", fmt])
rows = []
for line in out.splitlines():
name, image, status, ports = line.split("\t")
rows.append({"name": name, "image": image, "status": status, "ports": ports})
return rows
def docker_stats() -> dict:
# Name + CPU + MemUsage + MemPerc
fmt = "{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}"
out = sh([DOCKER, "stats", "--no-stream", "--format", fmt])
stats = {}
for line in out.splitlines():
name, cpu, mem_usage, mem_pct = line.split("\t")
# mem_usage like: "58.84MiB / 384MiB"
mem_used, mem_limit = [s.strip() for s in mem_usage.split("/", 1)]
stats[name] = {
"cpu": cpu,
"mem_used": mem_used,
"mem_limit": mem_limit,
"mem_pct": mem_pct,
}
return stats
import re
def docker_info() -> dict:
# docker info --format "{{json .}}" gives us structured host-level info
out = sh([DOCKER, "info", "--format", "{{json .}}"])
return json.loads(out)
def docker_system_df() -> dict:
# Parse `docker system df` (text). It's stable enough for a dashboard.
out = sh([DOCKER, "system", "df"])
# Example lines:
# Images 175 18 15.15GB 13.93GB (91%)
# Containers 27 26 145.1MB 16.57kB (0%)
# Local Volumes 47 1 817.7MB 817.7MB (100%)
# Build Cache 889 0 423B 423B
rows = {}
for line in out.splitlines():
line = line.strip()
if not line or line.startswith("TYPE"):
continue
parts = re.split(r"\s{2,}", line)
if len(parts) >= 5:
typ, total, active, size, reclaimable = parts[:5]
rows[typ] = {
"total": total,
"active": active,
"size": size,
"reclaimable": reclaimable,
}
return rows
def system_summary() -> dict:
info = docker_info()
df = docker_system_df()
return {
"name": info.get("Name", ""),
"server_version": info.get("ServerVersion", ""),
"operating_system": info.get("OperatingSystem", ""),
"os_type": info.get("OSType", ""),
"architecture": info.get("Architecture", ""),
"kernel_version": info.get("KernelVersion", ""),
"cpus": info.get("NCPU", ""),
"mem_total": info.get("MemTotal", ""), # bytes
"containers": info.get("Containers", ""),
"containers_running": info.get("ContainersRunning", ""),
"containers_stopped": info.get("ContainersStopped", ""),
"images": info.get("Images", ""),
"docker_root_dir": info.get("DockerRootDir", ""),
"system_df": df,
}
def format_bytes(n: int) -> str:
# for mem_total
units = ["B", "KB", "MB", "GB", "TB"]
f = float(n)
for u in units:
if f < 1024 or u == units[-1]:
return f"{f:.1f}{u}"
f /= 1024
return f"{n}B"
def docker_inspect_restart_count(container_name: str) -> int:
# RestartCount is useful when stuff is flapping / OOMing
try:
out = sh([DOCKER, "inspect", "-f", "{{.RestartCount}}", container_name])
return int(out.strip())
except Exception:
return 0
def get_container_logs(container_name: str, lines: int = 50) -> list[dict]:
"""
Get last N lines of container logs with error detection.
Returns list of dicts with 'text' and 'level' keys.
"""
try:
out = sh([DOCKER, "logs", "--tail", str(lines), container_name])
log_lines = []
for line in out.splitlines():
# Strip ANSI color codes
line_clean = re.sub(r'\x1b\[[0-9;]*m', '', line)
# Detect log level
line_lower = line_clean.lower()
if any(x in line_lower for x in ['error', 'exception', 'fatal', 'critical']):
level = 'error'
elif any(x in line_lower for x in ['warn', 'warning']):
level = 'warn'
else:
level = 'info'
log_lines.append({
'text': line_clean,
'level': level
})
return log_lines
except Exception:
return []
def get_container_detail(container_name: str) -> dict:
"""
Get detailed container information using docker inspect.
Returns parsed container metadata.
"""
try:
out = sh([DOCKER, "inspect", container_name])
inspect_data = json.loads(out)
if not inspect_data:
return {}
container = inspect_data[0]
# Extract useful information
config = container.get("Config", {})
state = container.get("State", {})
network_settings = container.get("NetworkSettings", {})
mounts = container.get("Mounts", [])
return {
"name": container.get("Name", "").lstrip("/"),
"id": container.get("Id", "")[:12],
"image": config.get("Image", ""),
"created": container.get("Created", ""),
"state": {
"status": state.get("Status", ""),
"running": state.get("Running", False),
"paused": state.get("Paused", False),
"restarting": state.get("Restarting", False),
"started_at": state.get("StartedAt", ""),
"finished_at": state.get("FinishedAt", ""),
},
"env": config.get("Env", []),
"cmd": config.get("Cmd", []),
"entrypoint": config.get("Entrypoint", []),
"working_dir": config.get("WorkingDir", ""),
"exposed_ports": list(config.get("ExposedPorts", {}).keys()),
"ports": network_settings.get("Ports", {}),
"networks": list(network_settings.get("Networks", {}).keys()),
"ip_address": network_settings.get("IPAddress", ""),
"mounts": [
{
"type": m.get("Type", ""),
"source": m.get("Source", ""),
"destination": m.get("Destination", ""),
"mode": m.get("Mode", ""),
"rw": m.get("RW", False),
}
for m in mounts
],
"restart_policy": container.get("HostConfig", {}).get("RestartPolicy", {}),
}
except Exception as e:
return {"error": str(e)}
def run_sql_query(container_name: str, query: str) -> dict:
"""
Execute a SQL query inside a database container using docker exec.
Supports Postgres and MySQL/MariaDB.
Infers the database name from the Dokku container name.
"""
try:
# Infer DB name: dokku.postgres.my-service -> my-service_db or my_service
# Dokku typically replaces hyphens with underscores in database names.
parts = container_name.split(".", 2)
service_name = parts[2] if len(parts) >= 3 else None
# Most common Dokku pattern: replace - with _
db_name = service_name.replace("-", "_") if service_name else None
if "postgres" in container_name:
# Postgres: use psql with CSV-like output
db_arg = ["-d", db_name] if db_name else []
cmd = [DOCKER, "exec", container_name, "psql", "-U", "postgres"] + db_arg + ["-c", query, "-A", "-F", ","]
elif "mysql" in container_name or "mariadb" in container_name:
# MySQL: use mysql with tab-separated output
db_arg = [db_name] if db_name else []
cmd = [DOCKER, "exec", container_name, "mysql", "-u", "root", "-e", query, "-B"] + db_arg
else:
return {"error": "Unsupported database type"}
out = sh(cmd)
# Parse output into rows and columns
lines = out.splitlines()
if not lines:
return {"columns": [], "rows": [], "message": "Query executed successfully (no results)"}
if "postgres" in container_name:
import csv
from io import StringIO
reader = csv.reader(StringIO(out))
rows = list(reader)
if not rows: return {"columns": [], "rows": []}
# psql -A can include a footer like "(1 row)", filter that out
if rows[-1] and rows[-1][0].startswith("(") and rows[-1][0].endswith(")"):
rows = rows[:-1]
columns = rows[0]
data = rows[1:]
else:
# MySQL is tab-separated by default with -B
rows = [line.split("\t") for line in lines]
columns = rows[0]
data = rows[1:]
return {
"columns": columns,
"rows": data,
"count": len(data)
}
except subprocess.CalledProcessError as e:
return {"error": e.output or str(e)}
except Exception as e:
return {"error": str(e)}
def is_app_web_container(name: str) -> bool:
# Dokku apps typically have containers like "<app>.web.1"
return name.endswith(".web.1") and not name.startswith("dokku.")
def infer_app_name(container_name: str) -> str:
# "<app>.web.1" -> "<app>"
return container_name.rsplit(".web.1", 1)[0]
def infer_url(app_name: str) -> str:
if app_name in APP_URL_OVERRIDES:
return APP_URL_OVERRIDES[app_name]
# default
return f"https://{app_name}.{APP_DOMAIN}"
def classify_infra(container_name: str) -> bool:
return (
container_name.startswith("dokku.postgres.")
or container_name.startswith("dokku.redis.")
or container_name.startswith("dokku.mysql.")
or container_name.startswith("dokku.mongodb.")
or container_name == "dokku.minio.storage"
or container_name == "logspout"
)
def collect():
ps_rows = docker_ps_all()
stats = docker_stats()
apps = []
infra = []
for r in ps_rows:
name = r["name"]
s = stats.get(name, {})
row = {
"container": name,
"image": r["image"],
"status": r["status"],
"ports": r["ports"],
"cpu": s.get("cpu", ""),
"mem_used": s.get("mem_used", ""),
"mem_limit": s.get("mem_limit", ""),
"mem_pct": s.get("mem_pct", ""),
"restarts": docker_inspect_restart_count(name),
}
if is_app_web_container(name):
app_name = infer_app_name(name)
row["app"] = app_name
row["url"] = infer_url(app_name)
apps.append(row)
elif SHOW_INFRA and classify_infra(name):
infra.append(row)
# Sort stable
apps.sort(key=lambda x: x["app"])
infra.sort(key=lambda x: x["container"])
# Simple top-line summary
warnings = []
for a in apps:
# mem_pct is like "15.32%"
try:
pct = float(a["mem_pct"].replace("%", "")) if a["mem_pct"] else 0.0
except Exception:
pct = 0.0
if pct >= 85:
warnings.append(f"{a['app']} RAM high ({a['mem_pct']})")
if a["restarts"] >= 3:
warnings.append(f"{a['app']} restarting (restarts={a['restarts']})")
sysinfo = system_summary()
# format mem bytes nicely
try:
mem_total_h = format_bytes(int(sysinfo["mem_total"]))
except Exception:
mem_total_h = ""
sysinfo["mem_total_h"] = mem_total_h
# --- Gauges (live-ish) ---
total_cpu_pct = 0.0
total_mem_used_bytes = 0
for name, s in stats.items():
total_cpu_pct += pct_str_to_float(s.get("cpu", "0%"))
total_mem_used_bytes += parse_human_bytes(s.get("mem_used", "0B"))
sysinfo = system_summary()
# Host total RAM (bytes) comes from docker info
host_mem_total = int(sysinfo.get("mem_total") or 0)
ram_pct = (total_mem_used_bytes / host_mem_total * 100.0) if host_mem_total else 0.0
sysinfo["ram_total_h"] = format_bytes(host_mem_total)
# Docker disk: images "Size" and "Reclaimable"
df_images = sysinfo.get("system_df", {}).get("Images", {})
images_size_bytes = parse_human_bytes(df_images.get("size", "0B"))
# Reclaimable looks like "13.93GB (91%)" so grab the first token
reclaimable_raw = (df_images.get("reclaimable") or "").split(" ", 1)[0]
images_reclaimable_bytes = parse_human_bytes(reclaimable_raw) if reclaimable_raw else 0
images_used_bytes = max(0, images_size_bytes - images_reclaimable_bytes)
disk_pct = (images_used_bytes / images_size_bytes * 100.0) if images_size_bytes else 0.0
gauges = {
"cpu_total_pct": clamp(total_cpu_pct), # sum of container CPU%, can exceed 100 if multi-core; we clamp for display
"ram_used_bytes": total_mem_used_bytes,
"ram_total_bytes": host_mem_total,
"ram_used_h": format_bytes(total_mem_used_bytes),
"ram_total_h": format_bytes(host_mem_total),
"ram_pct": clamp(ram_pct),
"docker_images_size_bytes": images_size_bytes,
"docker_images_used_bytes": images_used_bytes,
"docker_images_pct": clamp(disk_pct),
}
return {
"generated_at": datetime.utcnow().isoformat() + "Z",
"poll_seconds": POLL_SECONDS,
"domain": APP_DOMAIN,
"system": sysinfo,
"gauges": gauges,
"apps": apps,
"infra": infra,
"warnings": warnings,
}
def collect_admin_data():
"""
Collects logs and detailed container info for the admin dashboard.
Also identifies database containers for the SQL interface.
"""
ps_rows = docker_ps_all()
apps = []
databases = []
for r in ps_rows:
name = r["name"]
if is_app_web_container(name):
app_name = infer_app_name(name)
apps.append({
"app": app_name,
"container": name,
"logs": get_container_logs(name, lines=50),
"detail": get_container_detail(name)
})
elif classify_infra(name) and ("postgres" in name or "mysql" in name) and not name.endswith(".ambassador"):
databases.append({
"name": name,
"type": "postgres" if "postgres" in name else "mysql"
})
# Sort
apps.sort(key=lambda x: x["app"])
databases.sort(key=lambda x: x["name"])
return {
"apps": apps,
"databases": databases,
}
def parse_human_bytes(s: str) -> int:
# Handles "58.84MiB", "145.1MB", "423B"
s = s.strip()
m = re.match(r"^([0-9]*\.?[0-9]+)\s*([A-Za-z]+)$", s)
if not m:
return 0
val = float(m.group(1))
unit = m.group(2).lower()
return int(val * _UNIT.get(unit, 0))
def pct_str_to_float(p: str) -> float:
try:
return float(p.strip().replace("%", ""))
except Exception:
return 0.0
def clamp(n: float, lo: float = 0.0, hi: float = 100.0) -> float:
return max(lo, min(hi, n))
@app.get("/")
def index():
return render_template("index.html", poll_seconds=POLL_SECONDS)
@app.get("/partial/apps")
def partial_apps():
data = collect()
return render_template("apps_table.html", data=data)
@app.get("/api/status")
def api_status():
return jsonify(collect())
# Authentication decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# Login routes
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
password = request.form.get("password", "")
if password == LOGS_PASSWORD:
session['logged_in'] = True
return redirect(url_for('admin'))
else:
return render_template("login.html", error="Invalid password")
return render_template("login.html", error=None)
@app.get("/logout")
def logout():
session.pop('logged_in', None)
return redirect(url_for('index'))
# Protected admin page (logs + container details)
@app.get("/admin")
@login_required
def admin():
data = collect_admin_data()
return render_template("admin.html", data=data, poll_seconds=POLL_SECONDS)
# API endpoint for container details (used by admin panel)
@app.get("/api/container/<container_name>")
@login_required
def api_container_detail(container_name):
detail = get_container_detail(container_name)
return jsonify(detail)
# API endpoint for SQL queries
@app.post("/api/sql/query")
@login_required
def api_sql_query():
data = request.json
container = data.get("container")
query = data.get("query")
if not container or not query:
return jsonify({"error": "Missing container or query"}), 400
result = run_sql_query(container, query)
return jsonify(result)