""" ZFS pool monitoring plugin for Heartbeat. Collects per-pool health, capacity, and cumulative I/O statistics via zpool(8). """ import asyncio import logging import shutil from typing import Any, Dict, List, Optional from hbd.client.plugin import MonitorPlugin logger = logging.getLogger(__name__) def _int(s: str) -> Optional[int]: try: return int(s.strip().rstrip("KMGTkBkmgt%x")) except (ValueError, AttributeError): return None def _float(s: str) -> Optional[float]: try: return float(s.strip().rstrip("%x")) except (ValueError, AttributeError): return None class ZFSMonitorPlugin(MonitorPlugin): """Monitor ZFS pool health, capacity, and I/O statistics. Collects per pool: - health: ONLINE, DEGRADED, FAULTED, etc. - size / alloc / free: total, allocated and free bytes - capacity: percentage used (0-100) - frag: fragmentation percentage - dedup: deduplication ratio - read_ops / write_ops: cumulative I/O operations since last boot/clear - read_bw / write_bw: cumulative bytes transferred since last boot/clear Configuration: interval: collection interval in seconds (default: 300) pools: list of pool names to monitor (default: all) """ name = "zfs_monitor" description = "ZFS pool health, capacity, and I/O statistics" interval = 300 def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) self.interval = self.config.get("interval", 300) self._pools_filter: Optional[List[str]] = self.config.get("pools", None) async def initialize(self) -> bool: if not shutil.which("zpool"): self.skip_reason = "zpool not found" return False logger.info("ZFS monitor initialized (interval: %ds)", self.interval) return True async def _run(self, *args: str) -> List[str]: """Run a command and return its stdout lines, or [] on error.""" try: proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=15) return stdout.decode(errors="replace").splitlines() except (FileNotFoundError, asyncio.TimeoutError) as exc: logger.warning("zfs_monitor: %s: %s", args[0], exc) return [] async def _zpool_list(self) -> Dict[str, Dict]: """Return per-pool health and capacity from `zpool list`.""" lines = await self._run( "zpool", "list", "-H", "-p", "-o", "name,health,size,alloc,free,cap,frag,dedup", ) pools: Dict[str, Dict] = {} for line in lines: parts = line.split("\t") if len(parts) < 8: continue name = parts[0].strip() if self._pools_filter and name not in self._pools_filter: continue health = parts[1].strip() if health == "ONLINE": status = 0 elif health in ("DEGRADED", "ONLINE with errors"): status = 1 elif health in ("FAULTED", "OFFLINE", "UNAVAIL"): status = 2 else: status = 3 # unknown status pools[name] = { "health": health, "status": status, "size": _int(parts[2]), "alloc": _int(parts[3]), "free": _int(parts[4]), "capacity": _float(parts[5]), "frag": _float(parts[6]), "dedup": _float(parts[7]), } return pools async def _zpool_iostat(self) -> Dict[str, Dict]: """Return per-pool cumulative I/O counters from `zpool iostat`.""" lines = await self._run("zpool", "iostat", "-H", "-p") io: Dict[str, Dict] = {} for line in lines: parts = line.split("\t") if len(parts) < 7: continue name = parts[0].strip() if not name or name.startswith(" "): continue io[name] = { "read_ops": _int(parts[3]), "write_ops": _int(parts[4]), "read_bw": _int(parts[5]), "write_bw": _int(parts[6]), } return io async def _collect_metrics(self) -> Dict[str, Any]: pools, io = await asyncio.gather(self._zpool_list(), self._zpool_iostat()) for name, stats in io.items(): if name in pools: pools[name].update(stats) return {"pools": pools} plugin = ZFSMonitorPlugin