"""HTTP server implementation using aiohttp and jinja2.""" import asyncio import datetime import html as _html import json import platform import socket import sys import time import urllib.parse import os import logging from aiohttp import web import jinja2 from . import data from . import notify as notify_mod from . import settings as settings_mod from . import users as users_mod from . import oauth as oauth_mod from . import ws as ws_mod from . import configio as configio_mod logger = logging.getLogger(__name__) eventlog = notify_mod.eventlog def _build_threshold_configs_from_form(form_data: dict) -> dict: """Convert form-submitted flat threshold data to nested threshold_configs YAML structure. Input: {config_name: {metric_path: {warning, critical, operator, hysteresis, enabled, count, display}}} Output: {config_name: {thresholds: {plugin: {metric: {warning, critical, ...}}}}} """ result = {} for config_name, metrics in form_data.items(): if not isinstance(metrics, dict): continue thresholds = {} for metric_path, values in metrics.items(): _insert_threshold_metric(thresholds, metric_path, values) result[config_name] = {"thresholds": thresholds} return result def _insert_threshold_metric(thresholds: dict, metric_path: str, values: dict) -> None: """Insert a single metric into the nested threshold YAML structure.""" if not isinstance(values, dict): return cfg = {} op = values.get("operator", ">") if op and op != ">": cfg["operator"] = op for key, cast in (("warning", float), ("critical", float), ("hysteresis", float)): v = values.get(key) if v is not None: try: cfg[key] = cast(v) except (TypeError, ValueError): pass grace = values.get("grace") if grace is not None: try: cfg["grace"] = float(grace) except (TypeError, ValueError): pass count = values.get("count") if count is not None: try: cfg["count"] = int(count) except (TypeError, ValueError): pass display = values.get("display", "") if display: cfg["display"] = display if not values.get("enabled", True): cfg["enabled"] = False parts = metric_path.split(".", 2) if len(parts) == 1: # e.g. "rtt" thresholds[metric_path] = cfg elif len(parts) == 2: plugin, metric = parts thresholds.setdefault(plugin, {})[metric] = cfg else: plugin, intermediate, leaf = parts thresholds.setdefault(plugin, {}) if plugin == "disk_monitor": thresholds[plugin].setdefault("partitions", {}).setdefault(intermediate, {})[leaf] = cfg elif plugin == "zfs_monitor": thresholds[plugin].setdefault("pools", {}).setdefault(intermediate, {})[leaf] = cfg else: thresholds[plugin].setdefault(intermediate, {})[leaf] = cfg def _render_template(html_str: str, **context) -> str: tmpl = jinja2.Template(html_str) return tmpl.render(**context) # --------------------------------------------------------------------------- # Auth helpers # --------------------------------------------------------------------------- SESSION_COOKIE = "hbd_session" def _get_token(request) -> str: """Extract session token from Bearer header, X-Auth-Token header, or cookie.""" auth = request.headers.get("Authorization", "") if auth.lower().startswith("bearer "): return auth[7:].strip() header_token = request.headers.get("X-Auth-Token", "").strip() if header_token: return header_token return request.cookies.get(SESSION_COOKIE, "") def _current_user(request): """Return the authenticated User, or None when auth is not enabled.""" if not users_mod.users_enabled(): return None # unauthenticated mode — all access allowed return users_mod.get_session_user(_get_token(request)) def _require_auth(request): """Return (user, None) or (None, error Response).""" if not users_mod.users_enabled(): return None, None user = users_mod.get_session_user(_get_token(request)) if user is None: return None, web.json_response({"error": "Unauthorized"}, status=401) return user, None def _require_auth_redirect(request): """Like _require_auth but returns a redirect to /login for browser requests.""" if not users_mod.users_enabled(): return None, None user = users_mod.get_session_user(_get_token(request)) if user is None: raise web.HTTPFound("/login") return user, None def _can_view_host(user, host) -> bool: """Return True if *user* may see *host* (monitor or higher, or no auth).""" if user is None: return True if user.admin: return True return host.is_monitor(user.username) def _can_operate_host(user, host) -> bool: """Manager-level: queue commands, DNS, upgrade.""" if user is None: return True if user.admin: return True return host.is_manager(user.username) def _can_own_host(user, host) -> bool: """Owner-level: drop host, transfer ownership.""" if user is None: return True if user.admin: return True return host.is_owner(user.username) def _mask_config_for_api(config) -> dict: """Return a JSON-serializable config dict with secrets masked.""" result = {} result["server"] = {k: config.get(k) for k in configio_mod._SERVER_KEYS} users = {} for username, attrs in (config.get("users") or {}).items(): u = dict(attrs) if "password" in u: u["password"] = "•••" users[username] = u result["users"] = users oauth = {} for name, attrs in (config.get("oauth") or {}).items(): o = dict(attrs) if "client_secret" in o: o["client_secret"] = "•••" oauth[name] = o result["oauth"] = oauth return result def _build_host_info(host, threshold_checker=None) -> dict: """Assemble the info payload for GET /api/0/hosts/{hostname}/info.""" hbc_version = None hbc_type = None latest_os = host.get_latest_plugin_data("os_info") if latest_os: _, os_data = latest_os hbc_version = os_data.get("hbc_version") hbc_type = os_data.get("hbc_type") last_packet = None if host.connections: last_packet = max(conn.lastbeat for conn in host.connections.values()) thresholds = None if threshold_checker is not None: raw = threshold_checker.get_thresholds_for_host(host.name) # Build reverse coverage: which metric paths suffix-match to each threshold. # Mirrors the logic in ThresholdChecker._find_threshold. coverage: dict = {} for plugin_name, samples in host.plugin_data.items(): if not samples: continue _, pdata = samples[-1] for field_name in pdata: full_path = f"{plugin_name}.{field_name}" if full_path in raw: continue # exact match — the threshold IS this metric parts = field_name.split("_") for i in range(1, len(parts)): candidate = f"{plugin_name}." + "_".join(parts[i:]) if candidate in raw: coverage.setdefault(candidate, []).append(full_path) break thresholds = sorted( [ { "metric": tc.metric_path, "warning": tc.warning, "critical": tc.critical, "operator": tc.operator.value, "covers": sorted(coverage.get(tc.metric_path, [])), } for tc in raw.values() ], key=lambda x: x["metric"], ) return { "owner": getattr(host, "owner", None), "managers": list(getattr(host, "managers", [])), "hbc_version": hbc_version, "hbc_type": hbc_type, "last_packet": last_packet, "thresholds": thresholds, } async def start( host: str, port: int, config, hbdclass, tcss=None, verbose=False, get_now=None, VER="", threshold_checker=None, reload_callback=None, ): """Start an aiohttp web server and block until cancelled. This function is intended to be awaited inside the main asyncio event loop. """ get_now = get_now or (lambda: time.time()) _start_epoch = time.time() async def old_index(request): _require_auth_redirect(request) res = [] res.append('') res.append("") res.append("") res.append("Heartbeat") if tcss: res.append(tcss) res.append("") res.append('') res.append(f"

Heartbeat status {VER}

") res += hbdclass.ubHost.buildhosttable() res += hbdclass.ubHost.buildmsgtable(data.msgs) res.append( "

%s (%s)

" % ( time.strftime("%H:%M:%S", time.localtime(get_now())), config.get("tz", "CET-1CDT"), ) ) res.append("") body = "\n".join(res) return web.Response(text=body, content_type="text/html") async def api_hosts(request): user, err = _require_auth(request) if err: return err hosts = [ hbdclass.Host.hosts[h] for h in hbdclass.Host.hosts if _can_view_host(user, hbdclass.Host.hosts[h]) ] lst = [h.jsons() for h in hosts] return web.json_response(json.loads("[" + ",".join(lst) + "]")) async def api_alert_summary(request): """GET /api/0/alert_summary — counts of ok/warning/critical hosts visible to caller.""" user, err = _require_auth(request) if err: return err from .threshold import AlertLevel critical = warning = ok = 0 for host in hbdclass.Host.hosts.values(): if not host.watched: continue if not _can_operate_host(user, host): continue levels = {s.level for s in host.alert_states.values()} if AlertLevel.CRITICAL in levels: critical += 1 elif AlertLevel.WARNING in levels: warning += 1 else: ok += 1 return web.json_response({"critical": critical, "warning": warning, "ok": ok}) async def api_messages(request): lst = data.msgs[-30:] return web.json_response(lst) async def cmd(request): user, err = _require_auth(request) if err: return err qa = request.rel_url.query uname = qa.get("h") ucmd = qa.get("c") if not ucmd or not uname: return web.Response(status=400, text="need h= and c= arguments") if uname not in hbdclass.Host.hosts: return web.Response(status=400, text=f"h={uname} not found") host = hbdclass.Host.hosts[uname] if not _can_operate_host(user, host): return web.json_response({"error": "Forbidden"}, status=403) host.cmds.append(("CMD", {"cmd": urllib.parse.unquote(ucmd)})) return web.Response(text=f"cmd {uname} queued") async def drop(request): user, err = _require_auth(request) if err: return err qa = request.rel_url.query uname = qa.get("h") if not uname: return web.Response(status=400, text="need h= argument") if uname not in hbdclass.Host.hosts: return web.Response(status=400, text=f"h={uname} not found") host = hbdclass.Host.hosts[uname] if not _can_own_host(user, host): return web.json_response({"error": "Forbidden"}, status=403) eventlog(uname, "INFO", "dropped") del hbdclass.Host.hosts[uname] return web.Response(text="Done") async def register(request): user, err = _require_auth(request) if err: return err qa = request.rel_url.query uname = qa.get("h") if not uname: return web.Response(status=400, text="need h= argument") if uname not in hbdclass.Host.hosts: return web.Response(status=400, text=f"h={uname} not found") host = hbdclass.Host.hosts[uname] if not _can_operate_host(user, host): return web.json_response({"error": "Forbidden"}, status=403) ll = host.registerDns() eventlog(uname, "INFO", ll) return web.Response(text=str(ll)) async def update(request): user, err = _require_auth(request) if err: return err qa = request.rel_url.query uname = urllib.parse.unquote(qa.get("h", "")) if not uname: return web.Response(status=400, text="need h= argument") if uname != "All" and uname not in hbdclass.Host.hosts: return web.Response(status=400, text=f"h={uname} not found") names = [uname] if uname != "All" else list(hbdclass.Host.hosts) out = [] for n in names: host = hbdclass.Host.hosts[n] if not _can_operate_host(user, host): out.append(f"update skipped for {n}: Forbidden") continue op_err = None try: host.cmds.append(("UPD", {})) except Exception as e: op_err = str(e) out.append(f"update started for {n}: {op_err if op_err else 'OK'}") return web.Response(text="\n".join(out)) async def live(request): current_user, _ = _require_auth_redirect(request) # render template from hbd/templates/live.html using Jinja2 # Resolve templates directory relative to the hbd package pkg_dir = os.path.dirname(__file__) templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates")) env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir), autoescape=True) host = config.get("hb_host", "localhost") extra_scripts = config.get("http_extra_scripts", "") host = request.host # includes port if non-standard forwarded_proto = request.headers.get("X-Forwarded-Proto", "") is_secure = request.secure or forwarded_proto.lower() == "https" scheme = "wss" if is_secure else "ws" heartbeat_ws_url = f"{scheme}://{host}/ws" from hbd import __version__ as hbd_version tmpl = env.get_template("live.html") body = tmpl.render( title="Heartbeat", header="Heartbeat", request=request, heartbeat_ws_url=heartbeat_ws_url, extra_scripts=extra_scripts, hbd_version=hbd_version, hosts=[ hbdclass.Host.hosts[h].stateinfo() for h in sorted(hbdclass.Host.hosts) if _can_operate_host(current_user, hbdclass.Host.hosts[h]) ], messages=data.msgs[-30:], current_user=current_user.to_dict() if current_user else None, active_page="live", ) return web.Response(text=body, content_type="text/html") async def static(request): """Serve files from the package static directory. URL form: /static/ """ p = request.match_info.get("path", "") logger.debug("static file requested: %s", p) base = os.path.abspath(os.path.join(os.path.dirname(__file__), "static")) # normalize and prevent directory traversal target = os.path.abspath(os.path.normpath(os.path.join(base, p))) if not target.startswith(base + os.sep) and target != base: return web.Response(status=403, text="Forbidden") if not os.path.exists(target) or not os.path.isfile(target): return web.Response(status=404, text="Not Found") logger.info("serving static file: %s", target) return web.FileResponse(path=target) async def favicon(request): """Serve favicon.ico from the package static directory.""" base = os.path.abspath(os.path.join(os.path.dirname(__file__), "static/images")) target = os.path.join(base, "favicon.ico") if not os.path.exists(target) or not os.path.isfile(target): return web.Response(status=404, text="Not Found") return web.FileResponse(path=target) # ------------------------------------------------------------------------- # Plugin Data API Endpoints # ------------------------------------------------------------------------- async def api_host_plugins(request): """Get all plugin data for a specific host.""" user, err = _require_auth(request) if err: return err hostname = request.match_info.get("hostname") if hostname not in hbdclass.Host.hosts: return web.json_response({"error": f"Host '{hostname}' not found"}, status=404) host = hbdclass.Host.hosts[hostname] if not _can_view_host(user, host): return web.json_response({"error": "Forbidden"}, status=403) # Get plugin data with most recent sample for each plugin plugins_summary = {} for plugin_name, samples in host.plugin_data.items(): if samples: # Get most recent sample timestamp, data = samples[-1] plugins_summary[plugin_name] = { "timestamp": timestamp, "data": data, "sample_count": len(samples), } return web.json_response({ "hostname": hostname, "plugins": plugins_summary, }) async def api_host_plugin_detail(request): """Get detailed data for a specific plugin on a host.""" user, err = _require_auth(request) if err: return err hostname = request.match_info.get("hostname") plugin_name = request.match_info.get("plugin_name") if hostname not in hbdclass.Host.hosts: return web.json_response({"error": f"Host '{hostname}' not found"}, status=404) host = hbdclass.Host.hosts[hostname] if not _can_view_host(user, host): return web.json_response({"error": "Forbidden"}, status=403) # Get limit from query parameter limit = request.rel_url.query.get("limit", "10") try: limit = int(limit) except ValueError: limit = 10 # Get plugin data samples = host.get_plugin_data(plugin_name, limit=limit) if not samples: return web.json_response( {"error": f"No data for plugin '{plugin_name}' on host '{hostname}'"}, status=404 ) # Format samples formatted_samples = [ { "timestamp": ts, "data": data, } for ts, data in samples ] return web.json_response({ "hostname": hostname, "plugin": plugin_name, "samples": formatted_samples, "sample_count": len(formatted_samples), }) async def api_host_alerts(request): """Get alert states for a specific host.""" user, err = _require_auth(request) if err: return err hostname = request.match_info.get("hostname") if hostname not in hbdclass.Host.hosts: return web.json_response({"error": f"Host '{hostname}' not found"}, status=404) host = hbdclass.Host.hosts[hostname] if not _can_view_host(user, host): return web.json_response({"error": "Forbidden"}, status=403) # Get alert states alerts = [] for metric_path, alert_state in host.alert_states.items(): alerts.append(alert_state.to_dict()) # Get summary if threshold_checker available summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0} if threshold_checker: summary = threshold_checker.get_alert_summary(host.alert_states) return web.json_response({ "hostname": hostname, "alerts": alerts, "summary": summary, }) async def api_all_alerts(request): """Get all active alerts across all hosts.""" user, err = _require_auth(request) if err: return err all_alerts = [] for hostname, host in hbdclass.Host.hosts.items(): if not _can_view_host(user, host): continue if threshold_checker: active_alerts = threshold_checker.get_active_alerts(host.alert_states) else: # Fallback if no threshold checker from hbd.server.threshold import AlertLevel active_alerts = [ state for state in host.alert_states.values() if state.level != AlertLevel.OK ] for alert in active_alerts: alert_dict = alert.to_dict() alert_dict["hostname"] = hostname all_alerts.append(alert_dict) # Sort by level (critical first) then by hostname level_order = {"CRITICAL": 0, "WARNING": 1, "UNKNOWN": 2, "OK": 3} all_alerts.sort( key=lambda a: (level_order.get(a["level"], 99), a["hostname"], a["metric_path"]) ) # Get summary counts summary = {"critical": 0, "warning": 0, "unknown": 0, "total": len(all_alerts)} for alert in all_alerts: level = alert["level"].lower() if level in summary: summary[level] += 1 return web.json_response({ "alerts": all_alerts, "summary": summary, "host_count": len(hbdclass.Host.hosts), }) async def api_acknowledge_alert(request): """Acknowledge an alert to stop reminder notifications.""" user, err = _require_auth(request) if err: return err try: data = await request.json() except Exception: return web.json_response( {"error": "Invalid JSON in request body"}, status=400 ) hostname = data.get("hostname") metric_path = data.get("metric_path") if not hostname or not metric_path: return web.json_response( {"error": "Missing required fields: hostname and metric_path"}, status=400 ) if hostname not in hbdclass.Host.hosts: return web.json_response( {"error": f"Host '{hostname}' not found"}, status=404 ) host = hbdclass.Host.hosts[hostname] if not _can_view_host(user, host): return web.json_response({"error": "Forbidden"}, status=403) if metric_path not in host.alert_states: return web.json_response( {"error": f"Alert '{metric_path}' not found for host '{hostname}'"}, status=404 ) alert_state = host.alert_states[metric_path] alert_state.acknowledge() return web.json_response({ "success": True, "hostname": hostname, "metric_path": metric_path, "acknowledged_at": alert_state.acknowledged_at, }) # ------------------------------------------------------------------------- # UI Pages # ------------------------------------------------------------------------- async def plugins_page(request): """Render the plugin metrics visualization page.""" current_user, _ = _require_auth_redirect(request) pkg_dir = os.path.dirname(__file__) templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates")) env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir), autoescape=True) # Collect all hosts with plugin data (filtered by visibility) hosts_with_plugins = [] for hostname in sorted(hbdclass.Host.hosts.keys()): host = hbdclass.Host.hosts[hostname] if not _can_operate_host(current_user, host): continue if host.plugin_data: hosts_with_plugins.append({ "name": hostname, "plugins": list(host.plugin_data.keys()), "is_owner": _can_own_host(current_user, host), "owner": host.owner, }) tmpl = env.get_template("plugins.html") body = tmpl.render( title="Host Overview - Heartbeat", header="Host Overview", hosts=hosts_with_plugins, current_user=current_user.to_dict() if current_user else None, active_page="plugins", ) return web.Response(text=body, content_type="text/html") async def alerts_page(request): """Render the alerts dashboard page.""" current_user, _ = _require_auth_redirect(request) pkg_dir = os.path.dirname(__file__) templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates")) env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir), autoescape=True) tmpl = env.get_template("alerts.html") body = tmpl.render( title="Alerts Dashboard - Heartbeat", header="Alerts Dashboard", current_user=current_user.to_dict() if current_user else None, active_page="alerts", ) return web.Response(text=body, content_type="text/html") # ------------------------------------------------------------------------- # Auth endpoints # ------------------------------------------------------------------------- async def api_login(request): """POST /api/0/auth/login {username, password} -> {token} Also sets an hbd_session cookie for browser clients. """ if not users_mod.users_enabled(): return web.json_response({"error": "Auth not configured"}, status=404) try: body = await request.json() except Exception: return web.json_response({"error": "Invalid JSON"}, status=400) username = body.get("username", "") password = body.get("password", "") user = users_mod.authenticate(username, password) if user is None: return web.json_response({"error": "Invalid credentials"}, status=401) token = users_mod.create_session(username) eventlog("hbd", "INFO", f"Login: {username} via api") resp = web.json_response({"token": token, "username": username}) resp.set_cookie( SESSION_COOKIE, token, max_age=users_mod.SESSION_TTL, httponly=True, samesite="Lax", ) return resp async def login_page(request): """GET /login — show login form; POST /login — process and redirect.""" if not users_mod.users_enabled(): raise web.HTTPFound("/") error = "" if request.method == "POST": form = await request.post() username = form.get("username", "") password = form.get("password", "") user = users_mod.authenticate(username, password) if user: token = users_mod.create_session(username) eventlog("hbd", "INFO", f"Login: {username} via password") redirect_to = request.rel_url.query.get("next", "/") if not redirect_to.startswith("/"): redirect_to = "/" resp = web.HTTPFound(redirect_to) resp.set_cookie( SESSION_COOKIE, token, max_age=users_mod.SESSION_TTL, httponly=True, samesite="Lax", ) raise resp error = "Invalid username or password." elif request.rel_url.query.get("error"): error = "Sign-in failed. Please try again." oauth_buttons = "" _providers = oauth_mod.get_providers(config) if _providers: buttons_html = "" for _p in _providers: _logo = f'' if _p.logo else "" buttons_html += f""" {_logo}{_html.escape(_p.label)} """ oauth_buttons = f"""
or
{buttons_html}""" html = f""" Heartbeat — Login

Heartbeat

{'

' + error + '

' if error else ''}
{oauth_buttons}
""" return web.Response(text=html, content_type="text/html") async def web_logout(request): """GET /logout — clear session cookie and redirect to /login.""" token = request.cookies.get(SESSION_COOKIE, "") _user = users_mod.get_session_user(token) users_mod.delete_session(token) if _user: eventlog("hbd", "INFO", f"Logout: {_user.username}") resp = web.HTTPFound("/login") resp.del_cookie(SESSION_COOKIE) raise resp async def api_logout(request): """POST /api/0/auth/logout""" token = _get_token(request) _user = users_mod.get_session_user(token) users_mod.delete_session(token) if _user: eventlog("hbd", "INFO", f"Logout: {_user.username}") resp = web.json_response({"success": True}) resp.del_cookie(SESSION_COOKIE) return resp # ------------------------------------------------------------------------- # User endpoints # ------------------------------------------------------------------------- async def api_user_avatar(request): """GET /api/0/users/{username}/avatar — serve a local avatar file. Only reachable when the user's avatar config value starts with '/'. Falls back to 404 for external URLs (the browser fetches those directly). """ user, err = _require_auth(request) if err: return err username = request.match_info.get("username") target_user = users_mod.get_user(username) if target_user is None: return web.Response(status=404, text="User not found") if not target_user.avatar_is_local(): return web.Response(status=404, text="No local avatar configured") path = target_user.avatar avatar_dir = config.get("avatar_dir") or ( os.path.dirname(os.path.realpath(_config_path)) if _config_path else None ) if not avatar_dir: return web.Response(status=403, text="Local avatars not configured") if not os.path.realpath(path).startswith(os.path.realpath(avatar_dir) + os.sep): return web.Response(status=403, text="Forbidden") if not os.path.isfile(path): return web.Response(status=404, text="Avatar file not found") # Infer content-type from extension ext = os.path.splitext(path)[1].lower() mime = { ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".gif": "image/gif", ".webp": "image/webp", ".svg": "image/svg+xml", }.get(ext, "application/octet-stream") return web.FileResponse(path=path, headers={"Content-Type": mime}) async def api_users(request): """GET /api/0/users — admin only.""" user, err = _require_auth(request) if err: return err if users_mod.users_enabled() and (user is None or not user.admin): return web.json_response({"error": "Forbidden"}, status=403) return web.json_response([u.to_dict() for u in users_mod.users.values()]) async def api_user_self(request): """GET /api/0/users/me — own profile.""" user, err = _require_auth(request) if err: return err if user is None: return web.json_response({"error": "Auth not configured"}, status=404) return web.json_response(user.to_dict()) # ------------------------------------------------------------------------- # Host access endpoints # ------------------------------------------------------------------------- async def api_host_access_get(request): """GET /api/0/hosts/{hostname}/access""" user, err = _require_auth(request) if err: return err hostname = request.match_info.get("hostname") if hostname not in hbdclass.Host.hosts: return web.json_response({"error": f"Host '{hostname}' not found"}, status=404) host = hbdclass.Host.hosts[hostname] if not _can_view_host(user, host): return web.json_response({"error": "Forbidden"}, status=403) return web.json_response(host.access_dict()) async def api_host_access_put(request): """PUT /api/0/hosts/{hostname}/access — owner or admin only. Body: {owner?: str, managers?: [str], monitors?: [str]} """ user, err = _require_auth(request) if err: return err hostname = request.match_info.get("hostname") if hostname not in hbdclass.Host.hosts: return web.json_response({"error": f"Host '{hostname}' not found"}, status=404) host = hbdclass.Host.hosts[hostname] if not _can_own_host(user, host): return web.json_response({"error": "Forbidden"}, status=403) try: body = await request.json() except Exception: return web.json_response({"error": "Invalid JSON"}, status=400) if "owner" in body: host.owner = body["owner"] or None if "managers" in body: host.managers = list(body["managers"]) if "monitors" in body: host.monitors = list(body["monitors"]) return web.json_response(host.access_dict()) # ------------------------------------------------------------------------- # Host info endpoint # ------------------------------------------------------------------------- async def api_host_info(request): """GET /api/0/hosts/{hostname}/info""" user, err = _require_auth(request) if err: return err hostname = request.match_info.get("hostname") if hostname not in hbdclass.Host.hosts: return web.json_response({"error": f"Host '{hostname}' not found"}, status=404) host = hbdclass.Host.hosts[hostname] if not _can_view_host(user, host): return web.json_response({"error": "Forbidden"}, status=403) return web.json_response(_build_host_info(host, threshold_checker=threshold_checker)) # ------------------------------------------------------------------------- # User profile page # ------------------------------------------------------------------------- async def profile_page(request): """GET /profile — current user's settings and host access summary.""" current_user, _ = _require_auth_redirect(request) pkg_dir = os.path.dirname(__file__) templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates")) env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir), autoescape=True) # Build host access summary for this user. # Merge live hosts with config-only hosts (not yet seen) so the profile # reflects the config file immediately after a reload. from . import config as config_mod owned, managed, monitored = [], [], [] if current_user: # Collect all known hostnames: live + configured cfg_hostnames = set(config.get("hosts", {}).keys()) live_hostnames = set(hbdclass.Host.hosts.keys()) all_hostnames = sorted(cfg_hostnames | live_hostnames) for hostname in all_hostnames: live_host = hbdclass.Host.hosts.get(hostname) if live_host is not None: # Use live object — it has apply_access already called is_own = live_host.is_owner(current_user.username) is_mgr = not is_own and live_host.is_manager(current_user.username) is_mon = not is_own and not is_mgr and live_host.is_monitor(current_user.username) else: # Config-only host — read access directly from config access = config_mod.get_host_access(config, hostname) is_own = access["owner"] == current_user.username is_mgr = current_user.username in access["managers"] is_mon = current_user.username in access["monitors"] if is_own: owned.append(hostname) elif is_mgr: managed.append(hostname) elif is_mon: monitored.append(hostname) # Resolve notification channel configs for display notif_channels = [] if current_user: for ch_name in (current_user.notification_channels or []): ch_cfg = config.get("notification_channels", {}).get(ch_name, {}) notif_channels.append({"name": ch_name, "type": ch_cfg.get("type", "")}) # Build visible channels list for chip picker and My Channels management. visible_channels = _visible_channels_for_user(current_user) if current_user else {} all_channels = sorted( [ { "name": name, "type": cfg.get("type", ""), "owner": cfg.get("owner"), "private": bool(cfg.get("private", False)), } for name, cfg in visible_channels.items() if isinstance(cfg, dict) ], key=lambda c: c["name"], ) # Keep all_channel_names for backwards-compat with any template references. all_channel_names = [c["name"] for c in all_channels] tmpl = env.get_template("profile.html") body = tmpl.render( title="Profile - Heartbeat", header="My Profile", current_user=current_user.to_dict() if current_user else None, owned_hosts=owned, managed_hosts=managed, monitored_hosts=monitored, notification_channels=notif_channels, all_channels=all_channels, all_channel_names=all_channel_names, active_page="profile", ) return web.Response(text=body, content_type="text/html") # ------------------------------------------------------------------------- # About page # ------------------------------------------------------------------------- async def about_page(request): """GET /about — version, runtime, and project information.""" current_user, _ = _require_auth_redirect(request) pkg_dir = os.path.dirname(__file__) templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates")) env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir), autoescape=True) from hbd import __version__ as hbd_version uptime_secs = int(time.time() - _start_epoch) days, rem = divmod(uptime_secs, 86400) hours, rem = divmod(rem, 3600) mins, secs = divmod(rem, 60) if days: uptime_str = f"{days}d {hours}h {mins}m" elif hours: uptime_str = f"{hours}h {mins}m {secs}s" else: uptime_str = f"{mins}m {secs}s" start_dt = datetime.datetime.fromtimestamp(_start_epoch) start_time_str = start_dt.strftime("%Y-%m-%d %H:%M:%S") tmpl = env.get_template("about.html") body = tmpl.render( title="About - Heartbeat", header="About", hbd_version=hbd_version, python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro} ({platform.python_implementation()})", server_hostname=socket.gethostname(), start_epoch=int(_start_epoch), start_time_str=start_time_str, uptime_str=uptime_str, host_count=len(hbdclass.Host.hosts), current_user=current_user.to_dict() if current_user else None, active_page="about", ) return web.Response(text=body, content_type="text/html") # ------------------------------------------------------------------------- # Settings page (admin only) # ------------------------------------------------------------------------- async def settings_page(request): """GET /settings — read-only view of the current server configuration.""" current_user, _ = _require_auth_redirect(request) if current_user and not current_user.admin: raise web.HTTPForbidden(reason="Admin access required") pkg_dir = os.path.dirname(__file__) templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates")) env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir), autoescape=True) tmpl = env.get_template("settings.html") settings_data = settings_mod.get_settings_data(config, threshold_checker=threshold_checker) body = tmpl.render( title="Settings - Heartbeat", sections=settings_data["sections"], all_channel_names=settings_data["all_channel_names"], all_usernames=settings_data["all_usernames"], all_threshold_configs=settings_data["all_threshold_configs"], current_user=current_user.to_dict() if current_user else None, active_page="settings", ) return web.Response(text=body, content_type="text/html") def _oauth_redirect_uri(request, provider_name: str) -> str: base = config.get("base_url", "").rstrip("/") or str(request.url.origin()) return f"{base}/login/oauth/{provider_name}/callback" def _get_oauth_provider(name: str): """Return the ResolvedProvider for *name*, or None if not found.""" return next( (p for p in oauth_mod.get_providers(config) if p.name == name), None, ) async def oauth_redirect(request): """GET /login/oauth/{name} — kick off the OAuth2 flow for the named provider.""" name = request.match_info["name"] provider = _get_oauth_provider(name) if provider is None: return web.Response(status=404, text="OAuth provider not found") state = oauth_mod.make_state() raise web.HTTPFound( oauth_mod.build_auth_url(provider, state, _oauth_redirect_uri(request, name)) ) async def oauth_callback(request): """GET /login/oauth/{name}/callback — handle the provider's redirect back.""" name = request.match_info["name"] provider = _get_oauth_provider(name) if provider is None: return web.Response(status=404, text="OAuth provider not found") code = request.rel_url.query.get("code", "") state = request.rel_url.query.get("state", "") if not code or not state: return web.Response(status=400, text="Missing code or state") if not oauth_mod.validate_state(state): logger.warning("OAuth: invalid or expired state token from %s", request.remote) raise web.HTTPFound("/login?error=1") try: token = await oauth_mod.exchange_code(provider, code, _oauth_redirect_uri(request, name)) profile = await oauth_mod.fetch_user(provider, token) except oauth_mod.OAuthError as exc: logger.warning("OAuth error: %s", exc) raise web.HTTPFound("/login?error=1") user = users_mod.provision_oauth_user( profile["login"], profile["full_name"], profile["avatar_url"], ) # Persist new OAuth users to the config file so they survive restarts. # Only write when the user isn't already in the config's users section. if _config_path and not (config.get("users") or {}).get(user.username): try: disk_data = configio_mod.read_roundtrip(_config_path) if not disk_data.get("users"): disk_data["users"] = {} disk_data["users"][user.username] = { k: v for k, v in [ ("full_name", user.full_name), ("avatar", user.avatar), ] if v } configio_mod.write_config(_config_path, disk_data) logger.info("Persisted OAuth user %r to config", user.username) except Exception as exc: logger.warning("Failed to persist OAuth user %r to config: %s", user.username, exc) session_token = users_mod.create_session(user.username) eventlog("hbd", "INFO", f"Login: {user.username} via {provider.type}") resp = web.HTTPFound("/") resp.set_cookie( SESSION_COOKIE, session_token, max_age=users_mod.SESSION_TTL, httponly=True, samesite="Lax", ) raise resp # ------------------------------------------------------------------------- # Config API (admin only) # ------------------------------------------------------------------------- _config_path = getattr(config, "_config_path", "") or "" async def api_config_get(request): """GET /api/0/config — full config as JSON, secrets masked. Admin only.""" user, err = _require_auth(request) if err: return err if user and not user.admin: return web.json_response({"error": "Forbidden"}, status=403) return web.json_response(_mask_config_for_api(config)) _YAML_EXTRACTORS = { "notification_channels": lambda d: d.get("notification_channels") or {}, "thresholds": lambda d: d.get("threshold_configs") or {}, "hosts": lambda d: d.get("hosts") or {}, "dns": lambda d: {k: d[k] for k in configio_mod._DNS_KEYS if k in d}, } async def api_config_section_get(request): """GET /api/0/config/section/{name} — raw YAML text for a YAML-editor section.""" user, err = _require_auth(request) if err: return err if user and not user.admin: return web.json_response({"error": "Forbidden"}, status=403) if not _config_path: return web.json_response({"error": "Config path not available"}, status=503) name = request.match_info["name"] if name not in _YAML_EXTRACTORS: return web.json_response({"error": "Unknown section"}, status=404) import io as _io from ruamel.yaml import YAML as _YAML try: data = configio_mod.read_roundtrip(_config_path) section_data = _YAML_EXTRACTORS[name](data) _sy = _YAML() _sy.preserve_quotes = True buf = _io.StringIO() _sy.dump(section_data, buf) except Exception as exc: logger.error("Config section read failed: %s", exc) return web.json_response({"error": str(exc)}, status=500) return web.json_response({"yaml": buf.getvalue()}) async def api_config_backups_get(request): """GET /api/0/config/backups — list of backup paths, newest first.""" user, err = _require_auth(request) if err: return err if user and not user.admin: return web.json_response({"error": "Forbidden"}, status=403) if not _config_path: return web.json_response({"backups": []}) backups = configio_mod.list_backups(_config_path) return web.json_response({"backups": backups}) async def api_config_post(request): """POST /api/0/config — publish staged changes to .hb.yaml. Admin only.""" user, err = _require_auth(request) if err: return err if user and not user.admin: return web.json_response({"error": "Forbidden"}, status=403) if not _config_path: return web.json_response({"error": "Config path not available"}, status=503) try: payload = await request.json() except Exception: return web.json_response({"error": "Invalid JSON"}, status=400) if not isinstance(payload, dict): return web.json_response({"error": "Invalid JSON"}, status=400) try: data = configio_mod.read_roundtrip(_config_path) if "server" in payload: configio_mod.apply_structured_section(data, "server", payload["server"]) if "users" in payload: # Hash any plaintext passwords; preserve existing hashes when omitted or "•••" existing_users = data.get("users") or {} users_payload = payload["users"] for username, attrs in users_payload.items(): pw = attrs.get("password", "") if pw and pw != "•••" and not pw.startswith("pbkdf2:"): attrs["password"] = users_mod.hash_password(pw) elif not pw or pw == "•••": existing_hash = (existing_users.get(username) or {}).get("password", "") if existing_hash: attrs["password"] = existing_hash else: attrs.pop("password", None) configio_mod.apply_structured_section(data, "users", users_payload) if "oauth" in payload: existing_oauth = data.get("oauth") or {} new_oauth = payload["oauth"] for name, attrs in new_oauth.items(): cs = attrs.get("client_secret", "") if not cs or cs == "•••": existing_cs = (existing_oauth.get(name) or {}).get("client_secret", "") if existing_cs: attrs["client_secret"] = existing_cs else: attrs.pop("client_secret", None) data["oauth"] = new_oauth if "notification_channels" in payload: configio_mod.apply_yaml_section(data, "notification_channels", payload["notification_channels"]) if "dns" in payload: dns_payload = payload["dns"] if isinstance(dns_payload, str): configio_mod.apply_yaml_section(data, "dns", dns_payload) else: configio_mod.apply_structured_section(data, "dns", dns_payload) if "thresholds" in payload: tc = payload["thresholds"] if isinstance(tc, str): configio_mod.apply_yaml_section(data, "thresholds", tc) elif isinstance(tc, dict): data["threshold_configs"] = _build_threshold_configs_from_form(tc) if "hosts" in payload: h = payload["hosts"] if isinstance(h, dict): configio_mod.apply_structured_section(data, "hosts", h) else: configio_mod.apply_yaml_section(data, "hosts", h) configio_mod.write_config(_config_path, data) except Exception as exc: logger.error("Config write failed: %s", exc) return web.json_response({"error": str(exc)}, status=500) if reload_callback: await reload_callback() elif hasattr(config, "reload"): await config.reload() users_mod.load_users(config) return web.json_response({"ok": True}) async def api_config_rollback(request): """POST /api/0/config/rollback — restore a backup. Admin only.""" user, err = _require_auth(request) if err: return err if user and not user.admin: return web.json_response({"error": "Forbidden"}, status=403) if not _config_path: return web.json_response({"error": "Config path not available"}, status=503) try: body = await request.json() except Exception: return web.json_response({"error": "Invalid JSON"}, status=400) backup = body.get("backup", "") if not backup or backup not in configio_mod.list_backups(_config_path): return web.json_response({"error": "Invalid or missing backup"}, status=400) try: backup_data = configio_mod.read_roundtrip(backup) configio_mod.write_config(_config_path, backup_data) except Exception as exc: logger.error("Rollback failed: %s", exc) return web.json_response({"error": str(exc)}, status=500) if reload_callback: await reload_callback() elif hasattr(config, "reload"): await config.reload() users_mod.load_users(config) return web.json_response({"ok": True}) # ------------------------------------------------------------------------- # Notification channel helpers # ------------------------------------------------------------------------- def _visible_channels_for_user(user): """Return {name: cfg} of channels visible to user (public + own private).""" all_channels = config.get("notification_channels") or {} if user is None: return {} if user.admin: return dict(all_channels) visible = {} for name, cfg in all_channels.items(): if not isinstance(cfg, dict): continue if not cfg.get("private") or cfg.get("owner") == user.username: visible[name] = cfg return visible def _build_channel_response(ch_name, ch_cfg): """Serialize a channel config dict for the API response.""" ch_type = ch_cfg.get("type", "") schema_fields = settings_mod.CHANNEL_TYPE_SCHEMAS.get(ch_type, {}).get("fields", []) fields = [] for sf in schema_fields: k = sf["key"] v = ch_cfg.get(k, "") sensitive = sf["type"] == "secret" fields.append({ "key": k, "label": sf["label"], "value": "•••" if (sensitive and v) else ( ", ".join(v) if isinstance(v, list) else str(v or "") ), "sensitive": sensitive, }) return { "name": ch_name, "type": ch_type, "type_label": settings_mod._CHANNEL_TYPE_LABELS.get(ch_type, ch_type.title()), "owner": ch_cfg.get("owner"), "private": bool(ch_cfg.get("private", False)), "min_level": ch_cfg.get("min_level", "WARNING"), "fields": fields, } # ------------------------------------------------------------------------- # Notification channel API (any authenticated user) # ------------------------------------------------------------------------- async def api_notification_channel_types(request): """GET /api/0/notification_channel_types — channel type schemas.""" user, err = _require_auth(request) if err: return err return web.json_response(settings_mod.CHANNEL_TYPE_SCHEMAS) async def api_notification_channels_get(request): """GET /api/0/notification_channels — list channels visible to current user.""" user, err = _require_auth(request) if err: return err visible = _visible_channels_for_user(user) result = [ _build_channel_response(name, cfg) for name, cfg in visible.items() if isinstance(cfg, dict) ] return web.json_response(result) async def api_notification_channels_post(request): """POST /api/0/notification_channels — create a new channel.""" user, err = _require_auth(request) if err: return err if user is None: return web.json_response({"error": "Authentication required"}, status=401) if not _config_path: return web.json_response({"error": "Config path not available"}, status=503) try: body = await request.json() except Exception: return web.json_response({"error": "Invalid JSON"}, status=400) name = (body.get("name") or "").strip() if not name: return web.json_response({"error": "Channel name is required"}, status=400) ch_type = (body.get("type") or "").strip() if ch_type not in settings_mod.CHANNEL_TYPE_SCHEMAS: return web.json_response({"error": f"Unknown channel type: {ch_type!r}"}, status=400) if name in (config.get("notification_channels") or {}): return web.json_response({"error": f"Channel {name!r} already exists"}, status=409) schema = settings_mod.CHANNEL_TYPE_SCHEMAS[ch_type] channel_cfg = {"type": ch_type} for sf in schema["fields"]: k = sf["key"] v = body.get(k, "") if v: channel_cfg[k] = v elif sf["required"]: return web.json_response({"error": f"Field {k!r} is required"}, status=400) if body.get("min_level"): channel_cfg["min_level"] = body["min_level"] channel_cfg["owner"] = user.username if body.get("private"): channel_cfg["private"] = True try: disk_data = configio_mod.read_roundtrip(_config_path) configio_mod.apply_channel(disk_data, name, channel_cfg) configio_mod.write_config(_config_path, disk_data) except Exception as exc: logger.error("Channel create failed: %s", exc) return web.json_response({"error": str(exc)}, status=500) if reload_callback: await reload_callback() elif hasattr(config, "reload"): await config.reload() return web.json_response({"ok": True, "name": name}) async def api_notification_channel_put(request): """PUT /api/0/notification_channels/{name} — update a channel.""" user, err = _require_auth(request) if err: return err if user is None: return web.json_response({"error": "Authentication required"}, status=401) if not _config_path: return web.json_response({"error": "Config path not available"}, status=503) ch_name = request.match_info["name"] existing_channels = config.get("notification_channels") or {} if ch_name not in existing_channels: return web.json_response({"error": f"Channel {ch_name!r} not found"}, status=404) existing_cfg = existing_channels[ch_name] if not isinstance(existing_cfg, dict): return web.json_response({"error": "Invalid channel config"}, status=500) owner = existing_cfg.get("owner") if not user.admin and owner != user.username: return web.json_response({"error": "Forbidden"}, status=403) try: body = await request.json() except Exception: return web.json_response({"error": "Invalid JSON"}, status=400) ch_type = existing_cfg.get("type", "") schema_fields = settings_mod.CHANNEL_TYPE_SCHEMAS.get(ch_type, {}).get("fields", []) secret_keys = {sf["key"] for sf in schema_fields if sf["type"] == "secret"} try: disk_data = configio_mod.read_roundtrip(_config_path) existing_on_disk = (disk_data.get("notification_channels") or {}).get(ch_name, {}) channel_cfg = {"type": ch_type} for sf in schema_fields: k = sf["key"] v = body.get(k, "") if k in secret_keys and (not v or v == "•••"): existing_val = existing_on_disk.get(k, "") if existing_val: channel_cfg[k] = existing_val elif v: channel_cfg[k] = v if body.get("min_level"): channel_cfg["min_level"] = body["min_level"] if owner is not None: channel_cfg["owner"] = owner if "private" in body: channel_cfg["private"] = bool(body["private"]) elif existing_on_disk.get("private"): channel_cfg["private"] = True configio_mod.apply_channel(disk_data, ch_name, channel_cfg) configio_mod.write_config(_config_path, disk_data) except Exception as exc: logger.error("Channel update failed: %s", exc) return web.json_response({"error": str(exc)}, status=500) if reload_callback: await reload_callback() elif hasattr(config, "reload"): await config.reload() return web.json_response({"ok": True}) async def api_notification_channel_delete(request): """DELETE /api/0/notification_channels/{name} — delete a channel.""" user, err = _require_auth(request) if err: return err if user is None: return web.json_response({"error": "Authentication required"}, status=401) if not _config_path: return web.json_response({"error": "Config path not available"}, status=503) ch_name = request.match_info["name"] existing_channels = config.get("notification_channels") or {} if ch_name not in existing_channels: return web.json_response({"error": f"Channel {ch_name!r} not found"}, status=404) existing_cfg = existing_channels[ch_name] owner = existing_cfg.get("owner") if isinstance(existing_cfg, dict) else None if not user.admin and owner != user.username: return web.json_response({"error": "Forbidden"}, status=403) try: disk_data = configio_mod.read_roundtrip(_config_path) configio_mod.delete_channel(disk_data, ch_name) configio_mod.write_config(_config_path, disk_data) except Exception as exc: logger.error("Channel delete failed: %s", exc) return web.json_response({"error": str(exc)}, status=500) if reload_callback: await reload_callback() elif hasattr(config, "reload"): await config.reload() return web.json_response({"ok": True}) async def api_user_self_put(request): """PUT /api/0/users/me — update own full_name, avatar, notification_channels, password.""" user, err = _require_auth(request) if err: return err if user is None: return web.json_response({"error": "Authentication required"}, status=401) if not _config_path: return web.json_response({"error": "Config path not available"}, status=503) try: body = await request.json() except Exception: return web.json_response({"error": "Invalid JSON"}, status=400) if not isinstance(body, dict): return web.json_response({"error": "Invalid JSON"}, status=400) username = user.username password_change = body.get("password") if password_change: if not isinstance(password_change, dict): return web.json_response({"error": "Invalid JSON"}, status=400) current_pw = password_change.get("current", "") new_pw = password_change.get("new", "") if not new_pw: return web.json_response({"error": "New password cannot be empty"}, status=400) if not users_mod.authenticate(username, current_pw): return web.json_response({"error": "Current password incorrect"}, status=403) try: data = configio_mod.read_roundtrip(_config_path) if "users" not in data or data["users"] is None: data["users"] = {} user_entry = dict(data["users"].get(username) or {}) if "full_name" in body: user_entry["full_name"] = str(body["full_name"]) if "avatar" in body: avatar_val = str(body["avatar"]) if avatar_val.startswith("/"): avatar_dir = config.get("avatar_dir") or ( os.path.dirname(os.path.realpath(_config_path)) if _config_path else None ) if not avatar_dir: return web.json_response({"error": "Local avatars not configured"}, status=400) if not os.path.realpath(avatar_val).startswith(os.path.realpath(avatar_dir) + os.sep): return web.json_response({"error": "Avatar path outside allowed directory"}, status=400) user_entry["avatar"] = avatar_val if "notification_channels" in body: visible = _visible_channels_for_user(user) user_entry["notification_channels"] = [ str(ch) for ch in body["notification_channels"] if ch in visible ] if password_change: user_entry["password"] = users_mod.hash_password(password_change["new"]) data["users"][username] = user_entry configio_mod.write_config(_config_path, data) except Exception as exc: logger.error("User self-update failed: %s", exc) return web.json_response({"error": str(exc)}, status=500) if reload_callback: await reload_callback() elif hasattr(config, "reload"): await config.reload() users_mod.load_users(config) return web.json_response({"ok": True}) app = web.Application() app.add_routes( [ web.get("/", live), web.get("/old", old_index), # Auth web.get("/login", login_page), web.post("/login", login_page), web.get("/logout", web_logout), web.post("/api/0/auth/login", api_login), web.post("/api/0/auth/logout", api_logout), web.get("/login/oauth/{name}", oauth_redirect), web.get("/login/oauth/{name}/callback", oauth_callback), # Users web.get("/api/0/users", api_users), web.get("/api/0/users/me", api_user_self), web.put("/api/0/users/me", api_user_self_put), web.get("/api/0/users/{username}/avatar", api_user_avatar), # Config API (admin) web.get("/api/0/config", api_config_get), web.get("/api/0/config/section/{name}", api_config_section_get), web.get("/api/0/config/backups", api_config_backups_get), web.post("/api/0/config", api_config_post), web.post("/api/0/config/rollback", api_config_rollback), # Notification channel API (any authenticated user) web.get("/api/0/notification_channel_types", api_notification_channel_types), web.get("/api/0/notification_channels", api_notification_channels_get), web.post("/api/0/notification_channels", api_notification_channels_post), web.put("/api/0/notification_channels/{name}", api_notification_channel_put), web.delete("/api/0/notification_channels/{name}", api_notification_channel_delete), # Hosts web.get("/api/0/hosts", api_hosts), web.get("/api/0/alert_summary", api_alert_summary), web.get("/api/0/messages", api_messages), web.get("/api/0/hosts/{hostname}/plugins", api_host_plugins), web.get("/api/0/hosts/{hostname}/plugins/{plugin_name}", api_host_plugin_detail), web.get("/api/0/hosts/{hostname}/alerts", api_host_alerts), web.get("/api/0/hosts/{hostname}/access", api_host_access_get), web.put("/api/0/hosts/{hostname}/access", api_host_access_put), web.get("/api/0/hosts/{hostname}/info", api_host_info), web.get("/api/0/alerts", api_all_alerts), web.post("/api/0/alerts/acknowledge", api_acknowledge_alert), web.get("/c", cmd), web.get("/d", drop), web.get("/n", register), web.get("/u", update), web.get("/live", live), web.get("/plugins", plugins_page), web.get("/alerts", alerts_page), web.get("/about", about_page), web.get("/profile", profile_page), web.get("/settings", settings_page), web.get("/static/{path:.*}", static), web.get("/favicon.ico", favicon), web.get("/ws", ws_mod.handler), ] ) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, host, port) await site.start() logger.info(f"HTTP server started on {host}:{port}") try: await asyncio.Future() finally: await runner.cleanup()