c93dbdc0f4
Settings page: pass threshold_checker to http.start so the Threshold
Configurations section has data. Use threshold_checker's already-parsed
ThresholdConfig objects instead of re-parsing the raw nested YAML.
Named (non-default) configs now display only their explicit overrides
via threshold_raw_configs, not the full merged set with defaults.
hbc/hbc_mini: send boot and shutdown messages on first connection only
to avoid duplicate packets when multiple servers are configured.
Replace print("Daemonizing...") with logging.info so output goes to
syslog in daemon mode.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1158 lines
38 KiB
Python
Executable File
1158 lines
38 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""hbc_mini — single-file HeartBeat Client with no external dependencies.
|
|
|
|
Drop this file on any host with Python 3.8+ and run it directly:
|
|
python3 hbc_mini.py <hbd-host>
|
|
|
|
Config: ~/.hbc.json (same keys as ~/.hbc.yaml but JSON format)
|
|
|
|
Built-in plugins (always available):
|
|
os_info — OS name, kernel, distro, Python version
|
|
ping_monitor — ICMP ping RTT to configured hosts
|
|
nagios_runner — run Nagios-compatible check scripts
|
|
|
|
Linux-only plugins (via /proc, no extra tools needed):
|
|
cpu_monitor — CPU %, load average, core count
|
|
memory_monitor — RAM and swap usage
|
|
network_monitor — per-interface TX/RX bytes/s
|
|
|
|
Cross-platform (via df subprocess):
|
|
disk_monitor — per-mount usage and capacity
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import re
|
|
import shutil
|
|
import signal
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import zlib
|
|
from abc import ABC, abstractmethod
|
|
from collections import defaultdict
|
|
from logging.handlers import SysLogHandler
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
# updated by scripts/bumpminor.sh
|
|
__version__ = "5.1.17"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Protocol (mirrors hbd/common/proto.py)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _encode_value(v: Any) -> str:
|
|
if isinstance(v, float):
|
|
return f"{v:0.5f}"
|
|
if isinstance(v, (list, dict)):
|
|
return "@" + json.dumps(v)
|
|
if isinstance(v, bool):
|
|
return str(int(v))
|
|
return str(v)
|
|
|
|
|
|
def _decode_value(val: str) -> Any:
|
|
if not val:
|
|
return val
|
|
if val.startswith("@"):
|
|
try:
|
|
return json.loads(val[1:])
|
|
except Exception:
|
|
return val[1:]
|
|
if val[0].isdigit() or (val[0] == "-" and len(val) > 1 and val[1].isdigit()):
|
|
try:
|
|
return int(val)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
return float(val)
|
|
except ValueError:
|
|
pass
|
|
return val
|
|
|
|
|
|
def _dicttos(msg_id: str, d: Dict[str, Any]) -> bytes:
|
|
payload = ";".join(f"{k}={_encode_value(v)}" for k, v in d.items()).encode()
|
|
return ("!" + msg_id + ":").encode() + zlib.compress(payload, 6)
|
|
|
|
|
|
def _stodict(data: bytes) -> Dict[str, Any]:
|
|
result: Dict[str, Any] = {}
|
|
if not data:
|
|
return result
|
|
if chr(data[0]) == "!":
|
|
try:
|
|
payload = zlib.decompress(data[5:]).decode()
|
|
except Exception:
|
|
return {}
|
|
result["ID"] = data[1:4].decode()
|
|
else:
|
|
try:
|
|
head, payload = data.split(b":", 1)
|
|
payload = payload.decode()
|
|
result["ID"] = head.decode()
|
|
except Exception:
|
|
return {}
|
|
for item in payload.split(";"):
|
|
if not item:
|
|
continue
|
|
kv = item.split("=", 1)
|
|
result[kv[0].strip()] = _decode_value(kv[1].strip()) if len(kv) > 1 else None
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config (JSON, default ~/.hbc.json)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_DEFAULTS: Dict[str, Any] = {
|
|
"hb_port": 50003,
|
|
"interval": 10,
|
|
"plugins": {},
|
|
}
|
|
|
|
|
|
def _load_config(path: Optional[str] = None) -> Dict[str, Any]:
|
|
cfg = dict(_DEFAULTS)
|
|
if not path:
|
|
path = os.path.join(os.path.expanduser("~"), ".hbc.json")
|
|
if os.path.exists(path):
|
|
try:
|
|
with open(path) as fh:
|
|
cfg.update(json.load(fh))
|
|
logging.getLogger("hbc.config").info("loaded config from %s", path)
|
|
except Exception as e:
|
|
logging.getLogger("hbc.config").warning("cannot read %s: %s", path, e)
|
|
return cfg
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin base classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class Plugin(ABC):
|
|
name: str = ""
|
|
version: str = "1.0.0"
|
|
description: str = ""
|
|
interval: int = 0
|
|
enabled: bool = True
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
self.config = config or {}
|
|
self.logger = logging.getLogger(f"plugin.{self.name}")
|
|
self.skip_reason: Optional[str] = None
|
|
|
|
@abstractmethod
|
|
async def initialize(self) -> bool: ...
|
|
|
|
@abstractmethod
|
|
async def collect(self) -> Dict[str, Any]: ...
|
|
|
|
async def cleanup(self) -> None:
|
|
pass
|
|
|
|
|
|
class InfoPlugin(Plugin):
|
|
"""Collected once at startup, result is cached."""
|
|
interval = 0
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
self._cache: Optional[Dict[str, Any]] = None
|
|
|
|
async def collect(self) -> Dict[str, Any]:
|
|
if self._cache is None:
|
|
self._cache = await self._collect_info()
|
|
return self._cache
|
|
|
|
@abstractmethod
|
|
async def _collect_info(self) -> Dict[str, Any]: ...
|
|
|
|
|
|
class MonitorPlugin(Plugin):
|
|
"""Collected repeatedly on a fixed interval."""
|
|
interval = 30
|
|
|
|
async def collect(self) -> Dict[str, Any]:
|
|
data = await self._collect_metrics()
|
|
if data:
|
|
data["_timestamp"] = time.time()
|
|
return data
|
|
|
|
@abstractmethod
|
|
async def _collect_metrics(self) -> Dict[str, Any]: ...
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: os_info
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _linux_distro() -> Dict[str, str]:
|
|
info: Dict[str, str] = {}
|
|
for path in ("/etc/os-release", "/etc/lsb-release"):
|
|
if not os.path.exists(path):
|
|
continue
|
|
try:
|
|
with open(path) as fh:
|
|
for line in fh:
|
|
line = line.strip()
|
|
if "=" not in line or line.startswith("#"):
|
|
continue
|
|
k, v = line.split("=", 1)
|
|
v = v.strip('"').strip("'")
|
|
if k == "PRETTY_NAME":
|
|
info["distro_pretty_name"] = v
|
|
elif k == "NAME":
|
|
info.setdefault("distro_name", v)
|
|
elif k == "VERSION_ID":
|
|
info["distro_version_id"] = v
|
|
elif k == "ID":
|
|
info["distro_id"] = v
|
|
except Exception:
|
|
pass
|
|
if info:
|
|
break
|
|
return info
|
|
|
|
|
|
class OSInfoPlugin(InfoPlugin):
|
|
name = "os_info"
|
|
description = "OS and platform information"
|
|
|
|
async def initialize(self) -> bool:
|
|
return True
|
|
|
|
async def _collect_info(self) -> Dict[str, Any]:
|
|
data: Dict[str, Any] = {
|
|
"system": platform.system(),
|
|
"node": platform.node(),
|
|
"release": platform.release(),
|
|
"machine": platform.machine(),
|
|
"architecture": platform.architecture()[0],
|
|
"python_version": platform.python_version(),
|
|
"hbc_version": __version__,
|
|
"hbc_type": "mini",
|
|
}
|
|
if platform.system() == "Linux":
|
|
data.update(_linux_distro())
|
|
elif platform.system() == "Darwin":
|
|
data["macos_version"] = platform.mac_ver()[0]
|
|
return data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: ping_monitor
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class PingMonitorPlugin(MonitorPlugin):
|
|
name = "ping_monitor"
|
|
description = "ICMP ping latency monitoring"
|
|
interval = 60
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
cfg = config or {}
|
|
self.interval = cfg.get("interval", 60)
|
|
self.count = int(cfg.get("count", 3))
|
|
self.timeout = int(cfg.get("timeout", 5))
|
|
raw = cfg.get("hosts", {})
|
|
self.hosts: Dict[str, Any] = {h: {} for h in raw} if isinstance(raw, list) else dict(raw)
|
|
|
|
async def initialize(self) -> bool:
|
|
if not self.hosts:
|
|
self.skip_reason = "no hosts configured"
|
|
return False
|
|
return True
|
|
|
|
async def _ping(self, host: str) -> Dict[str, float]:
|
|
_inf = float("inf")
|
|
_fail: Dict[str, float] = {"rtt_min": _inf, "rtt_avg": _inf, "rtt_max": _inf, "loss": 100.0}
|
|
if sys.platform == "win32":
|
|
cmd = ["ping", "-n", str(self.count), "-w", str(self.timeout * 1000), host]
|
|
else:
|
|
cmd = ["ping", "-c", str(self.count), "-W", str(self.timeout), host]
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
|
)
|
|
out_b, _ = await asyncio.wait_for(
|
|
proc.communicate(), timeout=self.timeout * self.count + 2
|
|
)
|
|
out = out_b.decode(errors="replace")
|
|
except Exception:
|
|
return _fail
|
|
loss = 100.0
|
|
m = re.search(r"(\d+(?:\.\d+)?)\s*%\s*packet\s*loss", out)
|
|
if m:
|
|
loss = float(m.group(1))
|
|
m = re.search(
|
|
r"(?:rtt|round-trip)\s+min/avg/max/\S+\s*=\s*([\d.]+)/([\d.]+)/([\d.]+)", out
|
|
)
|
|
if m:
|
|
return {"rtt_min": float(m.group(1)), "rtt_avg": float(m.group(2)),
|
|
"rtt_max": float(m.group(3)), "loss": loss}
|
|
return _fail
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
tasks = {h: asyncio.create_task(self._ping(h)) for h in self.hosts}
|
|
data: Dict[str, Any] = {}
|
|
for host, task in tasks.items():
|
|
try:
|
|
result = await task
|
|
except Exception:
|
|
result = {"rtt_min": float("inf"), "rtt_avg": float("inf"),
|
|
"rtt_max": float("inf"), "loss": 100.0}
|
|
key = re.sub(r"[^a-zA-Z0-9_]", "_", host)
|
|
for metric, value in result.items():
|
|
data[f"{key}_{metric}"] = value
|
|
return data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: nagios_runner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_NAGIOS_STATUS = {0: "OK", 1: "WARNING", 2: "CRITICAL", 3: "UNKNOWN"}
|
|
|
|
|
|
def _parse_nagios_perfdata(output: str) -> Dict[str, Any]:
|
|
if "|" not in output:
|
|
return {}
|
|
section = output.split("|", 1)[1].strip()
|
|
perfdata: Dict[str, Any] = {}
|
|
for m in re.finditer(
|
|
r"'?([^'=\s]+)'?=([\d.]+)([a-zA-Z%]*);?([\d.]*);?([\d.]*);?([\d.]*);?([\d.]*)",
|
|
section,
|
|
):
|
|
label = m.group(1).strip()
|
|
try:
|
|
perfdata[label] = float(m.group(2))
|
|
except ValueError:
|
|
continue
|
|
if m.group(3):
|
|
perfdata[f"{label}_uom"] = m.group(3)
|
|
for idx, suffix in zip(range(4, 8), ("_warn", "_crit", "_min", "_max")):
|
|
if m.group(idx):
|
|
try:
|
|
perfdata[f"{label}{suffix}"] = float(m.group(idx))
|
|
except ValueError:
|
|
pass
|
|
return perfdata
|
|
|
|
|
|
class NagiosRunnerPlugin(MonitorPlugin):
|
|
name = "nagios_runner"
|
|
description = "Nagios-compatible plugin runner"
|
|
interval = 300
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
cfg = config or {}
|
|
self.commands: List[Dict[str, str]] = cfg.get("commands", [])
|
|
self.timeout: int = cfg.get("timeout", 30)
|
|
self.interval = cfg.get("interval", 300)
|
|
|
|
async def initialize(self) -> bool:
|
|
if not self.commands:
|
|
self.skip_reason = "no commands configured"
|
|
return False
|
|
return True
|
|
|
|
async def _run(self, command: str) -> Tuple[int, str, Dict[str, Any]]:
|
|
try:
|
|
proc = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
try:
|
|
out, err = await asyncio.wait_for(proc.communicate(), timeout=self.timeout)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.communicate()
|
|
return 3, f"timed out after {self.timeout}s", {}
|
|
rc = max(0, min(3, proc.returncode or 0))
|
|
stdout = out.decode(errors="replace").strip()
|
|
stderr = err.decode(errors="replace").strip()
|
|
perf = _parse_nagios_perfdata(stdout)
|
|
msg = stdout.split("|")[0].strip() or stderr
|
|
return rc, msg, perf
|
|
except Exception as e:
|
|
return 3, str(e), {}
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
results: Dict[str, Any] = {}
|
|
worst = 0
|
|
for cmd_cfg in self.commands:
|
|
name = cmd_cfg.get("name")
|
|
command = cmd_cfg.get("command")
|
|
if not name or not command:
|
|
continue
|
|
rc, msg, perf = await self._run(command)
|
|
results[f"{name}_status"] = _NAGIOS_STATUS.get(rc, "UNKNOWN")
|
|
results[f"{name}_status_code"] = rc
|
|
results[f"{name}_output"] = msg
|
|
results.update({f"{name}_{k}": v for k, v in perf.items()})
|
|
worst = max(worst, rc)
|
|
results["overall_status"] = _NAGIOS_STATUS.get(worst, "UNKNOWN")
|
|
results["overall_status_code"] = worst
|
|
results["plugin_count"] = len(self.commands)
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: cpu_monitor (Linux /proc/stat)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _read_proc_stat() -> Optional[Tuple[int, ...]]:
|
|
"""Return CPU jiffies tuple (user, nice, system, idle, iowait, ...)."""
|
|
try:
|
|
with open("/proc/stat") as fh:
|
|
line = fh.readline()
|
|
parts = line.split()
|
|
if parts[0] != "cpu":
|
|
return None
|
|
return tuple(int(x) for x in parts[1:])
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
class CPUMonitorPlugin(MonitorPlugin):
|
|
name = "cpu_monitor"
|
|
description = "CPU usage via /proc/stat (Linux)"
|
|
interval = 300
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
self.interval = (config or {}).get("interval", 300)
|
|
self._prev: Optional[Tuple[int, ...]] = None
|
|
|
|
async def initialize(self) -> bool:
|
|
if platform.system() != "Linux":
|
|
self.skip_reason = "Linux only (/proc/stat not available)"
|
|
return False
|
|
if _read_proc_stat() is None:
|
|
self.skip_reason = "/proc/stat not readable"
|
|
return False
|
|
return True
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
# First call: take a 1-second window for an immediate reading.
|
|
# Subsequent calls: use the full interval between collections.
|
|
if self._prev is None:
|
|
t1 = _read_proc_stat()
|
|
await asyncio.sleep(1)
|
|
else:
|
|
t1 = self._prev
|
|
|
|
t2 = _read_proc_stat()
|
|
if t1 is None or t2 is None:
|
|
return {}
|
|
self._prev = t2
|
|
|
|
# Fields: user(0) nice(1) system(2) idle(3) iowait(4) irq(5) softirq(6) steal(7)
|
|
have_iowait = len(t2) > 4
|
|
idle1 = t1[3] + (t1[4] if have_iowait else 0)
|
|
idle2 = t2[3] + (t2[4] if have_iowait else 0)
|
|
total_delta = sum(t2) - sum(t1)
|
|
if total_delta == 0:
|
|
return {}
|
|
|
|
data: Dict[str, Any] = {
|
|
"cpu_percent": round(100.0 * (1.0 - (idle2 - idle1) / total_delta), 1),
|
|
}
|
|
if have_iowait:
|
|
data["cpu_user"] = round(100.0 * (t2[0] - t1[0]) / total_delta, 1)
|
|
data["cpu_system"] = round(100.0 * (t2[2] - t1[2]) / total_delta, 1)
|
|
data["cpu_idle"] = round(100.0 * (idle2 - idle1) / total_delta, 1)
|
|
data["cpu_iowait"] = round(100.0 * (t2[4] - t1[4]) / total_delta, 1)
|
|
|
|
try:
|
|
la = os.getloadavg()
|
|
data["load_1min"] = round(la[0], 2)
|
|
data["load_5min"] = round(la[1], 2)
|
|
data["load_15min"] = round(la[2], 2)
|
|
except (AttributeError, OSError):
|
|
pass
|
|
|
|
try:
|
|
with open("/proc/cpuinfo") as fh:
|
|
data["cpu_core_count"] = sum(1 for l in fh if l.startswith("processor"))
|
|
except Exception:
|
|
pass
|
|
|
|
return data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: memory_monitor (Linux /proc/meminfo)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _read_meminfo() -> Dict[str, int]:
|
|
result: Dict[str, int] = {}
|
|
try:
|
|
with open("/proc/meminfo") as fh:
|
|
for line in fh:
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
try:
|
|
result[parts[0].rstrip(":")] = int(parts[1])
|
|
except ValueError:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
|
|
class MemoryMonitorPlugin(MonitorPlugin):
|
|
name = "memory_monitor"
|
|
description = "Memory usage via /proc/meminfo (Linux)"
|
|
interval = 300
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
self.interval = (config or {}).get("interval", 300)
|
|
|
|
async def initialize(self) -> bool:
|
|
if platform.system() != "Linux":
|
|
self.skip_reason = "Linux only (/proc/meminfo not available)"
|
|
return False
|
|
if not _read_meminfo():
|
|
self.skip_reason = "/proc/meminfo not readable"
|
|
return False
|
|
return True
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
mi = _read_meminfo()
|
|
if not mi:
|
|
return {}
|
|
total = mi.get("MemTotal", 0)
|
|
avail = mi.get("MemAvailable", mi.get("MemFree", 0))
|
|
free = mi.get("MemFree", 0)
|
|
used = total - avail
|
|
data: Dict[str, Any] = {
|
|
"memory_total": total * 1024,
|
|
"memory_used": used * 1024,
|
|
"memory_available": avail * 1024,
|
|
"memory_free": free * 1024,
|
|
"memory_percent": round(100.0 * used / total, 1) if total else 0.0,
|
|
}
|
|
for field, key in (("Buffers", "memory_buffers"), ("Cached", "memory_cached"),
|
|
("Active", "memory_active"), ("Inactive", "memory_inactive")):
|
|
if field in mi:
|
|
data[key] = mi[field] * 1024
|
|
stotal = mi.get("SwapTotal", 0)
|
|
if stotal:
|
|
sfree = mi.get("SwapFree", 0)
|
|
sused = stotal - sfree
|
|
data["swap_total"] = stotal * 1024
|
|
data["swap_used"] = sused * 1024
|
|
data["swap_free"] = sfree * 1024
|
|
data["swap_percent"] = round(100.0 * sused / stotal, 1)
|
|
return data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: disk_monitor (df -P; Linux, macOS, BSD)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class DiskMonitorPlugin(MonitorPlugin):
|
|
name = "disk_monitor"
|
|
description = "Disk usage via df"
|
|
interval = 300
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
cfg = config or {}
|
|
self.interval = cfg.get("interval", 300)
|
|
self.mounts: List[str] = cfg.get("mounts", [])
|
|
|
|
async def initialize(self) -> bool:
|
|
if sys.platform == "win32":
|
|
self.skip_reason = "Windows not supported"
|
|
return False
|
|
return True
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"df", "-P",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
out, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
|
|
except Exception as e:
|
|
self.logger.warning("df failed: %s", e)
|
|
return {}
|
|
partitions: Dict[str, Any] = {}
|
|
for line in out.decode(errors="replace").splitlines()[1:]:
|
|
parts = line.split()
|
|
if len(parts) < 6:
|
|
continue
|
|
mount = parts[5]
|
|
if self.mounts and mount not in self.mounts:
|
|
continue
|
|
try:
|
|
total_kb = int(parts[1])
|
|
used_kb = int(parts[2])
|
|
avail_kb = int(parts[3])
|
|
pct = int(parts[4].rstrip("%"))
|
|
partitions[mount] = {
|
|
"total": total_kb * 1024,
|
|
"used": used_kb * 1024,
|
|
"free": avail_kb * 1024,
|
|
"percent": pct,
|
|
}
|
|
except (ValueError, IndexError):
|
|
continue
|
|
return {"partitions": partitions} if partitions else {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin: network_monitor (Linux /proc/net/dev)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _read_net_dev() -> Dict[str, Tuple[int, int]]:
|
|
"""Return {iface: (rx_bytes, tx_bytes)}."""
|
|
result: Dict[str, Tuple[int, int]] = {}
|
|
try:
|
|
with open("/proc/net/dev") as fh:
|
|
for line in fh.readlines()[2:]: # skip 2 header lines
|
|
parts = line.split()
|
|
if len(parts) < 10:
|
|
continue
|
|
result[parts[0].rstrip(":")] = (int(parts[1]), int(parts[9]))
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
|
|
class NetworkMonitorPlugin(MonitorPlugin):
|
|
name = "network_monitor"
|
|
description = "Network I/O rates via /proc/net/dev (Linux)"
|
|
interval = 300
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
cfg = config or {}
|
|
self.interval = cfg.get("interval", 300)
|
|
self.skip_ifaces: set = set(cfg.get("skip_interfaces", ["lo"]))
|
|
self._prev: Optional[Tuple[float, Dict[str, Tuple[int, int]]]] = None
|
|
|
|
async def initialize(self) -> bool:
|
|
if platform.system() != "Linux":
|
|
self.skip_reason = "Linux only (/proc/net/dev not available)"
|
|
return False
|
|
dev = _read_net_dev()
|
|
if not dev:
|
|
self.skip_reason = "/proc/net/dev not readable"
|
|
return False
|
|
self._prev = (time.time(), dev) # prime first delta
|
|
return True
|
|
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
now = time.time()
|
|
curr = _read_net_dev()
|
|
if not curr or self._prev is None:
|
|
self._prev = (now, curr)
|
|
return {}
|
|
prev_ts, prev = self._prev
|
|
dt = now - prev_ts
|
|
self._prev = (now, curr)
|
|
if dt <= 0:
|
|
return {}
|
|
interfaces: Dict[str, Any] = {}
|
|
for iface, (rx, tx) in curr.items():
|
|
if iface in self.skip_ifaces or iface not in prev:
|
|
continue
|
|
prx, ptx = prev[iface]
|
|
interfaces[iface] = {
|
|
"bytes_recv": rx,
|
|
"bytes_sent": tx,
|
|
"bytes_recv_delta": rx - prx,
|
|
"bytes_sent_delta": tx - ptx,
|
|
}
|
|
return {"interfaces": interfaces} if interfaces else {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin registry — all built-in classes, initialized at startup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ALL_PLUGIN_CLASSES: List[type] = [
|
|
OSInfoPlugin,
|
|
PingMonitorPlugin,
|
|
NagiosRunnerPlugin,
|
|
CPUMonitorPlugin,
|
|
MemoryMonitorPlugin,
|
|
DiskMonitorPlugin,
|
|
NetworkMonitorPlugin,
|
|
]
|
|
|
|
|
|
async def _load_plugins(cfg: Dict[str, Any]) -> List[Plugin]:
|
|
log = logging.getLogger("hbc.plugins")
|
|
plugins_cfg: Dict[str, Any] = cfg.get("plugins", {})
|
|
loaded: List[Plugin] = []
|
|
for cls in _ALL_PLUGIN_CLASSES:
|
|
plugin_cfg = plugins_cfg.get(cls.name) or cfg.get(cls.name, {})
|
|
plugin: Plugin = cls(config=plugin_cfg)
|
|
try:
|
|
ok = await plugin.initialize()
|
|
except Exception as e:
|
|
log.error("init %s: %s", cls.name, e)
|
|
ok = False
|
|
if ok:
|
|
loaded.append(plugin)
|
|
log.info("loaded %s (interval=%ds)", plugin.name, plugin.interval)
|
|
else:
|
|
log.info("skip %s: %s", plugin.name, plugin.skip_reason or "init failed")
|
|
return loaded
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Global state
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_running = True
|
|
_dorestart = False
|
|
_shutdown_event: Optional[asyncio.Event] = None
|
|
_active_tasks: List[asyncio.Task] = []
|
|
|
|
PORT = 50003
|
|
INTERVAL = 10
|
|
|
|
|
|
def _shortname(name: str) -> str:
|
|
return name.split(".")[0]
|
|
|
|
|
|
def _stop():
|
|
global _running
|
|
_running = False
|
|
if _shutdown_event:
|
|
_shutdown_event.set()
|
|
for t in _active_tasks:
|
|
if not t.done():
|
|
t.cancel()
|
|
|
|
|
|
async def _sleep(seconds: float):
|
|
"""Sleep for up to `seconds`, waking early on shutdown."""
|
|
try:
|
|
if _shutdown_event:
|
|
await asyncio.wait_for(_shutdown_event.wait(), timeout=seconds)
|
|
else:
|
|
await asyncio.sleep(seconds)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# UDP protocol handler + connection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class _HeartbeatProtocol(asyncio.DatagramProtocol):
|
|
def __init__(self, conn: "AsyncConnection"):
|
|
self._conn = conn
|
|
self._log = logging.getLogger("hbc.proto")
|
|
|
|
def datagram_received(self, data: bytes, addr):
|
|
try:
|
|
msg = _stodict(data)
|
|
if not msg:
|
|
return
|
|
msg_id = msg.get("ID")
|
|
now = time.time()
|
|
if msg_id == "ACK":
|
|
self._conn._handle_ack(now)
|
|
elif msg_id == "CMD":
|
|
asyncio.create_task(_handle_command(self._conn, msg))
|
|
elif msg_id == "UPD":
|
|
asyncio.create_task(_handle_update(self._conn))
|
|
else:
|
|
self._log.debug("unknown msg type: %s", msg_id)
|
|
except Exception as e:
|
|
self._log.error("datagram error: %s", e)
|
|
|
|
def error_received(self, exc):
|
|
self._log.warning("protocol error on %s: %s — dropping connection", self._conn.addr, exc)
|
|
self._conn._dead = True
|
|
self._conn.close()
|
|
|
|
|
|
class AsyncConnection:
|
|
def __init__(self, conn_id: int, addr: str, port: int, af: int, name: str):
|
|
self.conn_id = conn_id
|
|
self.addr = addr
|
|
self.port = port
|
|
self.af = af
|
|
self.name = name
|
|
self.ackcount = 0
|
|
self.lastsend = 0.0
|
|
self.rtts: List[float] = [0.0]
|
|
self._transport: Optional[asyncio.DatagramTransport] = None
|
|
self._dead = False
|
|
self._log = logging.getLogger(f"hbc.conn.{addr}")
|
|
|
|
async def open(self) -> bool:
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
self._transport, _ = await loop.create_datagram_endpoint(
|
|
lambda: _HeartbeatProtocol(self), family=self.af
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
self._log.error("open: %s", e)
|
|
return False
|
|
|
|
def close(self):
|
|
if self._transport:
|
|
self._transport.close()
|
|
self._transport = None
|
|
|
|
def _handle_ack(self, now: float):
|
|
rtt = (now - self.lastsend) * 1000.0
|
|
self.rtts.append(rtt)
|
|
if len(self.rtts) > 10:
|
|
self.rtts.pop(0)
|
|
self.ackcount += 1
|
|
|
|
async def sendto(self, msg: Dict[str, Any], msg_id: str = "HTB"):
|
|
if self._dead:
|
|
return
|
|
if not self._transport:
|
|
await self.open()
|
|
if not self._transport:
|
|
return
|
|
out = dict(msg)
|
|
out["name"] = _shortname(self.name)
|
|
out["id"] = self.conn_id
|
|
out["time"] = time.time()
|
|
self._transport.sendto(_dicttos(msg_id, out), (self.addr, self.port))
|
|
self.lastsend = time.time()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Server command handlers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _handle_command(conn: AsyncConnection, msg: Dict[str, Any]):
|
|
cmd = msg.get("cmd", "")
|
|
if not cmd:
|
|
return
|
|
log = logging.getLogger("hbc.cmd")
|
|
log.info("exec: %s", cmd)
|
|
try:
|
|
out = subprocess.check_output(
|
|
cmd, shell=True, stderr=subprocess.STDOUT, timeout=30
|
|
).decode()
|
|
status = "OK"
|
|
except subprocess.CalledProcessError as e:
|
|
out, status = str(e), "Error"
|
|
except subprocess.TimeoutExpired:
|
|
out, status = "timed out", "Timeout"
|
|
except Exception as e:
|
|
out, status = str(e), "Error"
|
|
await conn.sendto({"service": "command", "msg": f"{status} {out}"})
|
|
|
|
|
|
async def _handle_update(conn: AsyncConnection):
|
|
log = logging.getLogger("hbc.update")
|
|
installer = shutil.which("hb_install.sh")
|
|
if installer is None:
|
|
candidate = Path(sys.argv[0]).parent / "hb_install.sh"
|
|
if candidate.exists():
|
|
installer = str(candidate)
|
|
if installer is None:
|
|
err = "hb_install.sh not found in PATH or alongside hbc_mini.py"
|
|
log.error(err)
|
|
await conn.sendto({"service": "update", "msg": err})
|
|
return
|
|
log.info("running installer: %s", installer)
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
installer, "mini",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
)
|
|
out, _ = await asyncio.wait_for(proc.communicate(), timeout=120)
|
|
except asyncio.TimeoutError:
|
|
err = "installer timed out"
|
|
log.error(err)
|
|
await conn.sendto({"service": "update", "msg": err})
|
|
return
|
|
except Exception as e:
|
|
err = f"installer failed: {e}"
|
|
log.error(err)
|
|
await conn.sendto({"service": "update", "msg": err})
|
|
return
|
|
if proc.returncode != 0:
|
|
err = f"installer exited {proc.returncode}: {out.decode().strip()}"
|
|
log.error(err)
|
|
await conn.sendto({"service": "update", "msg": err})
|
|
return
|
|
log.info("update successful, restarting")
|
|
await conn.sendto({"service": "update", "msg": "OK"})
|
|
global _dorestart
|
|
_dorestart = True
|
|
_stop()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Heartbeat sender
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _heartbeat_sender(conn: AsyncConnection, interval: int):
|
|
log = logging.getLogger("hbc.hb")
|
|
while _running:
|
|
try:
|
|
await conn.sendto({
|
|
"acks": conn.ackcount,
|
|
"rtt": conn.rtts[-1],
|
|
"interval": interval,
|
|
})
|
|
except Exception as e:
|
|
log.error("send: %s", e)
|
|
await _sleep(interval)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin collection loops
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _run_info_plugins(conn: AsyncConnection, plugins: List[Plugin]):
|
|
log = logging.getLogger("hbc.plugins")
|
|
for plugin in plugins:
|
|
try:
|
|
data = await plugin.collect()
|
|
if data:
|
|
await conn.sendto({"plugin": plugin.name, **data}, "PLG")
|
|
log.info("sent %s", plugin.name)
|
|
except Exception as e:
|
|
log.error("%s collect: %s", plugin.name, e)
|
|
|
|
|
|
async def _run_monitor_group(conn: AsyncConnection, plugins: List[Plugin], interval: int):
|
|
log = logging.getLogger(f"hbc.plugins.{interval}s")
|
|
while _running:
|
|
for plugin in plugins:
|
|
try:
|
|
data = await plugin.collect()
|
|
if data:
|
|
await conn.sendto({"plugin": plugin.name, **data}, "PLG")
|
|
log.debug("sent %s", plugin.name)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as e:
|
|
log.error("%s: %s", plugin.name, e)
|
|
await _sleep(interval)
|
|
|
|
|
|
async def _plugin_collector(conn: AsyncConnection, plugins: List[Plugin]):
|
|
info = [p for p in plugins if isinstance(p, InfoPlugin)]
|
|
monitor = [p for p in plugins if isinstance(p, MonitorPlugin)]
|
|
|
|
await _run_info_plugins(conn, info)
|
|
|
|
by_interval: Dict[int, List[Plugin]] = defaultdict(list)
|
|
for p in monitor:
|
|
by_interval[p.interval].append(p)
|
|
|
|
if by_interval:
|
|
await asyncio.gather(
|
|
*[asyncio.create_task(_run_monitor_group(conn, grp, iv))
|
|
for iv, grp in by_interval.items()],
|
|
return_exceptions=True,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Daemonize + syslog
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _daemonize():
|
|
for fork_n in (1, 2):
|
|
try:
|
|
pid = os.fork()
|
|
if pid > 0:
|
|
os._exit(0)
|
|
except OSError as e:
|
|
sys.stderr.write(f"fork #{fork_n} failed: {e}\n")
|
|
os._exit(1)
|
|
if fork_n == 1:
|
|
os.chdir("/")
|
|
os.setsid()
|
|
os.umask(0)
|
|
for fd, path, mode in ((0, "/dev/zero", "r"), (1, "/dev/null", "a+"), (2, "/dev/null", "a+")):
|
|
try:
|
|
os.dup2(open(path, mode).fileno(), fd)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _reconfigure_syslog(level: int):
|
|
root = logging.getLogger()
|
|
for h in root.handlers[:]:
|
|
root.removeHandler(h)
|
|
h.close()
|
|
addr = "/dev/log" if os.path.exists("/dev/log") else ("localhost", 514)
|
|
sh = SysLogHandler(address=addr, facility=SysLogHandler.LOG_DAEMON)
|
|
sh.setFormatter(logging.Formatter("hbc[%(process)d]: %(levelname)s %(message)s"))
|
|
root.addHandler(sh)
|
|
root.setLevel(level)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Async main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _async_main(args, cfg: Dict[str, Any]) -> int:
|
|
global _running, _shutdown_event, _active_tasks
|
|
_running = True
|
|
_shutdown_event = asyncio.Event()
|
|
_active_tasks = []
|
|
|
|
log = logging.getLogger("hbc.main")
|
|
iam = args.name or socket.gethostname()
|
|
port = cfg.get("hb_port", PORT)
|
|
interval = cfg.get("interval", INTERVAL)
|
|
|
|
log.info("starting: %s -> %s port=%d interval=%ds", iam, args.hosts, port, interval)
|
|
|
|
connections: List[AsyncConnection] = []
|
|
conn_id = 1
|
|
for host in args.hosts:
|
|
try:
|
|
addrs = socket.getaddrinfo(host, port, 0, 0, socket.SOL_UDP)
|
|
except socket.gaierror as e:
|
|
log.error("cannot resolve %s: %s", host, e)
|
|
continue
|
|
for ai in addrs:
|
|
conn = AsyncConnection(conn_id, ai[4][0], port, ai[0], iam)
|
|
if await conn.open():
|
|
connections.append(conn)
|
|
conn_id += 1
|
|
|
|
if not connections:
|
|
log.error("no connections established")
|
|
return 1
|
|
|
|
# Boot / one-shot message
|
|
if args.boot or args.message:
|
|
bmsg: Dict[str, Any] = {"acks": 0}
|
|
if args.boot:
|
|
bmsg["boot"] = 1
|
|
if args.message:
|
|
bmsg["service"] = "service"
|
|
bmsg["msg"] = args.message
|
|
for c in connections:
|
|
await c.sendto(bmsg)
|
|
break
|
|
if args.message and not args.daemon:
|
|
await asyncio.sleep(0.3)
|
|
for c in connections:
|
|
c.close()
|
|
return 0
|
|
|
|
plugins = await _load_plugins(cfg)
|
|
|
|
loop = asyncio.get_event_loop()
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(sig, _stop)
|
|
|
|
def _sighup():
|
|
global _dorestart
|
|
_dorestart = True
|
|
_stop()
|
|
|
|
loop.add_signal_handler(signal.SIGHUP, _sighup)
|
|
|
|
for conn in connections:
|
|
_active_tasks.append(asyncio.create_task(_heartbeat_sender(conn, interval)))
|
|
|
|
if plugins and connections:
|
|
_active_tasks.append(asyncio.create_task(_plugin_collector(connections[0], plugins)))
|
|
|
|
try:
|
|
await asyncio.gather(*_active_tasks, return_exceptions=True)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
log.info("shutting down")
|
|
for conn in connections:
|
|
try:
|
|
await conn.sendto({"shutdown": 1, "acks": conn.ackcount})
|
|
except Exception:
|
|
pass
|
|
break
|
|
for conn in connections:
|
|
conn.close()
|
|
await asyncio.sleep(0.3)
|
|
for plugin in plugins:
|
|
await plugin.cleanup()
|
|
return 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main(argv=None):
|
|
global _dorestart
|
|
|
|
parser = argparse.ArgumentParser(
|
|
prog="hbc_mini",
|
|
description="HeartBeat Client — single file, no external dependencies",
|
|
)
|
|
parser.add_argument("-b", "--boot", action="store_true", help="Send boot message")
|
|
parser.add_argument("-c", "--config", dest="configfile", help="Config file (JSON)")
|
|
parser.add_argument("-m", "--message", dest="message", help="Send a one-shot message")
|
|
parser.add_argument("-n", "--name", dest="name", help="Override hostname")
|
|
parser.add_argument("-d", "--daemon", action="store_true", help="Run as daemon")
|
|
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
|
|
parser.add_argument("-x", "--debug", action="count", default=0, help="Debug level")
|
|
parser.add_argument("hosts", nargs="+", help="HBD server(s)")
|
|
args = parser.parse_args(argv)
|
|
|
|
level = logging.WARNING
|
|
if args.verbose:
|
|
level = logging.INFO
|
|
if args.debug:
|
|
level = logging.DEBUG
|
|
logging.basicConfig(
|
|
level=level,
|
|
format="%(asctime)s %(name)s %(levelname)s: %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
cfg = _load_config(args.configfile)
|
|
|
|
if args.daemon:
|
|
_daemonize()
|
|
_reconfigure_syslog(level)
|
|
|
|
try:
|
|
rc = asyncio.run(_async_main(args, cfg))
|
|
except KeyboardInterrupt:
|
|
rc = 0
|
|
except Exception as e:
|
|
logging.error("fatal: %s", e, exc_info=True)
|
|
rc = 1
|
|
|
|
if _dorestart:
|
|
logging.info("restarting...")
|
|
os.execv(sys.argv[0], sys.argv)
|
|
|
|
sys.exit(rc)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|