#!/usr/bin/env python3 """hbc_mini — single-file HeartBeat Client with no external dependencies. Drop this file on any host with Python 3.8+ and run it directly: python3 hbc_mini.py Config: ~/.hbc.json (same keys as ~/.hbc.yaml but JSON format) Built-in plugins (always available): os_info — OS name, kernel, distro, Python version ping_monitor — ICMP ping RTT to configured hosts nagios_runner — run Nagios-compatible check scripts Linux-only plugins (via /proc, no extra tools needed): cpu_monitor — CPU %, load average, core count memory_monitor — RAM and swap usage network_monitor — per-interface TX/RX bytes/s Cross-platform (via df subprocess): disk_monitor — per-mount usage and capacity """ import argparse import asyncio import json import logging import os import platform import re import shutil import signal import socket import subprocess import sys import time import zlib from abc import ABC, abstractmethod from collections import defaultdict from logging.handlers import SysLogHandler from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # updated by scripts/bumpminor.sh __version__ = "5.1.14" # --------------------------------------------------------------------------- # Protocol (mirrors hbd/common/proto.py) # --------------------------------------------------------------------------- def _encode_value(v: Any) -> str: if isinstance(v, float): return f"{v:0.5f}" if isinstance(v, (list, dict)): return "@" + json.dumps(v) if isinstance(v, bool): return str(int(v)) return str(v) def _decode_value(val: str) -> Any: if not val: return val if val.startswith("@"): try: return json.loads(val[1:]) except Exception: return val[1:] if val[0].isdigit() or (val[0] == "-" and len(val) > 1 and val[1].isdigit()): try: return int(val) except ValueError: pass try: return float(val) except ValueError: pass return val def _dicttos(msg_id: str, d: Dict[str, Any]) -> bytes: payload = ";".join(f"{k}={_encode_value(v)}" for k, v in d.items()).encode() return ("!" + msg_id + ":").encode() + zlib.compress(payload, 6) def _stodict(data: bytes) -> Dict[str, Any]: result: Dict[str, Any] = {} if not data: return result if chr(data[0]) == "!": try: payload = zlib.decompress(data[5:]).decode() except Exception: return {} result["ID"] = data[1:4].decode() else: try: head, payload = data.split(b":", 1) payload = payload.decode() result["ID"] = head.decode() except Exception: return {} for item in payload.split(";"): if not item: continue kv = item.split("=", 1) result[kv[0].strip()] = _decode_value(kv[1].strip()) if len(kv) > 1 else None return result # --------------------------------------------------------------------------- # Config (JSON, default ~/.hbc.json) # --------------------------------------------------------------------------- _DEFAULTS: Dict[str, Any] = { "hb_port": 50003, "interval": 10, "plugins": {}, } def _load_config(path: Optional[str] = None) -> Dict[str, Any]: cfg = dict(_DEFAULTS) if not path: path = os.path.join(os.path.expanduser("~"), ".hbc.json") if os.path.exists(path): try: with open(path) as fh: cfg.update(json.load(fh)) logging.getLogger("hbc.config").info("loaded config from %s", path) except Exception as e: logging.getLogger("hbc.config").warning("cannot read %s: %s", path, e) return cfg # --------------------------------------------------------------------------- # Plugin base classes # --------------------------------------------------------------------------- class Plugin(ABC): name: str = "" version: str = "1.0.0" description: str = "" interval: int = 0 enabled: bool = True def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or {} self.logger = logging.getLogger(f"plugin.{self.name}") self.skip_reason: Optional[str] = None @abstractmethod async def initialize(self) -> bool: ... @abstractmethod async def collect(self) -> Dict[str, Any]: ... async def cleanup(self) -> None: pass class InfoPlugin(Plugin): """Collected once at startup, result is cached.""" interval = 0 def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) self._cache: Optional[Dict[str, Any]] = None async def collect(self) -> Dict[str, Any]: if self._cache is None: self._cache = await self._collect_info() return self._cache @abstractmethod async def _collect_info(self) -> Dict[str, Any]: ... class MonitorPlugin(Plugin): """Collected repeatedly on a fixed interval.""" interval = 30 async def collect(self) -> Dict[str, Any]: data = await self._collect_metrics() if data: data["_timestamp"] = time.time() return data @abstractmethod async def _collect_metrics(self) -> Dict[str, Any]: ... # --------------------------------------------------------------------------- # Plugin: os_info # --------------------------------------------------------------------------- def _linux_distro() -> Dict[str, str]: info: Dict[str, str] = {} for path in ("/etc/os-release", "/etc/lsb-release"): if not os.path.exists(path): continue try: with open(path) as fh: for line in fh: line = line.strip() if "=" not in line or line.startswith("#"): continue k, v = line.split("=", 1) v = v.strip('"').strip("'") if k == "PRETTY_NAME": info["distro_pretty_name"] = v elif k == "NAME": info.setdefault("distro_name", v) elif k == "VERSION_ID": info["distro_version_id"] = v elif k == "ID": info["distro_id"] = v except Exception: pass if info: break return info class OSInfoPlugin(InfoPlugin): name = "os_info" description = "OS and platform information" async def initialize(self) -> bool: return True async def _collect_info(self) -> Dict[str, Any]: data: Dict[str, Any] = { "system": platform.system(), "node": platform.node(), "release": platform.release(), "machine": platform.machine(), "architecture": platform.architecture()[0], "python_version": platform.python_version(), "hbc_version": __version__, "hbc_type": "mini", } if platform.system() == "Linux": data.update(_linux_distro()) elif platform.system() == "Darwin": data["macos_version"] = platform.mac_ver()[0] return data # --------------------------------------------------------------------------- # Plugin: ping_monitor # --------------------------------------------------------------------------- class PingMonitorPlugin(MonitorPlugin): name = "ping_monitor" description = "ICMP ping latency monitoring" interval = 60 def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) cfg = config or {} self.interval = cfg.get("interval", 60) self.count = int(cfg.get("count", 3)) self.timeout = int(cfg.get("timeout", 5)) raw = cfg.get("hosts", {}) self.hosts: Dict[str, Any] = {h: {} for h in raw} if isinstance(raw, list) else dict(raw) async def initialize(self) -> bool: if not self.hosts: self.skip_reason = "no hosts configured" return False return True async def _ping(self, host: str) -> Dict[str, float]: _inf = float("inf") _fail: Dict[str, float] = {"rtt_min": _inf, "rtt_avg": _inf, "rtt_max": _inf, "loss": 100.0} if sys.platform == "win32": cmd = ["ping", "-n", str(self.count), "-w", str(self.timeout * 1000), host] else: cmd = ["ping", "-c", str(self.count), "-W", str(self.timeout), host] try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) out_b, _ = await asyncio.wait_for( proc.communicate(), timeout=self.timeout * self.count + 2 ) out = out_b.decode(errors="replace") except Exception: return _fail loss = 100.0 m = re.search(r"(\d+(?:\.\d+)?)\s*%\s*packet\s*loss", out) if m: loss = float(m.group(1)) m = re.search( r"(?:rtt|round-trip)\s+min/avg/max/\S+\s*=\s*([\d.]+)/([\d.]+)/([\d.]+)", out ) if m: return {"rtt_min": float(m.group(1)), "rtt_avg": float(m.group(2)), "rtt_max": float(m.group(3)), "loss": loss} return _fail async def _collect_metrics(self) -> Dict[str, Any]: tasks = {h: asyncio.create_task(self._ping(h)) for h in self.hosts} data: Dict[str, Any] = {} for host, task in tasks.items(): try: result = await task except Exception: result = {"rtt_min": float("inf"), "rtt_avg": float("inf"), "rtt_max": float("inf"), "loss": 100.0} key = re.sub(r"[^a-zA-Z0-9_]", "_", host) for metric, value in result.items(): data[f"{key}_{metric}"] = value return data # --------------------------------------------------------------------------- # Plugin: nagios_runner # --------------------------------------------------------------------------- _NAGIOS_STATUS = {0: "OK", 1: "WARNING", 2: "CRITICAL", 3: "UNKNOWN"} def _parse_nagios_perfdata(output: str) -> Dict[str, Any]: if "|" not in output: return {} section = output.split("|", 1)[1].strip() perfdata: Dict[str, Any] = {} for m in re.finditer( r"'?([^'=\s]+)'?=([\d.]+)([a-zA-Z%]*);?([\d.]*);?([\d.]*);?([\d.]*);?([\d.]*)", section, ): label = m.group(1).strip() try: perfdata[label] = float(m.group(2)) except ValueError: continue if m.group(3): perfdata[f"{label}_uom"] = m.group(3) for idx, suffix in zip(range(4, 8), ("_warn", "_crit", "_min", "_max")): if m.group(idx): try: perfdata[f"{label}{suffix}"] = float(m.group(idx)) except ValueError: pass return perfdata class NagiosRunnerPlugin(MonitorPlugin): name = "nagios_runner" description = "Nagios-compatible plugin runner" interval = 300 def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) cfg = config or {} self.commands: List[Dict[str, str]] = cfg.get("commands", []) self.timeout: int = cfg.get("timeout", 30) self.interval = cfg.get("interval", 300) async def initialize(self) -> bool: if not self.commands: self.skip_reason = "no commands configured" return False return True async def _run(self, command: str) -> Tuple[int, str, Dict[str, Any]]: try: proc = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: out, err = await asyncio.wait_for(proc.communicate(), timeout=self.timeout) except asyncio.TimeoutError: proc.kill() await proc.communicate() return 3, f"timed out after {self.timeout}s", {} rc = max(0, min(3, proc.returncode or 0)) stdout = out.decode(errors="replace").strip() stderr = err.decode(errors="replace").strip() perf = _parse_nagios_perfdata(stdout) msg = stdout.split("|")[0].strip() or stderr return rc, msg, perf except Exception as e: return 3, str(e), {} async def _collect_metrics(self) -> Dict[str, Any]: results: Dict[str, Any] = {} worst = 0 for cmd_cfg in self.commands: name = cmd_cfg.get("name") command = cmd_cfg.get("command") if not name or not command: continue rc, msg, perf = await self._run(command) results[f"{name}_status"] = _NAGIOS_STATUS.get(rc, "UNKNOWN") results[f"{name}_status_code"] = rc results[f"{name}_output"] = msg results.update({f"{name}_{k}": v for k, v in perf.items()}) worst = max(worst, rc) results["overall_status"] = _NAGIOS_STATUS.get(worst, "UNKNOWN") results["overall_status_code"] = worst results["plugin_count"] = len(self.commands) return results # --------------------------------------------------------------------------- # Plugin: cpu_monitor (Linux /proc/stat) # --------------------------------------------------------------------------- def _read_proc_stat() -> Optional[Tuple[int, ...]]: """Return CPU jiffies tuple (user, nice, system, idle, iowait, ...).""" try: with open("/proc/stat") as fh: line = fh.readline() parts = line.split() if parts[0] != "cpu": return None return tuple(int(x) for x in parts[1:]) except Exception: return None class CPUMonitorPlugin(MonitorPlugin): name = "cpu_monitor" description = "CPU usage via /proc/stat (Linux)" interval = 300 def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) self.interval = (config or {}).get("interval", 300) self._prev: Optional[Tuple[int, ...]] = None async def initialize(self) -> bool: if platform.system() != "Linux": self.skip_reason = "Linux only (/proc/stat not available)" return False if _read_proc_stat() is None: self.skip_reason = "/proc/stat not readable" return False return True async def _collect_metrics(self) -> Dict[str, Any]: # First call: take a 1-second window for an immediate reading. # Subsequent calls: use the full interval between collections. if self._prev is None: t1 = _read_proc_stat() await asyncio.sleep(1) else: t1 = self._prev t2 = _read_proc_stat() if t1 is None or t2 is None: return {} self._prev = t2 # Fields: user(0) nice(1) system(2) idle(3) iowait(4) irq(5) softirq(6) steal(7) have_iowait = len(t2) > 4 idle1 = t1[3] + (t1[4] if have_iowait else 0) idle2 = t2[3] + (t2[4] if have_iowait else 0) total_delta = sum(t2) - sum(t1) if total_delta == 0: return {} data: Dict[str, Any] = { "cpu_percent": round(100.0 * (1.0 - (idle2 - idle1) / total_delta), 1), } if have_iowait: data["cpu_user"] = round(100.0 * (t2[0] - t1[0]) / total_delta, 1) data["cpu_system"] = round(100.0 * (t2[2] - t1[2]) / total_delta, 1) data["cpu_idle"] = round(100.0 * (idle2 - idle1) / total_delta, 1) data["cpu_iowait"] = round(100.0 * (t2[4] - t1[4]) / total_delta, 1) try: la = os.getloadavg() data["load_1min"] = round(la[0], 2) data["load_5min"] = round(la[1], 2) data["load_15min"] = round(la[2], 2) except (AttributeError, OSError): pass try: with open("/proc/cpuinfo") as fh: data["cpu_core_count"] = sum(1 for l in fh if l.startswith("processor")) except Exception: pass return data # --------------------------------------------------------------------------- # Plugin: memory_monitor (Linux /proc/meminfo) # --------------------------------------------------------------------------- def _read_meminfo() -> Dict[str, int]: result: Dict[str, int] = {} try: with open("/proc/meminfo") as fh: for line in fh: parts = line.split() if len(parts) >= 2: try: result[parts[0].rstrip(":")] = int(parts[1]) except ValueError: pass except Exception: pass return result class MemoryMonitorPlugin(MonitorPlugin): name = "memory_monitor" description = "Memory usage via /proc/meminfo (Linux)" interval = 300 def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) self.interval = (config or {}).get("interval", 300) async def initialize(self) -> bool: if platform.system() != "Linux": self.skip_reason = "Linux only (/proc/meminfo not available)" return False if not _read_meminfo(): self.skip_reason = "/proc/meminfo not readable" return False return True async def _collect_metrics(self) -> Dict[str, Any]: mi = _read_meminfo() if not mi: return {} total = mi.get("MemTotal", 0) avail = mi.get("MemAvailable", mi.get("MemFree", 0)) free = mi.get("MemFree", 0) used = total - avail data: Dict[str, Any] = { "memory_total": total * 1024, "memory_used": used * 1024, "memory_available": avail * 1024, "memory_free": free * 1024, "memory_percent": round(100.0 * used / total, 1) if total else 0.0, } for field, key in (("Buffers", "memory_buffers"), ("Cached", "memory_cached"), ("Active", "memory_active"), ("Inactive", "memory_inactive")): if field in mi: data[key] = mi[field] * 1024 stotal = mi.get("SwapTotal", 0) if stotal: sfree = mi.get("SwapFree", 0) sused = stotal - sfree data["swap_total"] = stotal * 1024 data["swap_used"] = sused * 1024 data["swap_free"] = sfree * 1024 data["swap_percent"] = round(100.0 * sused / stotal, 1) return data # --------------------------------------------------------------------------- # Plugin: disk_monitor (df -P; Linux, macOS, BSD) # --------------------------------------------------------------------------- class DiskMonitorPlugin(MonitorPlugin): name = "disk_monitor" description = "Disk usage via df" interval = 300 def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) cfg = config or {} self.interval = cfg.get("interval", 300) self.mounts: List[str] = cfg.get("mounts", []) async def initialize(self) -> bool: if sys.platform == "win32": self.skip_reason = "Windows not supported" return False return True async def _collect_metrics(self) -> Dict[str, Any]: try: proc = await asyncio.create_subprocess_exec( "df", "-P", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) out, _ = await asyncio.wait_for(proc.communicate(), timeout=10) except Exception as e: self.logger.warning("df failed: %s", e) return {} partitions: Dict[str, Any] = {} for line in out.decode(errors="replace").splitlines()[1:]: parts = line.split() if len(parts) < 6: continue mount = parts[5] if self.mounts and mount not in self.mounts: continue try: total_kb = int(parts[1]) used_kb = int(parts[2]) avail_kb = int(parts[3]) pct = int(parts[4].rstrip("%")) partitions[mount] = { "total": total_kb * 1024, "used": used_kb * 1024, "free": avail_kb * 1024, "percent": pct, } except (ValueError, IndexError): continue return {"partitions": partitions} if partitions else {} # --------------------------------------------------------------------------- # Plugin: network_monitor (Linux /proc/net/dev) # --------------------------------------------------------------------------- def _read_net_dev() -> Dict[str, Tuple[int, int]]: """Return {iface: (rx_bytes, tx_bytes)}.""" result: Dict[str, Tuple[int, int]] = {} try: with open("/proc/net/dev") as fh: for line in fh.readlines()[2:]: # skip 2 header lines parts = line.split() if len(parts) < 10: continue result[parts[0].rstrip(":")] = (int(parts[1]), int(parts[9])) except Exception: pass return result class NetworkMonitorPlugin(MonitorPlugin): name = "network_monitor" description = "Network I/O rates via /proc/net/dev (Linux)" interval = 300 def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) cfg = config or {} self.interval = cfg.get("interval", 300) self.skip_ifaces: set = set(cfg.get("skip_interfaces", ["lo"])) self._prev: Optional[Tuple[float, Dict[str, Tuple[int, int]]]] = None async def initialize(self) -> bool: if platform.system() != "Linux": self.skip_reason = "Linux only (/proc/net/dev not available)" return False dev = _read_net_dev() if not dev: self.skip_reason = "/proc/net/dev not readable" return False self._prev = (time.time(), dev) # prime first delta return True async def _collect_metrics(self) -> Dict[str, Any]: now = time.time() curr = _read_net_dev() if not curr or self._prev is None: self._prev = (now, curr) return {} prev_ts, prev = self._prev dt = now - prev_ts self._prev = (now, curr) if dt <= 0: return {} interfaces: Dict[str, Any] = {} for iface, (rx, tx) in curr.items(): if iface in self.skip_ifaces or iface not in prev: continue prx, ptx = prev[iface] interfaces[iface] = { "bytes_recv": rx, "bytes_sent": tx, "bytes_recv_delta": rx - prx, "bytes_sent_delta": tx - ptx, } return {"interfaces": interfaces} if interfaces else {} # --------------------------------------------------------------------------- # Plugin registry — all built-in classes, initialized at startup # --------------------------------------------------------------------------- _ALL_PLUGIN_CLASSES: List[type] = [ OSInfoPlugin, PingMonitorPlugin, NagiosRunnerPlugin, CPUMonitorPlugin, MemoryMonitorPlugin, DiskMonitorPlugin, NetworkMonitorPlugin, ] async def _load_plugins(cfg: Dict[str, Any]) -> List[Plugin]: log = logging.getLogger("hbc.plugins") plugins_cfg: Dict[str, Any] = cfg.get("plugins", {}) loaded: List[Plugin] = [] for cls in _ALL_PLUGIN_CLASSES: plugin_cfg = plugins_cfg.get(cls.name) or cfg.get(cls.name, {}) plugin: Plugin = cls(config=plugin_cfg) try: ok = await plugin.initialize() except Exception as e: log.error("init %s: %s", cls.name, e) ok = False if ok: loaded.append(plugin) log.info("loaded %s (interval=%ds)", plugin.name, plugin.interval) else: log.info("skip %s: %s", plugin.name, plugin.skip_reason or "init failed") return loaded # --------------------------------------------------------------------------- # Global state # --------------------------------------------------------------------------- _running = True _dorestart = False _shutdown_event: Optional[asyncio.Event] = None _active_tasks: List[asyncio.Task] = [] PORT = 50003 INTERVAL = 10 def _shortname(name: str) -> str: return name.split(".")[0] def _stop(): global _running _running = False if _shutdown_event: _shutdown_event.set() for t in _active_tasks: if not t.done(): t.cancel() async def _sleep(seconds: float): """Sleep for up to `seconds`, waking early on shutdown.""" try: if _shutdown_event: await asyncio.wait_for(_shutdown_event.wait(), timeout=seconds) else: await asyncio.sleep(seconds) except asyncio.TimeoutError: pass # --------------------------------------------------------------------------- # UDP protocol handler + connection # --------------------------------------------------------------------------- class _HeartbeatProtocol(asyncio.DatagramProtocol): def __init__(self, conn: "AsyncConnection"): self._conn = conn self._log = logging.getLogger("hbc.proto") def datagram_received(self, data: bytes, addr): try: msg = _stodict(data) if not msg: return msg_id = msg.get("ID") now = time.time() if msg_id == "ACK": self._conn._handle_ack(now) elif msg_id == "CMD": asyncio.create_task(_handle_command(self._conn, msg)) elif msg_id == "UPD": asyncio.create_task(_handle_update(self._conn)) else: self._log.debug("unknown msg type: %s", msg_id) except Exception as e: self._log.error("datagram error: %s", e) def error_received(self, exc): self._log.warning("protocol error on %s: %s — dropping connection", self._conn.addr, exc) self._conn._dead = True self._conn.close() class AsyncConnection: def __init__(self, conn_id: int, addr: str, port: int, af: int, name: str): self.conn_id = conn_id self.addr = addr self.port = port self.af = af self.name = name self.ackcount = 0 self.lastsend = 0.0 self.rtts: List[float] = [0.0] self._transport: Optional[asyncio.DatagramTransport] = None self._dead = False self._log = logging.getLogger(f"hbc.conn.{addr}") async def open(self) -> bool: try: loop = asyncio.get_event_loop() self._transport, _ = await loop.create_datagram_endpoint( lambda: _HeartbeatProtocol(self), family=self.af ) return True except Exception as e: self._log.error("open: %s", e) return False def close(self): if self._transport: self._transport.close() self._transport = None def _handle_ack(self, now: float): rtt = (now - self.lastsend) * 1000.0 self.rtts.append(rtt) if len(self.rtts) > 10: self.rtts.pop(0) self.ackcount += 1 async def sendto(self, msg: Dict[str, Any], msg_id: str = "HTB"): if self._dead: return if not self._transport: await self.open() if not self._transport: return out = dict(msg) out["name"] = _shortname(self.name) out["id"] = self.conn_id out["time"] = time.time() self._transport.sendto(_dicttos(msg_id, out), (self.addr, self.port)) self.lastsend = time.time() # --------------------------------------------------------------------------- # Server command handlers # --------------------------------------------------------------------------- async def _handle_command(conn: AsyncConnection, msg: Dict[str, Any]): cmd = msg.get("cmd", "") if not cmd: return log = logging.getLogger("hbc.cmd") log.info("exec: %s", cmd) try: out = subprocess.check_output( cmd, shell=True, stderr=subprocess.STDOUT, timeout=30 ).decode() status = "OK" except subprocess.CalledProcessError as e: out, status = str(e), "Error" except subprocess.TimeoutExpired: out, status = "timed out", "Timeout" except Exception as e: out, status = str(e), "Error" await conn.sendto({"service": "command", "msg": f"{status} {out}"}) async def _handle_update(conn: AsyncConnection): log = logging.getLogger("hbc.update") installer = shutil.which("hb_install.sh") if installer is None: candidate = Path(sys.argv[0]).parent / "hb_install.sh" if candidate.exists(): installer = str(candidate) if installer is None: err = "hb_install.sh not found in PATH or alongside hbc_mini.py" log.error(err) await conn.sendto({"service": "update", "msg": err}) return log.info("running installer: %s", installer) try: proc = await asyncio.create_subprocess_exec( installer, "mini", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) out, _ = await asyncio.wait_for(proc.communicate(), timeout=120) except asyncio.TimeoutError: err = "installer timed out" log.error(err) await conn.sendto({"service": "update", "msg": err}) return except Exception as e: err = f"installer failed: {e}" log.error(err) await conn.sendto({"service": "update", "msg": err}) return if proc.returncode != 0: err = f"installer exited {proc.returncode}: {out.decode().strip()}" log.error(err) await conn.sendto({"service": "update", "msg": err}) return log.info("update successful, restarting") await conn.sendto({"service": "update", "msg": "OK"}) global _dorestart _dorestart = True _stop() # --------------------------------------------------------------------------- # Heartbeat sender # --------------------------------------------------------------------------- async def _heartbeat_sender(conn: AsyncConnection, interval: int): log = logging.getLogger("hbc.hb") while _running: try: await conn.sendto({ "acks": conn.ackcount, "rtt": conn.rtts[-1], "interval": interval, }) except Exception as e: log.error("send: %s", e) await _sleep(interval) # --------------------------------------------------------------------------- # Plugin collection loops # --------------------------------------------------------------------------- async def _run_info_plugins(conn: AsyncConnection, plugins: List[Plugin]): log = logging.getLogger("hbc.plugins") for plugin in plugins: try: data = await plugin.collect() if data: await conn.sendto({"plugin": plugin.name, **data}, "PLG") log.info("sent %s", plugin.name) except Exception as e: log.error("%s collect: %s", plugin.name, e) async def _run_monitor_group(conn: AsyncConnection, plugins: List[Plugin], interval: int): log = logging.getLogger(f"hbc.plugins.{interval}s") while _running: for plugin in plugins: try: data = await plugin.collect() if data: await conn.sendto({"plugin": plugin.name, **data}, "PLG") log.debug("sent %s", plugin.name) except asyncio.CancelledError: raise except Exception as e: log.error("%s: %s", plugin.name, e) await _sleep(interval) async def _plugin_collector(conn: AsyncConnection, plugins: List[Plugin]): info = [p for p in plugins if isinstance(p, InfoPlugin)] monitor = [p for p in plugins if isinstance(p, MonitorPlugin)] await _run_info_plugins(conn, info) by_interval: Dict[int, List[Plugin]] = defaultdict(list) for p in monitor: by_interval[p.interval].append(p) if by_interval: await asyncio.gather( *[asyncio.create_task(_run_monitor_group(conn, grp, iv)) for iv, grp in by_interval.items()], return_exceptions=True, ) # --------------------------------------------------------------------------- # Daemonize + syslog # --------------------------------------------------------------------------- def _daemonize(): for fork_n in (1, 2): try: pid = os.fork() if pid > 0: os._exit(0) except OSError as e: sys.stderr.write(f"fork #{fork_n} failed: {e}\n") os._exit(1) if fork_n == 1: os.chdir("/") os.setsid() os.umask(0) for fd, path, mode in ((0, "/dev/zero", "r"), (1, "/dev/null", "a+"), (2, "/dev/null", "a+")): try: os.dup2(open(path, mode).fileno(), fd) except Exception: pass def _reconfigure_syslog(level: int): root = logging.getLogger() for h in root.handlers[:]: root.removeHandler(h) h.close() addr = "/dev/log" if os.path.exists("/dev/log") else ("localhost", 514) sh = SysLogHandler(address=addr, facility=SysLogHandler.LOG_DAEMON) sh.setFormatter(logging.Formatter("hbc[%(process)d]: %(levelname)s %(message)s")) root.addHandler(sh) root.setLevel(level) # --------------------------------------------------------------------------- # Async main # --------------------------------------------------------------------------- async def _async_main(args, cfg: Dict[str, Any]) -> int: global _running, _shutdown_event, _active_tasks _running = True _shutdown_event = asyncio.Event() _active_tasks = [] log = logging.getLogger("hbc.main") iam = args.name or socket.gethostname() port = cfg.get("hb_port", PORT) interval = cfg.get("interval", INTERVAL) log.info("starting: %s -> %s port=%d interval=%ds", iam, args.hosts, port, interval) connections: List[AsyncConnection] = [] conn_id = 1 for host in args.hosts: try: addrs = socket.getaddrinfo(host, port, 0, 0, socket.SOL_UDP) except socket.gaierror as e: log.error("cannot resolve %s: %s", host, e) continue for ai in addrs: conn = AsyncConnection(conn_id, ai[4][0], port, ai[0], iam) if await conn.open(): connections.append(conn) conn_id += 1 if not connections: log.error("no connections established") return 1 # Boot / one-shot message if args.boot or args.message: bmsg: Dict[str, Any] = {"acks": 0} if args.boot: bmsg["boot"] = 1 if args.message: bmsg["service"] = "service" bmsg["msg"] = args.message for c in connections: await c.sendto(bmsg) if args.message and not args.daemon: await asyncio.sleep(0.3) for c in connections: c.close() return 0 plugins = await _load_plugins(cfg) loop = asyncio.get_event_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, _stop) def _sighup(): global _dorestart _dorestart = True _stop() loop.add_signal_handler(signal.SIGHUP, _sighup) for conn in connections: _active_tasks.append(asyncio.create_task(_heartbeat_sender(conn, interval))) if plugins and connections: _active_tasks.append(asyncio.create_task(_plugin_collector(connections[0], plugins))) try: await asyncio.gather(*_active_tasks, return_exceptions=True) except asyncio.CancelledError: pass log.info("shutting down") for conn in connections: try: await conn.sendto({"shutdown": 1, "acks": conn.ackcount}) except Exception: pass conn.close() await asyncio.sleep(0.3) for plugin in plugins: await plugin.cleanup() return 0 # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- def main(argv=None): global _dorestart parser = argparse.ArgumentParser( prog="hbc_mini", description="HeartBeat Client — single file, no external dependencies", ) parser.add_argument("-b", "--boot", action="store_true", help="Send boot message") parser.add_argument("-c", "--config", dest="configfile", help="Config file (JSON)") parser.add_argument("-m", "--message", dest="message", help="Send a one-shot message") parser.add_argument("-n", "--name", dest="name", help="Override hostname") parser.add_argument("-d", "--daemon", action="store_true", help="Run as daemon") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") parser.add_argument("-x", "--debug", action="count", default=0, help="Debug level") parser.add_argument("hosts", nargs="+", help="HBD server(s)") args = parser.parse_args(argv) level = logging.WARNING if args.verbose: level = logging.INFO if args.debug: level = logging.DEBUG logging.basicConfig( level=level, format="%(asctime)s %(name)s %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) cfg = _load_config(args.configfile) if args.daemon: _daemonize() _reconfigure_syslog(level) try: rc = asyncio.run(_async_main(args, cfg)) except KeyboardInterrupt: rc = 0 except Exception as e: logging.error("fatal: %s", e, exc_info=True) rc = 1 if _dorestart: logging.info("restarting...") os.execv(sys.argv[0], sys.argv) sys.exit(rc) if __name__ == "__main__": main()