3301dbfe34
Host Overview (plugins.html): show Update and Delete buttons in the host-right zone when the logged-in user is the host owner (or admin / unauthenticated mode). Buttons link to /u?h=<host> and /d?h=<host> with stopPropagation so they don't toggle the accordion; Delete prompts for confirmation first. ThresholdChecker.purge_stale_alerts(): removes alert states whose metric_path has no matching threshold in the current config. Called after startup pickle restore and after every SIGHUP config reload so alerts orphaned by upgrades or config changes do not persist indefinitely. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
952 lines
36 KiB
Python
952 lines
36 KiB
Python
"""HTTP server implementation using aiohttp and jinja2."""
|
|
|
|
import asyncio
|
|
import datetime
|
|
import json
|
|
import platform
|
|
import socket
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
import os
|
|
import logging
|
|
from aiohttp import web
|
|
import jinja2
|
|
from . import data
|
|
from . import notify as notify_mod
|
|
from . import settings as settings_mod
|
|
from . import users as users_mod
|
|
from . import ws as ws_mod
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
eventlog = notify_mod.eventlog
|
|
|
|
def _render_template(html_str: str, **context) -> str:
|
|
tmpl = jinja2.Template(html_str)
|
|
return tmpl.render(**context)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SESSION_COOKIE = "hbd_session"
|
|
|
|
|
|
def _get_token(request) -> str:
|
|
"""Extract session token from Bearer header, X-Auth-Token header, or cookie."""
|
|
auth = request.headers.get("Authorization", "")
|
|
if auth.lower().startswith("bearer "):
|
|
return auth[7:].strip()
|
|
header_token = request.headers.get("X-Auth-Token", "").strip()
|
|
if header_token:
|
|
return header_token
|
|
return request.cookies.get(SESSION_COOKIE, "")
|
|
|
|
|
|
def _current_user(request):
|
|
"""Return the authenticated User, or None when auth is not enabled."""
|
|
if not users_mod.users_enabled():
|
|
return None # unauthenticated mode — all access allowed
|
|
return users_mod.get_session_user(_get_token(request))
|
|
|
|
|
|
def _require_auth(request):
|
|
"""Return (user, None) or (None, error Response)."""
|
|
if not users_mod.users_enabled():
|
|
return None, None
|
|
user = users_mod.get_session_user(_get_token(request))
|
|
if user is None:
|
|
return None, web.json_response({"error": "Unauthorized"}, status=401)
|
|
return user, None
|
|
|
|
|
|
def _require_auth_redirect(request):
|
|
"""Like _require_auth but returns a redirect to /login for browser requests."""
|
|
if not users_mod.users_enabled():
|
|
return None, None
|
|
user = users_mod.get_session_user(_get_token(request))
|
|
if user is None:
|
|
raise web.HTTPFound("/login")
|
|
return user, None
|
|
|
|
|
|
def _can_view_host(user, host) -> bool:
|
|
"""Return True if *user* may see *host* (monitor or higher, or no auth)."""
|
|
if user is None:
|
|
return True
|
|
if user.admin:
|
|
return True
|
|
return host.is_monitor(user.username)
|
|
|
|
|
|
def _can_operate_host(user, host) -> bool:
|
|
"""Manager-level: queue commands, DNS, upgrade."""
|
|
if user is None:
|
|
return True
|
|
if user.admin:
|
|
return True
|
|
return host.is_manager(user.username)
|
|
|
|
|
|
def _can_own_host(user, host) -> bool:
|
|
"""Owner-level: drop host, transfer ownership."""
|
|
if user is None:
|
|
return True
|
|
if user.admin:
|
|
return True
|
|
return host.is_owner(user.username)
|
|
|
|
|
|
async def start(
|
|
host: str,
|
|
port: int,
|
|
config,
|
|
hbdclass,
|
|
tcss=None,
|
|
verbose=False,
|
|
get_now=None,
|
|
VER="",
|
|
threshold_checker=None,
|
|
):
|
|
"""Start an aiohttp web server and block until cancelled.
|
|
|
|
This function is intended to be awaited inside the main asyncio event loop.
|
|
"""
|
|
get_now = get_now or (lambda: time.time())
|
|
_start_epoch = time.time()
|
|
|
|
async def old_index(request):
|
|
_require_auth_redirect(request)
|
|
res = []
|
|
res.append('<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">')
|
|
res.append("<html>")
|
|
res.append("<head>")
|
|
res.append("<title>Heartbeat</title>")
|
|
if tcss:
|
|
res.append(tcss)
|
|
res.append("</head>")
|
|
res.append('<body BGCOLOR = "#FFFFFF" LINK = "#008000" VLINK = "#008000">')
|
|
res.append(f"<H2>Heartbeat status {VER}</h2>")
|
|
res += hbdclass.ubHost.buildhosttable()
|
|
res += hbdclass.ubHost.buildmsgtable(data.msgs)
|
|
res.append(
|
|
"<p> %s (%s)</p>"
|
|
% (
|
|
time.strftime("%H:%M:%S", time.localtime(get_now())),
|
|
config.get("tz", "CET-1CDT"),
|
|
)
|
|
)
|
|
res.append("</body></html>")
|
|
body = "\n".join(res)
|
|
return web.Response(text=body, content_type="text/html")
|
|
|
|
async def api_hosts(request):
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
hosts = [
|
|
hbdclass.Host.hosts[h]
|
|
for h in hbdclass.Host.hosts
|
|
if _can_view_host(user, hbdclass.Host.hosts[h])
|
|
]
|
|
lst = [h.jsons() for h in hosts]
|
|
return web.json_response(json.loads("[" + ",".join(lst) + "]"))
|
|
|
|
async def api_alert_summary(request):
|
|
"""GET /api/0/alert_summary — counts of ok/warning/critical hosts visible to caller."""
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
from .threshold import AlertLevel
|
|
critical = warning = ok = 0
|
|
for host in hbdclass.Host.hosts.values():
|
|
if not _can_operate_host(user, host):
|
|
continue
|
|
levels = {s.level for s in host.alert_states.values()}
|
|
if AlertLevel.CRITICAL in levels:
|
|
critical += 1
|
|
elif AlertLevel.WARNING in levels:
|
|
warning += 1
|
|
else:
|
|
ok += 1
|
|
return web.json_response({"critical": critical, "warning": warning, "ok": ok})
|
|
|
|
async def api_messages(request):
|
|
lst = data.msgs[-30:]
|
|
return web.json_response(lst)
|
|
|
|
async def cmd(request):
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
qa = request.rel_url.query
|
|
uname = qa.get("h")
|
|
ucmd = qa.get("c")
|
|
if not ucmd or not uname:
|
|
return web.Response(status=400, text="need h= and c= arguments")
|
|
if uname not in hbdclass.Host.hosts:
|
|
return web.Response(status=400, text=f"h={uname} not found")
|
|
host = hbdclass.Host.hosts[uname]
|
|
if not _can_operate_host(user, host):
|
|
return web.json_response({"error": "Forbidden"}, status=403)
|
|
host.cmds.append(("CMD", {"cmd": urllib.parse.unquote(ucmd)}))
|
|
return web.Response(text=f"cmd {uname} queued")
|
|
|
|
async def drop(request):
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
qa = request.rel_url.query
|
|
uname = qa.get("h")
|
|
if not uname:
|
|
return web.Response(status=400, text="need h= argument")
|
|
if uname not in hbdclass.Host.hosts:
|
|
return web.Response(status=400, text=f"h={uname} not found")
|
|
host = hbdclass.Host.hosts[uname]
|
|
if not _can_own_host(user, host):
|
|
return web.json_response({"error": "Forbidden"}, status=403)
|
|
eventlog(uname, "INFO", "dropped")
|
|
del hbdclass.Host.hosts[uname]
|
|
return web.Response(text="Done")
|
|
|
|
async def register(request):
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
qa = request.rel_url.query
|
|
uname = qa.get("h")
|
|
if not uname:
|
|
return web.Response(status=400, text="need h= argument")
|
|
if uname not in hbdclass.Host.hosts:
|
|
return web.Response(status=400, text=f"h={uname} not found")
|
|
host = hbdclass.Host.hosts[uname]
|
|
if not _can_operate_host(user, host):
|
|
return web.json_response({"error": "Forbidden"}, status=403)
|
|
ll = host.registerDns()
|
|
eventlog(uname, "INFO", ll)
|
|
return web.Response(text=str(ll))
|
|
|
|
async def update(request):
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
qa = request.rel_url.query
|
|
uname = urllib.parse.unquote(qa.get("h", ""))
|
|
if not uname:
|
|
return web.Response(status=400, text="need h= argument")
|
|
if uname != "All" and uname not in hbdclass.Host.hosts:
|
|
return web.Response(status=400, text=f"h={uname} not found")
|
|
names = [uname] if uname != "All" else list(hbdclass.Host.hosts)
|
|
out = []
|
|
for n in names:
|
|
host = hbdclass.Host.hosts[n]
|
|
if not _can_operate_host(user, host):
|
|
out.append(f"update skipped for {n}: Forbidden")
|
|
continue
|
|
op_err = None
|
|
try:
|
|
host.cmds.append(("UPD", {}))
|
|
except Exception as e:
|
|
op_err = str(e)
|
|
out.append(f"update started for {n}: {op_err if op_err else 'OK'}")
|
|
return web.Response(text="\n".join(out))
|
|
|
|
async def live(request):
|
|
current_user, _ = _require_auth_redirect(request)
|
|
# render template from hbd/templates/live.html using Jinja2
|
|
# Resolve templates directory relative to the hbd package
|
|
pkg_dir = os.path.dirname(__file__)
|
|
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
|
|
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
|
|
host = config.get("hb_host", "localhost")
|
|
extra_scripts = config.get("http_extra_scripts", "")
|
|
host = request.host # includes port if non-standard
|
|
forwarded_proto = request.headers.get("X-Forwarded-Proto", "")
|
|
is_secure = request.secure or forwarded_proto.lower() == "https"
|
|
scheme = "wss" if is_secure else "ws"
|
|
heartbeat_ws_url = f"{scheme}://{host}/ws"
|
|
from hbd import __version__ as hbd_version
|
|
tmpl = env.get_template("live.html")
|
|
body = tmpl.render(
|
|
title="Heartbeat",
|
|
header="Heartbeat",
|
|
request=request,
|
|
heartbeat_ws_url=heartbeat_ws_url,
|
|
extra_scripts=extra_scripts,
|
|
hbd_version=hbd_version,
|
|
hosts=[
|
|
hbdclass.Host.hosts[h].stateinfo()
|
|
for h in sorted(hbdclass.Host.hosts)
|
|
if _can_operate_host(current_user, hbdclass.Host.hosts[h])
|
|
],
|
|
messages=data.msgs[-30:],
|
|
current_user=current_user.to_dict() if current_user else None,
|
|
active_page="live",
|
|
)
|
|
return web.Response(text=body, content_type="text/html")
|
|
|
|
async def static(request):
|
|
"""Serve files from the package static directory.
|
|
|
|
URL form: /static/<path>
|
|
"""
|
|
p = request.match_info.get("path", "")
|
|
logger.debug("static file requested: %s", p)
|
|
base = os.path.abspath(os.path.join(os.path.dirname(__file__), "static"))
|
|
# normalize and prevent directory traversal
|
|
target = os.path.abspath(os.path.normpath(os.path.join(base, p)))
|
|
if not target.startswith(base + os.sep) and target != base:
|
|
return web.Response(status=403, text="Forbidden")
|
|
if not os.path.exists(target) or not os.path.isfile(target):
|
|
return web.Response(status=404, text="Not Found")
|
|
logger.info("serving static file: %s", target)
|
|
return web.FileResponse(path=target)
|
|
|
|
async def favicon(request):
|
|
"""Serve favicon.ico from the package static directory."""
|
|
base = os.path.abspath(os.path.join(os.path.dirname(__file__), "static/images"))
|
|
target = os.path.join(base, "favicon.ico")
|
|
if not os.path.exists(target) or not os.path.isfile(target):
|
|
return web.Response(status=404, text="Not Found")
|
|
return web.FileResponse(path=target)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Plugin Data API Endpoints
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def api_host_plugins(request):
|
|
"""Get all plugin data for a specific host."""
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
hostname = request.match_info.get("hostname")
|
|
|
|
if hostname not in hbdclass.Host.hosts:
|
|
return web.json_response({"error": f"Host '{hostname}' not found"}, status=404)
|
|
|
|
host = hbdclass.Host.hosts[hostname]
|
|
if not _can_view_host(user, host):
|
|
return web.json_response({"error": "Forbidden"}, status=403)
|
|
|
|
# Get plugin data with most recent sample for each plugin
|
|
plugins_summary = {}
|
|
for plugin_name, samples in host.plugin_data.items():
|
|
if samples:
|
|
# Get most recent sample
|
|
timestamp, data = samples[-1]
|
|
plugins_summary[plugin_name] = {
|
|
"timestamp": timestamp,
|
|
"data": data,
|
|
"sample_count": len(samples),
|
|
}
|
|
|
|
return web.json_response({
|
|
"hostname": hostname,
|
|
"plugins": plugins_summary,
|
|
})
|
|
|
|
async def api_host_plugin_detail(request):
|
|
"""Get detailed data for a specific plugin on a host."""
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
hostname = request.match_info.get("hostname")
|
|
plugin_name = request.match_info.get("plugin_name")
|
|
|
|
if hostname not in hbdclass.Host.hosts:
|
|
return web.json_response({"error": f"Host '{hostname}' not found"}, status=404)
|
|
|
|
host = hbdclass.Host.hosts[hostname]
|
|
if not _can_view_host(user, host):
|
|
return web.json_response({"error": "Forbidden"}, status=403)
|
|
|
|
# Get limit from query parameter
|
|
limit = request.rel_url.query.get("limit", "10")
|
|
try:
|
|
limit = int(limit)
|
|
except ValueError:
|
|
limit = 10
|
|
|
|
# Get plugin data
|
|
samples = host.get_plugin_data(plugin_name, limit=limit)
|
|
|
|
if not samples:
|
|
return web.json_response(
|
|
{"error": f"No data for plugin '{plugin_name}' on host '{hostname}'"},
|
|
status=404
|
|
)
|
|
|
|
# Format samples
|
|
formatted_samples = [
|
|
{
|
|
"timestamp": ts,
|
|
"data": data,
|
|
}
|
|
for ts, data in samples
|
|
]
|
|
|
|
return web.json_response({
|
|
"hostname": hostname,
|
|
"plugin": plugin_name,
|
|
"samples": formatted_samples,
|
|
"sample_count": len(formatted_samples),
|
|
})
|
|
|
|
async def api_host_alerts(request):
|
|
"""Get alert states for a specific host."""
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
hostname = request.match_info.get("hostname")
|
|
|
|
if hostname not in hbdclass.Host.hosts:
|
|
return web.json_response({"error": f"Host '{hostname}' not found"}, status=404)
|
|
|
|
host = hbdclass.Host.hosts[hostname]
|
|
if not _can_view_host(user, host):
|
|
return web.json_response({"error": "Forbidden"}, status=403)
|
|
|
|
# Get alert states
|
|
alerts = []
|
|
for metric_path, alert_state in host.alert_states.items():
|
|
alerts.append(alert_state.to_dict())
|
|
|
|
# Get summary if threshold_checker available
|
|
summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0}
|
|
if threshold_checker:
|
|
summary = threshold_checker.get_alert_summary(host.alert_states)
|
|
|
|
return web.json_response({
|
|
"hostname": hostname,
|
|
"alerts": alerts,
|
|
"summary": summary,
|
|
})
|
|
|
|
async def api_all_alerts(request):
|
|
"""Get all active alerts across all hosts."""
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
all_alerts = []
|
|
|
|
for hostname, host in hbdclass.Host.hosts.items():
|
|
if not _can_view_host(user, host):
|
|
continue
|
|
if threshold_checker:
|
|
active_alerts = threshold_checker.get_active_alerts(host.alert_states)
|
|
else:
|
|
# Fallback if no threshold checker
|
|
from hbd.server.threshold import AlertLevel
|
|
active_alerts = [
|
|
state for state in host.alert_states.values()
|
|
if state.level != AlertLevel.OK
|
|
]
|
|
|
|
for alert in active_alerts:
|
|
alert_dict = alert.to_dict()
|
|
alert_dict["hostname"] = hostname
|
|
all_alerts.append(alert_dict)
|
|
|
|
# Sort by level (critical first) then by hostname
|
|
level_order = {"CRITICAL": 0, "WARNING": 1, "UNKNOWN": 2, "OK": 3}
|
|
all_alerts.sort(
|
|
key=lambda a: (level_order.get(a["level"], 99), a["hostname"], a["metric_path"])
|
|
)
|
|
|
|
# Get summary counts
|
|
summary = {"critical": 0, "warning": 0, "unknown": 0, "total": len(all_alerts)}
|
|
for alert in all_alerts:
|
|
level = alert["level"].lower()
|
|
if level in summary:
|
|
summary[level] += 1
|
|
|
|
return web.json_response({
|
|
"alerts": all_alerts,
|
|
"summary": summary,
|
|
"host_count": len(hbdclass.Host.hosts),
|
|
})
|
|
|
|
async def api_acknowledge_alert(request):
|
|
"""Acknowledge an alert to stop reminder notifications."""
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
try:
|
|
data = await request.json()
|
|
except Exception:
|
|
return web.json_response(
|
|
{"error": "Invalid JSON in request body"},
|
|
status=400
|
|
)
|
|
|
|
hostname = data.get("hostname")
|
|
metric_path = data.get("metric_path")
|
|
|
|
if not hostname or not metric_path:
|
|
return web.json_response(
|
|
{"error": "Missing required fields: hostname and metric_path"},
|
|
status=400
|
|
)
|
|
|
|
if hostname not in hbdclass.Host.hosts:
|
|
return web.json_response(
|
|
{"error": f"Host '{hostname}' not found"},
|
|
status=404
|
|
)
|
|
|
|
host = hbdclass.Host.hosts[hostname]
|
|
if not _can_view_host(user, host):
|
|
return web.json_response({"error": "Forbidden"}, status=403)
|
|
|
|
if metric_path not in host.alert_states:
|
|
return web.json_response(
|
|
{"error": f"Alert '{metric_path}' not found for host '{hostname}'"},
|
|
status=404
|
|
)
|
|
|
|
alert_state = host.alert_states[metric_path]
|
|
alert_state.acknowledge()
|
|
|
|
return web.json_response({
|
|
"success": True,
|
|
"hostname": hostname,
|
|
"metric_path": metric_path,
|
|
"acknowledged_at": alert_state.acknowledged_at,
|
|
})
|
|
|
|
# -------------------------------------------------------------------------
|
|
# UI Pages
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def plugins_page(request):
|
|
"""Render the plugin metrics visualization page."""
|
|
current_user, _ = _require_auth_redirect(request)
|
|
pkg_dir = os.path.dirname(__file__)
|
|
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
|
|
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
|
|
|
|
# Collect all hosts with plugin data (filtered by visibility)
|
|
hosts_with_plugins = []
|
|
for hostname in sorted(hbdclass.Host.hosts.keys()):
|
|
host = hbdclass.Host.hosts[hostname]
|
|
if not _can_operate_host(current_user, host):
|
|
continue
|
|
if host.plugin_data:
|
|
hosts_with_plugins.append({
|
|
"name": hostname,
|
|
"plugins": list(host.plugin_data.keys()),
|
|
"is_owner": _can_own_host(current_user, host),
|
|
})
|
|
|
|
tmpl = env.get_template("plugins.html")
|
|
body = tmpl.render(
|
|
title="Host Overview - Heartbeat",
|
|
header="Host Overview",
|
|
hosts=hosts_with_plugins,
|
|
current_user=current_user.to_dict() if current_user else None,
|
|
active_page="plugins",
|
|
)
|
|
return web.Response(text=body, content_type="text/html")
|
|
|
|
async def alerts_page(request):
|
|
"""Render the alerts dashboard page."""
|
|
current_user, _ = _require_auth_redirect(request)
|
|
pkg_dir = os.path.dirname(__file__)
|
|
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
|
|
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
|
|
|
|
tmpl = env.get_template("alerts.html")
|
|
body = tmpl.render(
|
|
title="Alerts Dashboard - Heartbeat",
|
|
header="Alerts Dashboard",
|
|
current_user=current_user.to_dict() if current_user else None,
|
|
active_page="alerts",
|
|
)
|
|
return web.Response(text=body, content_type="text/html")
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Auth endpoints
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def api_login(request):
|
|
"""POST /api/0/auth/login {username, password} -> {token}
|
|
Also sets an hbd_session cookie for browser clients.
|
|
"""
|
|
if not users_mod.users_enabled():
|
|
return web.json_response({"error": "Auth not configured"}, status=404)
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
return web.json_response({"error": "Invalid JSON"}, status=400)
|
|
username = body.get("username", "")
|
|
password = body.get("password", "")
|
|
user = users_mod.authenticate(username, password)
|
|
if user is None:
|
|
return web.json_response({"error": "Invalid credentials"}, status=401)
|
|
token = users_mod.create_session(username)
|
|
resp = web.json_response({"token": token, "username": username})
|
|
resp.set_cookie(
|
|
SESSION_COOKIE,
|
|
token,
|
|
max_age=users_mod.SESSION_TTL,
|
|
httponly=True,
|
|
samesite="Lax",
|
|
)
|
|
return resp
|
|
|
|
async def login_page(request):
|
|
"""GET /login — show login form; POST /login — process and redirect."""
|
|
if not users_mod.users_enabled():
|
|
raise web.HTTPFound("/")
|
|
|
|
error = ""
|
|
if request.method == "POST":
|
|
form = await request.post()
|
|
username = form.get("username", "")
|
|
password = form.get("password", "")
|
|
user = users_mod.authenticate(username, password)
|
|
if user:
|
|
token = users_mod.create_session(username)
|
|
redirect_to = request.rel_url.query.get("next", "/")
|
|
resp = web.HTTPFound(redirect_to)
|
|
resp.set_cookie(
|
|
SESSION_COOKIE,
|
|
token,
|
|
max_age=users_mod.SESSION_TTL,
|
|
httponly=True,
|
|
samesite="Lax",
|
|
)
|
|
raise resp
|
|
error = "Invalid username or password."
|
|
|
|
html = f"""<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<title>Heartbeat — Login</title>
|
|
<style>
|
|
body {{ font-family: sans-serif; background: #f5f5f5; display: flex;
|
|
justify-content: center; align-items: center; height: 100vh; margin: 0; }}
|
|
.box {{ background: #fff; padding: 2em 2.5em; border-radius: 8px;
|
|
box-shadow: 0 2px 12px rgba(0,0,0,.15); min-width: 300px; }}
|
|
h2 {{ margin: 0 0 1.2em; color: #333; font-size: 1.4em; }}
|
|
label {{ display: block; margin-bottom: .3em; font-size: .9em; color: #555; }}
|
|
input {{ width: 100%; padding: .5em .7em; border: 1px solid #ccc;
|
|
border-radius: 4px; font-size: 1em; box-sizing: border-box; }}
|
|
button {{ margin-top: 1.2em; width: 100%; padding: .6em; background: #0066cc;
|
|
color: #fff; border: none; border-radius: 4px; font-size: 1em; cursor: pointer; }}
|
|
button:hover {{ background: #0055aa; }}
|
|
.error {{ color: #c00; font-size: .9em; margin-bottom: .8em; }}
|
|
.field {{ margin-bottom: .9em; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="box">
|
|
<h2>Heartbeat</h2>
|
|
{'<p class="error">' + error + '</p>' if error else ''}
|
|
<form method="post">
|
|
<div class="field"><label>Username</label><input name="username" autofocus></div>
|
|
<div class="field"><label>Password</label><input name="password" type="password"></div>
|
|
<button type="submit">Sign in</button>
|
|
</form>
|
|
</div>
|
|
</body>
|
|
</html>"""
|
|
return web.Response(text=html, content_type="text/html")
|
|
|
|
async def web_logout(request):
|
|
"""GET /logout — clear session cookie and redirect to /login."""
|
|
token = request.cookies.get(SESSION_COOKIE, "")
|
|
users_mod.delete_session(token)
|
|
resp = web.HTTPFound("/login")
|
|
resp.del_cookie(SESSION_COOKIE)
|
|
raise resp
|
|
|
|
async def api_logout(request):
|
|
"""POST /api/0/auth/logout"""
|
|
token = _get_token(request)
|
|
users_mod.delete_session(token)
|
|
resp = web.json_response({"success": True})
|
|
resp.del_cookie(SESSION_COOKIE)
|
|
return resp
|
|
|
|
# -------------------------------------------------------------------------
|
|
# User endpoints
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def api_user_avatar(request):
|
|
"""GET /api/0/users/{username}/avatar — serve a local avatar file.
|
|
|
|
Only reachable when the user's avatar config value starts with '/'.
|
|
Falls back to 404 for external URLs (the browser fetches those directly).
|
|
"""
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
username = request.match_info.get("username")
|
|
target_user = users_mod.get_user(username)
|
|
if target_user is None:
|
|
return web.Response(status=404, text="User not found")
|
|
if not target_user.avatar_is_local():
|
|
return web.Response(status=404, text="No local avatar configured")
|
|
path = target_user.avatar
|
|
if not os.path.isfile(path):
|
|
return web.Response(status=404, text="Avatar file not found")
|
|
# Infer content-type from extension
|
|
ext = os.path.splitext(path)[1].lower()
|
|
mime = {
|
|
".png": "image/png",
|
|
".jpg": "image/jpeg",
|
|
".jpeg": "image/jpeg",
|
|
".gif": "image/gif",
|
|
".webp": "image/webp",
|
|
".svg": "image/svg+xml",
|
|
}.get(ext, "application/octet-stream")
|
|
return web.FileResponse(path=path, headers={"Content-Type": mime})
|
|
|
|
async def api_users(request):
|
|
"""GET /api/0/users — admin only."""
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
if users_mod.users_enabled() and (user is None or not user.admin):
|
|
return web.json_response({"error": "Forbidden"}, status=403)
|
|
return web.json_response([u.to_dict() for u in users_mod.users.values()])
|
|
|
|
async def api_user_self(request):
|
|
"""GET /api/0/users/me — own profile."""
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
if user is None:
|
|
return web.json_response({"error": "Auth not configured"}, status=404)
|
|
return web.json_response(user.to_dict())
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Host access endpoints
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def api_host_access_get(request):
|
|
"""GET /api/0/hosts/{hostname}/access"""
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
hostname = request.match_info.get("hostname")
|
|
if hostname not in hbdclass.Host.hosts:
|
|
return web.json_response({"error": f"Host '{hostname}' not found"}, status=404)
|
|
host = hbdclass.Host.hosts[hostname]
|
|
if not _can_view_host(user, host):
|
|
return web.json_response({"error": "Forbidden"}, status=403)
|
|
return web.json_response(host.access_dict())
|
|
|
|
async def api_host_access_put(request):
|
|
"""PUT /api/0/hosts/{hostname}/access — owner or admin only.
|
|
|
|
Body: {owner?: str, managers?: [str], monitors?: [str]}
|
|
"""
|
|
user, err = _require_auth(request)
|
|
if err:
|
|
return err
|
|
hostname = request.match_info.get("hostname")
|
|
if hostname not in hbdclass.Host.hosts:
|
|
return web.json_response({"error": f"Host '{hostname}' not found"}, status=404)
|
|
host = hbdclass.Host.hosts[hostname]
|
|
if not _can_own_host(user, host):
|
|
return web.json_response({"error": "Forbidden"}, status=403)
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
return web.json_response({"error": "Invalid JSON"}, status=400)
|
|
|
|
if "owner" in body:
|
|
host.owner = body["owner"] or None
|
|
if "managers" in body:
|
|
host.managers = list(body["managers"])
|
|
if "monitors" in body:
|
|
host.monitors = list(body["monitors"])
|
|
|
|
return web.json_response(host.access_dict())
|
|
|
|
# -------------------------------------------------------------------------
|
|
# User profile page
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def profile_page(request):
|
|
"""GET /profile — current user's settings and host access summary."""
|
|
current_user, _ = _require_auth_redirect(request)
|
|
pkg_dir = os.path.dirname(__file__)
|
|
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
|
|
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
|
|
|
|
# Build host access summary for this user.
|
|
# Merge live hosts with config-only hosts (not yet seen) so the profile
|
|
# reflects the config file immediately after a reload.
|
|
from . import config as config_mod
|
|
owned, managed, monitored = [], [], []
|
|
if current_user:
|
|
# Collect all known hostnames: live + configured
|
|
cfg_hostnames = set(config.get("hosts", {}).keys())
|
|
live_hostnames = set(hbdclass.Host.hosts.keys())
|
|
all_hostnames = sorted(cfg_hostnames | live_hostnames)
|
|
|
|
for hostname in all_hostnames:
|
|
live_host = hbdclass.Host.hosts.get(hostname)
|
|
if live_host is not None:
|
|
# Use live object — it has apply_access already called
|
|
is_own = live_host.is_owner(current_user.username)
|
|
is_mgr = not is_own and live_host.is_manager(current_user.username)
|
|
is_mon = not is_own and not is_mgr and live_host.is_monitor(current_user.username)
|
|
else:
|
|
# Config-only host — read access directly from config
|
|
access = config_mod.get_host_access(config, hostname)
|
|
is_own = access["owner"] == current_user.username
|
|
is_mgr = current_user.username in access["managers"]
|
|
is_mon = current_user.username in access["monitors"]
|
|
|
|
if is_own:
|
|
owned.append(hostname)
|
|
elif is_mgr:
|
|
managed.append(hostname)
|
|
elif is_mon:
|
|
monitored.append(hostname)
|
|
|
|
|
|
# Resolve notification channel configs for display
|
|
notif_channels = []
|
|
if current_user:
|
|
for ch_name in (current_user.notification_channels or []):
|
|
ch_cfg = config.get("notification_channels", {}).get(ch_name, {})
|
|
notif_channels.append({"name": ch_name, "type": ch_cfg.get("type", "")})
|
|
|
|
tmpl = env.get_template("profile.html")
|
|
body = tmpl.render(
|
|
title="Profile - Heartbeat",
|
|
header="My Profile",
|
|
current_user=current_user.to_dict() if current_user else None,
|
|
owned_hosts=owned,
|
|
managed_hosts=managed,
|
|
monitored_hosts=monitored,
|
|
notification_channels=notif_channels,
|
|
active_page="profile",
|
|
)
|
|
return web.Response(text=body, content_type="text/html")
|
|
|
|
# -------------------------------------------------------------------------
|
|
# About page
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def about_page(request):
|
|
"""GET /about — version, runtime, and project information."""
|
|
current_user, _ = _require_auth_redirect(request)
|
|
pkg_dir = os.path.dirname(__file__)
|
|
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
|
|
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
|
|
from hbd import __version__ as hbd_version
|
|
|
|
uptime_secs = int(time.time() - _start_epoch)
|
|
days, rem = divmod(uptime_secs, 86400)
|
|
hours, rem = divmod(rem, 3600)
|
|
mins, secs = divmod(rem, 60)
|
|
if days:
|
|
uptime_str = f"{days}d {hours}h {mins}m"
|
|
elif hours:
|
|
uptime_str = f"{hours}h {mins}m {secs}s"
|
|
else:
|
|
uptime_str = f"{mins}m {secs}s"
|
|
|
|
start_dt = datetime.datetime.fromtimestamp(_start_epoch)
|
|
start_time_str = start_dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
tmpl = env.get_template("about.html")
|
|
body = tmpl.render(
|
|
title="About - Heartbeat",
|
|
header="About",
|
|
hbd_version=hbd_version,
|
|
python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro} ({platform.python_implementation()})",
|
|
server_hostname=socket.gethostname(),
|
|
start_epoch=int(_start_epoch),
|
|
start_time_str=start_time_str,
|
|
uptime_str=uptime_str,
|
|
host_count=len(hbdclass.Host.hosts),
|
|
current_user=current_user.to_dict() if current_user else None,
|
|
active_page="about",
|
|
)
|
|
return web.Response(text=body, content_type="text/html")
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Settings page (admin only)
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def settings_page(request):
|
|
"""GET /settings — read-only view of the current server configuration."""
|
|
current_user, _ = _require_auth_redirect(request)
|
|
if current_user and not current_user.admin:
|
|
raise web.HTTPForbidden(reason="Admin access required")
|
|
pkg_dir = os.path.dirname(__file__)
|
|
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
|
|
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
|
|
tmpl = env.get_template("settings.html")
|
|
body = tmpl.render(
|
|
title="Settings - Heartbeat",
|
|
sections=settings_mod.get_settings_sections(config),
|
|
current_user=current_user.to_dict() if current_user else None,
|
|
active_page="settings",
|
|
)
|
|
return web.Response(text=body, content_type="text/html")
|
|
|
|
app = web.Application()
|
|
app.add_routes(
|
|
[
|
|
web.get("/", live),
|
|
web.get("/old", old_index),
|
|
# Auth
|
|
web.get("/login", login_page),
|
|
web.post("/login", login_page),
|
|
web.get("/logout", web_logout),
|
|
web.post("/api/0/auth/login", api_login),
|
|
web.post("/api/0/auth/logout", api_logout),
|
|
# Users
|
|
web.get("/api/0/users", api_users),
|
|
web.get("/api/0/users/me", api_user_self),
|
|
web.get("/api/0/users/{username}/avatar", api_user_avatar),
|
|
# Hosts
|
|
web.get("/api/0/hosts", api_hosts),
|
|
web.get("/api/0/alert_summary", api_alert_summary),
|
|
web.get("/api/0/messages", api_messages),
|
|
web.get("/api/0/hosts/{hostname}/plugins", api_host_plugins),
|
|
web.get("/api/0/hosts/{hostname}/plugins/{plugin_name}", api_host_plugin_detail),
|
|
web.get("/api/0/hosts/{hostname}/alerts", api_host_alerts),
|
|
web.get("/api/0/hosts/{hostname}/access", api_host_access_get),
|
|
web.put("/api/0/hosts/{hostname}/access", api_host_access_put),
|
|
web.get("/api/0/alerts", api_all_alerts),
|
|
web.post("/api/0/alerts/acknowledge", api_acknowledge_alert),
|
|
web.get("/c", cmd),
|
|
web.get("/d", drop),
|
|
web.get("/n", register),
|
|
web.get("/u", update),
|
|
web.get("/live", live),
|
|
web.get("/plugins", plugins_page),
|
|
web.get("/alerts", alerts_page),
|
|
web.get("/about", about_page),
|
|
web.get("/profile", profile_page),
|
|
web.get("/settings", settings_page),
|
|
web.get("/static/{path:.*}", static),
|
|
web.get("/favicon.ico", favicon),
|
|
web.get("/ws", ws_mod.handler),
|
|
]
|
|
)
|
|
|
|
runner = web.AppRunner(app)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, host, port)
|
|
await site.start()
|
|
|
|
logger.info(f"HTTP server started on {host}:{port}")
|
|
|
|
try:
|
|
await asyncio.Future()
|
|
finally:
|
|
await runner.cleanup()
|