Files
heartbeat/scripts/hbc_mini.py
T
Andreas Wrede 8a1f412d1d
Release / release (push) Successful in 43s
version 5.3.9
2026-05-31 20:58:58 -04:00

1204 lines
40 KiB
Python
Executable File

#!/usr/bin/env python3
"""hbc_mini — single-file HeartBeat Client with no external dependencies.
Drop this file on any host with Python 3.8+ and run it directly:
python3 hbc_mini.py <hbd-host>
Config: ~/.hbc.json (same keys as ~/.hbc.yaml but JSON format)
Built-in plugins (always available):
os_info — OS name, kernel, distro, Python version
ping_monitor — ICMP ping RTT to configured hosts
nagios_runner — run Nagios-compatible check scripts
Linux-only plugins (via /proc, no extra tools needed):
cpu_monitor — CPU %, load average, core count
memory_monitor — RAM and swap usage
network_monitor — per-interface TX/RX bytes/s
Cross-platform (via df subprocess):
disk_monitor — per-mount usage and capacity
"""
import argparse
import asyncio
import json
import logging
import os
import platform
import re
import shutil
import signal
import socket
import subprocess
import sys
import time
import zlib
from abc import ABC, abstractmethod
from collections import defaultdict
from logging.handlers import SysLogHandler
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# updated by scripts/bumpminor.sh
__version__ = "5.3.9"
# ---------------------------------------------------------------------------
# Protocol (mirrors hbd/common/proto.py)
# ---------------------------------------------------------------------------
def _encode_value(v: Any) -> str:
if isinstance(v, float):
return f"{v:0.5f}"
if isinstance(v, (list, dict)):
return "@" + json.dumps(v)
if isinstance(v, bool):
return str(int(v))
return str(v)
def _decode_value(val: str) -> Any:
if not val:
return val
if val.startswith("@"):
try:
return json.loads(val[1:])
except Exception:
return val[1:]
if val[0].isdigit() or (val[0] == "-" and len(val) > 1 and val[1].isdigit()):
try:
return int(val)
except ValueError:
pass
try:
return float(val)
except ValueError:
pass
return val
def _dicttos(msg_id: str, d: Dict[str, Any]) -> bytes:
payload = ";".join(f"{k}={_encode_value(v)}" for k, v in d.items()).encode()
return ("!" + msg_id + ":").encode() + zlib.compress(payload, 6)
def _stodict(data: bytes) -> Dict[str, Any]:
result: Dict[str, Any] = {}
if not data:
return result
if chr(data[0]) == "!":
try:
payload = zlib.decompress(data[5:]).decode()
except Exception:
return {}
result["ID"] = data[1:4].decode()
else:
try:
head, payload = data.split(b":", 1)
payload = payload.decode()
result["ID"] = head.decode()
except Exception:
return {}
for item in payload.split(";"):
if not item:
continue
kv = item.split("=", 1)
result[kv[0].strip()] = _decode_value(kv[1].strip()) if len(kv) > 1 else None
return result
# ---------------------------------------------------------------------------
# Config (JSON, default ~/.hbc.json)
# ---------------------------------------------------------------------------
_DEFAULTS: Dict[str, Any] = {
"hb_port": 50003,
"interval": 10,
"owner": None,
"plugins": {},
}
def _load_config(path: Optional[str] = None) -> Dict[str, Any]:
cfg = dict(_DEFAULTS)
if not path:
path = os.path.join(os.path.expanduser("~"), ".hbc.json")
if os.path.exists(path):
try:
with open(path) as fh:
cfg.update(json.load(fh))
logging.getLogger("hbc.config").info("loaded config from %s", path)
except Exception as e:
logging.getLogger("hbc.config").warning("cannot read %s: %s", path, e)
return cfg
# ---------------------------------------------------------------------------
# Plugin base classes
# ---------------------------------------------------------------------------
class Plugin(ABC):
name: str = ""
version: str = "1.0.0"
description: str = ""
interval: int = 0
enabled: bool = True
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.logger = logging.getLogger(f"plugin.{self.name}")
self.skip_reason: Optional[str] = None
@abstractmethod
async def initialize(self) -> bool: ...
@abstractmethod
async def collect(self) -> Dict[str, Any]: ...
async def cleanup(self) -> None:
pass
class InfoPlugin(Plugin):
"""Collected once at startup, result is cached."""
interval = 0
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
self._cache: Optional[Dict[str, Any]] = None
async def collect(self) -> Dict[str, Any]:
if self._cache is None:
self._cache = await self._collect_info()
return self._cache
@abstractmethod
async def _collect_info(self) -> Dict[str, Any]: ...
class MonitorPlugin(Plugin):
"""Collected repeatedly on a fixed interval."""
interval = 30
async def collect(self) -> Dict[str, Any]:
data = await self._collect_metrics()
if data:
data["_timestamp"] = time.time()
return data
@abstractmethod
async def _collect_metrics(self) -> Dict[str, Any]: ...
# ---------------------------------------------------------------------------
# Plugin: os_info
# ---------------------------------------------------------------------------
def _linux_distro() -> Dict[str, str]:
info: Dict[str, str] = {}
for path in ("/etc/os-release", "/etc/lsb-release"):
if not os.path.exists(path):
continue
try:
with open(path) as fh:
for line in fh:
line = line.strip()
if "=" not in line or line.startswith("#"):
continue
k, v = line.split("=", 1)
v = v.strip('"').strip("'")
if k == "PRETTY_NAME":
info["distro_pretty_name"] = v
elif k == "NAME":
info.setdefault("distro_name", v)
elif k == "VERSION_ID":
info["distro_version_id"] = v
elif k == "ID":
info["distro_id"] = v
except Exception:
pass
if info:
break
return info
class OSInfoPlugin(InfoPlugin):
name = "os_info"
description = "OS and platform information"
async def initialize(self) -> bool:
return True
async def _collect_info(self) -> Dict[str, Any]:
data: Dict[str, Any] = {
"system": platform.system(),
"node": platform.node(),
"release": platform.release(),
"machine": platform.machine(),
"architecture": platform.architecture()[0],
"python_version": platform.python_version(),
"hbc_version": __version__,
"hbc_type": "mini",
}
if self.config.get("owner"):
data["owner"] = self.config["owner"]
if platform.system() == "Linux":
data.update(_linux_distro())
elif platform.system() == "Darwin":
data["macos_version"] = platform.mac_ver()[0]
return data
# ---------------------------------------------------------------------------
# Plugin: ping_monitor
# ---------------------------------------------------------------------------
class PingMonitorPlugin(MonitorPlugin):
name = "ping_monitor"
description = "ICMP ping latency monitoring"
interval = 60
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
cfg = config or {}
self.interval = cfg.get("interval", 60)
self.count = int(cfg.get("count", 3))
self.timeout = int(cfg.get("timeout", 5))
raw = cfg.get("hosts", {})
self.hosts: Dict[str, Any] = {h: {} for h in raw} if isinstance(raw, list) else dict(raw)
async def initialize(self) -> bool:
if not self.hosts:
self.skip_reason = "no hosts configured"
return False
return True
async def _ping(self, host: str) -> Dict[str, float]:
_inf = float("inf")
_fail: Dict[str, float] = {"rtt_min": _inf, "rtt_avg": _inf, "rtt_max": _inf, "loss": 100.0}
if sys.platform == "win32":
cmd = ["ping", "-n", str(self.count), "-w", str(self.timeout * 1000), host]
else:
cmd = ["ping", "-c", str(self.count), "-W", str(self.timeout), host]
try:
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
out_b, _ = await asyncio.wait_for(
proc.communicate(), timeout=self.timeout * self.count + 2
)
out = out_b.decode(errors="replace")
except Exception:
return _fail
loss = 100.0
m = re.search(r"(\d+(?:\.\d+)?)\s*%\s*packet\s*loss", out)
if m:
loss = float(m.group(1))
m = re.search(
r"(?:rtt|round-trip)\s+min/avg/max/\S+\s*=\s*([\d.]+)/([\d.]+)/([\d.]+)", out
)
if m:
return {"rtt_min": float(m.group(1)), "rtt_avg": float(m.group(2)),
"rtt_max": float(m.group(3)), "loss": loss}
return _fail
async def _collect_metrics(self) -> Dict[str, Any]:
tasks = {h: asyncio.create_task(self._ping(h)) for h in self.hosts}
data: Dict[str, Any] = {}
for host, task in tasks.items():
try:
result = await task
except Exception:
result = {"rtt_min": float("inf"), "rtt_avg": float("inf"),
"rtt_max": float("inf"), "loss": 100.0}
key = re.sub(r"[^a-zA-Z0-9_]", "_", host)
for metric, value in result.items():
data[f"{key}_{metric}"] = value
return data
# ---------------------------------------------------------------------------
# Plugin: nagios_runner
# ---------------------------------------------------------------------------
_NAGIOS_STATUS = {0: "OK", 1: "WARNING", 2: "CRITICAL", 3: "UNKNOWN"}
def _parse_nagios_perfdata(output: str) -> Dict[str, Any]:
if "|" not in output:
return {}
section = output.split("|", 1)[1].strip()
perfdata: Dict[str, Any] = {}
for m in re.finditer(
r"'?([^'=\s]+)'?=([\d.]+)([a-zA-Z%]*);?([\d.]*);?([\d.]*);?([\d.]*);?([\d.]*)",
section,
):
label = m.group(1).strip()
try:
perfdata[label] = float(m.group(2))
except ValueError:
continue
if m.group(3):
perfdata[f"{label}_uom"] = m.group(3)
for idx, suffix in zip(range(4, 8), ("_warn", "_crit", "_min", "_max")):
if m.group(idx):
try:
perfdata[f"{label}{suffix}"] = float(m.group(idx))
except ValueError:
pass
return perfdata
class NagiosRunnerPlugin(MonitorPlugin):
name = "nagios_runner"
description = "Nagios-compatible plugin runner"
interval = 300
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
cfg = config or {}
self.commands: List[Dict[str, str]] = cfg.get("commands", [])
self.timeout: int = cfg.get("timeout", 30)
self.interval = cfg.get("interval", 300)
async def initialize(self) -> bool:
if not self.commands:
self.skip_reason = "no commands configured"
return False
return True
async def _run(self, command: str) -> Tuple[int, str, Dict[str, Any]]:
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
out, err = await asyncio.wait_for(proc.communicate(), timeout=self.timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
return 3, f"timed out after {self.timeout}s", {}
rc = max(0, min(3, proc.returncode or 0))
stdout = out.decode(errors="replace").strip()
stderr = err.decode(errors="replace").strip()
perf = _parse_nagios_perfdata(stdout)
msg = stdout.split("|")[0].strip() or stderr
return rc, msg, perf
except Exception as e:
return 3, str(e), {}
async def _collect_metrics(self) -> Dict[str, Any]:
results: Dict[str, Any] = {}
for cmd_cfg in self.commands:
name = cmd_cfg.get("name")
command = cmd_cfg.get("command")
if not name or not command:
continue
rc, msg, perf = await self._run(command)
results[f"{name}_status"] = _NAGIOS_STATUS.get(rc, "UNKNOWN")
results[f"{name}_status_code"] = rc
results[f"{name}_output"] = msg
results.update({f"{name}_{k}": v for k, v in perf.items()})
return results
# ---------------------------------------------------------------------------
# Plugin: cpu_monitor (Linux /proc/stat)
# ---------------------------------------------------------------------------
def _read_proc_stat() -> Optional[Tuple[int, ...]]:
"""Return CPU jiffies tuple (user, nice, system, idle, iowait, ...)."""
try:
with open("/proc/stat") as fh:
line = fh.readline()
parts = line.split()
if parts[0] != "cpu":
return None
return tuple(int(x) for x in parts[1:])
except Exception:
return None
class CPUMonitorPlugin(MonitorPlugin):
name = "cpu_monitor"
description = "CPU usage via /proc/stat (Linux)"
interval = 300
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
self.interval = (config or {}).get("interval", 300)
self._prev: Optional[Tuple[int, ...]] = None
async def initialize(self) -> bool:
if platform.system() != "Linux":
self.skip_reason = "Linux only (/proc/stat not available)"
return False
if _read_proc_stat() is None:
self.skip_reason = "/proc/stat not readable"
return False
return True
async def _collect_metrics(self) -> Dict[str, Any]:
# First call: take a 1-second window for an immediate reading.
# Subsequent calls: use the full interval between collections.
if self._prev is None:
t1 = _read_proc_stat()
await asyncio.sleep(1)
else:
t1 = self._prev
t2 = _read_proc_stat()
if t1 is None or t2 is None:
return {}
self._prev = t2
# Fields: user(0) nice(1) system(2) idle(3) iowait(4) irq(5) softirq(6) steal(7)
have_iowait = len(t2) > 4
idle1 = t1[3] + (t1[4] if have_iowait else 0)
idle2 = t2[3] + (t2[4] if have_iowait else 0)
total_delta = sum(t2) - sum(t1)
if total_delta == 0:
return {}
data: Dict[str, Any] = {
"cpu_percent": round(100.0 * (1.0 - (idle2 - idle1) / total_delta), 1),
}
if have_iowait:
data["cpu_user"] = round(100.0 * (t2[0] - t1[0]) / total_delta, 1)
data["cpu_system"] = round(100.0 * (t2[2] - t1[2]) / total_delta, 1)
data["cpu_idle"] = round(100.0 * (idle2 - idle1) / total_delta, 1)
data["cpu_iowait"] = round(100.0 * (t2[4] - t1[4]) / total_delta, 1)
try:
la = os.getloadavg()
data["load_1min"] = round(la[0], 2)
data["load_5min"] = round(la[1], 2)
data["load_15min"] = round(la[2], 2)
except (AttributeError, OSError):
pass
try:
with open("/proc/cpuinfo") as fh:
data["cpu_core_count"] = sum(1 for l in fh if l.startswith("processor"))
except Exception:
pass
try:
with open("/proc/uptime") as fh:
data["uptime_seconds"] = int(float(fh.read().split()[0]))
except Exception:
pass
return data
# ---------------------------------------------------------------------------
# Plugin: memory_monitor (Linux /proc/meminfo)
# ---------------------------------------------------------------------------
def _read_meminfo() -> Dict[str, int]:
result: Dict[str, int] = {}
try:
with open("/proc/meminfo") as fh:
for line in fh:
parts = line.split()
if len(parts) >= 2:
try:
result[parts[0].rstrip(":")] = int(parts[1])
except ValueError:
pass
except Exception:
pass
return result
class MemoryMonitorPlugin(MonitorPlugin):
name = "memory_monitor"
description = "Memory usage via /proc/meminfo (Linux)"
interval = 300
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
self.interval = (config or {}).get("interval", 300)
async def initialize(self) -> bool:
if platform.system() != "Linux":
self.skip_reason = "Linux only (/proc/meminfo not available)"
return False
if not _read_meminfo():
self.skip_reason = "/proc/meminfo not readable"
return False
return True
async def _collect_metrics(self) -> Dict[str, Any]:
mi = _read_meminfo()
if not mi:
return {}
total = mi.get("MemTotal", 0)
avail = mi.get("MemAvailable", mi.get("MemFree", 0))
free = mi.get("MemFree", 0)
# ZFS ARC is reclaimable but not included in MemAvailable; add it.
arc_kb = 0
try:
with open("/proc/spl/kstat/zfs/arcstats") as _f:
for _line in _f:
_p = _line.split()
if len(_p) >= 3 and _p[0] == "size":
arc_kb = int(_p[2]) // 1024
break
except (OSError, ValueError):
pass
avail = min(avail + arc_kb, total)
used = total - avail
data: Dict[str, Any] = {
"memory_total": total * 1024,
"memory_used": used * 1024,
"memory_available": avail * 1024,
"memory_free": free * 1024,
"memory_percent": round(100.0 * used / total, 1) if total else 0.0,
}
for field, key in (("Buffers", "memory_buffers"), ("Cached", "memory_cached"),
("Active", "memory_active"), ("Inactive", "memory_inactive")):
if field in mi:
data[key] = mi[field] * 1024
stotal = mi.get("SwapTotal", 0)
if stotal:
sfree = mi.get("SwapFree", 0)
sused = stotal - sfree
data["swap_total"] = stotal * 1024
data["swap_used"] = sused * 1024
data["swap_free"] = sfree * 1024
data["swap_percent"] = round(100.0 * sused / stotal, 1)
return data
# ---------------------------------------------------------------------------
# Plugin: disk_monitor (df -P; Linux, macOS, BSD)
# ---------------------------------------------------------------------------
class DiskMonitorPlugin(MonitorPlugin):
name = "disk_monitor"
description = "Disk usage via df"
interval = 300
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
cfg = config or {}
self.interval = cfg.get("interval", 300)
self.mounts: List[str] = cfg.get("mounts", [])
async def initialize(self) -> bool:
if sys.platform == "win32":
self.skip_reason = "Windows not supported"
return False
return True
async def _collect_metrics(self) -> Dict[str, Any]:
try:
proc = await asyncio.create_subprocess_exec(
"df", "-P",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
except Exception as e:
self.logger.warning("df failed: %s", e)
return {}
partitions: Dict[str, Any] = {}
for line in out.decode(errors="replace").splitlines()[1:]:
parts = line.split()
if len(parts) < 6:
continue
mount = parts[5]
if self.mounts and mount not in self.mounts:
continue
try:
total_kb = int(parts[1])
used_kb = int(parts[2])
avail_kb = int(parts[3])
pct = int(parts[4].rstrip("%"))
partitions[mount] = {
"total": total_kb * 1024,
"used": used_kb * 1024,
"free": avail_kb * 1024,
"percent": pct,
}
except (ValueError, IndexError):
continue
return {"partitions": partitions} if partitions else {}
# ---------------------------------------------------------------------------
# Plugin: network_monitor (Linux /proc/net/dev)
# ---------------------------------------------------------------------------
def _read_net_dev() -> Dict[str, Tuple[int, int]]:
"""Return {iface: (rx_bytes, tx_bytes)}."""
result: Dict[str, Tuple[int, int]] = {}
try:
with open("/proc/net/dev") as fh:
for line in fh.readlines()[2:]: # skip 2 header lines
parts = line.split()
if len(parts) < 10:
continue
result[parts[0].rstrip(":")] = (int(parts[1]), int(parts[9]))
except Exception:
pass
return result
class NetworkMonitorPlugin(MonitorPlugin):
name = "network_monitor"
description = "Network I/O rates via /proc/net/dev (Linux)"
interval = 300
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
cfg = config or {}
self.interval = cfg.get("interval", 300)
self.skip_ifaces: set = set(cfg.get("skip_interfaces", ["lo"]))
self._prev: Optional[Tuple[float, Dict[str, Tuple[int, int]]]] = None
async def initialize(self) -> bool:
if platform.system() != "Linux":
self.skip_reason = "Linux only (/proc/net/dev not available)"
return False
dev = _read_net_dev()
if not dev:
self.skip_reason = "/proc/net/dev not readable"
return False
self._prev = (time.time(), dev) # prime first delta
return True
async def _collect_metrics(self) -> Dict[str, Any]:
now = time.time()
curr = _read_net_dev()
if not curr or self._prev is None:
self._prev = (now, curr)
return {}
prev_ts, prev = self._prev
dt = now - prev_ts
self._prev = (now, curr)
if dt <= 0:
return {}
interfaces: Dict[str, Any] = {}
for iface, (rx, tx) in curr.items():
if iface in self.skip_ifaces or iface not in prev:
continue
prx, ptx = prev[iface]
interfaces[iface] = {
"bytes_recv": rx,
"bytes_sent": tx,
"bytes_recv_delta": rx - prx,
"bytes_sent_delta": tx - ptx,
}
return {"interfaces": interfaces} if interfaces else {}
# ---------------------------------------------------------------------------
# Plugin registry — all built-in classes, initialized at startup
# ---------------------------------------------------------------------------
_ALL_PLUGIN_CLASSES: List[type] = [
OSInfoPlugin,
PingMonitorPlugin,
NagiosRunnerPlugin,
CPUMonitorPlugin,
MemoryMonitorPlugin,
DiskMonitorPlugin,
NetworkMonitorPlugin,
]
async def _load_plugins(cfg: Dict[str, Any]) -> List[Plugin]:
log = logging.getLogger("hbc.plugins")
plugins_cfg: Dict[str, Any] = cfg.get("plugins", {})
loaded: List[Plugin] = []
for cls in _ALL_PLUGIN_CLASSES:
plugin_cfg = dict(plugins_cfg.get(cls.name) or cfg.get(cls.name) or {})
if "owner" in cfg and "owner" not in plugin_cfg:
plugin_cfg["owner"] = cfg["owner"]
plugin: Plugin = cls(config=plugin_cfg)
try:
ok = await plugin.initialize()
except Exception as e:
log.error("init %s: %s", cls.name, e)
ok = False
if ok:
loaded.append(plugin)
log.info("loaded %s (interval=%ds)", plugin.name, plugin.interval)
else:
log.info("skip %s: %s", plugin.name, plugin.skip_reason or "init failed")
return loaded
# ---------------------------------------------------------------------------
# Global state
# ---------------------------------------------------------------------------
_running = True
_dorestart = False
_shutdown_event: Optional[asyncio.Event] = None
_active_tasks: List[asyncio.Task] = []
PORT = 50003
INTERVAL = 10
def _shortname(name: str) -> str:
return name.split(".")[0]
def _stop():
global _running
_running = False
if _shutdown_event:
_shutdown_event.set()
for t in _active_tasks:
if not t.done():
t.cancel()
async def _sleep(seconds: float):
"""Sleep for up to `seconds`, waking early on shutdown."""
try:
if _shutdown_event:
await asyncio.wait_for(_shutdown_event.wait(), timeout=seconds)
else:
await asyncio.sleep(seconds)
except asyncio.TimeoutError:
pass
# ---------------------------------------------------------------------------
# UDP protocol handler + connection
# ---------------------------------------------------------------------------
class _HeartbeatProtocol(asyncio.DatagramProtocol):
def __init__(self, conn: "AsyncConnection"):
self._conn = conn
self._log = logging.getLogger("hbc.proto")
def datagram_received(self, data: bytes, addr):
try:
msg = _stodict(data)
if not msg:
return
msg_id = msg.get("ID")
now = time.time()
if msg_id == "ACK":
self._conn._handle_ack(msg, now)
elif msg_id == "CMD":
asyncio.create_task(_handle_command(self._conn, msg))
elif msg_id == "UPD":
asyncio.create_task(_handle_update(self._conn))
else:
self._log.debug("unknown msg type: %s", msg_id)
except Exception as e:
self._log.error("datagram error: %s", e)
def error_received(self, exc):
self._log.warning("protocol error on %s: %s — will retry", self._conn.addr, exc)
self._conn.close()
class AsyncConnection:
def __init__(self, conn_id: int, addr: str, port: int, af: int, name: str):
self.conn_id = conn_id
self.addr = addr
self.port = port
self.af = af
self.name = name
self.ackcount = 0
self.lastsend = 0.0
self.rtts: List[float] = [0.0]
self._transport: Optional[asyncio.DatagramTransport] = None
self._dead = False
self._request_info: asyncio.Event = asyncio.Event()
self._log = logging.getLogger(f"hbc.conn.{addr}")
async def open(self) -> bool:
try:
loop = asyncio.get_event_loop()
self._transport, _ = await loop.create_datagram_endpoint(
lambda: _HeartbeatProtocol(self), family=self.af
)
return True
except Exception as e:
self._log.error("open: %s", e)
return False
def close(self):
if self._transport:
self._transport.close()
self._transport = None
def _handle_ack(self, msg: Dict[str, Any], now: float):
rtt = (now - self.lastsend) * 1000.0
self.rtts.append(rtt)
if len(self.rtts) > 10:
self.rtts.pop(0)
self.ackcount += 1
if msg.get("request_update"):
self._request_info.set()
async def sendto(self, msg: Dict[str, Any], msg_id: str = "HTB"):
if self._dead:
return
if not self._transport:
await self.open()
if not self._transport:
return
out = dict(msg)
out["name"] = _shortname(self.name)
out["id"] = self.conn_id
out["time"] = time.time()
self._transport.sendto(_dicttos(msg_id, out), (self.addr, self.port))
self.lastsend = time.time()
# ---------------------------------------------------------------------------
# Server command handlers
# ---------------------------------------------------------------------------
async def _handle_command(conn: AsyncConnection, msg: Dict[str, Any]):
cmd = msg.get("cmd", "")
if not cmd:
return
log = logging.getLogger("hbc.cmd")
log.info("exec: %s", cmd)
try:
out = subprocess.check_output(
cmd, shell=True, stderr=subprocess.STDOUT, timeout=30
).decode()
status = "OK"
except subprocess.CalledProcessError as e:
out, status = str(e), "Error"
except subprocess.TimeoutExpired:
out, status = "timed out", "Timeout"
except Exception as e:
out, status = str(e), "Error"
await conn.sendto({"service": "command", "msg": f"{status} {out}"})
async def _handle_update(conn: AsyncConnection):
log = logging.getLogger("hbc.update")
installer = shutil.which("hb_install.sh")
if installer is None:
candidate = Path(sys.argv[0]).parent / "hb_install.sh"
if candidate.exists():
installer = str(candidate)
if installer is None:
err = "hb_install.sh not found in PATH or alongside hbc_mini.py"
log.error(err)
await conn.sendto({"service": "update", "msg": err})
return
log.info("running installer: %s", installer)
try:
proc = await asyncio.create_subprocess_exec(
installer, "mini",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
out, _ = await asyncio.wait_for(proc.communicate(), timeout=120)
except asyncio.TimeoutError:
err = "installer timed out"
log.error(err)
await conn.sendto({"service": "update", "msg": err})
return
except Exception as e:
err = f"installer failed: {e}"
log.error(err)
await conn.sendto({"service": "update", "msg": err})
return
if proc.returncode != 0:
err = f"installer exited {proc.returncode}: {out.decode().strip()}"
log.error(err)
await conn.sendto({"service": "update", "msg": err})
return
log.info("update successful, restarting")
await conn.sendto({"service": "update", "msg": "OK"})
global _dorestart
_dorestart = True
_stop()
# ---------------------------------------------------------------------------
# Heartbeat sender
# ---------------------------------------------------------------------------
async def _heartbeat_sender(conn: AsyncConnection, interval: int):
log = logging.getLogger("hbc.hb")
while _running:
try:
await conn.sendto({
"acks": conn.ackcount,
"rtt": conn.rtts[-1],
"interval": interval,
})
except Exception as e:
log.error("send: %s", e)
await _sleep(interval)
# ---------------------------------------------------------------------------
# Plugin collection loops
# ---------------------------------------------------------------------------
async def _run_info_plugins(conn: AsyncConnection, plugins: List[Plugin]):
log = logging.getLogger("hbc.plugins")
for plugin in plugins:
try:
data = await plugin.collect()
if data:
await conn.sendto({"plugin": plugin.name, **data}, "PLG")
log.info("sent %s", plugin.name)
except Exception as e:
log.error("%s collect: %s", plugin.name, e)
async def _run_monitor_group(conn: AsyncConnection, plugins: List[Plugin], interval: int):
log = logging.getLogger(f"hbc.plugins.{interval}s")
while _running:
for plugin in plugins:
try:
data = await plugin.collect()
if data:
await conn.sendto({"plugin": plugin.name, **data}, "PLG")
log.debug("sent %s", plugin.name)
except asyncio.CancelledError:
raise
except Exception as e:
log.error("%s: %s", plugin.name, e)
await _sleep(interval)
async def _info_refresh_loop(conn: AsyncConnection, info: List[Plugin]):
log = logging.getLogger("hbc.plugins")
while _running:
await conn._request_info.wait()
if not _running:
break
conn._request_info.clear()
log.info("refreshing InfoPlugins on server request")
for plugin in info:
plugin._cache = None
await _run_info_plugins(conn, info)
async def _plugin_collector(conn: AsyncConnection, plugins: List[Plugin]):
info = [p for p in plugins if isinstance(p, InfoPlugin)]
monitor = [p for p in plugins if isinstance(p, MonitorPlugin)]
await _run_info_plugins(conn, info)
by_interval: Dict[int, List[Plugin]] = defaultdict(list)
for p in monitor:
by_interval[p.interval].append(p)
tasks = [asyncio.create_task(_info_refresh_loop(conn, info))]
tasks += [asyncio.create_task(_run_monitor_group(conn, grp, iv))
for iv, grp in by_interval.items()]
await asyncio.gather(*tasks, return_exceptions=True)
# ---------------------------------------------------------------------------
# Daemonize + syslog
# ---------------------------------------------------------------------------
def _daemonize():
for fork_n in (1, 2):
try:
pid = os.fork()
if pid > 0:
os._exit(0)
except OSError as e:
sys.stderr.write(f"fork #{fork_n} failed: {e}\n")
os._exit(1)
if fork_n == 1:
os.chdir("/")
os.setsid()
os.umask(0)
for fd, path, mode in ((0, "/dev/zero", "r"), (1, "/dev/null", "a+"), (2, "/dev/null", "a+")):
try:
os.dup2(open(path, mode).fileno(), fd)
except Exception:
pass
def _reconfigure_syslog(level: int):
root = logging.getLogger()
for h in root.handlers[:]:
root.removeHandler(h)
h.close()
addr = "/dev/log" if os.path.exists("/dev/log") else ("localhost", 514)
sh = SysLogHandler(address=addr, facility=SysLogHandler.LOG_DAEMON)
sh.setFormatter(logging.Formatter("hbc[%(process)d]: %(levelname)s %(message)s"))
root.addHandler(sh)
root.setLevel(level)
# ---------------------------------------------------------------------------
# Async main
# ---------------------------------------------------------------------------
async def _async_main(args, cfg: Dict[str, Any]) -> int:
global _running, _shutdown_event, _active_tasks, send_shutdown
_running = True
_shutdown_event = asyncio.Event()
_active_tasks = []
log = logging.getLogger("hbc.main")
iam = args.name or socket.gethostname()
port = cfg.get("hb_port", PORT)
interval = cfg.get("interval", INTERVAL)
log.info("hbc_mini %s on %s -> %s port=%d interval=%ds",__version__, iam, args.hosts, port, interval)
af_filter = (socket.AF_INET if getattr(args, "ipv4_only", False)
else socket.AF_INET6 if getattr(args, "ipv6_only", False)
else 0)
connections: List[AsyncConnection] = []
conn_id = 1
_retry_delay = 5
while _running and not connections:
for host in args.hosts:
try:
addrs = socket.getaddrinfo(host, port, af_filter, 0, socket.SOL_UDP)
except socket.gaierror as e:
log.warning("cannot resolve %s: %s — retrying in %ds", host, e, _retry_delay)
continue
for ai in addrs:
conn = AsyncConnection(conn_id, ai[4][0], port, ai[0], iam)
if await conn.open():
connections.append(conn)
conn_id += 1
if not connections:
await _sleep(_retry_delay)
_retry_delay = min(_retry_delay * 2, 60)
if not connections:
return 1
# Boot / one-shot message
send_shutdown = False
if args.boot or args.message:
bmsg: Dict[str, Any] = {"acks": 0}
if args.boot:
bmsg["boot"] = 1
args.boot = False # don't repeat on restart
send_shutdown = True
if args.message:
bmsg["service"] = "service"
bmsg["msg"] = args.message
target = next((c for c in connections if c._transport), connections[0])
await target.sendto(bmsg)
if args.message and not args.daemon:
await asyncio.sleep(0.3)
for c in connections:
c.close()
return 0
plugins = await _load_plugins(cfg)
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, _stop)
def _sighup():
global _dorestart
_dorestart = True
_stop()
loop.add_signal_handler(signal.SIGHUP, _sighup)
for conn in connections:
_active_tasks.append(asyncio.create_task(_heartbeat_sender(conn, interval)))
if plugins and connections:
_active_tasks.append(asyncio.create_task(_plugin_collector(connections[0], plugins)))
try:
await asyncio.gather(*_active_tasks, return_exceptions=True)
except asyncio.CancelledError:
pass
log.info("shutting down")
target = next((c for c in connections if c._transport), connections[0] if connections else None)
if target and send_shutdown:
try:
await target.sendto({"shutdown": 1, "acks": target.ackcount})
except Exception:
pass
for conn in connections:
conn.close()
await asyncio.sleep(0.3)
for plugin in plugins:
await plugin.cleanup()
return 0
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main(argv=None):
global _dorestart
parser = argparse.ArgumentParser(
prog="hbc_mini",
description="HeartBeat Client — single file, no external dependencies",
)
parser.add_argument("-b", "--boot", action="store_true", help="Send boot message")
parser.add_argument("-c", "--config", dest="configfile", help="Config file (JSON)")
parser.add_argument("-m", "--message", dest="message", help="Send a one-shot message")
parser.add_argument("-n", "--name", dest="name", help="Override hostname")
parser.add_argument("-d", "--daemon", action="store_true", help="Run as daemon")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("-x", "--debug", action="count", default=0, help="Debug level")
af_group = parser.add_mutually_exclusive_group()
af_group.add_argument("-4", dest="ipv4_only", action="store_true", help="Use IPv4 only")
af_group.add_argument("-6", dest="ipv6_only", action="store_true", help="Use IPv6 only")
parser.add_argument("hosts", nargs="+", help="HBD server(s)")
args = parser.parse_args(argv)
level = logging.WARNING
if args.verbose:
level = logging.INFO
if args.debug:
level = logging.DEBUG
logging.basicConfig(
level=level,
format="%(asctime)s %(name)s %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
cfg = _load_config(args.configfile)
if args.daemon:
_daemonize()
_reconfigure_syslog(level)
try:
rc = asyncio.run(_async_main(args, cfg))
except KeyboardInterrupt:
rc = 0
except Exception as e:
logging.error("fatal: %s", e, exc_info=True)
rc = 1
if _dorestart:
logging.info("restarting...")
os.execv(sys.argv[0], sys.argv)
sys.exit(rc)
if __name__ == "__main__":
main()