diff --git a/hbd/client/plugins/zfs_monitor.py b/hbd/client/plugins/zfs_monitor.py new file mode 100644 index 0000000..5a256ef --- /dev/null +++ b/hbd/client/plugins/zfs_monitor.py @@ -0,0 +1,130 @@ +""" +ZFS pool monitoring plugin for Heartbeat. + +Collects per-pool health, capacity, and cumulative I/O statistics via zpool(8). +""" + +import asyncio +import logging +import shutil +from typing import Any, Dict, List, Optional + +from hbd.client.plugin import MonitorPlugin + +logger = logging.getLogger(__name__) + + +def _int(s: str) -> Optional[int]: + try: + return int(s.strip().rstrip("KMGTkBkmgt%x")) + except (ValueError, AttributeError): + return None + + +def _float(s: str) -> Optional[float]: + try: + return float(s.strip().rstrip("%x")) + except (ValueError, AttributeError): + return None + + +class ZFSMonitorPlugin(MonitorPlugin): + """Monitor ZFS pool health, capacity, and I/O statistics. + + Collects per pool: + - health: ONLINE, DEGRADED, FAULTED, etc. + - size / alloc / free: total, allocated and free bytes + - capacity: percentage used (0-100) + - frag: fragmentation percentage + - dedup: deduplication ratio + - read_ops / write_ops: cumulative I/O operations since last boot/clear + - read_bw / write_bw: cumulative bytes transferred since last boot/clear + + Configuration: + interval: collection interval in seconds (default: 300) + pools: list of pool names to monitor (default: all) + """ + + name = "zfs_monitor" + description = "ZFS pool health, capacity, and I/O statistics" + interval = 300 + + def __init__(self, config: Optional[Dict[str, Any]] = None): + super().__init__(config) + self.interval = self.config.get("interval", 300) + self._pools_filter: Optional[List[str]] = self.config.get("pools", None) + + async def initialize(self) -> bool: + if not shutil.which("zpool"): + self.skip_reason = "zpool not found" + return False + logger.info("ZFS monitor initialized (interval: %ds)", self.interval) + return True + + async def _run(self, *args: str) -> List[str]: + """Run a command and return its stdout lines, or [] on error.""" + try: + proc = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=15) + return stdout.decode(errors="replace").splitlines() + except (FileNotFoundError, asyncio.TimeoutError) as exc: + logger.warning("zfs_monitor: %s: %s", args[0], exc) + return [] + + async def _zpool_list(self) -> Dict[str, Dict]: + """Return per-pool health and capacity from `zpool list`.""" + lines = await self._run( + "zpool", "list", "-H", "-p", + "-o", "name,health,size,alloc,free,cap,frag,dedup", + ) + pools: Dict[str, Dict] = {} + for line in lines: + parts = line.split("\t") + if len(parts) < 8: + continue + name = parts[0].strip() + if self._pools_filter and name not in self._pools_filter: + continue + pools[name] = { + "health": parts[1].strip(), + "size": _int(parts[2]), + "alloc": _int(parts[3]), + "free": _int(parts[4]), + "capacity": _float(parts[5]), + "frag": _float(parts[6]), + "dedup": _float(parts[7]), + } + return pools + + async def _zpool_iostat(self) -> Dict[str, Dict]: + """Return per-pool cumulative I/O counters from `zpool iostat`.""" + lines = await self._run("zpool", "iostat", "-H", "-p") + io: Dict[str, Dict] = {} + for line in lines: + parts = line.split("\t") + if len(parts) < 7: + continue + name = parts[0].strip() + if not name or name.startswith(" "): + continue + io[name] = { + "read_ops": _int(parts[3]), + "write_ops": _int(parts[4]), + "read_bw": _int(parts[5]), + "write_bw": _int(parts[6]), + } + return io + + async def _collect_metrics(self) -> Dict[str, Any]: + pools, io = await asyncio.gather(self._zpool_list(), self._zpool_iostat()) + for name, stats in io.items(): + if name in pools: + pools[name].update(stats) + return {"pools": pools} + + +plugin = ZFSMonitorPlugin diff --git a/hbd/server/config.py b/hbd/server/config.py index 494ac81..58074fb 100644 --- a/hbd/server/config.py +++ b/hbd/server/config.py @@ -225,7 +225,7 @@ def get_watchhosts(config): hosts_config = config.get("hosts", {}) if isinstance(hosts_config, dict): for host_name, host_attrs in hosts_config.items(): - if isinstance(host_attrs, dict) and host_attrs.get("watch", False): + if isinstance(host_attrs, dict) and host_attrs.get("watch", True): watchhosts.append(host_name) return watchhosts diff --git a/hbd/server/hbdclass.py b/hbd/server/hbdclass.py index f5f0697..29bb410 100644 --- a/hbd/server/hbdclass.py +++ b/hbd/server/hbdclass.py @@ -286,7 +286,7 @@ class Host: Host.hosts[name] = self self.num = num self.dyn = False - self.watched = False + self.watched = True self.upcount = 0 self.interval = 0 self.doesack = -1 @@ -304,6 +304,7 @@ class Host: def statedict(self): d = {} + d["raw_name"] = self.name d["name"] = self.name if self.dyn: d["name"] += "*" diff --git a/hbd/server/http.py b/hbd/server/http.py index 0c2b76d..4bb12e4 100644 --- a/hbd/server/http.py +++ b/hbd/server/http.py @@ -258,7 +258,9 @@ async def start( extra_scripts=extra_scripts, hbd_version=hbd_version, hosts=[ - hbdclass.Host.hosts[h].stateinfo() for h in sorted(hbdclass.Host.hosts) + hbdclass.Host.hosts[h].stateinfo() + for h in sorted(hbdclass.Host.hosts) + if _can_operate_host(current_user, hbdclass.Host.hosts[h]) ], messages=data.msgs[-30:], current_user=current_user.to_dict() if current_user else None, @@ -510,7 +512,7 @@ async def start( hosts_with_plugins = [] for hostname in sorted(hbdclass.Host.hosts.keys()): host = hbdclass.Host.hosts[hostname] - if not _can_view_host(current_user, host): + if not _can_operate_host(current_user, host): continue if host.plugin_data: hosts_with_plugins.append({ diff --git a/hbd/server/settings.py b/hbd/server/settings.py index f69a578..768a569 100644 --- a/hbd/server/settings.py +++ b/hbd/server/settings.py @@ -188,7 +188,7 @@ def get_settings_sections(config: dict) -> list: continue hosts_list.append({ "name": hname, - "watch": bool(hcfg.get("watch", False)), + "watch": bool(hcfg.get("watch", True)), "dyndns": bool(hcfg.get("dyndns", False)), "owner": hcfg.get("owner", ""), "managers": hcfg.get("managers", []), diff --git a/hbd/server/threshold.py b/hbd/server/threshold.py index 0da9074..0e11993 100644 --- a/hbd/server/threshold.py +++ b/hbd/server/threshold.py @@ -9,6 +9,7 @@ This module provides a flexible threshold checking system that: - Supports multiple comparison operators """ +import asyncio import logging import time from enum import Enum @@ -1020,6 +1021,11 @@ class ThresholdChecker: value: Any, ): """Send notification and log to journal/eventlog.""" + from . import hbdclass + host = hbdclass.Host.hosts.get(host_name) + if host is not None and not host.watched: + eventlog(host_name, lvl, message, service="threshold") + return asyncio.get_event_loop().create_task(notify_mod.send_notification( host_name, notify_mod.Notification( @@ -1032,7 +1038,6 @@ class ThresholdChecker: # Log to journal if self.journal is not None: try: - import asyncio loop = asyncio.get_event_loop() loop.create_task(self.journal.log_threshold_event( host_name=host_name, @@ -1224,17 +1229,20 @@ class ThresholdChecker: else: message = f"REMINDER ({alert_state.level.name}): {host_name} - {metric_path} = {value} (ongoing for {int(now - alert_state.since)}s)" - asyncio.get_event_loop().create_task(notify_mod.send_notification( - host_name, - notify_mod.Notification( - title=f"[REMINDER/{alert_state.level.name}] {host_name}", - body=message, - level=alert_state.level.name, - ), - )) + from . import hbdclass + host = hbdclass.Host.hosts.get(host_name) + if host is None or host.watched: + asyncio.get_event_loop().create_task(notify_mod.send_notification( + host_name, + notify_mod.Notification( + title=f"[REMINDER/{alert_state.level.name}] {host_name}", + body=message, + level=alert_state.level.name, + ), + )) + logger.info("Re-notification sent: %s", message) alert_state.last_notification = now alert_state.notification_count += 1 - logger.info("Re-notification sent: %s", message) def get_active_alerts(self, alert_states: Dict[str, AlertState]) -> list: """ diff --git a/hbd/server/udp.py b/hbd/server/udp.py index 1dc8734..733db72 100644 --- a/hbd/server/udp.py +++ b/hbd/server/udp.py @@ -211,10 +211,11 @@ def _make_timer_callbacks(uname, host, ctx): connection.newstate(connection.__class__.OVERDUE, now, cfg.get("grace", 2)) msg = f"{connection.afam} overdue" eventlog(uname, "CRITICAL", msg) - asyncio.create_task(notify_mod.send_notification( - uname, - notify_mod.Notification(title=f"[CRITICAL] {uname}", body=msg, level="CRITICAL"), - )) + if host.watched: + asyncio.create_task(notify_mod.send_notification( + uname, + notify_mod.Notification(title=f"[CRITICAL] {uname}", body=msg, level="CRITICAL"), + )) # Track in alert_states so the Alerts Dashboard shows this _set_connectivity_alert(host, connection.afam, "CRITICAL") if threshold_checker: @@ -407,10 +408,11 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): if res: eventlog(uname, "WARNING", res) - asyncio.create_task(notify_mod.send_notification( - uname, - notify_mod.Notification(title=f"[WARNING] {uname}", body=res, level="WARNING"), - )) + if host.watched: + asyncio.create_task(notify_mod.send_notification( + uname, + notify_mod.Notification(title=f"[WARNING] {uname}", body=res, level="WARNING"), + )) interval = int(msg.get("interval", 0) or 0) shutdown = msg.get("shutdown", 0) @@ -420,10 +422,11 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): if boot: eventlog(uname, "INFO", "booted") - asyncio.create_task(notify_mod.send_notification( - uname, - notify_mod.Notification(title=f"[INFO] {uname}", body=f"{host.name} booted", level="INFO"), - )) + if host.watched: + asyncio.create_task(notify_mod.send_notification( + uname, + notify_mod.Notification(title=f"[INFO] {uname}", body=f"{host.name} booted", level="INFO"), + )) if message: eventlog(uname, "INFO", "msg: %s" % message, service=service) @@ -440,10 +443,11 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): else: m = "%s back after being %s for %s" % (conn.afam, lasts, dur(d)) eventlog(uname, "RECOVER", m) - asyncio.create_task(notify_mod.send_notification( - uname, - notify_mod.Notification(title=f"[RECOVER] {uname}", body=m, level="RECOVER"), - )) + if host.watched: + asyncio.create_task(notify_mod.send_notification( + uname, + notify_mod.Notification(title=f"[RECOVER] {uname}", body=m, level="RECOVER"), + )) if boot or newh: host.upcount = host.doesack @@ -453,10 +457,11 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): if shutdown: m = "%s shutdown" % conn.afam eventlog(uname, "INFO", m) - asyncio.create_task(notify_mod.send_notification( - uname, - notify_mod.Notification(title=f"[INFO] {uname}", body=m, level="INFO"), - )) + if host.watched: + asyncio.create_task(notify_mod.send_notification( + uname, + notify_mod.Notification(title=f"[INFO] {uname}", body=m, level="INFO"), + )) conn.newstate(hbdcls.Connection.DOWN, now) _set_connectivity_alert(host, conn.afam, "CRITICAL") diff --git a/hbd/server/ws.py b/hbd/server/ws.py index cf2ccd4..9117a82 100644 --- a/hbd/server/ws.py +++ b/hbd/server/ws.py @@ -13,7 +13,8 @@ from . import data logger = logging.getLogger(__name__) -_connections: set = set() +# Map of WebSocket → User object (or None when auth is disabled) +_connections: dict = {} _loop: Optional[asyncio.AbstractEventLoop] = None _get_hosts: Optional[Callable[[], Iterable]] = None _verbose: bool = False @@ -34,23 +35,53 @@ def setup( _verbose = verbose +def _user_can_see_host(user, host_name: str) -> bool: + """Return True if *user* may see updates for *host_name* (manager or higher).""" + from . import hbdclass, users as users_mod + if user is None or not users_mod.users_enabled(): + return True + if user.admin: + return True + host = hbdclass.Host.hosts.get(host_name) + if host is None: + return False + return host.is_manager(user.username) + + +def _get_token(request) -> str: + """Extract session token from request (mirrors logic in http.py).""" + auth = request.headers.get("Authorization", "") + if auth.startswith("Bearer "): + return auth[7:].strip() + token = request.headers.get("X-Auth-Token", "") + if token: + return token + return request.cookies.get("hbd_session", "") + + async def handler(request): """aiohttp WebSocket upgrade handler — register as GET /ws.""" from aiohttp import web + from . import users as users_mod ws = web.WebSocketResponse() await ws.prepare(request) - _connections.add(ws) + token = _get_token(request) + user = users_mod.get_session_user(token) if token else None + + _connections[ws] = user remote = request.remote logger.info("WebSocket connected from %s", remote) try: - # Send current host state to the new client + # Send current host state, filtered to hosts this user may see if _get_hosts: try: for h in list(_get_hosts()): - await ws.send_str(json.dumps({"type": "host", "data": h})) + host_name = h.get("raw_name") or h.get("name", "") + if _user_can_see_host(user, host_name): + await ws.send_str(json.dumps({"type": "host", "data": h})) except Exception as e: logger.error("Error sending initial hosts: %s", e) @@ -74,7 +105,7 @@ async def handler(request): except Exception as e: logger.exception("WebSocket handler error from %s: %s", remote, e) finally: - _connections.discard(ws) + _connections.pop(ws, None) logger.info("WebSocket disconnected from %s", remote) return ws @@ -83,25 +114,37 @@ async def handler(request): def broadcast(typ: str, payload) -> bool: """Thread-safe broadcast to all connected WebSocket clients. + For host and plugin updates, only sends to clients whose user has + manager-or-higher access to that host. Other message types are + broadcast to all clients. + Can be called from any thread; schedules sends on the event loop. Returns False if the loop is not running yet. """ if not _loop: return False + + # Determine the host name for access-filtered message types + host_name: Optional[str] = None + if typ in ("host", "plugin"): + host_name = payload.get("raw_name") or payload.get("host") or payload.get("name") + jmsg = json.dumps({"type": typ, "data": payload}) async def _send_all(): dead = set() - for ws in list(_connections): + for ws, user in list(_connections.items()): try: - if not ws.closed: - await ws.send_str(jmsg) - else: + if ws.closed: dead.add(ws) + continue + if host_name is not None and not _user_can_see_host(user, host_name): + continue + await ws.send_str(jmsg) except Exception: dead.add(ws) for ws in dead: - _connections.discard(ws) + _connections.pop(ws, None) asyncio.run_coroutine_threadsafe(_send_all(), _loop) return True