feat: host-level watch flag suppresses notifications; filter dashboard/overview by owner/manager; add ZFS monitor plugin
- watch: true (default) per host; watch: false suppresses all notifications for that host in udp.py and threshold.py - Live Dashboard and Host Overview now show only hosts where the logged-in user is owner or manager (admins see all); WebSocket broadcasts filtered per-connection by the same rule - Add hbd/client/plugins/zfs_monitor.py: collects per-pool health, capacity, fragmentation, dedup ratio, and cumulative I/O ops/bandwidth via zpool(8) Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,130 @@
|
||||
"""
|
||||
ZFS pool monitoring plugin for Heartbeat.
|
||||
|
||||
Collects per-pool health, capacity, and cumulative I/O statistics via zpool(8).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import shutil
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from hbd.client.plugin import MonitorPlugin
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _int(s: str) -> Optional[int]:
|
||||
try:
|
||||
return int(s.strip().rstrip("KMGTkBkmgt%x"))
|
||||
except (ValueError, AttributeError):
|
||||
return None
|
||||
|
||||
|
||||
def _float(s: str) -> Optional[float]:
|
||||
try:
|
||||
return float(s.strip().rstrip("%x"))
|
||||
except (ValueError, AttributeError):
|
||||
return None
|
||||
|
||||
|
||||
class ZFSMonitorPlugin(MonitorPlugin):
|
||||
"""Monitor ZFS pool health, capacity, and I/O statistics.
|
||||
|
||||
Collects per pool:
|
||||
- health: ONLINE, DEGRADED, FAULTED, etc.
|
||||
- size / alloc / free: total, allocated and free bytes
|
||||
- capacity: percentage used (0-100)
|
||||
- frag: fragmentation percentage
|
||||
- dedup: deduplication ratio
|
||||
- read_ops / write_ops: cumulative I/O operations since last boot/clear
|
||||
- read_bw / write_bw: cumulative bytes transferred since last boot/clear
|
||||
|
||||
Configuration:
|
||||
interval: collection interval in seconds (default: 300)
|
||||
pools: list of pool names to monitor (default: all)
|
||||
"""
|
||||
|
||||
name = "zfs_monitor"
|
||||
description = "ZFS pool health, capacity, and I/O statistics"
|
||||
interval = 300
|
||||
|
||||
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
||||
super().__init__(config)
|
||||
self.interval = self.config.get("interval", 300)
|
||||
self._pools_filter: Optional[List[str]] = self.config.get("pools", None)
|
||||
|
||||
async def initialize(self) -> bool:
|
||||
if not shutil.which("zpool"):
|
||||
self.skip_reason = "zpool not found"
|
||||
return False
|
||||
logger.info("ZFS monitor initialized (interval: %ds)", self.interval)
|
||||
return True
|
||||
|
||||
async def _run(self, *args: str) -> List[str]:
|
||||
"""Run a command and return its stdout lines, or [] on error."""
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*args,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.DEVNULL,
|
||||
)
|
||||
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=15)
|
||||
return stdout.decode(errors="replace").splitlines()
|
||||
except (FileNotFoundError, asyncio.TimeoutError) as exc:
|
||||
logger.warning("zfs_monitor: %s: %s", args[0], exc)
|
||||
return []
|
||||
|
||||
async def _zpool_list(self) -> Dict[str, Dict]:
|
||||
"""Return per-pool health and capacity from `zpool list`."""
|
||||
lines = await self._run(
|
||||
"zpool", "list", "-H", "-p",
|
||||
"-o", "name,health,size,alloc,free,cap,frag,dedup",
|
||||
)
|
||||
pools: Dict[str, Dict] = {}
|
||||
for line in lines:
|
||||
parts = line.split("\t")
|
||||
if len(parts) < 8:
|
||||
continue
|
||||
name = parts[0].strip()
|
||||
if self._pools_filter and name not in self._pools_filter:
|
||||
continue
|
||||
pools[name] = {
|
||||
"health": parts[1].strip(),
|
||||
"size": _int(parts[2]),
|
||||
"alloc": _int(parts[3]),
|
||||
"free": _int(parts[4]),
|
||||
"capacity": _float(parts[5]),
|
||||
"frag": _float(parts[6]),
|
||||
"dedup": _float(parts[7]),
|
||||
}
|
||||
return pools
|
||||
|
||||
async def _zpool_iostat(self) -> Dict[str, Dict]:
|
||||
"""Return per-pool cumulative I/O counters from `zpool iostat`."""
|
||||
lines = await self._run("zpool", "iostat", "-H", "-p")
|
||||
io: Dict[str, Dict] = {}
|
||||
for line in lines:
|
||||
parts = line.split("\t")
|
||||
if len(parts) < 7:
|
||||
continue
|
||||
name = parts[0].strip()
|
||||
if not name or name.startswith(" "):
|
||||
continue
|
||||
io[name] = {
|
||||
"read_ops": _int(parts[3]),
|
||||
"write_ops": _int(parts[4]),
|
||||
"read_bw": _int(parts[5]),
|
||||
"write_bw": _int(parts[6]),
|
||||
}
|
||||
return io
|
||||
|
||||
async def _collect_metrics(self) -> Dict[str, Any]:
|
||||
pools, io = await asyncio.gather(self._zpool_list(), self._zpool_iostat())
|
||||
for name, stats in io.items():
|
||||
if name in pools:
|
||||
pools[name].update(stats)
|
||||
return {"pools": pools}
|
||||
|
||||
|
||||
plugin = ZFSMonitorPlugin
|
||||
@@ -225,7 +225,7 @@ def get_watchhosts(config):
|
||||
hosts_config = config.get("hosts", {})
|
||||
if isinstance(hosts_config, dict):
|
||||
for host_name, host_attrs in hosts_config.items():
|
||||
if isinstance(host_attrs, dict) and host_attrs.get("watch", False):
|
||||
if isinstance(host_attrs, dict) and host_attrs.get("watch", True):
|
||||
watchhosts.append(host_name)
|
||||
return watchhosts
|
||||
|
||||
|
||||
@@ -286,7 +286,7 @@ class Host:
|
||||
Host.hosts[name] = self
|
||||
self.num = num
|
||||
self.dyn = False
|
||||
self.watched = False
|
||||
self.watched = True
|
||||
self.upcount = 0
|
||||
self.interval = 0
|
||||
self.doesack = -1
|
||||
@@ -304,6 +304,7 @@ class Host:
|
||||
|
||||
def statedict(self):
|
||||
d = {}
|
||||
d["raw_name"] = self.name
|
||||
d["name"] = self.name
|
||||
if self.dyn:
|
||||
d["name"] += "*"
|
||||
|
||||
+4
-2
@@ -258,7 +258,9 @@ async def start(
|
||||
extra_scripts=extra_scripts,
|
||||
hbd_version=hbd_version,
|
||||
hosts=[
|
||||
hbdclass.Host.hosts[h].stateinfo() for h in sorted(hbdclass.Host.hosts)
|
||||
hbdclass.Host.hosts[h].stateinfo()
|
||||
for h in sorted(hbdclass.Host.hosts)
|
||||
if _can_operate_host(current_user, hbdclass.Host.hosts[h])
|
||||
],
|
||||
messages=data.msgs[-30:],
|
||||
current_user=current_user.to_dict() if current_user else None,
|
||||
@@ -510,7 +512,7 @@ async def start(
|
||||
hosts_with_plugins = []
|
||||
for hostname in sorted(hbdclass.Host.hosts.keys()):
|
||||
host = hbdclass.Host.hosts[hostname]
|
||||
if not _can_view_host(current_user, host):
|
||||
if not _can_operate_host(current_user, host):
|
||||
continue
|
||||
if host.plugin_data:
|
||||
hosts_with_plugins.append({
|
||||
|
||||
@@ -188,7 +188,7 @@ def get_settings_sections(config: dict) -> list:
|
||||
continue
|
||||
hosts_list.append({
|
||||
"name": hname,
|
||||
"watch": bool(hcfg.get("watch", False)),
|
||||
"watch": bool(hcfg.get("watch", True)),
|
||||
"dyndns": bool(hcfg.get("dyndns", False)),
|
||||
"owner": hcfg.get("owner", ""),
|
||||
"managers": hcfg.get("managers", []),
|
||||
|
||||
+10
-2
@@ -9,6 +9,7 @@ This module provides a flexible threshold checking system that:
|
||||
- Supports multiple comparison operators
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from enum import Enum
|
||||
@@ -1020,6 +1021,11 @@ class ThresholdChecker:
|
||||
value: Any,
|
||||
):
|
||||
"""Send notification and log to journal/eventlog."""
|
||||
from . import hbdclass
|
||||
host = hbdclass.Host.hosts.get(host_name)
|
||||
if host is not None and not host.watched:
|
||||
eventlog(host_name, lvl, message, service="threshold")
|
||||
return
|
||||
asyncio.get_event_loop().create_task(notify_mod.send_notification(
|
||||
host_name,
|
||||
notify_mod.Notification(
|
||||
@@ -1032,7 +1038,6 @@ class ThresholdChecker:
|
||||
# Log to journal
|
||||
if self.journal is not None:
|
||||
try:
|
||||
import asyncio
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.create_task(self.journal.log_threshold_event(
|
||||
host_name=host_name,
|
||||
@@ -1224,6 +1229,9 @@ class ThresholdChecker:
|
||||
else:
|
||||
message = f"REMINDER ({alert_state.level.name}): {host_name} - {metric_path} = {value} (ongoing for {int(now - alert_state.since)}s)"
|
||||
|
||||
from . import hbdclass
|
||||
host = hbdclass.Host.hosts.get(host_name)
|
||||
if host is None or host.watched:
|
||||
asyncio.get_event_loop().create_task(notify_mod.send_notification(
|
||||
host_name,
|
||||
notify_mod.Notification(
|
||||
@@ -1232,9 +1240,9 @@ class ThresholdChecker:
|
||||
level=alert_state.level.name,
|
||||
),
|
||||
))
|
||||
logger.info("Re-notification sent: %s", message)
|
||||
alert_state.last_notification = now
|
||||
alert_state.notification_count += 1
|
||||
logger.info("Re-notification sent: %s", message)
|
||||
|
||||
def get_active_alerts(self, alert_states: Dict[str, AlertState]) -> list:
|
||||
"""
|
||||
|
||||
@@ -211,6 +211,7 @@ def _make_timer_callbacks(uname, host, ctx):
|
||||
connection.newstate(connection.__class__.OVERDUE, now, cfg.get("grace", 2))
|
||||
msg = f"{connection.afam} overdue"
|
||||
eventlog(uname, "CRITICAL", msg)
|
||||
if host.watched:
|
||||
asyncio.create_task(notify_mod.send_notification(
|
||||
uname,
|
||||
notify_mod.Notification(title=f"[CRITICAL] {uname}", body=msg, level="CRITICAL"),
|
||||
@@ -407,6 +408,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
|
||||
if res:
|
||||
eventlog(uname, "WARNING", res)
|
||||
if host.watched:
|
||||
asyncio.create_task(notify_mod.send_notification(
|
||||
uname,
|
||||
notify_mod.Notification(title=f"[WARNING] {uname}", body=res, level="WARNING"),
|
||||
@@ -420,6 +422,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
|
||||
if boot:
|
||||
eventlog(uname, "INFO", "booted")
|
||||
if host.watched:
|
||||
asyncio.create_task(notify_mod.send_notification(
|
||||
uname,
|
||||
notify_mod.Notification(title=f"[INFO] {uname}", body=f"{host.name} booted", level="INFO"),
|
||||
@@ -440,6 +443,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
else:
|
||||
m = "%s back after being %s for %s" % (conn.afam, lasts, dur(d))
|
||||
eventlog(uname, "RECOVER", m)
|
||||
if host.watched:
|
||||
asyncio.create_task(notify_mod.send_notification(
|
||||
uname,
|
||||
notify_mod.Notification(title=f"[RECOVER] {uname}", body=m, level="RECOVER"),
|
||||
@@ -453,6 +457,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
if shutdown:
|
||||
m = "%s shutdown" % conn.afam
|
||||
eventlog(uname, "INFO", m)
|
||||
if host.watched:
|
||||
asyncio.create_task(notify_mod.send_notification(
|
||||
uname,
|
||||
notify_mod.Notification(title=f"[INFO] {uname}", body=m, level="INFO"),
|
||||
|
||||
+52
-9
@@ -13,7 +13,8 @@ from . import data
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_connections: set = set()
|
||||
# Map of WebSocket → User object (or None when auth is disabled)
|
||||
_connections: dict = {}
|
||||
_loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
_get_hosts: Optional[Callable[[], Iterable]] = None
|
||||
_verbose: bool = False
|
||||
@@ -34,22 +35,52 @@ def setup(
|
||||
_verbose = verbose
|
||||
|
||||
|
||||
def _user_can_see_host(user, host_name: str) -> bool:
|
||||
"""Return True if *user* may see updates for *host_name* (manager or higher)."""
|
||||
from . import hbdclass, users as users_mod
|
||||
if user is None or not users_mod.users_enabled():
|
||||
return True
|
||||
if user.admin:
|
||||
return True
|
||||
host = hbdclass.Host.hosts.get(host_name)
|
||||
if host is None:
|
||||
return False
|
||||
return host.is_manager(user.username)
|
||||
|
||||
|
||||
def _get_token(request) -> str:
|
||||
"""Extract session token from request (mirrors logic in http.py)."""
|
||||
auth = request.headers.get("Authorization", "")
|
||||
if auth.startswith("Bearer "):
|
||||
return auth[7:].strip()
|
||||
token = request.headers.get("X-Auth-Token", "")
|
||||
if token:
|
||||
return token
|
||||
return request.cookies.get("hbd_session", "")
|
||||
|
||||
|
||||
async def handler(request):
|
||||
"""aiohttp WebSocket upgrade handler — register as GET /ws."""
|
||||
from aiohttp import web
|
||||
from . import users as users_mod
|
||||
|
||||
ws = web.WebSocketResponse()
|
||||
await ws.prepare(request)
|
||||
|
||||
_connections.add(ws)
|
||||
token = _get_token(request)
|
||||
user = users_mod.get_session_user(token) if token else None
|
||||
|
||||
_connections[ws] = user
|
||||
remote = request.remote
|
||||
logger.info("WebSocket connected from %s", remote)
|
||||
|
||||
try:
|
||||
# Send current host state to the new client
|
||||
# Send current host state, filtered to hosts this user may see
|
||||
if _get_hosts:
|
||||
try:
|
||||
for h in list(_get_hosts()):
|
||||
host_name = h.get("raw_name") or h.get("name", "")
|
||||
if _user_can_see_host(user, host_name):
|
||||
await ws.send_str(json.dumps({"type": "host", "data": h}))
|
||||
except Exception as e:
|
||||
logger.error("Error sending initial hosts: %s", e)
|
||||
@@ -74,7 +105,7 @@ async def handler(request):
|
||||
except Exception as e:
|
||||
logger.exception("WebSocket handler error from %s: %s", remote, e)
|
||||
finally:
|
||||
_connections.discard(ws)
|
||||
_connections.pop(ws, None)
|
||||
logger.info("WebSocket disconnected from %s", remote)
|
||||
|
||||
return ws
|
||||
@@ -83,25 +114,37 @@ async def handler(request):
|
||||
def broadcast(typ: str, payload) -> bool:
|
||||
"""Thread-safe broadcast to all connected WebSocket clients.
|
||||
|
||||
For host and plugin updates, only sends to clients whose user has
|
||||
manager-or-higher access to that host. Other message types are
|
||||
broadcast to all clients.
|
||||
|
||||
Can be called from any thread; schedules sends on the event loop.
|
||||
Returns False if the loop is not running yet.
|
||||
"""
|
||||
if not _loop:
|
||||
return False
|
||||
|
||||
# Determine the host name for access-filtered message types
|
||||
host_name: Optional[str] = None
|
||||
if typ in ("host", "plugin"):
|
||||
host_name = payload.get("raw_name") or payload.get("host") or payload.get("name")
|
||||
|
||||
jmsg = json.dumps({"type": typ, "data": payload})
|
||||
|
||||
async def _send_all():
|
||||
dead = set()
|
||||
for ws in list(_connections):
|
||||
for ws, user in list(_connections.items()):
|
||||
try:
|
||||
if not ws.closed:
|
||||
await ws.send_str(jmsg)
|
||||
else:
|
||||
if ws.closed:
|
||||
dead.add(ws)
|
||||
continue
|
||||
if host_name is not None and not _user_can_see_host(user, host_name):
|
||||
continue
|
||||
await ws.send_str(jmsg)
|
||||
except Exception:
|
||||
dead.add(ws)
|
||||
for ws in dead:
|
||||
_connections.discard(ws)
|
||||
_connections.pop(ws, None)
|
||||
|
||||
asyncio.run_coroutine_threadsafe(_send_all(), _loop)
|
||||
return True
|
||||
|
||||
Reference in New Issue
Block a user