"""Ping Monitor Plugin for Heartbeat. Pings one or more hosts and reports round-trip time. Results are sent as plugin metrics so the server-side threshold system can raise WARNING/CRITICAL alerts using the same RTT threshold configuration format used for heartbeat RTT. Example configuration in ~/.hbc.yaml (or the plugins section of ~/.hb.yaml): ```yaml plugins: ping_monitor: interval: 60 # ping every 60 seconds (default) count: 3 # ICMP packets per ping run (default 3) timeout: 5 # seconds before a host is considered unreachable (default 5) hosts: 8.8.8.8: warning: 20.0 # ms critical: 100.0 # ms 192.168.1.1: warning: 5.0 critical: 20.0 ``` Reported metrics per host (metric key uses the hostname with dots/colons replaced by underscores so it is a valid identifier): ping..rtt_avg – average RTT in ms (float, or inf if unreachable) ping..rtt_min – minimum RTT in ms ping..rtt_max – maximum RTT in ms ping..loss – packet loss percentage (0–100) Server-side threshold config example: ```yaml threshold_configs: default: thresholds: ping_monitor: 8_8_8_8_rtt_avg: warning: 20.0 critical: 100.0 ``` """ import asyncio import re import sys from typing import Any, Dict, Optional from hbd.client.plugin import MonitorPlugin def _host_key(host: str) -> str: """Convert a hostname/IP to a safe metric key (replace . and : with _).""" return re.sub(r"[^a-zA-Z0-9_]", "_", host) class PingMonitorPlugin(MonitorPlugin): """Ping one or more configured hosts and report RTT metrics.""" name = "ping_monitor" version = "1.0.0" description = "ICMP ping latency monitoring" interval = 60 def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) cfg = config or {} self.interval = cfg.get("interval", 60) self.count = int(cfg.get("count", 3)) self.timeout = int(cfg.get("timeout", 5)) # hosts: dict of {hostname: {warning: x, critical: y}} or list of hostnames raw_hosts = cfg.get("hosts", {}) if isinstance(raw_hosts, list): self.hosts = {h: {} for h in raw_hosts} else: self.hosts = dict(raw_hosts) async def initialize(self) -> bool: if not self.hosts: self.logger.warning("ping_monitor: no hosts configured, plugin disabled") return False self.logger.info( "ping_monitor initialized: %d host(s), interval=%ds, count=%d, timeout=%ds", len(self.hosts), self.interval, self.count, self.timeout, ) return True async def _ping(self, host: str) -> Dict[str, float]: """Run a system ping command and return rtt_min/avg/max/loss.""" if sys.platform == "win32": cmd = ["ping", "-n", str(self.count), "-w", str(self.timeout * 1000), host] else: cmd = ["ping", "-c", str(self.count), "-W", str(self.timeout), host] try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await asyncio.wait_for( proc.communicate(), timeout=self.timeout * self.count + 2, ) output = stdout.decode(errors="replace") except (asyncio.TimeoutError, FileNotFoundError, OSError) as e: self.logger.warning("ping_monitor: ping failed for %s: %s", host, e) return {"rtt_min": float("inf"), "rtt_avg": float("inf"), "rtt_max": float("inf"), "loss": 100.0} # Parse packet loss loss = 100.0 loss_match = re.search(r"(\d+(?:\.\d+)?)\s*%\s*packet\s*loss", output) if loss_match: loss = float(loss_match.group(1)) # Parse rtt min/avg/max — Linux: "rtt min/avg/max/mdev = x/x/x/x ms" # macOS: "round-trip min/avg/max/stddev = x/x/x/x ms" rtt_match = re.search( r"(?:rtt|round-trip)\s+min/avg/max/\S+\s*=\s*([\d.]+)/([\d.]+)/([\d.]+)", output, ) if rtt_match: return { "rtt_min": float(rtt_match.group(1)), "rtt_avg": float(rtt_match.group(2)), "rtt_max": float(rtt_match.group(3)), "loss": loss, } # Host unreachable or all packets lost return {"rtt_min": float("inf"), "rtt_avg": float("inf"), "rtt_max": float("inf"), "loss": loss} async def _collect_metrics(self) -> Dict[str, Any]: data: Dict[str, Any] = {} tasks = {host: asyncio.create_task(self._ping(host)) for host in self.hosts} for host, task in tasks.items(): try: result = await task except Exception as e: self.logger.error("ping_monitor: error pinging %s: %s", host, e) result = {"rtt_min": float("inf"), "rtt_avg": float("inf"), "rtt_max": float("inf"), "loss": 100.0} key = _host_key(host) for metric, value in result.items(): data[f"{key}_{metric}"] = value status = "unreachable" if result["loss"] == 100.0 else f"{result['rtt_avg']:.1f}ms" self.logger.debug("ping_monitor: %s -> %s", host, status) return data