Compare commits

...

10 Commits

Author SHA1 Message Date
Andreas Wrede 3232239a85 version 5.0.11
Release / release (push) Successful in 5s
2026-04-07 14:19:46 -04:00
Andreas Wrede 014781de5e Merge branch 'master' of git.wrede.ca:andreas/heartbeat 2026-04-07 14:16:12 -04:00
Andreas Wrede 68b1c65384 version 5.0.10 2026-04-07 14:15:46 -04:00
Andreas Wrede e8bb553349 version 5.0.10
Release / release (push) Failing after 4s
2026-04-07 14:11:03 -04:00
Andreas Wrede e4ecb8723f release a pypi package on gitea 2026-04-07 14:10:07 -04:00
Andreas Wrede 5edbaacf81 version 5.0.9
Release / release (push) Successful in 15s
2026-04-07 11:02:19 -04:00
Andreas Wrede 8421f472f2 there is only one __version__ 2026-04-07 11:00:22 -04:00
Andreas Wrede 51f9bdc2b5 use SO_TIMESTAMP, works on Linux, FreeBSD and macOS 2026-04-07 10:46:54 -04:00
andreas 02bc42fbf0 get rtt time differently 2026-04-07 10:40:12 -04:00
andreas 832a8b0bda save state to pickle file, restart timers on restart 2026-04-06 17:24:59 -04:00
13 changed files with 337 additions and 107 deletions
+8 -1
View File
@@ -25,7 +25,7 @@ jobs:
- name: Install build tools
run: |
python -m pip install --upgrade pip
# pip install build twine
pip install build twine
- name: Build package
run: python -m build
@@ -34,6 +34,13 @@ jobs:
id: get_version
run: echo "VERSION=${GITHUB_REF#refs/tags/v}" >> $GITHUB_OUTPUT
- name: Upload to Gitea PyPI registry
env:
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }}
run: |
python -m twine upload --repository-url https://git.wrede.ca/api/packages/andreas/pypi dist/*
- name: Create release
uses: actions/gitea-release-action@v1
with:
+3 -2
View File
@@ -7,6 +7,7 @@ logfile: "/home/andreas/logs/heartbeat/heartbeat.log"
logfmt: "msg"
grace: 40
interval: 10
autosave_interval: 300 # Autosave interval in seconds (default: 5 minutes)
# Notification Channels - Define notification providers centrally
# Each channel has a type (pushover, email, signal, mattermost) and type-specific configuration
@@ -159,7 +160,7 @@ threshold_configs:
warning: 85.0
critical: 90.0
rtt:
warning: 50
warning: 200
critical: 250.0
@@ -204,7 +205,7 @@ threshold_configs:
critical: 2
operator: ">="
rtt:
warning: 50
warning: 200
critical: 250.0
truenas_server:
+1 -1
View File
@@ -14,4 +14,4 @@ Install options:
"""
__all__ = ["__version__"]
__version__ = "5.0.8"
__version__ = "5.0.11"
+1 -1
View File
@@ -1,3 +1,3 @@
"""HeartBeat Client (hbc) - System monitoring client."""
__version__ = "5.0.5"
from hbd import __version__
+1 -1
View File
@@ -1,3 +1,3 @@
"""Common utilities shared between hbc and hbd."""
__version__ = "5.0.5"
from hbd import __version__
+1 -1
View File
@@ -1,3 +1,3 @@
"""HeartBeat Daemon (hbd) - Server/daemon component."""
__version__ = "5.0.5"
from hbd import __version__
+1 -1
View File
@@ -189,7 +189,7 @@ class Connection:
except Exception:
pass
self.addr = addr
Connection.htab[addr] = self.host.nameconnection_count
Connection.htab[addr] = self.host.name
if self.host.isDynDns():
Host.dnsQ.put((self.host.name, self.addr))
return r
+68 -17
View File
@@ -22,12 +22,12 @@ eventlog = notify_mod.eventlog
# shared runtime collections and helpers
def cleanup_function(config, hbdclass):
"""This function will be executed upon program exit."""
logger.info("Running cleanup function...")
def save_state(config, hbdclass):
"""Save current state to pickle file. Safe to call at any time."""
import pickle
import os
# Ensure all timer references are cleared before pickling
# Clear timer references before pickling (they can't be serialized)
for hostname, host in list(hbdclass.Host.hosts.items()):
for conn_type, conn in host.connections.items():
if hasattr(conn, 'cancel_overdue_timer'):
@@ -40,13 +40,26 @@ def cleanup_function(config, hbdclass):
conn.timeout_duration = None
pickfile = config.get("pickfile", "hbd.pickle")
tmpfile = pickfile + ".tmp"
pickf = open(pickfile, "wb")
pick = pickle.Pickler(pickf)
pick.dump(hbdclass.Host.hosts)
pick.dump(data.msgs)
pickf.close()
try:
with open(tmpfile, "wb") as pickf:
pick = pickle.Pickler(pickf)
pick.dump(hbdclass.Host.hosts)
pick.dump(data.msgs)
os.replace(tmpfile, pickfile)
except Exception as e:
logger.error("Failed to save state: %s", e)
try:
os.unlink(tmpfile)
except Exception:
pass
def cleanup_function(config, hbdclass):
"""This function will be executed upon program exit."""
logger.info("Running cleanup function...")
save_state(config, hbdclass)
logger.info("Cleanup complete.")
@@ -160,14 +173,20 @@ async def _run_async(config, config_path=None):
f"Warning: Could not reset IPV6_V6ONLY not supported or dual-stack is unavailable. Error: {e}"
)
# 3. Bind to all interfaces (::) on a specific port
# UDP server endpoint (handler wired to handle_datagram with context)
bind_addr = ("::", config.get("hb_port", 50003))
sock.bind(bind_addr)
logger.info("Starting UDP server on %s:%s", *bind_addr)
def udp_handler(msg, addr, transport):
# Try to enable kernel receive timestamps (Linux SO_TIMESTAMPNS).
# If supported, read datagrams via recvmsg() so RTT uses the kernel
# timestamp rather than the time.time() call after asyncio scheduling.
use_kernel_ts = udp.enable_kernel_timestamps(sock)
if use_kernel_ts:
logger.info("SO_TIMESTAMPNS enabled: using kernel receive timestamps for RTT")
else:
logger.info("SO_TIMESTAMPNS not available: using time.time() for RTT")
def udp_handler(msg, addr, transport, recv_ts=None):
ctx = dict(
config=config,
hbdclass=hbdclass,
@@ -177,13 +196,32 @@ async def _run_async(config, config_path=None):
threshold_checker=threshold_checker,
DEBUG=config.get("debug", 0),
verbose=config.get("verbose", False),
recv_ts=recv_ts,
)
udp.handle_datagram(msg, addr, transport, ctx)
transport, protocol = await loop.create_datagram_endpoint(
lambda: udp.EchoServerProtocol(config=config, handler=udp_handler),
sock=sock,
if use_kernel_ts:
# recvmsg path: manage the socket ourselves with loop.add_reader()
sock.setblocking(False)
transport = udp.RecvmsgTransport(loop, sock)
reader = udp.make_recvmsg_reader(sock, udp_handler, transport)
loop.add_reader(sock.fileno(), reader)
protocol = None
else:
transport, protocol = await loop.create_datagram_endpoint(
lambda: udp.EchoServerProtocol(config=config, handler=udp_handler),
sock=sock,
)
# Restore connection timers for hosts loaded from pickle
restore_ctx = dict(
config=config,
hbdclass=hbdclass,
log=eventlog,
msg_to_websockets=msg_to_websockets,
threshold_checker=threshold_checker,
)
udp.restore_connection_timers(hbdclass, restore_ctx)
# HTTP server (asyncio-based via aiohttp)
try:
@@ -257,6 +295,19 @@ async def _run_async(config, config_path=None):
except Exception as e:
logger.exception("websocket server failed to start: %s", e)
# Periodic autosave task
autosave_interval = config.get("autosave_interval", 300) # default: 5 minutes
async def autosave_task():
while True:
await asyncio.sleep(autosave_interval)
logger.debug("Autosaving state...")
save_state(config, hbdclass)
logger.debug("Autosave complete (%d hosts)", len(hbdclass.Host.hosts))
autosave = asyncio.create_task(autosave_task())
logger.info("Autosave task started (interval: %ds)", autosave_interval)
# Main event loop - monitor shutdown and reload events
try:
while True:
@@ -304,7 +355,7 @@ async def _run_async(config, config_path=None):
except Exception as e:
logger.warning("Error closing UDP transport: %s", e)
tasks_to_cancel = [http_task, ws_task]
tasks_to_cancel = [http_task, ws_task, autosave]
for task in tasks_to_cancel:
if task:
try:
+15 -5
View File
@@ -459,23 +459,27 @@
}
}
// Protocol metadata fields injected by the client never plugin metrics
const SKIP_FIELDS = new Set(['id', 'name']);
function renderPluginData(data, timestamp) {
// Check if this should be rendered as a simple table
const pluginName = getCurrentPluginName();
const simplePlugins = ['os_info', 'cpu_monitor', 'memory_monitor', 'nagios_runner'];
if (simplePlugins.includes(pluginName) && isSimpleKeyValueData(data)) {
return renderSimpleDataTable(data, timestamp);
}
let html = '<div class="metric-grid">';
for (const [key, value] of Object.entries(data)) {
if (SKIP_FIELDS.has(key)) continue;
// Skip nested objects for now, handle them separately
if (typeof value === 'object' && value !== null) {
continue;
}
html += renderMetric(key, value);
}
@@ -572,10 +576,11 @@
// Table body
html += '<tbody>';
for (const [key, value] of Object.entries(data)) {
if (SKIP_FIELDS.has(key)) continue;
const label = formatLabel(key);
const formattedValue = formatValue(key, value);
const unit = getUnit(key);
html += '<tr>';
html += `<td class="name">${label}</td>`;
html += `<td class="value">${formattedValue}${unit ? ' ' + unit : ''}</td>`;
@@ -1012,12 +1017,17 @@
}
function formatLabel(key) {
if (key === 'time') return 'Collected At';
return key
.replace(/_/g, ' ')
.replace(/\b\w/g, l => l.toUpperCase());
}
function formatValue(key, value) {
// Epoch timestamp field sent by the client alongside plugin data
if (key === 'time' && typeof value === 'number') {
return new Date(value * 1000).toLocaleString();
}
if (typeof value === 'number') {
// Format percentages
if (key.includes('percent') || key.includes('usage')) {
+197 -49
View File
@@ -1,6 +1,9 @@
"""UDP listener and datagram processing."""
import asyncio
import socket
import struct
import time
import zlib
import logging
@@ -11,6 +14,99 @@ from . import notify as notify_mod
logger = logging.getLogger(__name__)
eventlog = notify_mod.eventlog
# SO_TIMESTAMP: kernel attaches a struct timeval to each received datagram.
# Supported on Linux, FreeBSD, and macOS. The constant is not exposed by
# Python's socket module on all platforms, so fall back to the Linux value (29)
# when absent.
_SO_TIMESTAMP = getattr(socket, 'SO_TIMESTAMP', 29)
# struct timeval uses two native C longs: tv_sec and tv_usec
_TIMEVAL = struct.Struct('@ll')
def enable_kernel_timestamps(sock) -> bool:
"""Try to enable SO_TIMESTAMP on *sock*.
Returns True if the kernel will supply receive timestamps, False otherwise
(unsupported platform, older kernel, or insufficient permissions).
"""
try:
sock.setsockopt(socket.SOL_SOCKET, _SO_TIMESTAMP, 1)
return True
except OSError:
return False
def _extract_kernel_ts(ancdata) -> float | None:
"""Parse recvmsg ancillary data and return the kernel receive time.
Returns seconds as a float, or None if no SO_TIMESTAMP cmsg is present.
"""
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if cmsg_level == socket.SOL_SOCKET and cmsg_type == _SO_TIMESTAMP:
if len(cmsg_data) >= _TIMEVAL.size:
sec, usec = _TIMEVAL.unpack_from(cmsg_data)
return sec + usec * 1e-6
return None
class RecvmsgTransport:
"""Thin wrapper used when SO_TIMESTAMP is active (add_reader path).
Exposes the same sendto() / close() interface as asyncio's DatagramTransport
so the rest of the code does not need to know which path is in use.
"""
def __init__(self, loop, sock):
self._loop = loop
self._sock = sock
def sendto(self, data, addr):
try:
self._sock.sendto(data, addr)
except Exception as e:
logger.debug("sendto failed: %s", e)
def close(self):
try:
self._loop.remove_reader(self._sock.fileno())
except Exception:
pass
try:
self._sock.close()
except Exception:
pass
def make_recvmsg_reader(sock, handler, transport):
"""Return a callback suitable for loop.add_reader().
Reads one datagram per call using recvmsg() so that kernel timestamps in
the ancillary data are accessible. Falls back to time.time() if the
cmsg is missing.
handler(msg, addr, transport, kernel_ts) same signature as udp_handler
in main.py with the optional kernel_ts argument.
"""
BUFSIZE = 65536
ANCBUFSIZE = 128 # enough for one struct timespec cmsg
def _read():
try:
data, ancdata, _, addr = sock.recvmsg(BUFSIZE, ANCBUFSIZE)
except BlockingIOError:
return
except OSError as e:
logger.warning("recvmsg error: %s", e)
return
try:
kernel_ts = _extract_kernel_ts(ancdata)
msg = parse_message(data)
if msg:
handler(msg, addr, transport, kernel_ts)
except Exception:
logger.exception("Error processing datagram from %s", addr)
return _read
class EchoServerProtocol(asyncio.DatagramProtocol):
def __init__(self, config=None, handler=None):
@@ -61,6 +157,100 @@ def dicttos(ID, d):
return opk
DROPOVERDUE = 7 * 24 * 3600 # seconds before an overdue host becomes UNKNOWN
def _make_timer_callbacks(uname, host, watchhosts, ctx):
"""Return (on_overdue, on_unknown) async callbacks for connection timer logic.
Captured values are bound at call time so callbacks are safe to use in loops.
"""
msg_to_websockets = ctx.get("msg_to_websockets")
threshold_checker = ctx.get("threshold_checker")
cfg = ctx.get("config", {})
async def on_unknown(connection):
connection.newstate(connection.__class__.UNKNOWN, connection.lastbeat)
if msg_to_websockets:
msg_to_websockets("host", host.stateinfo())
async def on_overdue(connection):
if connection.getstate() != connection.__class__.UP:
return
now = time.time()
connection.newstate(connection.__class__.OVERDUE, now, cfg.get("grace", 2))
msg = f"{connection.afam} overdue"
eventlog(uname, "CRITICAL" if uname in watchhosts else "WARNING", msg)
if uname in watchhosts:
notify_mod.pushmsg_for_host(uname, f"{uname} {msg}")
if threshold_checker:
threshold_checker.check_value(
host_name=uname,
metric_path="rtt",
value=float("inf"),
alert_states=host.alert_states,
)
if msg_to_websockets:
msg_to_websockets("host", host.stateinfo())
connection.reset_overdue_timer(DROPOVERDUE, on_unknown)
return on_overdue, on_unknown
def restore_connection_timers(hbdclass, ctx):
"""Restore overdue timers for all loaded connections after a pickle restore.
For UP connections, the remaining time until overdue is calculated from
lastbeat so that clients that vanished during hbd's downtime are detected.
For OVERDUE connections, the UNKNOWN drop timer is restored.
"""
now = time.time()
cfg = ctx.get("config", {})
grace = cfg.get("grace", 2)
from . import config as config_mod
watchhosts = config_mod.get_watchhosts(cfg)
restored = 0
for uname, host in list(hbdclass.Host.hosts.items()):
interval = host.interval
for afam, conn in list(host.connections.items()):
state = conn.getstate()
if state == hbdclass.Connection.DOWN:
continue
on_overdue, on_unknown = _make_timer_callbacks(uname, host, watchhosts, ctx)
if state == hbdclass.Connection.UP and interval > 0:
elapsed = now - conn.lastbeat
remaining = max(1.0, (interval + grace) - elapsed)
conn.reset_overdue_timer(remaining, on_overdue)
logger.debug(
"Restored UP timer %s/%s: %.0fs remaining (elapsed %.0fs)",
uname, afam, remaining, elapsed,
)
restored += 1
elif state == hbdclass.Connection.OVERDUE:
elapsed_overdue = now - conn.statetime
remaining = DROPOVERDUE - elapsed_overdue
if remaining <= 1:
# Already past the drop window — mark UNKNOWN immediately
conn.newstate(hbdclass.Connection.UNKNOWN, conn.lastbeat)
logger.info(
"Marking %s/%s UNKNOWN (overdue %.1f days)",
uname, afam, elapsed_overdue / 86400,
)
else:
conn.reset_overdue_timer(remaining, on_unknown)
logger.debug(
"Restored OVERDUE timer %s/%s: %.0fs remaining",
uname, afam, remaining,
)
restored += 1
logger.info("Restored timers for %d connection(s)", restored)
def handle_datagram(msg: dict, addr, transport, ctx: dict):
"""Handle a parsed datagram message.
@@ -74,7 +264,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
"""
if not msg:
return
now = __import__("time").time()
now = ctx.get("recv_ts") or time.time()
# Log message to journal
msg_journal = ctx.get("msg_journal")
@@ -126,7 +316,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
if msg.get("ID") == "HTB":
host.doesack = msg.get("acks", -1)
# send ACK back
rmsg = {"time": __import__("time").time()}
rmsg = {"time": time.time()}
opkt = dicttos("ACK", rmsg)
try:
transport.sendto(opkt, addr)
@@ -138,8 +328,9 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
# Handle plugin data message
plugin_name = msg.get("plugin")
if plugin_name:
# Extract all fields except ID and plugin name
plugin_data = {k: v for k, v in msg.items() if k not in ["ID", "plugin"]}
# Extract plugin fields, dropping protocol metadata fields
plugin_data = {k: v for k, v in msg.items()
if k not in ("ID", "plugin", "id", "name")}
# Store plugin data with timestamp
host.add_plugin_data(plugin_name, plugin_data, timestamp=now)
if DEBUG > 1:
@@ -229,51 +420,8 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
# Reset overdue timer on every heartbeat
if interval > 0 and conn.getstate() != hbdcls.Connection.DOWN:
grace = cfg.get("grace", 2)
timeout_seconds = (interval + grace) if interval > 0 else 30
# Create callback for timer expiration
async def on_overdue(connection):
"""Called when connection timer expires (no heartbeat received)."""
import time
now = time.time()
# Only mark as overdue if still in UP state (not already marked)
if connection.getstate() == hbdcls.Connection.UP:
connection.newstate(hbdcls.Connection.OVERDUE, now, cfg.get("grace", 2))
msg = f"{connection.afam} overdue"
eventlog(uname, "CRITICAL" if uname in watchhosts else "WARNING", msg)
if uname in watchhosts:
notify_mod.pushmsg_for_host(uname, f"{uname} {msg}")
# Check RTT thresholds with infinite RTT for overdue hosts
threshold_checker = ctx.get("threshold_checker")
if threshold_checker:
metric_path = "rtt"
threshold_checker.check_value(
host_name=uname,
metric_path=metric_path,
value=float('inf'),
alert_states=host.alert_states
)
# Notify websockets
if msg_to_websockets:
msg_to_websockets("host", host.stateinfo())
# Set a longer timer for marking as UNKNOWN (7 days)
DROPOVERDUE = 7 * 24 * 3600
async def on_unknown(connection):
"""Mark connection as unknown after extended absence."""
connection.newstate(hbdcls.Connection.UNKNOWN, connection.lastbeat)
if msg_to_websockets:
msg_to_websockets("host", host.stateinfo())
connection.reset_overdue_timer(DROPOVERDUE, on_unknown)
# Reset the timer
timeout_seconds = interval + grace
on_overdue, _ = _make_timer_callbacks(uname, host, watchhosts, ctx)
conn.reset_overdue_timer(timeout_seconds, on_overdue)
# Check RTT thresholds using the threshold checker
+29 -26
View File
@@ -65,11 +65,7 @@ async def _handler(websocket, path=None):
logger.exception("WebSocket handler exception from %s: %s", remote_address, e)
finally:
logger.debug("Removing WebSocket connection from %s", remote_address)
try:
_connections.remove(websocket)
except KeyError:
pass
await websocket.wait_closed()
_connections.discard(websocket)
async def start(
@@ -93,33 +89,40 @@ async def start(
_verbose = config.get("verbose", False),
_debug = config.get("debug", 0),
servers = []
# plain WebSocket
websockets_logger = logging.getLogger("websockets.server")
#if _debug > 2:
# websockets_logger.setLevel(logging.DEBUG)
#else:
# websockets_logger.setLevel(logging.INFO)
# regular WebSocket
ws_server = websockets.serve(_handler, host, ws_port) # , subprotocols=["hbd"])
servers.append(ws_server)
# secure WebSocket (optional)
# Start servers and keep the server objects for clean shutdown
running_servers = []
ws_server = await websockets.serve(_handler, host, ws_port)
running_servers.append(ws_server)
if wss_port and ssl_context:
wss_server = websockets.serve(
_handler, host, wss_port, ssl=ssl_context
) # , subprotocols=["hbd"])
servers.append(wss_server)
# await starting of all servers
for srv in servers:
await srv
wss_server = await websockets.serve(_handler, host, wss_port, ssl=ssl_context)
running_servers.append(wss_server)
logger.info(
"WebSocket server(s) started on port %s (wss %s)", ws_port, wss_port
)
# block forever (until loop is stopped or cancelled)
await asyncio.Future()
try:
# Block until cancelled
await asyncio.Future()
except asyncio.CancelledError:
pass
finally:
# Close all active browser connections so their handler coroutines exit
active = list(_connections)
if active:
logger.info("Closing %d active WebSocket connection(s)...", len(active))
await asyncio.gather(
*[ws.close() for ws in active],
return_exceptions=True,
)
# Stop the listening servers and wait for all handlers to finish
for srv in running_servers:
srv.close()
await asyncio.gather(
*[srv.wait_closed() for srv in running_servers],
return_exceptions=True,
)
logger.info("WebSocket server(s) stopped")
def broadcast(typ: str, data) -> bool:
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "hbd"
version = "5.0.8"
version = "5.0.11"
description = "Heartbeat monitoring system — client (hbc) and server (hbd)"
readme = "README.md"
requires-python = ">=3.11"
+11 -1
View File
@@ -1,6 +1,16 @@
#!/bin/sh
# install hbd/hbc from wheel and create symlinks for hbd and hbc in ~/bin
# install the heartbeat tools. By default, this will install the hbc
# client only. The server is installed when the arg 'server' is passed
# to the script. The script will install the heartbeat tools in a python
# virtual environment in ~/venvs/hbd. The hbd and hbc commands will be
# installed from the wheel and symlinked to ~/bin/hbd and ~/bin/hbc,
# respectively. If the virtual environment already exists, it will be
# reused. The script will also remove any existing symlinks for hbd and hbc
# in ~/bin before creating new ones.
# hbd/hbc from wheel and create symlinks for hbd and hbc in ~/bin
set -e
if [ ! -d ~/venvs/hbd ]; then