From 462a445235a4303baddc8b446a05756739fcd867 Mon Sep 17 00:00:00 2001 From: Andreas Wrede Date: Thu, 30 Apr 2026 17:50:19 -0400 Subject: [PATCH] feat: add hbc_mini single-file client; drop dead connections on protocol error - scripts/hbc_mini.py: self-contained hbc with no external deps; uses /proc for CPU/memory/network on Linux, df for disk, JSON config - hbc + hbc_mini: mark connection _dead and stop sending on protocol error - README: document hbc_mini usage, config, and plugin availability - pyproject.toml: include hbc_mini.py in script-files Co-Authored-By: Claude Sonnet 4.6 (1M context) --- README.md | 62 +++ hbd/client/main.py | 12 +- pyproject.toml | 2 +- scripts/hbc_mini.py | 1128 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 1200 insertions(+), 4 deletions(-) create mode 100755 scripts/hbc_mini.py diff --git a/README.md b/README.md index fd44e1a..97e0db0 100644 --- a/README.md +++ b/README.md @@ -441,6 +441,68 @@ plugins: All monitoring plugins default to 5-minute (300 second) intervals, but can be customized as needed. +### hbc_mini — single-file client (no external dependencies) + +`scripts/hbc_mini.py` is a self-contained version of the heartbeat client that requires only Python 3.8+ and no external packages. Copy it to any host and run it directly — no virtualenv, no `pip install`. + +```bash +# Basic usage +python3 hbc_mini.py your-server.example.com + +# Run as daemon +python3 hbc_mini.py -d your-server.example.com + +# Send a boot message +python3 hbc_mini.py -b your-server.example.com + +# Send a one-off message +python3 hbc_mini.py -m "maintenance starting" your-server.example.com +``` + +**Config:** `~/.hbc.json` (same keys as `~/.hbc.yaml`, JSON format). Example: + +```json +{ + "hb_port": 50003, + "interval": 30, + "plugins": { + "ping_monitor": { + "interval": 60, + "hosts": ["8.8.8.8", "192.168.1.1"] + }, + "nagios_runner": { + "interval": 300, + "commands": [ + {"name": "check_load", "command": "/usr/lib/nagios/plugins/check_load -w 5,4,3 -c 10,8,6"} + ] + } + } +} +``` + +**Plugin availability:** + +| Plugin | Platform | Data source | +|---|---|---| +| `os_info` | all | `platform` stdlib | +| `ping_monitor` | all | `ping` subprocess | +| `nagios_runner` | all (not Windows) | subprocess | +| `cpu_monitor` | Linux | `/proc/stat` | +| `memory_monitor` | Linux | `/proc/meminfo` | +| `disk_monitor` | Linux, macOS, BSD | `df -P` subprocess | +| `network_monitor` | Linux | `/proc/net/dev` | + +**What is not available compared to the full `hbc`:** + +- No YAML config (use JSON instead) +- No `filesystem_info` plugin +- `cpu_monitor` does not report per-core usage or CPU frequency (no psutil) +- Plugins cannot be loaded from external `.py` files — all plugins are compiled in + +Everything else — heartbeat protocol, ACK/CMD/UPD handling, `hb_install.sh`-based self-update, daemonize, syslog — is identical to the full client. + +--- + ## 🐞 Debugging in VS Code This repository includes a ready-to-use `.vscode/launch.json` with configurations to run or attach the VS Code debugger to `hbd`. diff --git a/hbd/client/main.py b/hbd/client/main.py index be3ccc7..dd9ccce 100644 --- a/hbd/client/main.py +++ b/hbd/client/main.py @@ -55,7 +55,8 @@ class AsyncConnection: self.transport: Optional[asyncio.DatagramTransport] = None self.protocol: Optional[asyncio.DatagramProtocol] = None - + self._dead = False + self.logger = logging.getLogger(f"hbc.conn.{addr}") async def open(self) -> bool: @@ -92,9 +93,12 @@ class AsyncConnection: msg: Message dictionary msg_id: Message ID (HTB, PLG, etc.) """ + if self._dead: + return + if not self.transport: await self.open() - + if not self.transport: self.logger.error("Cannot send - no transport") return @@ -166,7 +170,9 @@ class HeartbeatProtocol(asyncio.DatagramProtocol): def error_received(self, exc): """Handle protocol errors.""" - self.logger.error(f"Protocol error: {exc}") + self.logger.warning(f"Protocol error on {self.connection.addr}: {exc} — dropping connection") + self.connection._dead = True + self.connection.close() async def handle_command(conn: AsyncConnection, msg: dict): diff --git a/pyproject.toml b/pyproject.toml index 1b2020f..a1bc3e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,7 +55,7 @@ hbd = "hbd.server.cli:main" hbc = "hbd.client.main:main" [tool.setuptools] -script-files = ["scripts/hb_install.sh"] +script-files = ["scripts/hb_install.sh", "scripts/hbc_mini.py"] [tool.setuptools.packages.find] where = ["."] diff --git a/scripts/hbc_mini.py b/scripts/hbc_mini.py new file mode 100755 index 0000000..1a69afe --- /dev/null +++ b/scripts/hbc_mini.py @@ -0,0 +1,1128 @@ +#!/usr/bin/env python3 +"""hbc_mini — single-file HeartBeat Client with no external dependencies. + +Drop this file on any host with Python 3.8+ and run it directly: + python3 hbc_mini.py + +Config: ~/.hbc.json (same keys as ~/.hbc.yaml but JSON format) + +Built-in plugins (always available): + os_info — OS name, kernel, distro, Python version + ping_monitor — ICMP ping RTT to configured hosts + nagios_runner — run Nagios-compatible check scripts + +Linux-only plugins (via /proc, no extra tools needed): + cpu_monitor — CPU %, load average, core count + memory_monitor — RAM and swap usage + network_monitor — per-interface TX/RX bytes/s + +Cross-platform (via df subprocess): + disk_monitor — per-mount usage and capacity +""" + +import argparse +import asyncio +import json +import logging +import os +import platform +import re +import shutil +import signal +import socket +import subprocess +import sys +import time +import zlib +from abc import ABC, abstractmethod +from collections import defaultdict +from logging.handlers import SysLogHandler +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +# --------------------------------------------------------------------------- +# Protocol (mirrors hbd/common/proto.py) +# --------------------------------------------------------------------------- + +def _encode_value(v: Any) -> str: + if isinstance(v, float): + return f"{v:0.5f}" + if isinstance(v, (list, dict)): + return "@" + json.dumps(v) + if isinstance(v, bool): + return str(int(v)) + return str(v) + + +def _decode_value(val: str) -> Any: + if not val: + return val + if val.startswith("@"): + try: + return json.loads(val[1:]) + except Exception: + return val[1:] + if val[0].isdigit() or (val[0] == "-" and len(val) > 1 and val[1].isdigit()): + try: + return int(val) + except ValueError: + pass + try: + return float(val) + except ValueError: + pass + return val + + +def _dicttos(msg_id: str, d: Dict[str, Any]) -> bytes: + payload = ";".join(f"{k}={_encode_value(v)}" for k, v in d.items()).encode() + return ("!" + msg_id + ":").encode() + zlib.compress(payload, 6) + + +def _stodict(data: bytes) -> Dict[str, Any]: + result: Dict[str, Any] = {} + if not data: + return result + if chr(data[0]) == "!": + try: + payload = zlib.decompress(data[5:]).decode() + except Exception: + return {} + result["ID"] = data[1:4].decode() + else: + try: + head, payload = data.split(b":", 1) + payload = payload.decode() + result["ID"] = head.decode() + except Exception: + return {} + for item in payload.split(";"): + if not item: + continue + kv = item.split("=", 1) + result[kv[0].strip()] = _decode_value(kv[1].strip()) if len(kv) > 1 else None + return result + + +# --------------------------------------------------------------------------- +# Config (JSON, default ~/.hbc.json) +# --------------------------------------------------------------------------- + +_DEFAULTS: Dict[str, Any] = { + "hb_port": 50003, + "interval": 10, + "plugins": {}, +} + + +def _load_config(path: Optional[str] = None) -> Dict[str, Any]: + cfg = dict(_DEFAULTS) + if not path: + path = os.path.join(os.path.expanduser("~"), ".hbc.json") + if os.path.exists(path): + try: + with open(path) as fh: + cfg.update(json.load(fh)) + logging.getLogger("hbc.config").info("loaded config from %s", path) + except Exception as e: + logging.getLogger("hbc.config").warning("cannot read %s: %s", path, e) + return cfg + + +# --------------------------------------------------------------------------- +# Plugin base classes +# --------------------------------------------------------------------------- + +class Plugin(ABC): + name: str = "" + version: str = "1.0.0" + description: str = "" + interval: int = 0 + enabled: bool = True + + def __init__(self, config: Optional[Dict[str, Any]] = None): + self.config = config or {} + self.logger = logging.getLogger(f"plugin.{self.name}") + self.skip_reason: Optional[str] = None + + @abstractmethod + async def initialize(self) -> bool: ... + + @abstractmethod + async def collect(self) -> Dict[str, Any]: ... + + async def cleanup(self) -> None: + pass + + +class InfoPlugin(Plugin): + """Collected once at startup, result is cached.""" + interval = 0 + + def __init__(self, config: Optional[Dict[str, Any]] = None): + super().__init__(config) + self._cache: Optional[Dict[str, Any]] = None + + async def collect(self) -> Dict[str, Any]: + if self._cache is None: + self._cache = await self._collect_info() + return self._cache + + @abstractmethod + async def _collect_info(self) -> Dict[str, Any]: ... + + +class MonitorPlugin(Plugin): + """Collected repeatedly on a fixed interval.""" + interval = 30 + + async def collect(self) -> Dict[str, Any]: + data = await self._collect_metrics() + if data: + data["_timestamp"] = time.time() + return data + + @abstractmethod + async def _collect_metrics(self) -> Dict[str, Any]: ... + + +# --------------------------------------------------------------------------- +# Plugin: os_info +# --------------------------------------------------------------------------- + +def _linux_distro() -> Dict[str, str]: + info: Dict[str, str] = {} + for path in ("/etc/os-release", "/etc/lsb-release"): + if not os.path.exists(path): + continue + try: + with open(path) as fh: + for line in fh: + line = line.strip() + if "=" not in line or line.startswith("#"): + continue + k, v = line.split("=", 1) + v = v.strip('"').strip("'") + if k == "PRETTY_NAME": + info["distro_pretty_name"] = v + elif k == "NAME": + info.setdefault("distro_name", v) + elif k == "VERSION_ID": + info["distro_version_id"] = v + elif k == "ID": + info["distro_id"] = v + except Exception: + pass + if info: + break + return info + + +class OSInfoPlugin(InfoPlugin): + name = "os_info" + description = "OS and platform information" + + async def initialize(self) -> bool: + return True + + async def _collect_info(self) -> Dict[str, Any]: + data: Dict[str, Any] = { + "system": platform.system(), + "node": platform.node(), + "release": platform.release(), + "machine": platform.machine(), + "architecture": platform.architecture()[0], + "python_version": platform.python_version(), + } + if platform.system() == "Linux": + data.update(_linux_distro()) + elif platform.system() == "Darwin": + data["macos_version"] = platform.mac_ver()[0] + return data + + +# --------------------------------------------------------------------------- +# Plugin: ping_monitor +# --------------------------------------------------------------------------- + +class PingMonitorPlugin(MonitorPlugin): + name = "ping_monitor" + description = "ICMP ping latency monitoring" + interval = 60 + + def __init__(self, config: Optional[Dict[str, Any]] = None): + super().__init__(config) + cfg = config or {} + self.interval = cfg.get("interval", 60) + self.count = int(cfg.get("count", 3)) + self.timeout = int(cfg.get("timeout", 5)) + raw = cfg.get("hosts", {}) + self.hosts: Dict[str, Any] = {h: {} for h in raw} if isinstance(raw, list) else dict(raw) + + async def initialize(self) -> bool: + if not self.hosts: + self.skip_reason = "no hosts configured" + return False + return True + + async def _ping(self, host: str) -> Dict[str, float]: + _inf = float("inf") + _fail: Dict[str, float] = {"rtt_min": _inf, "rtt_avg": _inf, "rtt_max": _inf, "loss": 100.0} + if sys.platform == "win32": + cmd = ["ping", "-n", str(self.count), "-w", str(self.timeout * 1000), host] + else: + cmd = ["ping", "-c", str(self.count), "-W", str(self.timeout), host] + try: + proc = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + out_b, _ = await asyncio.wait_for( + proc.communicate(), timeout=self.timeout * self.count + 2 + ) + out = out_b.decode(errors="replace") + except Exception: + return _fail + loss = 100.0 + m = re.search(r"(\d+(?:\.\d+)?)\s*%\s*packet\s*loss", out) + if m: + loss = float(m.group(1)) + m = re.search( + r"(?:rtt|round-trip)\s+min/avg/max/\S+\s*=\s*([\d.]+)/([\d.]+)/([\d.]+)", out + ) + if m: + return {"rtt_min": float(m.group(1)), "rtt_avg": float(m.group(2)), + "rtt_max": float(m.group(3)), "loss": loss} + return _fail + + async def _collect_metrics(self) -> Dict[str, Any]: + tasks = {h: asyncio.create_task(self._ping(h)) for h in self.hosts} + data: Dict[str, Any] = {} + for host, task in tasks.items(): + try: + result = await task + except Exception: + result = {"rtt_min": float("inf"), "rtt_avg": float("inf"), + "rtt_max": float("inf"), "loss": 100.0} + key = re.sub(r"[^a-zA-Z0-9_]", "_", host) + for metric, value in result.items(): + data[f"{key}_{metric}"] = value + return data + + +# --------------------------------------------------------------------------- +# Plugin: nagios_runner +# --------------------------------------------------------------------------- + +_NAGIOS_STATUS = {0: "OK", 1: "WARNING", 2: "CRITICAL", 3: "UNKNOWN"} + + +def _parse_nagios_perfdata(output: str) -> Dict[str, Any]: + if "|" not in output: + return {} + section = output.split("|", 1)[1].strip() + perfdata: Dict[str, Any] = {} + for m in re.finditer( + r"'?([^'=\s]+)'?=([\d.]+)([a-zA-Z%]*);?([\d.]*);?([\d.]*);?([\d.]*);?([\d.]*)", + section, + ): + label = m.group(1).strip() + try: + perfdata[label] = float(m.group(2)) + except ValueError: + continue + if m.group(3): + perfdata[f"{label}_uom"] = m.group(3) + for idx, suffix in zip(range(4, 8), ("_warn", "_crit", "_min", "_max")): + if m.group(idx): + try: + perfdata[f"{label}{suffix}"] = float(m.group(idx)) + except ValueError: + pass + return perfdata + + +class NagiosRunnerPlugin(MonitorPlugin): + name = "nagios_runner" + description = "Nagios-compatible plugin runner" + interval = 300 + + def __init__(self, config: Optional[Dict[str, Any]] = None): + super().__init__(config) + cfg = config or {} + self.commands: List[Dict[str, str]] = cfg.get("commands", []) + self.timeout: int = cfg.get("timeout", 30) + self.interval = cfg.get("interval", 300) + + async def initialize(self) -> bool: + if not self.commands: + self.skip_reason = "no commands configured" + return False + return True + + async def _run(self, command: str) -> Tuple[int, str, Dict[str, Any]]: + try: + proc = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + out, err = await asyncio.wait_for(proc.communicate(), timeout=self.timeout) + except asyncio.TimeoutError: + proc.kill() + await proc.communicate() + return 3, f"timed out after {self.timeout}s", {} + rc = max(0, min(3, proc.returncode or 0)) + stdout = out.decode(errors="replace").strip() + stderr = err.decode(errors="replace").strip() + perf = _parse_nagios_perfdata(stdout) + msg = stdout.split("|")[0].strip() or stderr + return rc, msg, perf + except Exception as e: + return 3, str(e), {} + + async def _collect_metrics(self) -> Dict[str, Any]: + results: Dict[str, Any] = {} + worst = 0 + for cmd_cfg in self.commands: + name = cmd_cfg.get("name") + command = cmd_cfg.get("command") + if not name or not command: + continue + rc, msg, perf = await self._run(command) + results[f"{name}_status"] = _NAGIOS_STATUS.get(rc, "UNKNOWN") + results[f"{name}_status_code"] = rc + results[f"{name}_output"] = msg + results.update({f"{name}_{k}": v for k, v in perf.items()}) + worst = max(worst, rc) + results["overall_status"] = _NAGIOS_STATUS.get(worst, "UNKNOWN") + results["overall_status_code"] = worst + results["plugin_count"] = len(self.commands) + return results + + +# --------------------------------------------------------------------------- +# Plugin: cpu_monitor (Linux /proc/stat) +# --------------------------------------------------------------------------- + +def _read_proc_stat() -> Optional[Tuple[int, ...]]: + """Return CPU jiffies tuple (user, nice, system, idle, iowait, ...).""" + try: + with open("/proc/stat") as fh: + line = fh.readline() + parts = line.split() + if parts[0] != "cpu": + return None + return tuple(int(x) for x in parts[1:]) + except Exception: + return None + + +class CPUMonitorPlugin(MonitorPlugin): + name = "cpu_monitor" + description = "CPU usage via /proc/stat (Linux)" + interval = 300 + + def __init__(self, config: Optional[Dict[str, Any]] = None): + super().__init__(config) + self.interval = (config or {}).get("interval", 300) + self._prev: Optional[Tuple[int, ...]] = None + + async def initialize(self) -> bool: + if platform.system() != "Linux": + self.skip_reason = "Linux only (/proc/stat not available)" + return False + if _read_proc_stat() is None: + self.skip_reason = "/proc/stat not readable" + return False + return True + + async def _collect_metrics(self) -> Dict[str, Any]: + # First call: take a 1-second window for an immediate reading. + # Subsequent calls: use the full interval between collections. + if self._prev is None: + t1 = _read_proc_stat() + await asyncio.sleep(1) + else: + t1 = self._prev + + t2 = _read_proc_stat() + if t1 is None or t2 is None: + return {} + self._prev = t2 + + # Fields: user(0) nice(1) system(2) idle(3) iowait(4) irq(5) softirq(6) steal(7) + have_iowait = len(t2) > 4 + idle1 = t1[3] + (t1[4] if have_iowait else 0) + idle2 = t2[3] + (t2[4] if have_iowait else 0) + total_delta = sum(t2) - sum(t1) + if total_delta == 0: + return {} + + data: Dict[str, Any] = { + "cpu_percent": round(100.0 * (1.0 - (idle2 - idle1) / total_delta), 1), + } + if have_iowait: + data["cpu_user"] = round(100.0 * (t2[0] - t1[0]) / total_delta, 1) + data["cpu_system"] = round(100.0 * (t2[2] - t1[2]) / total_delta, 1) + data["cpu_idle"] = round(100.0 * (idle2 - idle1) / total_delta, 1) + data["cpu_iowait"] = round(100.0 * (t2[4] - t1[4]) / total_delta, 1) + + try: + la = os.getloadavg() + data["load_1min"] = round(la[0], 2) + data["load_5min"] = round(la[1], 2) + data["load_15min"] = round(la[2], 2) + except (AttributeError, OSError): + pass + + try: + with open("/proc/cpuinfo") as fh: + data["cpu_core_count"] = sum(1 for l in fh if l.startswith("processor")) + except Exception: + pass + + return data + + +# --------------------------------------------------------------------------- +# Plugin: memory_monitor (Linux /proc/meminfo) +# --------------------------------------------------------------------------- + +def _read_meminfo() -> Dict[str, int]: + result: Dict[str, int] = {} + try: + with open("/proc/meminfo") as fh: + for line in fh: + parts = line.split() + if len(parts) >= 2: + try: + result[parts[0].rstrip(":")] = int(parts[1]) + except ValueError: + pass + except Exception: + pass + return result + + +class MemoryMonitorPlugin(MonitorPlugin): + name = "memory_monitor" + description = "Memory usage via /proc/meminfo (Linux)" + interval = 300 + + def __init__(self, config: Optional[Dict[str, Any]] = None): + super().__init__(config) + self.interval = (config or {}).get("interval", 300) + + async def initialize(self) -> bool: + if platform.system() != "Linux": + self.skip_reason = "Linux only (/proc/meminfo not available)" + return False + if not _read_meminfo(): + self.skip_reason = "/proc/meminfo not readable" + return False + return True + + async def _collect_metrics(self) -> Dict[str, Any]: + mi = _read_meminfo() + if not mi: + return {} + total = mi.get("MemTotal", 0) + avail = mi.get("MemAvailable", mi.get("MemFree", 0)) + used = total - avail + data: Dict[str, Any] = { + "mem_total_kb": total, + "mem_used_kb": used, + "mem_available_kb": avail, + "mem_percent": round(100.0 * used / total, 1) if total else 0.0, + } + stotal = mi.get("SwapTotal", 0) + if stotal: + sfree = mi.get("SwapFree", 0) + data["swap_total_kb"] = stotal + data["swap_used_kb"] = stotal - sfree + data["swap_percent"] = round(100.0 * (stotal - sfree) / stotal, 1) + return data + + +# --------------------------------------------------------------------------- +# Plugin: disk_monitor (df -P; Linux, macOS, BSD) +# --------------------------------------------------------------------------- + +class DiskMonitorPlugin(MonitorPlugin): + name = "disk_monitor" + description = "Disk usage via df" + interval = 300 + + def __init__(self, config: Optional[Dict[str, Any]] = None): + super().__init__(config) + cfg = config or {} + self.interval = cfg.get("interval", 300) + self.mounts: List[str] = cfg.get("mounts", []) + + async def initialize(self) -> bool: + if sys.platform == "win32": + self.skip_reason = "Windows not supported" + return False + return True + + async def _collect_metrics(self) -> Dict[str, Any]: + try: + proc = await asyncio.create_subprocess_exec( + "df", "-P", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + out, _ = await asyncio.wait_for(proc.communicate(), timeout=10) + except Exception as e: + self.logger.warning("df failed: %s", e) + return {} + data: Dict[str, Any] = {} + for line in out.decode(errors="replace").splitlines()[1:]: + parts = line.split() + if len(parts) < 6: + continue + mount = parts[5] + if self.mounts and mount not in self.mounts: + continue + try: + key = re.sub(r"[^a-zA-Z0-9_]", "_", mount).strip("_") or "root" + data[f"{key}_total_kb"] = int(parts[1]) + data[f"{key}_used_kb"] = int(parts[2]) + data[f"{key}_avail_kb"] = int(parts[3]) + data[f"{key}_percent"] = int(parts[4].rstrip("%")) + except (ValueError, IndexError): + continue + return data + + +# --------------------------------------------------------------------------- +# Plugin: network_monitor (Linux /proc/net/dev) +# --------------------------------------------------------------------------- + +def _read_net_dev() -> Dict[str, Tuple[int, int]]: + """Return {iface: (rx_bytes, tx_bytes)}.""" + result: Dict[str, Tuple[int, int]] = {} + try: + with open("/proc/net/dev") as fh: + for line in fh.readlines()[2:]: # skip 2 header lines + parts = line.split() + if len(parts) < 10: + continue + result[parts[0].rstrip(":")] = (int(parts[1]), int(parts[9])) + except Exception: + pass + return result + + +class NetworkMonitorPlugin(MonitorPlugin): + name = "network_monitor" + description = "Network I/O rates via /proc/net/dev (Linux)" + interval = 300 + + def __init__(self, config: Optional[Dict[str, Any]] = None): + super().__init__(config) + cfg = config or {} + self.interval = cfg.get("interval", 300) + self.skip_ifaces: set = set(cfg.get("skip_interfaces", ["lo"])) + self._prev: Optional[Tuple[float, Dict[str, Tuple[int, int]]]] = None + + async def initialize(self) -> bool: + if platform.system() != "Linux": + self.skip_reason = "Linux only (/proc/net/dev not available)" + return False + dev = _read_net_dev() + if not dev: + self.skip_reason = "/proc/net/dev not readable" + return False + self._prev = (time.time(), dev) # prime first delta + return True + + async def _collect_metrics(self) -> Dict[str, Any]: + now = time.time() + curr = _read_net_dev() + if not curr or self._prev is None: + self._prev = (now, curr) + return {} + prev_ts, prev = self._prev + dt = now - prev_ts + self._prev = (now, curr) + if dt <= 0: + return {} + data: Dict[str, Any] = {} + for iface, (rx, tx) in curr.items(): + if iface in self.skip_ifaces or iface not in prev: + continue + prx, ptx = prev[iface] + key = re.sub(r"[^a-zA-Z0-9_]", "_", iface) + data[f"{key}_rx_bps"] = round((rx - prx) / dt) + data[f"{key}_tx_bps"] = round((tx - ptx) / dt) + data[f"{key}_rx_bytes"] = rx + data[f"{key}_tx_bytes"] = tx + return data + + +# --------------------------------------------------------------------------- +# Plugin registry — all built-in classes, initialized at startup +# --------------------------------------------------------------------------- + +_ALL_PLUGIN_CLASSES: List[type] = [ + OSInfoPlugin, + PingMonitorPlugin, + NagiosRunnerPlugin, + CPUMonitorPlugin, + MemoryMonitorPlugin, + DiskMonitorPlugin, + NetworkMonitorPlugin, +] + + +async def _load_plugins(cfg: Dict[str, Any]) -> List[Plugin]: + log = logging.getLogger("hbc.plugins") + plugins_cfg: Dict[str, Any] = cfg.get("plugins", {}) + loaded: List[Plugin] = [] + for cls in _ALL_PLUGIN_CLASSES: + plugin_cfg = plugins_cfg.get(cls.name) or cfg.get(cls.name, {}) + plugin: Plugin = cls(config=plugin_cfg) + try: + ok = await plugin.initialize() + except Exception as e: + log.error("init %s: %s", cls.name, e) + ok = False + if ok: + loaded.append(plugin) + log.info("loaded %s (interval=%ds)", plugin.name, plugin.interval) + else: + log.info("skip %s: %s", plugin.name, plugin.skip_reason or "init failed") + return loaded + + +# --------------------------------------------------------------------------- +# Global state +# --------------------------------------------------------------------------- + +_running = True +_dorestart = False +_shutdown_event: Optional[asyncio.Event] = None +_active_tasks: List[asyncio.Task] = [] + +PORT = 50003 +INTERVAL = 10 + + +def _shortname(name: str) -> str: + return name.split(".")[0] + + +def _stop(): + global _running + _running = False + if _shutdown_event: + _shutdown_event.set() + for t in _active_tasks: + if not t.done(): + t.cancel() + + +async def _sleep(seconds: float): + """Sleep for up to `seconds`, waking early on shutdown.""" + try: + if _shutdown_event: + await asyncio.wait_for(_shutdown_event.wait(), timeout=seconds) + else: + await asyncio.sleep(seconds) + except asyncio.TimeoutError: + pass + + +# --------------------------------------------------------------------------- +# UDP protocol handler + connection +# --------------------------------------------------------------------------- + +class _HeartbeatProtocol(asyncio.DatagramProtocol): + def __init__(self, conn: "AsyncConnection"): + self._conn = conn + self._log = logging.getLogger("hbc.proto") + + def datagram_received(self, data: bytes, addr): + try: + msg = _stodict(data) + if not msg: + return + msg_id = msg.get("ID") + now = time.time() + if msg_id == "ACK": + self._conn._handle_ack(now) + elif msg_id == "CMD": + asyncio.create_task(_handle_command(self._conn, msg)) + elif msg_id == "UPD": + asyncio.create_task(_handle_update(self._conn)) + else: + self._log.debug("unknown msg type: %s", msg_id) + except Exception as e: + self._log.error("datagram error: %s", e) + + def error_received(self, exc): + self._log.warning("protocol error on %s: %s — dropping connection", self._conn.addr, exc) + self._conn._dead = True + self._conn.close() + + +class AsyncConnection: + def __init__(self, conn_id: int, addr: str, port: int, af: int, name: str): + self.conn_id = conn_id + self.addr = addr + self.port = port + self.af = af + self.name = name + self.ackcount = 0 + self.lastsend = 0.0 + self.rtts: List[float] = [0.0] + self._transport: Optional[asyncio.DatagramTransport] = None + self._dead = False + self._log = logging.getLogger(f"hbc.conn.{addr}") + + async def open(self) -> bool: + try: + loop = asyncio.get_event_loop() + self._transport, _ = await loop.create_datagram_endpoint( + lambda: _HeartbeatProtocol(self), family=self.af + ) + return True + except Exception as e: + self._log.error("open: %s", e) + return False + + def close(self): + if self._transport: + self._transport.close() + self._transport = None + + def _handle_ack(self, now: float): + rtt = (now - self.lastsend) * 1000.0 + self.rtts.append(rtt) + if len(self.rtts) > 10: + self.rtts.pop(0) + self.ackcount += 1 + + async def sendto(self, msg: Dict[str, Any], msg_id: str = "HTB"): + if self._dead: + return + if not self._transport: + await self.open() + if not self._transport: + return + out = dict(msg) + out["name"] = _shortname(self.name) + out["id"] = self.conn_id + out["time"] = time.time() + self._transport.sendto(_dicttos(msg_id, out), (self.addr, self.port)) + self.lastsend = time.time() + + +# --------------------------------------------------------------------------- +# Server command handlers +# --------------------------------------------------------------------------- + +async def _handle_command(conn: AsyncConnection, msg: Dict[str, Any]): + cmd = msg.get("cmd", "") + if not cmd: + return + log = logging.getLogger("hbc.cmd") + log.info("exec: %s", cmd) + try: + out = subprocess.check_output( + cmd, shell=True, stderr=subprocess.STDOUT, timeout=30 + ).decode() + status = "OK" + except subprocess.CalledProcessError as e: + out, status = str(e), "Error" + except subprocess.TimeoutExpired: + out, status = "timed out", "Timeout" + except Exception as e: + out, status = str(e), "Error" + await conn.sendto({"service": "command", "msg": f"{status} {out}"}) + + +async def _handle_update(conn: AsyncConnection): + log = logging.getLogger("hbc.update") + installer = shutil.which("hb_install.sh") + if installer is None: + candidate = Path(sys.argv[0]).parent / "hb_install.sh" + if candidate.exists(): + installer = str(candidate) + if installer is None: + err = "hb_install.sh not found in PATH or alongside hbc_mini.py" + log.error(err) + await conn.sendto({"service": "update", "msg": err}) + return + log.info("running installer: %s", installer) + try: + proc = await asyncio.create_subprocess_exec( + installer, "client", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + out, _ = await asyncio.wait_for(proc.communicate(), timeout=120) + except asyncio.TimeoutError: + err = "installer timed out" + log.error(err) + await conn.sendto({"service": "update", "msg": err}) + return + except Exception as e: + err = f"installer failed: {e}" + log.error(err) + await conn.sendto({"service": "update", "msg": err}) + return + if proc.returncode != 0: + err = f"installer exited {proc.returncode}: {out.decode().strip()}" + log.error(err) + await conn.sendto({"service": "update", "msg": err}) + return + log.info("update successful, restarting") + await conn.sendto({"service": "update", "msg": "OK"}) + global _dorestart + _dorestart = True + _stop() + + +# --------------------------------------------------------------------------- +# Heartbeat sender +# --------------------------------------------------------------------------- + +async def _heartbeat_sender(conn: AsyncConnection, interval: int): + log = logging.getLogger("hbc.hb") + while _running: + try: + await conn.sendto({ + "acks": conn.ackcount, + "rtt": conn.rtts[-1], + "interval": interval, + }) + except Exception as e: + log.error("send: %s", e) + await _sleep(interval) + + +# --------------------------------------------------------------------------- +# Plugin collection loops +# --------------------------------------------------------------------------- + +async def _run_info_plugins(conn: AsyncConnection, plugins: List[Plugin]): + log = logging.getLogger("hbc.plugins") + for plugin in plugins: + try: + data = await plugin.collect() + if data: + await conn.sendto({"plugin": plugin.name, **data}, "PLG") + log.info("sent %s", plugin.name) + except Exception as e: + log.error("%s collect: %s", plugin.name, e) + + +async def _run_monitor_group(conn: AsyncConnection, plugins: List[Plugin], interval: int): + log = logging.getLogger(f"hbc.plugins.{interval}s") + while _running: + for plugin in plugins: + try: + data = await plugin.collect() + if data: + await conn.sendto({"plugin": plugin.name, **data}, "PLG") + log.debug("sent %s", plugin.name) + except asyncio.CancelledError: + raise + except Exception as e: + log.error("%s: %s", plugin.name, e) + await _sleep(interval) + + +async def _plugin_collector(conn: AsyncConnection, plugins: List[Plugin]): + info = [p for p in plugins if isinstance(p, InfoPlugin)] + monitor = [p for p in plugins if isinstance(p, MonitorPlugin)] + + await _run_info_plugins(conn, info) + + by_interval: Dict[int, List[Plugin]] = defaultdict(list) + for p in monitor: + by_interval[p.interval].append(p) + + if by_interval: + await asyncio.gather( + *[asyncio.create_task(_run_monitor_group(conn, grp, iv)) + for iv, grp in by_interval.items()], + return_exceptions=True, + ) + + +# --------------------------------------------------------------------------- +# Daemonize + syslog +# --------------------------------------------------------------------------- + +def _daemonize(): + for fork_n in (1, 2): + try: + pid = os.fork() + if pid > 0: + os._exit(0) + except OSError as e: + sys.stderr.write(f"fork #{fork_n} failed: {e}\n") + os._exit(1) + if fork_n == 1: + os.chdir("/") + os.setsid() + os.umask(0) + for fd, path, mode in ((0, "/dev/zero", "r"), (1, "/dev/null", "a+"), (2, "/dev/null", "a+")): + try: + os.dup2(open(path, mode).fileno(), fd) + except Exception: + pass + + +def _reconfigure_syslog(level: int): + root = logging.getLogger() + for h in root.handlers[:]: + root.removeHandler(h) + h.close() + addr = "/dev/log" if os.path.exists("/dev/log") else ("localhost", 514) + sh = SysLogHandler(address=addr, facility=SysLogHandler.LOG_DAEMON) + sh.setFormatter(logging.Formatter("hbc[%(process)d]: %(levelname)s %(message)s")) + root.addHandler(sh) + root.setLevel(level) + + +# --------------------------------------------------------------------------- +# Async main +# --------------------------------------------------------------------------- + +async def _async_main(args, cfg: Dict[str, Any]) -> int: + global _running, _shutdown_event, _active_tasks + _running = True + _shutdown_event = asyncio.Event() + _active_tasks = [] + + log = logging.getLogger("hbc.main") + iam = args.name or socket.gethostname() + port = cfg.get("hb_port", PORT) + interval = cfg.get("interval", INTERVAL) + + log.info("starting: %s -> %s port=%d interval=%ds", iam, args.hosts, port, interval) + + connections: List[AsyncConnection] = [] + conn_id = 1 + for host in args.hosts: + try: + addrs = socket.getaddrinfo(host, port, 0, 0, socket.SOL_UDP) + except socket.gaierror as e: + log.error("cannot resolve %s: %s", host, e) + continue + for ai in addrs: + conn = AsyncConnection(conn_id, ai[4][0], port, ai[0], iam) + if await conn.open(): + connections.append(conn) + conn_id += 1 + + if not connections: + log.error("no connections established") + return 1 + + # Boot / one-shot message + if args.boot or args.message: + bmsg: Dict[str, Any] = {"acks": 0} + if args.boot: + bmsg["boot"] = 1 + if args.message: + bmsg["service"] = "service" + bmsg["msg"] = args.message + for c in connections: + await c.sendto(bmsg) + if args.message and not args.daemon: + await asyncio.sleep(0.3) + for c in connections: + c.close() + return 0 + + plugins = await _load_plugins(cfg) + + loop = asyncio.get_event_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, _stop) + + for conn in connections: + _active_tasks.append(asyncio.create_task(_heartbeat_sender(conn, interval))) + + if plugins and connections: + _active_tasks.append(asyncio.create_task(_plugin_collector(connections[0], plugins))) + + try: + await asyncio.gather(*_active_tasks, return_exceptions=True) + except asyncio.CancelledError: + pass + + log.info("shutting down") + for conn in connections: + try: + await conn.sendto({"shutdown": 1, "acks": conn.ackcount}) + except Exception: + pass + conn.close() + await asyncio.sleep(0.3) + for plugin in plugins: + await plugin.cleanup() + return 0 + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + +def main(argv=None): + global _dorestart + + parser = argparse.ArgumentParser( + prog="hbc_mini", + description="HeartBeat Client — single file, no external dependencies", + ) + parser.add_argument("-b", "--boot", action="store_true", help="Send boot message") + parser.add_argument("-c", "--config", dest="configfile", help="Config file (JSON)") + parser.add_argument("-m", "--message", dest="message", help="Send a one-shot message") + parser.add_argument("-n", "--name", dest="name", help="Override hostname") + parser.add_argument("-d", "--daemon", action="store_true", help="Run as daemon") + parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") + parser.add_argument("-x", "--debug", action="count", default=0, help="Debug level") + parser.add_argument("hosts", nargs="+", help="HBD server(s)") + args = parser.parse_args(argv) + + level = logging.WARNING + if args.verbose: + level = logging.INFO + if args.debug: + level = logging.DEBUG + logging.basicConfig( + level=level, + format="%(asctime)s %(name)s %(levelname)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + cfg = _load_config(args.configfile) + + if args.daemon: + _daemonize() + _reconfigure_syslog(level) + + try: + rc = asyncio.run(_async_main(args, cfg)) + except KeyboardInterrupt: + rc = 0 + except Exception as e: + logging.error("fatal: %s", e, exc_info=True) + rc = 1 + + if _dorestart: + logging.info("restarting...") + os.execv(sys.argv[0], sys.argv) + + sys.exit(rc) + + +if __name__ == "__main__": + main()