"""HTTP server implementation using aiohttp and jinja2."""
import asyncio
import datetime
import html as _html
import json
import platform
import socket
import sys
import time
import urllib.parse
import os
import logging
from aiohttp import web
import jinja2
from . import data
from . import notify as notify_mod
from . import settings as settings_mod
from . import users as users_mod
from . import oauth as oauth_mod
from . import ws as ws_mod
from . import configio as configio_mod
logger = logging.getLogger(__name__)
eventlog = notify_mod.eventlog
def _render_template(html_str: str, **context) -> str:
tmpl = jinja2.Template(html_str)
return tmpl.render(**context)
# ---------------------------------------------------------------------------
# Auth helpers
# ---------------------------------------------------------------------------
SESSION_COOKIE = "hbd_session"
def _get_token(request) -> str:
"""Extract session token from Bearer header, X-Auth-Token header, or cookie."""
auth = request.headers.get("Authorization", "")
if auth.lower().startswith("bearer "):
return auth[7:].strip()
header_token = request.headers.get("X-Auth-Token", "").strip()
if header_token:
return header_token
return request.cookies.get(SESSION_COOKIE, "")
def _current_user(request):
"""Return the authenticated User, or None when auth is not enabled."""
if not users_mod.users_enabled():
return None # unauthenticated mode — all access allowed
return users_mod.get_session_user(_get_token(request))
def _require_auth(request):
"""Return (user, None) or (None, error Response)."""
if not users_mod.users_enabled():
return None, None
user = users_mod.get_session_user(_get_token(request))
if user is None:
return None, web.json_response({"error": "Unauthorized"}, status=401)
return user, None
def _require_auth_redirect(request):
"""Like _require_auth but returns a redirect to /login for browser requests."""
if not users_mod.users_enabled():
return None, None
user = users_mod.get_session_user(_get_token(request))
if user is None:
raise web.HTTPFound("/login")
return user, None
def _can_view_host(user, host) -> bool:
"""Return True if *user* may see *host* (monitor or higher, or no auth)."""
if user is None:
return True
if user.admin:
return True
return host.is_monitor(user.username)
def _can_operate_host(user, host) -> bool:
"""Manager-level: queue commands, DNS, upgrade."""
if user is None:
return True
if user.admin:
return True
return host.is_manager(user.username)
def _can_own_host(user, host) -> bool:
"""Owner-level: drop host, transfer ownership."""
if user is None:
return True
if user.admin:
return True
return host.is_owner(user.username)
def _mask_config_for_api(config) -> dict:
"""Return a JSON-serializable config dict with secrets masked."""
result = {}
result["server"] = {k: config.get(k) for k in configio_mod._SERVER_KEYS}
users = {}
for username, attrs in (config.get("users") or {}).items():
u = dict(attrs)
if "password" in u:
u["password"] = "•••"
users[username] = u
result["users"] = users
oauth = {}
for name, attrs in (config.get("oauth") or {}).items():
o = dict(attrs)
if "client_secret" in o:
o["client_secret"] = "•••"
oauth[name] = o
result["oauth"] = oauth
return result
async def start(
host: str,
port: int,
config,
hbdclass,
tcss=None,
verbose=False,
get_now=None,
VER="",
threshold_checker=None,
):
"""Start an aiohttp web server and block until cancelled.
This function is intended to be awaited inside the main asyncio event loop.
"""
get_now = get_now or (lambda: time.time())
_start_epoch = time.time()
async def old_index(request):
_require_auth_redirect(request)
res = []
res.append('')
res.append("")
res.append("
")
res.append("Heartbeat")
if tcss:
res.append(tcss)
res.append("")
res.append('')
res.append(f"Heartbeat status {VER}
")
res += hbdclass.ubHost.buildhosttable()
res += hbdclass.ubHost.buildmsgtable(data.msgs)
res.append(
" %s (%s)
"
% (
time.strftime("%H:%M:%S", time.localtime(get_now())),
config.get("tz", "CET-1CDT"),
)
)
res.append("")
body = "\n".join(res)
return web.Response(text=body, content_type="text/html")
async def api_hosts(request):
user, err = _require_auth(request)
if err:
return err
hosts = [
hbdclass.Host.hosts[h]
for h in hbdclass.Host.hosts
if _can_view_host(user, hbdclass.Host.hosts[h])
]
lst = [h.jsons() for h in hosts]
return web.json_response(json.loads("[" + ",".join(lst) + "]"))
async def api_alert_summary(request):
"""GET /api/0/alert_summary — counts of ok/warning/critical hosts visible to caller."""
user, err = _require_auth(request)
if err:
return err
from .threshold import AlertLevel
critical = warning = ok = 0
for host in hbdclass.Host.hosts.values():
if not _can_operate_host(user, host):
continue
levels = {s.level for s in host.alert_states.values()}
if AlertLevel.CRITICAL in levels:
critical += 1
elif AlertLevel.WARNING in levels:
warning += 1
else:
ok += 1
return web.json_response({"critical": critical, "warning": warning, "ok": ok})
async def api_messages(request):
lst = data.msgs[-30:]
return web.json_response(lst)
async def cmd(request):
user, err = _require_auth(request)
if err:
return err
qa = request.rel_url.query
uname = qa.get("h")
ucmd = qa.get("c")
if not ucmd or not uname:
return web.Response(status=400, text="need h= and c= arguments")
if uname not in hbdclass.Host.hosts:
return web.Response(status=400, text=f"h={uname} not found")
host = hbdclass.Host.hosts[uname]
if not _can_operate_host(user, host):
return web.json_response({"error": "Forbidden"}, status=403)
host.cmds.append(("CMD", {"cmd": urllib.parse.unquote(ucmd)}))
return web.Response(text=f"cmd {uname} queued")
async def drop(request):
user, err = _require_auth(request)
if err:
return err
qa = request.rel_url.query
uname = qa.get("h")
if not uname:
return web.Response(status=400, text="need h= argument")
if uname not in hbdclass.Host.hosts:
return web.Response(status=400, text=f"h={uname} not found")
host = hbdclass.Host.hosts[uname]
if not _can_own_host(user, host):
return web.json_response({"error": "Forbidden"}, status=403)
eventlog(uname, "INFO", "dropped")
del hbdclass.Host.hosts[uname]
return web.Response(text="Done")
async def register(request):
user, err = _require_auth(request)
if err:
return err
qa = request.rel_url.query
uname = qa.get("h")
if not uname:
return web.Response(status=400, text="need h= argument")
if uname not in hbdclass.Host.hosts:
return web.Response(status=400, text=f"h={uname} not found")
host = hbdclass.Host.hosts[uname]
if not _can_operate_host(user, host):
return web.json_response({"error": "Forbidden"}, status=403)
ll = host.registerDns()
eventlog(uname, "INFO", ll)
return web.Response(text=str(ll))
async def update(request):
user, err = _require_auth(request)
if err:
return err
qa = request.rel_url.query
uname = urllib.parse.unquote(qa.get("h", ""))
if not uname:
return web.Response(status=400, text="need h= argument")
if uname != "All" and uname not in hbdclass.Host.hosts:
return web.Response(status=400, text=f"h={uname} not found")
names = [uname] if uname != "All" else list(hbdclass.Host.hosts)
out = []
for n in names:
host = hbdclass.Host.hosts[n]
if not _can_operate_host(user, host):
out.append(f"update skipped for {n}: Forbidden")
continue
op_err = None
try:
host.cmds.append(("UPD", {}))
except Exception as e:
op_err = str(e)
out.append(f"update started for {n}: {op_err if op_err else 'OK'}")
return web.Response(text="\n".join(out))
async def live(request):
current_user, _ = _require_auth_redirect(request)
# render template from hbd/templates/live.html using Jinja2
# Resolve templates directory relative to the hbd package
pkg_dir = os.path.dirname(__file__)
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
host = config.get("hb_host", "localhost")
extra_scripts = config.get("http_extra_scripts", "")
host = request.host # includes port if non-standard
forwarded_proto = request.headers.get("X-Forwarded-Proto", "")
is_secure = request.secure or forwarded_proto.lower() == "https"
scheme = "wss" if is_secure else "ws"
heartbeat_ws_url = f"{scheme}://{host}/ws"
from hbd import __version__ as hbd_version
tmpl = env.get_template("live.html")
body = tmpl.render(
title="Heartbeat",
header="Heartbeat",
request=request,
heartbeat_ws_url=heartbeat_ws_url,
extra_scripts=extra_scripts,
hbd_version=hbd_version,
hosts=[
hbdclass.Host.hosts[h].stateinfo()
for h in sorted(hbdclass.Host.hosts)
if _can_operate_host(current_user, hbdclass.Host.hosts[h])
],
messages=data.msgs[-30:],
current_user=current_user.to_dict() if current_user else None,
active_page="live",
)
return web.Response(text=body, content_type="text/html")
async def static(request):
"""Serve files from the package static directory.
URL form: /static/
"""
p = request.match_info.get("path", "")
logger.debug("static file requested: %s", p)
base = os.path.abspath(os.path.join(os.path.dirname(__file__), "static"))
# normalize and prevent directory traversal
target = os.path.abspath(os.path.normpath(os.path.join(base, p)))
if not target.startswith(base + os.sep) and target != base:
return web.Response(status=403, text="Forbidden")
if not os.path.exists(target) or not os.path.isfile(target):
return web.Response(status=404, text="Not Found")
logger.info("serving static file: %s", target)
return web.FileResponse(path=target)
async def favicon(request):
"""Serve favicon.ico from the package static directory."""
base = os.path.abspath(os.path.join(os.path.dirname(__file__), "static/images"))
target = os.path.join(base, "favicon.ico")
if not os.path.exists(target) or not os.path.isfile(target):
return web.Response(status=404, text="Not Found")
return web.FileResponse(path=target)
# -------------------------------------------------------------------------
# Plugin Data API Endpoints
# -------------------------------------------------------------------------
async def api_host_plugins(request):
"""Get all plugin data for a specific host."""
user, err = _require_auth(request)
if err:
return err
hostname = request.match_info.get("hostname")
if hostname not in hbdclass.Host.hosts:
return web.json_response({"error": f"Host '{hostname}' not found"}, status=404)
host = hbdclass.Host.hosts[hostname]
if not _can_view_host(user, host):
return web.json_response({"error": "Forbidden"}, status=403)
# Get plugin data with most recent sample for each plugin
plugins_summary = {}
for plugin_name, samples in host.plugin_data.items():
if samples:
# Get most recent sample
timestamp, data = samples[-1]
plugins_summary[plugin_name] = {
"timestamp": timestamp,
"data": data,
"sample_count": len(samples),
}
return web.json_response({
"hostname": hostname,
"plugins": plugins_summary,
})
async def api_host_plugin_detail(request):
"""Get detailed data for a specific plugin on a host."""
user, err = _require_auth(request)
if err:
return err
hostname = request.match_info.get("hostname")
plugin_name = request.match_info.get("plugin_name")
if hostname not in hbdclass.Host.hosts:
return web.json_response({"error": f"Host '{hostname}' not found"}, status=404)
host = hbdclass.Host.hosts[hostname]
if not _can_view_host(user, host):
return web.json_response({"error": "Forbidden"}, status=403)
# Get limit from query parameter
limit = request.rel_url.query.get("limit", "10")
try:
limit = int(limit)
except ValueError:
limit = 10
# Get plugin data
samples = host.get_plugin_data(plugin_name, limit=limit)
if not samples:
return web.json_response(
{"error": f"No data for plugin '{plugin_name}' on host '{hostname}'"},
status=404
)
# Format samples
formatted_samples = [
{
"timestamp": ts,
"data": data,
}
for ts, data in samples
]
return web.json_response({
"hostname": hostname,
"plugin": plugin_name,
"samples": formatted_samples,
"sample_count": len(formatted_samples),
})
async def api_host_alerts(request):
"""Get alert states for a specific host."""
user, err = _require_auth(request)
if err:
return err
hostname = request.match_info.get("hostname")
if hostname not in hbdclass.Host.hosts:
return web.json_response({"error": f"Host '{hostname}' not found"}, status=404)
host = hbdclass.Host.hosts[hostname]
if not _can_view_host(user, host):
return web.json_response({"error": "Forbidden"}, status=403)
# Get alert states
alerts = []
for metric_path, alert_state in host.alert_states.items():
alerts.append(alert_state.to_dict())
# Get summary if threshold_checker available
summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0}
if threshold_checker:
summary = threshold_checker.get_alert_summary(host.alert_states)
return web.json_response({
"hostname": hostname,
"alerts": alerts,
"summary": summary,
})
async def api_all_alerts(request):
"""Get all active alerts across all hosts."""
user, err = _require_auth(request)
if err:
return err
all_alerts = []
for hostname, host in hbdclass.Host.hosts.items():
if not _can_view_host(user, host):
continue
if threshold_checker:
active_alerts = threshold_checker.get_active_alerts(host.alert_states)
else:
# Fallback if no threshold checker
from hbd.server.threshold import AlertLevel
active_alerts = [
state for state in host.alert_states.values()
if state.level != AlertLevel.OK
]
for alert in active_alerts:
alert_dict = alert.to_dict()
alert_dict["hostname"] = hostname
all_alerts.append(alert_dict)
# Sort by level (critical first) then by hostname
level_order = {"CRITICAL": 0, "WARNING": 1, "UNKNOWN": 2, "OK": 3}
all_alerts.sort(
key=lambda a: (level_order.get(a["level"], 99), a["hostname"], a["metric_path"])
)
# Get summary counts
summary = {"critical": 0, "warning": 0, "unknown": 0, "total": len(all_alerts)}
for alert in all_alerts:
level = alert["level"].lower()
if level in summary:
summary[level] += 1
return web.json_response({
"alerts": all_alerts,
"summary": summary,
"host_count": len(hbdclass.Host.hosts),
})
async def api_acknowledge_alert(request):
"""Acknowledge an alert to stop reminder notifications."""
user, err = _require_auth(request)
if err:
return err
try:
data = await request.json()
except Exception:
return web.json_response(
{"error": "Invalid JSON in request body"},
status=400
)
hostname = data.get("hostname")
metric_path = data.get("metric_path")
if not hostname or not metric_path:
return web.json_response(
{"error": "Missing required fields: hostname and metric_path"},
status=400
)
if hostname not in hbdclass.Host.hosts:
return web.json_response(
{"error": f"Host '{hostname}' not found"},
status=404
)
host = hbdclass.Host.hosts[hostname]
if not _can_view_host(user, host):
return web.json_response({"error": "Forbidden"}, status=403)
if metric_path not in host.alert_states:
return web.json_response(
{"error": f"Alert '{metric_path}' not found for host '{hostname}'"},
status=404
)
alert_state = host.alert_states[metric_path]
alert_state.acknowledge()
return web.json_response({
"success": True,
"hostname": hostname,
"metric_path": metric_path,
"acknowledged_at": alert_state.acknowledged_at,
})
# -------------------------------------------------------------------------
# UI Pages
# -------------------------------------------------------------------------
async def plugins_page(request):
"""Render the plugin metrics visualization page."""
current_user, _ = _require_auth_redirect(request)
pkg_dir = os.path.dirname(__file__)
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
# Collect all hosts with plugin data (filtered by visibility)
hosts_with_plugins = []
for hostname in sorted(hbdclass.Host.hosts.keys()):
host = hbdclass.Host.hosts[hostname]
if not _can_operate_host(current_user, host):
continue
if host.plugin_data:
hosts_with_plugins.append({
"name": hostname,
"plugins": list(host.plugin_data.keys()),
"is_owner": _can_own_host(current_user, host),
"owner": host.owner,
})
tmpl = env.get_template("plugins.html")
body = tmpl.render(
title="Host Overview - Heartbeat",
header="Host Overview",
hosts=hosts_with_plugins,
current_user=current_user.to_dict() if current_user else None,
active_page="plugins",
)
return web.Response(text=body, content_type="text/html")
async def alerts_page(request):
"""Render the alerts dashboard page."""
current_user, _ = _require_auth_redirect(request)
pkg_dir = os.path.dirname(__file__)
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
tmpl = env.get_template("alerts.html")
body = tmpl.render(
title="Alerts Dashboard - Heartbeat",
header="Alerts Dashboard",
current_user=current_user.to_dict() if current_user else None,
active_page="alerts",
)
return web.Response(text=body, content_type="text/html")
# -------------------------------------------------------------------------
# Auth endpoints
# -------------------------------------------------------------------------
async def api_login(request):
"""POST /api/0/auth/login {username, password} -> {token}
Also sets an hbd_session cookie for browser clients.
"""
if not users_mod.users_enabled():
return web.json_response({"error": "Auth not configured"}, status=404)
try:
body = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON"}, status=400)
username = body.get("username", "")
password = body.get("password", "")
user = users_mod.authenticate(username, password)
if user is None:
return web.json_response({"error": "Invalid credentials"}, status=401)
token = users_mod.create_session(username)
eventlog("hbd", "INFO", f"Login: {username} via api")
resp = web.json_response({"token": token, "username": username})
resp.set_cookie(
SESSION_COOKIE,
token,
max_age=users_mod.SESSION_TTL,
httponly=True,
samesite="Lax",
)
return resp
async def login_page(request):
"""GET /login — show login form; POST /login — process and redirect."""
if not users_mod.users_enabled():
raise web.HTTPFound("/")
error = ""
if request.method == "POST":
form = await request.post()
username = form.get("username", "")
password = form.get("password", "")
user = users_mod.authenticate(username, password)
if user:
token = users_mod.create_session(username)
eventlog("hbd", "INFO", f"Login: {username} via password")
redirect_to = request.rel_url.query.get("next", "/")
resp = web.HTTPFound(redirect_to)
resp.set_cookie(
SESSION_COOKIE,
token,
max_age=users_mod.SESSION_TTL,
httponly=True,
samesite="Lax",
)
raise resp
error = "Invalid username or password."
elif request.rel_url.query.get("error"):
error = "Sign-in failed. Please try again."
oauth_buttons = ""
_providers = oauth_mod.get_providers(config)
if _providers:
buttons_html = ""
for _p in _providers:
_logo = f'
' if _p.logo else ""
buttons_html += f"""
{_logo}{_html.escape(_p.label)}
"""
oauth_buttons = f"""
or
{buttons_html}"""
html = f"""
Heartbeat — Login
Heartbeat
{'
' + error + '
' if error else ''}
{oauth_buttons}
"""
return web.Response(text=html, content_type="text/html")
async def web_logout(request):
"""GET /logout — clear session cookie and redirect to /login."""
token = request.cookies.get(SESSION_COOKIE, "")
_user = users_mod.get_session_user(token)
users_mod.delete_session(token)
if _user:
eventlog("hbd", "INFO", f"Logout: {_user.username}")
resp = web.HTTPFound("/login")
resp.del_cookie(SESSION_COOKIE)
raise resp
async def api_logout(request):
"""POST /api/0/auth/logout"""
token = _get_token(request)
_user = users_mod.get_session_user(token)
users_mod.delete_session(token)
if _user:
eventlog("hbd", "INFO", f"Logout: {_user.username}")
resp = web.json_response({"success": True})
resp.del_cookie(SESSION_COOKIE)
return resp
# -------------------------------------------------------------------------
# User endpoints
# -------------------------------------------------------------------------
async def api_user_avatar(request):
"""GET /api/0/users/{username}/avatar — serve a local avatar file.
Only reachable when the user's avatar config value starts with '/'.
Falls back to 404 for external URLs (the browser fetches those directly).
"""
user, err = _require_auth(request)
if err:
return err
username = request.match_info.get("username")
target_user = users_mod.get_user(username)
if target_user is None:
return web.Response(status=404, text="User not found")
if not target_user.avatar_is_local():
return web.Response(status=404, text="No local avatar configured")
path = target_user.avatar
if not os.path.isfile(path):
return web.Response(status=404, text="Avatar file not found")
# Infer content-type from extension
ext = os.path.splitext(path)[1].lower()
mime = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".webp": "image/webp",
".svg": "image/svg+xml",
}.get(ext, "application/octet-stream")
return web.FileResponse(path=path, headers={"Content-Type": mime})
async def api_users(request):
"""GET /api/0/users — admin only."""
user, err = _require_auth(request)
if err:
return err
if users_mod.users_enabled() and (user is None or not user.admin):
return web.json_response({"error": "Forbidden"}, status=403)
return web.json_response([u.to_dict() for u in users_mod.users.values()])
async def api_user_self(request):
"""GET /api/0/users/me — own profile."""
user, err = _require_auth(request)
if err:
return err
if user is None:
return web.json_response({"error": "Auth not configured"}, status=404)
return web.json_response(user.to_dict())
# -------------------------------------------------------------------------
# Host access endpoints
# -------------------------------------------------------------------------
async def api_host_access_get(request):
"""GET /api/0/hosts/{hostname}/access"""
user, err = _require_auth(request)
if err:
return err
hostname = request.match_info.get("hostname")
if hostname not in hbdclass.Host.hosts:
return web.json_response({"error": f"Host '{hostname}' not found"}, status=404)
host = hbdclass.Host.hosts[hostname]
if not _can_view_host(user, host):
return web.json_response({"error": "Forbidden"}, status=403)
return web.json_response(host.access_dict())
async def api_host_access_put(request):
"""PUT /api/0/hosts/{hostname}/access — owner or admin only.
Body: {owner?: str, managers?: [str], monitors?: [str]}
"""
user, err = _require_auth(request)
if err:
return err
hostname = request.match_info.get("hostname")
if hostname not in hbdclass.Host.hosts:
return web.json_response({"error": f"Host '{hostname}' not found"}, status=404)
host = hbdclass.Host.hosts[hostname]
if not _can_own_host(user, host):
return web.json_response({"error": "Forbidden"}, status=403)
try:
body = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON"}, status=400)
if "owner" in body:
host.owner = body["owner"] or None
if "managers" in body:
host.managers = list(body["managers"])
if "monitors" in body:
host.monitors = list(body["monitors"])
return web.json_response(host.access_dict())
# -------------------------------------------------------------------------
# User profile page
# -------------------------------------------------------------------------
async def profile_page(request):
"""GET /profile — current user's settings and host access summary."""
current_user, _ = _require_auth_redirect(request)
pkg_dir = os.path.dirname(__file__)
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
# Build host access summary for this user.
# Merge live hosts with config-only hosts (not yet seen) so the profile
# reflects the config file immediately after a reload.
from . import config as config_mod
owned, managed, monitored = [], [], []
if current_user:
# Collect all known hostnames: live + configured
cfg_hostnames = set(config.get("hosts", {}).keys())
live_hostnames = set(hbdclass.Host.hosts.keys())
all_hostnames = sorted(cfg_hostnames | live_hostnames)
for hostname in all_hostnames:
live_host = hbdclass.Host.hosts.get(hostname)
if live_host is not None:
# Use live object — it has apply_access already called
is_own = live_host.is_owner(current_user.username)
is_mgr = not is_own and live_host.is_manager(current_user.username)
is_mon = not is_own and not is_mgr and live_host.is_monitor(current_user.username)
else:
# Config-only host — read access directly from config
access = config_mod.get_host_access(config, hostname)
is_own = access["owner"] == current_user.username
is_mgr = current_user.username in access["managers"]
is_mon = current_user.username in access["monitors"]
if is_own:
owned.append(hostname)
elif is_mgr:
managed.append(hostname)
elif is_mon:
monitored.append(hostname)
# Resolve notification channel configs for display
notif_channels = []
if current_user:
for ch_name in (current_user.notification_channels or []):
ch_cfg = config.get("notification_channels", {}).get(ch_name, {})
notif_channels.append({"name": ch_name, "type": ch_cfg.get("type", "")})
tmpl = env.get_template("profile.html")
body = tmpl.render(
title="Profile - Heartbeat",
header="My Profile",
current_user=current_user.to_dict() if current_user else None,
owned_hosts=owned,
managed_hosts=managed,
monitored_hosts=monitored,
notification_channels=notif_channels,
active_page="profile",
)
return web.Response(text=body, content_type="text/html")
# -------------------------------------------------------------------------
# About page
# -------------------------------------------------------------------------
async def about_page(request):
"""GET /about — version, runtime, and project information."""
current_user, _ = _require_auth_redirect(request)
pkg_dir = os.path.dirname(__file__)
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
from hbd import __version__ as hbd_version
uptime_secs = int(time.time() - _start_epoch)
days, rem = divmod(uptime_secs, 86400)
hours, rem = divmod(rem, 3600)
mins, secs = divmod(rem, 60)
if days:
uptime_str = f"{days}d {hours}h {mins}m"
elif hours:
uptime_str = f"{hours}h {mins}m {secs}s"
else:
uptime_str = f"{mins}m {secs}s"
start_dt = datetime.datetime.fromtimestamp(_start_epoch)
start_time_str = start_dt.strftime("%Y-%m-%d %H:%M:%S")
tmpl = env.get_template("about.html")
body = tmpl.render(
title="About - Heartbeat",
header="About",
hbd_version=hbd_version,
python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro} ({platform.python_implementation()})",
server_hostname=socket.gethostname(),
start_epoch=int(_start_epoch),
start_time_str=start_time_str,
uptime_str=uptime_str,
host_count=len(hbdclass.Host.hosts),
current_user=current_user.to_dict() if current_user else None,
active_page="about",
)
return web.Response(text=body, content_type="text/html")
# -------------------------------------------------------------------------
# Settings page (admin only)
# -------------------------------------------------------------------------
async def settings_page(request):
"""GET /settings — read-only view of the current server configuration."""
current_user, _ = _require_auth_redirect(request)
if current_user and not current_user.admin:
raise web.HTTPForbidden(reason="Admin access required")
pkg_dir = os.path.dirname(__file__)
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
tmpl = env.get_template("settings.html")
body = tmpl.render(
title="Settings - Heartbeat",
sections=settings_mod.get_settings_sections(config, threshold_checker=threshold_checker),
current_user=current_user.to_dict() if current_user else None,
active_page="settings",
)
return web.Response(text=body, content_type="text/html")
def _oauth_redirect_uri(request, provider_name: str) -> str:
base = config.get("base_url", "").rstrip("/") or str(request.url.origin())
return f"{base}/login/oauth/{provider_name}/callback"
def _get_oauth_provider(name: str):
"""Return the ResolvedProvider for *name*, or None if not found."""
return next(
(p for p in oauth_mod.get_providers(config) if p.name == name),
None,
)
async def oauth_redirect(request):
"""GET /login/oauth/{name} — kick off the OAuth2 flow for the named provider."""
name = request.match_info["name"]
provider = _get_oauth_provider(name)
if provider is None:
return web.Response(status=404, text="OAuth provider not found")
state = oauth_mod.make_state()
raise web.HTTPFound(
oauth_mod.build_auth_url(provider, state, _oauth_redirect_uri(request, name))
)
async def oauth_callback(request):
"""GET /login/oauth/{name}/callback — handle the provider's redirect back."""
name = request.match_info["name"]
provider = _get_oauth_provider(name)
if provider is None:
return web.Response(status=404, text="OAuth provider not found")
code = request.rel_url.query.get("code", "")
state = request.rel_url.query.get("state", "")
if not code or not state:
return web.Response(status=400, text="Missing code or state")
if not oauth_mod.validate_state(state):
logger.warning("OAuth: invalid or expired state token from %s", request.remote)
raise web.HTTPFound("/login?error=1")
try:
token = await oauth_mod.exchange_code(provider, code, _oauth_redirect_uri(request, name))
profile = await oauth_mod.fetch_user(provider, token)
except oauth_mod.OAuthError as exc:
logger.warning("OAuth error: %s", exc)
raise web.HTTPFound("/login?error=1")
user = users_mod.provision_oauth_user(
profile["login"],
profile["full_name"],
profile["avatar_url"],
)
session_token = users_mod.create_session(user.username)
eventlog("hbd", "INFO", f"Login: {user.username} via {provider.type}")
resp = web.HTTPFound("/")
resp.set_cookie(
SESSION_COOKIE,
session_token,
max_age=users_mod.SESSION_TTL,
httponly=True,
samesite="Lax",
)
raise resp
# -------------------------------------------------------------------------
# Config API (admin only)
# -------------------------------------------------------------------------
_config_path = getattr(config, "_config_path", "") or ""
async def api_config_get(request):
"""GET /api/0/config — full config as JSON, secrets masked. Admin only."""
user, err = _require_auth(request)
if err:
return err
if user and not user.admin:
return web.json_response({"error": "Forbidden"}, status=403)
return web.json_response(_mask_config_for_api(config))
_YAML_EXTRACTORS = {
"notification_channels": lambda d: d.get("notification_channels") or {},
"thresholds": lambda d: d.get("threshold_configs") or {},
"hosts": lambda d: d.get("hosts") or {},
"dns": lambda d: {k: d[k] for k in configio_mod._DNS_KEYS if k in d},
}
async def api_config_section_get(request):
"""GET /api/0/config/section/{name} — raw YAML text for a YAML-editor section."""
user, err = _require_auth(request)
if err:
return err
if user and not user.admin:
return web.json_response({"error": "Forbidden"}, status=403)
if not _config_path:
return web.json_response({"error": "Config path not available"}, status=503)
name = request.match_info["name"]
if name not in _YAML_EXTRACTORS:
return web.json_response({"error": "Unknown section"}, status=404)
import io as _io
from ruamel.yaml import YAML as _YAML
try:
data = configio_mod.read_roundtrip(_config_path)
section_data = _YAML_EXTRACTORS[name](data)
_sy = _YAML()
_sy.preserve_quotes = True
buf = _io.StringIO()
_sy.dump(section_data, buf)
except Exception as exc:
logger.error("Config section read failed: %s", exc)
return web.json_response({"error": str(exc)}, status=500)
return web.json_response({"yaml": buf.getvalue()})
async def api_config_backups_get(request):
"""GET /api/0/config/backups — list of backup paths, newest first."""
user, err = _require_auth(request)
if err:
return err
if user and not user.admin:
return web.json_response({"error": "Forbidden"}, status=403)
if not _config_path:
return web.json_response({"backups": []})
backups = configio_mod.list_backups(_config_path)
return web.json_response({"backups": backups})
async def api_config_post(request):
"""POST /api/0/config — publish staged changes to .hb.yaml. Admin only."""
user, err = _require_auth(request)
if err:
return err
if user and not user.admin:
return web.json_response({"error": "Forbidden"}, status=403)
if not _config_path:
return web.json_response({"error": "Config path not available"}, status=503)
try:
payload = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON"}, status=400)
if not isinstance(payload, dict):
return web.json_response({"error": "Invalid JSON"}, status=400)
try:
data = configio_mod.read_roundtrip(_config_path)
if "server" in payload:
configio_mod.apply_structured_section(data, "server", payload["server"])
if "users" in payload:
# Hash any plaintext passwords; preserve existing hashes when omitted or "•••"
existing_users = data.get("users") or {}
users_payload = payload["users"]
for username, attrs in users_payload.items():
pw = attrs.get("password", "")
if pw and pw != "•••" and not pw.startswith("pbkdf2:"):
attrs["password"] = users_mod.hash_password(pw)
elif not pw or pw == "•••":
existing_hash = (existing_users.get(username) or {}).get("password", "")
if existing_hash:
attrs["password"] = existing_hash
else:
attrs.pop("password", None)
configio_mod.apply_structured_section(data, "users", users_payload)
if "oauth" in payload:
existing_oauth = data.get("oauth") or {}
new_oauth = payload["oauth"]
for name, attrs in new_oauth.items():
cs = attrs.get("client_secret", "")
if not cs or cs == "•••":
existing_cs = (existing_oauth.get(name) or {}).get("client_secret", "")
if existing_cs:
attrs["client_secret"] = existing_cs
else:
attrs.pop("client_secret", None)
data["oauth"] = new_oauth
for section in ("notification_channels", "thresholds", "hosts", "dns"):
if section in payload:
configio_mod.apply_yaml_section(data, section, payload[section])
configio_mod.write_config(_config_path, data)
except Exception as exc:
logger.error("Config write failed: %s", exc)
return web.json_response({"error": str(exc)}, status=500)
if hasattr(config, "reload"):
await config.reload()
users_mod.load_users(config)
return web.json_response({"ok": True})
async def api_config_rollback(request):
"""POST /api/0/config/rollback — restore a backup. Admin only."""
user, err = _require_auth(request)
if err:
return err
if user and not user.admin:
return web.json_response({"error": "Forbidden"}, status=403)
if not _config_path:
return web.json_response({"error": "Config path not available"}, status=503)
try:
body = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON"}, status=400)
backup = body.get("backup", "")
if not backup or backup not in configio_mod.list_backups(_config_path):
return web.json_response({"error": "Invalid or missing backup"}, status=400)
try:
backup_data = configio_mod.read_roundtrip(backup)
configio_mod.write_config(_config_path, backup_data)
except Exception as exc:
logger.error("Rollback failed: %s", exc)
return web.json_response({"error": str(exc)}, status=500)
if hasattr(config, "reload"):
await config.reload()
users_mod.load_users(config)
return web.json_response({"ok": True})
app = web.Application()
app.add_routes(
[
web.get("/", live),
web.get("/old", old_index),
# Auth
web.get("/login", login_page),
web.post("/login", login_page),
web.get("/logout", web_logout),
web.post("/api/0/auth/login", api_login),
web.post("/api/0/auth/logout", api_logout),
web.get("/login/oauth/{name}", oauth_redirect),
web.get("/login/oauth/{name}/callback", oauth_callback),
# Users
web.get("/api/0/users", api_users),
web.get("/api/0/users/me", api_user_self),
web.get("/api/0/users/{username}/avatar", api_user_avatar),
# Config API (admin)
web.get("/api/0/config", api_config_get),
web.get("/api/0/config/section/{name}", api_config_section_get),
web.get("/api/0/config/backups", api_config_backups_get),
web.post("/api/0/config", api_config_post),
web.post("/api/0/config/rollback", api_config_rollback),
# Hosts
web.get("/api/0/hosts", api_hosts),
web.get("/api/0/alert_summary", api_alert_summary),
web.get("/api/0/messages", api_messages),
web.get("/api/0/hosts/{hostname}/plugins", api_host_plugins),
web.get("/api/0/hosts/{hostname}/plugins/{plugin_name}", api_host_plugin_detail),
web.get("/api/0/hosts/{hostname}/alerts", api_host_alerts),
web.get("/api/0/hosts/{hostname}/access", api_host_access_get),
web.put("/api/0/hosts/{hostname}/access", api_host_access_put),
web.get("/api/0/alerts", api_all_alerts),
web.post("/api/0/alerts/acknowledge", api_acknowledge_alert),
web.get("/c", cmd),
web.get("/d", drop),
web.get("/n", register),
web.get("/u", update),
web.get("/live", live),
web.get("/plugins", plugins_page),
web.get("/alerts", alerts_page),
web.get("/about", about_page),
web.get("/profile", profile_page),
web.get("/settings", settings_page),
web.get("/static/{path:.*}", static),
web.get("/favicon.ico", favicon),
web.get("/ws", ws_mod.handler),
]
)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
logger.info(f"HTTP server started on {host}:{port}")
try:
await asyncio.Future()
finally:
await runner.cleanup()