1193 lines
39 KiB
Python
Executable File
1193 lines
39 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""hbc_mini — single-file HeartBeat Client with no external dependencies.
|
|
|
|
Drop this file on any host with Python 3.8+ and run it directly:
|
|
python3 hbc_mini.py <hbd-host>
|
|
|
|
Config: ~/.hbc.json (same keys as ~/.hbc.yaml but JSON format)
|
|
|
|
Built-in plugins (always available):
|
|
os_info — OS name, kernel, distro, Python version
|
|
ping_monitor — ICMP ping RTT to configured hosts
|
|
nagios_runner — run Nagios-compatible check scripts
|
|
|
|
Linux-only plugins (via /proc, no extra tools needed):
|
|
cpu_monitor — CPU %, load average, core count
|
|
memory_monitor — RAM and swap usage
|
|
network_monitor — per-interface TX/RX bytes/s
|
|
|
|
Cross-platform (via df subprocess):
|
|
disk_monitor — per-mount usage and capacity
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import re
|
|
import shutil
|
|
import signal
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import zlib
|
|
from abc import ABC, abstractmethod
|
|
from collections import defaultdict
|
|
from logging.handlers import SysLogHandler
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
# updated by scripts/bumpminor.sh
|
|
__version__ = "5.2.6"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Protocol (mirrors hbd/common/proto.py)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _encode_value(v: Any) -> str:
|
|
if isinstance(v, float):
|
|
return f"{v:0.5f}"
|
|
if isinstance(v, (list, dict)):
|
|
return "@" + json.dumps(v)
|
|
if isinstance(v, bool):
|
|
return str(int(v))
|
|
return str(v)
|
|
|
|
|
|
def _decode_value(val: str) -> Any:
|
|
if not val:
|
|
return val
|
|
if val.startswith("@"):
|
|
try:
|
|
return json.loads(val[1:])
|
|
except Exception:
|
|
return val[1:]
|
|
if val[0].isdigit() or (val[0] == "-" and len(val) > 1 and val[1].isdigit()):
|
|
try:
|
|
return int(val)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
return float(val)
|
|
except ValueError:
|
|
pass
|
|
return val
|
|
|
|
|
|
def _dicttos(msg_id: str, d: Dict[str, Any]) -> bytes:
|
|
payload = ";".join(f"{k}={_encode_value(v)}" for k, v in d.items()).encode()
|
|
return ("!" + msg_id + ":").encode() + zlib.compress(payload, 6)
|
|
|
|
|
|
def _stodict(data: bytes) -> Dict[str, Any]:
|
|
result: Dict[str, Any] = {}
|
|
if not data:
|
|
return result
|
|
if chr(data[0]) == "!":
|
|
try:
|
|
payload = zlib.decompress(data[5:]).decode()
|
|
except Exception:
|
|
return {}
|
|
result["ID"] = data[1:4].decode()
|
|
else:
|
|
try:
|
|
head, payload = data.split(b":", 1)
|
|
payload = payload.decode()
|
|
result["ID"] = head.decode()
|
|
except Exception:
|
|
return {}
|
|
for item in payload.split(";"):
|
|
if not item:
|
|
continue
|
|
kv = item.split("=", 1)
|
|
result[kv[0].strip()] = _decode_value(kv[1].strip()) if len(kv) > 1 else None
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config (JSON, default ~/.hbc.json)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_DEFAULTS: Dict[str, Any] = {
|
|
"hb_port": 50003,
|
|
"interval": 10,
|
|
"owner": None,
|
|
"plugins": {},
|
|
}
|
|
|
|
|
|
def _load_config(path: Optional[str] = None) -> Dict[str, Any]:
|
|
cfg = dict(_DEFAULTS)
|
|
if not path:
|
|
path = os.path.join(os.path.expanduser("~"), ".hbc.json")
|
|
if os.path.exists(path):
|
|
try:
|
|
with open(path) as fh:
|
|
cfg.update(json.load(fh))
|
|
logging.getLogger("hbc.config").info("loaded config from %s", path)
|
|
except Exception as e:
|
|
logging.getLogger("hbc.config").warning("cannot read %s: %s", path, e)
|
|
return cfg
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin base classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class Plugin(ABC):
|
|
name: str = ""
|
|
version: str = "1.0.0"
|
|
description: str = ""
|
|
interval: int = 0
|
|
enabled: bool = True
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
self.config = config or {}
|
|
self.logger = logging.getLogger(f"plugin.{self.name}")
|
|
self.skip_reason: Optional[str] = None
|
|
|
|
@abstractmethod
|
|
async def initialize(self) -> bool: ...
|
|
|
|
@abstractmethod
|
|
async def collect(self) -> Dict[str, Any]: ...
|
|
|
|
async def cleanup(self) -> None:
|
|
pass
|
|
|
|
|
|
class InfoPlugin(Plugin):
|
|
"""Collected once at startup, result is cached."""
|
|
interval = 0
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
self._cache: Optional[Dict[str, Any]] = None
|
|
|
|
async def collect(self) -> Dict[str, Any]:
|
|
if self._cache is None:
|
|
self._cache = await self._collect_info()
|
|
return self._cache
|
|
|
|
@abstractmethod
|
|
async def _collect_info(self) -> Dict[str, Any]: ...
|
|
|
|
|
|
class MonitorPlugin(Plugin):
|
|
"""Collected repeatedly on a fixed interval."""
|
|
interval = 30
|
|
|
|
async def collect(self) -> Dict[str, Any]:
|
|
data = await self._collect_metrics()
|
|
if data:
|
|
data["_timestamp"] = time.time()
|
|
return data
|
|
|
|
@abstractmethod
|
|
async def _collect_metrics(self) -> Dict[str, Any]: ...
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: os_info
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _linux_distro() -> Dict[str, str]:
|
|
info: Dict[str, str] = {}
|
|
for path in ("/etc/os-release", "/etc/lsb-release"):
|
|
if not os.path.exists(path):
|
|
continue
|
|
try:
|
|
with open(path) as fh:
|
|
for line in fh:
|
|
line = line.strip()
|
|
if "=" not in line or line.startswith("#"):
|
|
continue
|
|
k, v = line.split("=", 1)
|
|
v = v.strip('"').strip("'")
|
|
if k == "PRETTY_NAME":
|
|
info["distro_pretty_name"] = v
|
|
elif k == "NAME":
|
|
info.setdefault("distro_name", v)
|
|
elif k == "VERSION_ID":
|
|
info["distro_version_id"] = v
|
|
elif k == "ID":
|
|
info["distro_id"] = v
|
|
except Exception:
|
|
pass
|
|
if info:
|
|
break
|
|
return info
|
|
|
|
|
|
class OSInfoPlugin(InfoPlugin):
|
|
name = "os_info"
|
|
description = "OS and platform information"
|
|
|
|
async def initialize(self) -> bool:
|
|
return True
|
|
|
|
async def _collect_info(self) -> Dict[str, Any]:
|
|
data: Dict[str, Any] = {
|
|
"system": platform.system(),
|
|
"node": platform.node(),
|
|
"release": platform.release(),
|
|
"machine": platform.machine(),
|
|
"architecture": platform.architecture()[0],
|
|
"python_version": platform.python_version(),
|
|
"hbc_version": __version__,
|
|
"hbc_type": "mini",
|
|
}
|
|
if self.config.get("owner"):
|
|
data["owner"] = self.config["owner"]
|
|
if platform.system() == "Linux":
|
|
data.update(_linux_distro())
|
|
elif platform.system() == "Darwin":
|
|
data["macos_version"] = platform.mac_ver()[0]
|
|
return data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: ping_monitor
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class PingMonitorPlugin(MonitorPlugin):
|
|
name = "ping_monitor"
|
|
description = "ICMP ping latency monitoring"
|
|
interval = 60
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
cfg = config or {}
|
|
self.interval = cfg.get("interval", 60)
|
|
self.count = int(cfg.get("count", 3))
|
|
self.timeout = int(cfg.get("timeout", 5))
|
|
raw = cfg.get("hosts", {})
|
|
self.hosts: Dict[str, Any] = {h: {} for h in raw} if isinstance(raw, list) else dict(raw)
|
|
|
|
async def initialize(self) -> bool:
|
|
if not self.hosts:
|
|
self.skip_reason = "no hosts configured"
|
|
return False
|
|
return True
|
|
|
|
async def _ping(self, host: str) -> Dict[str, float]:
|
|
_inf = float("inf")
|
|
_fail: Dict[str, float] = {"rtt_min": _inf, "rtt_avg": _inf, "rtt_max": _inf, "loss": 100.0}
|
|
if sys.platform == "win32":
|
|
cmd = ["ping", "-n", str(self.count), "-w", str(self.timeout * 1000), host]
|
|
else:
|
|
cmd = ["ping", "-c", str(self.count), "-W", str(self.timeout), host]
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
|
)
|
|
out_b, _ = await asyncio.wait_for(
|
|
proc.communicate(), timeout=self.timeout * self.count + 2
|
|
)
|
|
out = out_b.decode(errors="replace")
|
|
except Exception:
|
|
return _fail
|
|
loss = 100.0
|
|
m = re.search(r"(\d+(?:\.\d+)?)\s*%\s*packet\s*loss", out)
|
|
if m:
|
|
loss = float(m.group(1))
|
|
m = re.search(
|
|
r"(?:rtt|round-trip)\s+min/avg/max/\S+\s*=\s*([\d.]+)/([\d.]+)/([\d.]+)", out
|
|
)
|
|
if m:
|
|
return {"rtt_min": float(m.group(1)), "rtt_avg": float(m.group(2)),
|
|
"rtt_max": float(m.group(3)), "loss": loss}
|
|
return _fail
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
tasks = {h: asyncio.create_task(self._ping(h)) for h in self.hosts}
|
|
data: Dict[str, Any] = {}
|
|
for host, task in tasks.items():
|
|
try:
|
|
result = await task
|
|
except Exception:
|
|
result = {"rtt_min": float("inf"), "rtt_avg": float("inf"),
|
|
"rtt_max": float("inf"), "loss": 100.0}
|
|
key = re.sub(r"[^a-zA-Z0-9_]", "_", host)
|
|
for metric, value in result.items():
|
|
data[f"{key}_{metric}"] = value
|
|
return data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: nagios_runner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_NAGIOS_STATUS = {0: "OK", 1: "WARNING", 2: "CRITICAL", 3: "UNKNOWN"}
|
|
|
|
|
|
def _parse_nagios_perfdata(output: str) -> Dict[str, Any]:
|
|
if "|" not in output:
|
|
return {}
|
|
section = output.split("|", 1)[1].strip()
|
|
perfdata: Dict[str, Any] = {}
|
|
for m in re.finditer(
|
|
r"'?([^'=\s]+)'?=([\d.]+)([a-zA-Z%]*);?([\d.]*);?([\d.]*);?([\d.]*);?([\d.]*)",
|
|
section,
|
|
):
|
|
label = m.group(1).strip()
|
|
try:
|
|
perfdata[label] = float(m.group(2))
|
|
except ValueError:
|
|
continue
|
|
if m.group(3):
|
|
perfdata[f"{label}_uom"] = m.group(3)
|
|
for idx, suffix in zip(range(4, 8), ("_warn", "_crit", "_min", "_max")):
|
|
if m.group(idx):
|
|
try:
|
|
perfdata[f"{label}{suffix}"] = float(m.group(idx))
|
|
except ValueError:
|
|
pass
|
|
return perfdata
|
|
|
|
|
|
class NagiosRunnerPlugin(MonitorPlugin):
|
|
name = "nagios_runner"
|
|
description = "Nagios-compatible plugin runner"
|
|
interval = 300
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
cfg = config or {}
|
|
self.commands: List[Dict[str, str]] = cfg.get("commands", [])
|
|
self.timeout: int = cfg.get("timeout", 30)
|
|
self.interval = cfg.get("interval", 300)
|
|
|
|
async def initialize(self) -> bool:
|
|
if not self.commands:
|
|
self.skip_reason = "no commands configured"
|
|
return False
|
|
return True
|
|
|
|
async def _run(self, command: str) -> Tuple[int, str, Dict[str, Any]]:
|
|
try:
|
|
proc = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
try:
|
|
out, err = await asyncio.wait_for(proc.communicate(), timeout=self.timeout)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.communicate()
|
|
return 3, f"timed out after {self.timeout}s", {}
|
|
rc = max(0, min(3, proc.returncode or 0))
|
|
stdout = out.decode(errors="replace").strip()
|
|
stderr = err.decode(errors="replace").strip()
|
|
perf = _parse_nagios_perfdata(stdout)
|
|
msg = stdout.split("|")[0].strip() or stderr
|
|
return rc, msg, perf
|
|
except Exception as e:
|
|
return 3, str(e), {}
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
results: Dict[str, Any] = {}
|
|
for cmd_cfg in self.commands:
|
|
name = cmd_cfg.get("name")
|
|
command = cmd_cfg.get("command")
|
|
if not name or not command:
|
|
continue
|
|
rc, msg, perf = await self._run(command)
|
|
results[f"{name}_status"] = _NAGIOS_STATUS.get(rc, "UNKNOWN")
|
|
results[f"{name}_status_code"] = rc
|
|
results[f"{name}_output"] = msg
|
|
results.update({f"{name}_{k}": v for k, v in perf.items()})
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: cpu_monitor (Linux /proc/stat)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _read_proc_stat() -> Optional[Tuple[int, ...]]:
|
|
"""Return CPU jiffies tuple (user, nice, system, idle, iowait, ...)."""
|
|
try:
|
|
with open("/proc/stat") as fh:
|
|
line = fh.readline()
|
|
parts = line.split()
|
|
if parts[0] != "cpu":
|
|
return None
|
|
return tuple(int(x) for x in parts[1:])
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
class CPUMonitorPlugin(MonitorPlugin):
|
|
name = "cpu_monitor"
|
|
description = "CPU usage via /proc/stat (Linux)"
|
|
interval = 300
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
self.interval = (config or {}).get("interval", 300)
|
|
self._prev: Optional[Tuple[int, ...]] = None
|
|
|
|
async def initialize(self) -> bool:
|
|
if platform.system() != "Linux":
|
|
self.skip_reason = "Linux only (/proc/stat not available)"
|
|
return False
|
|
if _read_proc_stat() is None:
|
|
self.skip_reason = "/proc/stat not readable"
|
|
return False
|
|
return True
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
# First call: take a 1-second window for an immediate reading.
|
|
# Subsequent calls: use the full interval between collections.
|
|
if self._prev is None:
|
|
t1 = _read_proc_stat()
|
|
await asyncio.sleep(1)
|
|
else:
|
|
t1 = self._prev
|
|
|
|
t2 = _read_proc_stat()
|
|
if t1 is None or t2 is None:
|
|
return {}
|
|
self._prev = t2
|
|
|
|
# Fields: user(0) nice(1) system(2) idle(3) iowait(4) irq(5) softirq(6) steal(7)
|
|
have_iowait = len(t2) > 4
|
|
idle1 = t1[3] + (t1[4] if have_iowait else 0)
|
|
idle2 = t2[3] + (t2[4] if have_iowait else 0)
|
|
total_delta = sum(t2) - sum(t1)
|
|
if total_delta == 0:
|
|
return {}
|
|
|
|
data: Dict[str, Any] = {
|
|
"cpu_percent": round(100.0 * (1.0 - (idle2 - idle1) / total_delta), 1),
|
|
}
|
|
if have_iowait:
|
|
data["cpu_user"] = round(100.0 * (t2[0] - t1[0]) / total_delta, 1)
|
|
data["cpu_system"] = round(100.0 * (t2[2] - t1[2]) / total_delta, 1)
|
|
data["cpu_idle"] = round(100.0 * (idle2 - idle1) / total_delta, 1)
|
|
data["cpu_iowait"] = round(100.0 * (t2[4] - t1[4]) / total_delta, 1)
|
|
|
|
try:
|
|
la = os.getloadavg()
|
|
data["load_1min"] = round(la[0], 2)
|
|
data["load_5min"] = round(la[1], 2)
|
|
data["load_15min"] = round(la[2], 2)
|
|
except (AttributeError, OSError):
|
|
pass
|
|
|
|
try:
|
|
with open("/proc/cpuinfo") as fh:
|
|
data["cpu_core_count"] = sum(1 for l in fh if l.startswith("processor"))
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
with open("/proc/uptime") as fh:
|
|
data["uptime_seconds"] = int(float(fh.read().split()[0]))
|
|
except Exception:
|
|
pass
|
|
|
|
return data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: memory_monitor (Linux /proc/meminfo)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _read_meminfo() -> Dict[str, int]:
|
|
result: Dict[str, int] = {}
|
|
try:
|
|
with open("/proc/meminfo") as fh:
|
|
for line in fh:
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
try:
|
|
result[parts[0].rstrip(":")] = int(parts[1])
|
|
except ValueError:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
|
|
class MemoryMonitorPlugin(MonitorPlugin):
|
|
name = "memory_monitor"
|
|
description = "Memory usage via /proc/meminfo (Linux)"
|
|
interval = 300
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
self.interval = (config or {}).get("interval", 300)
|
|
|
|
async def initialize(self) -> bool:
|
|
if platform.system() != "Linux":
|
|
self.skip_reason = "Linux only (/proc/meminfo not available)"
|
|
return False
|
|
if not _read_meminfo():
|
|
self.skip_reason = "/proc/meminfo not readable"
|
|
return False
|
|
return True
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
mi = _read_meminfo()
|
|
if not mi:
|
|
return {}
|
|
total = mi.get("MemTotal", 0)
|
|
avail = mi.get("MemAvailable", mi.get("MemFree", 0))
|
|
free = mi.get("MemFree", 0)
|
|
|
|
# ZFS ARC is reclaimable but not included in MemAvailable; add it.
|
|
arc_kb = 0
|
|
try:
|
|
with open("/proc/spl/kstat/zfs/arcstats") as _f:
|
|
for _line in _f:
|
|
_p = _line.split()
|
|
if len(_p) >= 3 and _p[0] == "size":
|
|
arc_kb = int(_p[2]) // 1024
|
|
break
|
|
except (OSError, ValueError):
|
|
pass
|
|
|
|
avail = min(avail + arc_kb, total)
|
|
used = total - avail
|
|
data: Dict[str, Any] = {
|
|
"memory_total": total * 1024,
|
|
"memory_used": used * 1024,
|
|
"memory_available": avail * 1024,
|
|
"memory_free": free * 1024,
|
|
"memory_percent": round(100.0 * used / total, 1) if total else 0.0,
|
|
}
|
|
for field, key in (("Buffers", "memory_buffers"), ("Cached", "memory_cached"),
|
|
("Active", "memory_active"), ("Inactive", "memory_inactive")):
|
|
if field in mi:
|
|
data[key] = mi[field] * 1024
|
|
stotal = mi.get("SwapTotal", 0)
|
|
if stotal:
|
|
sfree = mi.get("SwapFree", 0)
|
|
sused = stotal - sfree
|
|
data["swap_total"] = stotal * 1024
|
|
data["swap_used"] = sused * 1024
|
|
data["swap_free"] = sfree * 1024
|
|
data["swap_percent"] = round(100.0 * sused / stotal, 1)
|
|
return data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: disk_monitor (df -P; Linux, macOS, BSD)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class DiskMonitorPlugin(MonitorPlugin):
|
|
name = "disk_monitor"
|
|
description = "Disk usage via df"
|
|
interval = 300
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
cfg = config or {}
|
|
self.interval = cfg.get("interval", 300)
|
|
self.mounts: List[str] = cfg.get("mounts", [])
|
|
|
|
async def initialize(self) -> bool:
|
|
if sys.platform == "win32":
|
|
self.skip_reason = "Windows not supported"
|
|
return False
|
|
return True
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"df", "-P",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
out, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
|
|
except Exception as e:
|
|
self.logger.warning("df failed: %s", e)
|
|
return {}
|
|
partitions: Dict[str, Any] = {}
|
|
for line in out.decode(errors="replace").splitlines()[1:]:
|
|
parts = line.split()
|
|
if len(parts) < 6:
|
|
continue
|
|
mount = parts[5]
|
|
if self.mounts and mount not in self.mounts:
|
|
continue
|
|
try:
|
|
total_kb = int(parts[1])
|
|
used_kb = int(parts[2])
|
|
avail_kb = int(parts[3])
|
|
pct = int(parts[4].rstrip("%"))
|
|
partitions[mount] = {
|
|
"total": total_kb * 1024,
|
|
"used": used_kb * 1024,
|
|
"free": avail_kb * 1024,
|
|
"percent": pct,
|
|
}
|
|
except (ValueError, IndexError):
|
|
continue
|
|
return {"partitions": partitions} if partitions else {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: network_monitor (Linux /proc/net/dev)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _read_net_dev() -> Dict[str, Tuple[int, int]]:
|
|
"""Return {iface: (rx_bytes, tx_bytes)}."""
|
|
result: Dict[str, Tuple[int, int]] = {}
|
|
try:
|
|
with open("/proc/net/dev") as fh:
|
|
for line in fh.readlines()[2:]: # skip 2 header lines
|
|
parts = line.split()
|
|
if len(parts) < 10:
|
|
continue
|
|
result[parts[0].rstrip(":")] = (int(parts[1]), int(parts[9]))
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
|
|
class NetworkMonitorPlugin(MonitorPlugin):
|
|
name = "network_monitor"
|
|
description = "Network I/O rates via /proc/net/dev (Linux)"
|
|
interval = 300
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
cfg = config or {}
|
|
self.interval = cfg.get("interval", 300)
|
|
self.skip_ifaces: set = set(cfg.get("skip_interfaces", ["lo"]))
|
|
self._prev: Optional[Tuple[float, Dict[str, Tuple[int, int]]]] = None
|
|
|
|
async def initialize(self) -> bool:
|
|
if platform.system() != "Linux":
|
|
self.skip_reason = "Linux only (/proc/net/dev not available)"
|
|
return False
|
|
dev = _read_net_dev()
|
|
if not dev:
|
|
self.skip_reason = "/proc/net/dev not readable"
|
|
return False
|
|
self._prev = (time.time(), dev) # prime first delta
|
|
return True
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
now = time.time()
|
|
curr = _read_net_dev()
|
|
if not curr or self._prev is None:
|
|
self._prev = (now, curr)
|
|
return {}
|
|
prev_ts, prev = self._prev
|
|
dt = now - prev_ts
|
|
self._prev = (now, curr)
|
|
if dt <= 0:
|
|
return {}
|
|
interfaces: Dict[str, Any] = {}
|
|
for iface, (rx, tx) in curr.items():
|
|
if iface in self.skip_ifaces or iface not in prev:
|
|
continue
|
|
prx, ptx = prev[iface]
|
|
interfaces[iface] = {
|
|
"bytes_recv": rx,
|
|
"bytes_sent": tx,
|
|
"bytes_recv_delta": rx - prx,
|
|
"bytes_sent_delta": tx - ptx,
|
|
}
|
|
return {"interfaces": interfaces} if interfaces else {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin registry — all built-in classes, initialized at startup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ALL_PLUGIN_CLASSES: List[type] = [
|
|
OSInfoPlugin,
|
|
PingMonitorPlugin,
|
|
NagiosRunnerPlugin,
|
|
CPUMonitorPlugin,
|
|
MemoryMonitorPlugin,
|
|
DiskMonitorPlugin,
|
|
NetworkMonitorPlugin,
|
|
]
|
|
|
|
|
|
async def _load_plugins(cfg: Dict[str, Any]) -> List[Plugin]:
|
|
log = logging.getLogger("hbc.plugins")
|
|
plugins_cfg: Dict[str, Any] = cfg.get("plugins", {})
|
|
loaded: List[Plugin] = []
|
|
for cls in _ALL_PLUGIN_CLASSES:
|
|
plugin_cfg = dict(plugins_cfg.get(cls.name) or cfg.get(cls.name) or {})
|
|
if "owner" in cfg and "owner" not in plugin_cfg:
|
|
plugin_cfg["owner"] = cfg["owner"]
|
|
plugin: Plugin = cls(config=plugin_cfg)
|
|
try:
|
|
ok = await plugin.initialize()
|
|
except Exception as e:
|
|
log.error("init %s: %s", cls.name, e)
|
|
ok = False
|
|
if ok:
|
|
loaded.append(plugin)
|
|
log.info("loaded %s (interval=%ds)", plugin.name, plugin.interval)
|
|
else:
|
|
log.info("skip %s: %s", plugin.name, plugin.skip_reason or "init failed")
|
|
return loaded
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Global state
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_running = True
|
|
_dorestart = False
|
|
_shutdown_event: Optional[asyncio.Event] = None
|
|
_active_tasks: List[asyncio.Task] = []
|
|
|
|
PORT = 50003
|
|
INTERVAL = 10
|
|
|
|
|
|
def _shortname(name: str) -> str:
|
|
return name.split(".")[0]
|
|
|
|
|
|
def _stop():
|
|
global _running
|
|
_running = False
|
|
if _shutdown_event:
|
|
_shutdown_event.set()
|
|
for t in _active_tasks:
|
|
if not t.done():
|
|
t.cancel()
|
|
|
|
|
|
async def _sleep(seconds: float):
|
|
"""Sleep for up to `seconds`, waking early on shutdown."""
|
|
try:
|
|
if _shutdown_event:
|
|
await asyncio.wait_for(_shutdown_event.wait(), timeout=seconds)
|
|
else:
|
|
await asyncio.sleep(seconds)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# UDP protocol handler + connection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class _HeartbeatProtocol(asyncio.DatagramProtocol):
|
|
def __init__(self, conn: "AsyncConnection"):
|
|
self._conn = conn
|
|
self._log = logging.getLogger("hbc.proto")
|
|
|
|
def datagram_received(self, data: bytes, addr):
|
|
try:
|
|
msg = _stodict(data)
|
|
if not msg:
|
|
return
|
|
msg_id = msg.get("ID")
|
|
now = time.time()
|
|
if msg_id == "ACK":
|
|
self._conn._handle_ack(msg, now)
|
|
elif msg_id == "CMD":
|
|
asyncio.create_task(_handle_command(self._conn, msg))
|
|
elif msg_id == "UPD":
|
|
asyncio.create_task(_handle_update(self._conn))
|
|
else:
|
|
self._log.debug("unknown msg type: %s", msg_id)
|
|
except Exception as e:
|
|
self._log.error("datagram error: %s", e)
|
|
|
|
def error_received(self, exc):
|
|
self._log.warning("protocol error on %s: %s — will retry", self._conn.addr, exc)
|
|
self._conn.close()
|
|
|
|
|
|
class AsyncConnection:
|
|
def __init__(self, conn_id: int, addr: str, port: int, af: int, name: str):
|
|
self.conn_id = conn_id
|
|
self.addr = addr
|
|
self.port = port
|
|
self.af = af
|
|
self.name = name
|
|
self.ackcount = 0
|
|
self.lastsend = 0.0
|
|
self.rtts: List[float] = [0.0]
|
|
self._transport: Optional[asyncio.DatagramTransport] = None
|
|
self._dead = False
|
|
self._request_info: asyncio.Event = asyncio.Event()
|
|
self._log = logging.getLogger(f"hbc.conn.{addr}")
|
|
|
|
async def open(self) -> bool:
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
self._transport, _ = await loop.create_datagram_endpoint(
|
|
lambda: _HeartbeatProtocol(self), family=self.af
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
self._log.error("open: %s", e)
|
|
return False
|
|
|
|
def close(self):
|
|
if self._transport:
|
|
self._transport.close()
|
|
self._transport = None
|
|
|
|
def _handle_ack(self, msg: Dict[str, Any], now: float):
|
|
rtt = (now - self.lastsend) * 1000.0
|
|
self.rtts.append(rtt)
|
|
if len(self.rtts) > 10:
|
|
self.rtts.pop(0)
|
|
self.ackcount += 1
|
|
if msg.get("request_update"):
|
|
self._request_info.set()
|
|
|
|
async def sendto(self, msg: Dict[str, Any], msg_id: str = "HTB"):
|
|
if self._dead:
|
|
return
|
|
if not self._transport:
|
|
await self.open()
|
|
if not self._transport:
|
|
return
|
|
out = dict(msg)
|
|
out["name"] = _shortname(self.name)
|
|
out["id"] = self.conn_id
|
|
out["time"] = time.time()
|
|
self._transport.sendto(_dicttos(msg_id, out), (self.addr, self.port))
|
|
self.lastsend = time.time()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Server command handlers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _handle_command(conn: AsyncConnection, msg: Dict[str, Any]):
|
|
cmd = msg.get("cmd", "")
|
|
if not cmd:
|
|
return
|
|
log = logging.getLogger("hbc.cmd")
|
|
log.info("exec: %s", cmd)
|
|
try:
|
|
out = subprocess.check_output(
|
|
cmd, shell=True, stderr=subprocess.STDOUT, timeout=30
|
|
).decode()
|
|
status = "OK"
|
|
except subprocess.CalledProcessError as e:
|
|
out, status = str(e), "Error"
|
|
except subprocess.TimeoutExpired:
|
|
out, status = "timed out", "Timeout"
|
|
except Exception as e:
|
|
out, status = str(e), "Error"
|
|
await conn.sendto({"service": "command", "msg": f"{status} {out}"})
|
|
|
|
|
|
async def _handle_update(conn: AsyncConnection):
|
|
log = logging.getLogger("hbc.update")
|
|
installer = shutil.which("hb_install.sh")
|
|
if installer is None:
|
|
candidate = Path(sys.argv[0]).parent / "hb_install.sh"
|
|
if candidate.exists():
|
|
installer = str(candidate)
|
|
if installer is None:
|
|
err = "hb_install.sh not found in PATH or alongside hbc_mini.py"
|
|
log.error(err)
|
|
await conn.sendto({"service": "update", "msg": err})
|
|
return
|
|
log.info("running installer: %s", installer)
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
installer, "mini",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
)
|
|
out, _ = await asyncio.wait_for(proc.communicate(), timeout=120)
|
|
except asyncio.TimeoutError:
|
|
err = "installer timed out"
|
|
log.error(err)
|
|
await conn.sendto({"service": "update", "msg": err})
|
|
return
|
|
except Exception as e:
|
|
err = f"installer failed: {e}"
|
|
log.error(err)
|
|
await conn.sendto({"service": "update", "msg": err})
|
|
return
|
|
if proc.returncode != 0:
|
|
err = f"installer exited {proc.returncode}: {out.decode().strip()}"
|
|
log.error(err)
|
|
await conn.sendto({"service": "update", "msg": err})
|
|
return
|
|
log.info("update successful, restarting")
|
|
await conn.sendto({"service": "update", "msg": "OK"})
|
|
global _dorestart
|
|
_dorestart = True
|
|
_stop()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Heartbeat sender
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _heartbeat_sender(conn: AsyncConnection, interval: int):
|
|
log = logging.getLogger("hbc.hb")
|
|
while _running:
|
|
try:
|
|
await conn.sendto({
|
|
"acks": conn.ackcount,
|
|
"rtt": conn.rtts[-1],
|
|
"interval": interval,
|
|
})
|
|
except Exception as e:
|
|
log.error("send: %s", e)
|
|
await _sleep(interval)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin collection loops
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _run_info_plugins(conn: AsyncConnection, plugins: List[Plugin]):
|
|
log = logging.getLogger("hbc.plugins")
|
|
for plugin in plugins:
|
|
try:
|
|
data = await plugin.collect()
|
|
if data:
|
|
await conn.sendto({"plugin": plugin.name, **data}, "PLG")
|
|
log.info("sent %s", plugin.name)
|
|
except Exception as e:
|
|
log.error("%s collect: %s", plugin.name, e)
|
|
|
|
|
|
async def _run_monitor_group(conn: AsyncConnection, plugins: List[Plugin], interval: int):
|
|
log = logging.getLogger(f"hbc.plugins.{interval}s")
|
|
while _running:
|
|
for plugin in plugins:
|
|
try:
|
|
data = await plugin.collect()
|
|
if data:
|
|
await conn.sendto({"plugin": plugin.name, **data}, "PLG")
|
|
log.debug("sent %s", plugin.name)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as e:
|
|
log.error("%s: %s", plugin.name, e)
|
|
await _sleep(interval)
|
|
|
|
|
|
async def _info_refresh_loop(conn: AsyncConnection, info: List[Plugin]):
|
|
log = logging.getLogger("hbc.plugins")
|
|
while _running:
|
|
await conn._request_info.wait()
|
|
if not _running:
|
|
break
|
|
conn._request_info.clear()
|
|
log.info("refreshing InfoPlugins on server request")
|
|
for plugin in info:
|
|
plugin._cache = None
|
|
await _run_info_plugins(conn, info)
|
|
|
|
|
|
async def _plugin_collector(conn: AsyncConnection, plugins: List[Plugin]):
|
|
info = [p for p in plugins if isinstance(p, InfoPlugin)]
|
|
monitor = [p for p in plugins if isinstance(p, MonitorPlugin)]
|
|
|
|
await _run_info_plugins(conn, info)
|
|
|
|
by_interval: Dict[int, List[Plugin]] = defaultdict(list)
|
|
for p in monitor:
|
|
by_interval[p.interval].append(p)
|
|
|
|
tasks = [asyncio.create_task(_info_refresh_loop(conn, info))]
|
|
tasks += [asyncio.create_task(_run_monitor_group(conn, grp, iv))
|
|
for iv, grp in by_interval.items()]
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Daemonize + syslog
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _daemonize():
|
|
for fork_n in (1, 2):
|
|
try:
|
|
pid = os.fork()
|
|
if pid > 0:
|
|
os._exit(0)
|
|
except OSError as e:
|
|
sys.stderr.write(f"fork #{fork_n} failed: {e}\n")
|
|
os._exit(1)
|
|
if fork_n == 1:
|
|
os.chdir("/")
|
|
os.setsid()
|
|
os.umask(0)
|
|
for fd, path, mode in ((0, "/dev/zero", "r"), (1, "/dev/null", "a+"), (2, "/dev/null", "a+")):
|
|
try:
|
|
os.dup2(open(path, mode).fileno(), fd)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _reconfigure_syslog(level: int):
|
|
root = logging.getLogger()
|
|
for h in root.handlers[:]:
|
|
root.removeHandler(h)
|
|
h.close()
|
|
addr = "/dev/log" if os.path.exists("/dev/log") else ("localhost", 514)
|
|
sh = SysLogHandler(address=addr, facility=SysLogHandler.LOG_DAEMON)
|
|
sh.setFormatter(logging.Formatter("hbc[%(process)d]: %(levelname)s %(message)s"))
|
|
root.addHandler(sh)
|
|
root.setLevel(level)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Async main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _async_main(args, cfg: Dict[str, Any]) -> int:
|
|
global _running, _shutdown_event, _active_tasks, send_shutdown
|
|
_running = True
|
|
_shutdown_event = asyncio.Event()
|
|
_active_tasks = []
|
|
|
|
log = logging.getLogger("hbc.main")
|
|
iam = args.name or socket.gethostname()
|
|
port = cfg.get("hb_port", PORT)
|
|
interval = cfg.get("interval", INTERVAL)
|
|
|
|
log.info("hbc_mini %s on %s -> %s port=%d interval=%ds",__version__, iam, args.hosts, port, interval)
|
|
|
|
connections: List[AsyncConnection] = []
|
|
conn_id = 1
|
|
for host in args.hosts:
|
|
try:
|
|
addrs = socket.getaddrinfo(host, port, 0, 0, socket.SOL_UDP)
|
|
except socket.gaierror as e:
|
|
log.error("cannot resolve %s: %s", host, e)
|
|
continue
|
|
for ai in addrs:
|
|
conn = AsyncConnection(conn_id, ai[4][0], port, ai[0], iam)
|
|
if await conn.open():
|
|
connections.append(conn)
|
|
conn_id += 1
|
|
|
|
if not connections:
|
|
log.error("no connections established")
|
|
return 1
|
|
|
|
# Boot / one-shot message
|
|
send_shutdown = False
|
|
if args.boot or args.message:
|
|
bmsg: Dict[str, Any] = {"acks": 0}
|
|
if args.boot:
|
|
bmsg["boot"] = 1
|
|
args.boot = False # don't repeat on restart
|
|
send_shutdown = True
|
|
if args.message:
|
|
bmsg["service"] = "service"
|
|
bmsg["msg"] = args.message
|
|
target = next((c for c in connections if c._transport), connections[0])
|
|
await target.sendto(bmsg)
|
|
if args.message and not args.daemon:
|
|
await asyncio.sleep(0.3)
|
|
for c in connections:
|
|
c.close()
|
|
return 0
|
|
|
|
plugins = await _load_plugins(cfg)
|
|
|
|
loop = asyncio.get_event_loop()
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(sig, _stop)
|
|
|
|
def _sighup():
|
|
global _dorestart
|
|
_dorestart = True
|
|
_stop()
|
|
|
|
loop.add_signal_handler(signal.SIGHUP, _sighup)
|
|
|
|
for conn in connections:
|
|
_active_tasks.append(asyncio.create_task(_heartbeat_sender(conn, interval)))
|
|
|
|
if plugins and connections:
|
|
_active_tasks.append(asyncio.create_task(_plugin_collector(connections[0], plugins)))
|
|
|
|
try:
|
|
await asyncio.gather(*_active_tasks, return_exceptions=True)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
log.info("shutting down")
|
|
target = next((c for c in connections if c._transport), connections[0] if connections else None)
|
|
if target and send_shutdown:
|
|
try:
|
|
await target.sendto({"shutdown": 1, "acks": target.ackcount})
|
|
except Exception:
|
|
pass
|
|
for conn in connections:
|
|
conn.close()
|
|
await asyncio.sleep(0.3)
|
|
for plugin in plugins:
|
|
await plugin.cleanup()
|
|
return 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main(argv=None):
|
|
global _dorestart
|
|
|
|
parser = argparse.ArgumentParser(
|
|
prog="hbc_mini",
|
|
description="HeartBeat Client — single file, no external dependencies",
|
|
)
|
|
parser.add_argument("-b", "--boot", action="store_true", help="Send boot message")
|
|
parser.add_argument("-c", "--config", dest="configfile", help="Config file (JSON)")
|
|
parser.add_argument("-m", "--message", dest="message", help="Send a one-shot message")
|
|
parser.add_argument("-n", "--name", dest="name", help="Override hostname")
|
|
parser.add_argument("-d", "--daemon", action="store_true", help="Run as daemon")
|
|
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
|
|
parser.add_argument("-x", "--debug", action="count", default=0, help="Debug level")
|
|
parser.add_argument("hosts", nargs="+", help="HBD server(s)")
|
|
args = parser.parse_args(argv)
|
|
|
|
level = logging.WARNING
|
|
if args.verbose:
|
|
level = logging.INFO
|
|
if args.debug:
|
|
level = logging.DEBUG
|
|
logging.basicConfig(
|
|
level=level,
|
|
format="%(asctime)s %(name)s %(levelname)s: %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
cfg = _load_config(args.configfile)
|
|
|
|
if args.daemon:
|
|
_daemonize()
|
|
_reconfigure_syslog(level)
|
|
|
|
try:
|
|
rc = asyncio.run(_async_main(args, cfg))
|
|
except KeyboardInterrupt:
|
|
rc = 0
|
|
except Exception as e:
|
|
logging.error("fatal: %s", e, exc_info=True)
|
|
rc = 1
|
|
|
|
if _dorestart:
|
|
logging.info("restarting...")
|
|
os.execv(sys.argv[0], sys.argv)
|
|
|
|
sys.exit(rc)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|