Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5edbaacf81 | |||
| 8421f472f2 | |||
| 51f9bdc2b5 | |||
| 02bc42fbf0 | |||
| 832a8b0bda | |||
| 57c4b86430 | |||
| 43fad7beed | |||
| 8dd002d159 | |||
| 2373b55d8b |
@@ -6,20 +6,26 @@ on:
|
|||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
release:
|
release:
|
||||||
runs-on: ubuntu-latest
|
runs-on: FreeBSD
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout code
|
- name: Checkout code
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
# - name: Set up Python
|
||||||
|
# uses: actions/setup-python@v5
|
||||||
|
# with:
|
||||||
|
# python-version: '3.11'
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v5
|
# Use a generic run step for FreeBSD if actions/setup-python
|
||||||
with:
|
# fails in restricted environments.
|
||||||
python-version: '3.11'
|
run: |
|
||||||
|
python3 --version
|
||||||
|
python3 -m ensurepip --upgrade
|
||||||
|
|
||||||
- name: Install build tools
|
- name: Install build tools
|
||||||
run: |
|
run: |
|
||||||
python -m pip install --upgrade pip
|
python -m pip install --upgrade pip
|
||||||
pip install build twine
|
# pip install build twine
|
||||||
|
|
||||||
- name: Build package
|
- name: Build package
|
||||||
run: python -m build
|
run: python -m build
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ logfile: "/home/andreas/logs/heartbeat/heartbeat.log"
|
|||||||
logfmt: "msg"
|
logfmt: "msg"
|
||||||
grace: 40
|
grace: 40
|
||||||
interval: 10
|
interval: 10
|
||||||
|
autosave_interval: 300 # Autosave interval in seconds (default: 5 minutes)
|
||||||
|
|
||||||
# Notification Channels - Define notification providers centrally
|
# Notification Channels - Define notification providers centrally
|
||||||
# Each channel has a type (pushover, email, signal, mattermost) and type-specific configuration
|
# Each channel has a type (pushover, email, signal, mattermost) and type-specific configuration
|
||||||
@@ -159,7 +160,7 @@ threshold_configs:
|
|||||||
warning: 85.0
|
warning: 85.0
|
||||||
critical: 90.0
|
critical: 90.0
|
||||||
rtt:
|
rtt:
|
||||||
warning: 50
|
warning: 200
|
||||||
critical: 250.0
|
critical: 250.0
|
||||||
|
|
||||||
|
|
||||||
@@ -204,7 +205,7 @@ threshold_configs:
|
|||||||
critical: 2
|
critical: 2
|
||||||
operator: ">="
|
operator: ">="
|
||||||
rtt:
|
rtt:
|
||||||
warning: 50
|
warning: 200
|
||||||
critical: 250.0
|
critical: 250.0
|
||||||
|
|
||||||
truenas_server:
|
truenas_server:
|
||||||
|
|||||||
+1
-1
@@ -14,4 +14,4 @@ Install options:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
__all__ = ["__version__"]
|
__all__ = ["__version__"]
|
||||||
__version__ = "5.0.6"
|
__version__ = "5.0.9"
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
"""HeartBeat Client (hbc) - System monitoring client."""
|
"""HeartBeat Client (hbc) - System monitoring client."""
|
||||||
|
|
||||||
__version__ = "5.0.5"
|
from hbd import __version__
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
"""Common utilities shared between hbc and hbd."""
|
"""Common utilities shared between hbc and hbd."""
|
||||||
|
|
||||||
__version__ = "5.0.5"
|
from hbd import __version__
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
"""HeartBeat Daemon (hbd) - Server/daemon component."""
|
"""HeartBeat Daemon (hbd) - Server/daemon component."""
|
||||||
|
|
||||||
__version__ = "5.0.5"
|
from hbd import __version__
|
||||||
|
|||||||
@@ -189,7 +189,7 @@ class Connection:
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
self.addr = addr
|
self.addr = addr
|
||||||
Connection.htab[addr] = self.host.nameconnection_count
|
Connection.htab[addr] = self.host.name
|
||||||
if self.host.isDynDns():
|
if self.host.isDynDns():
|
||||||
Host.dnsQ.put((self.host.name, self.addr))
|
Host.dnsQ.put((self.host.name, self.addr))
|
||||||
return r
|
return r
|
||||||
|
|||||||
+68
-17
@@ -22,12 +22,12 @@ eventlog = notify_mod.eventlog
|
|||||||
|
|
||||||
# shared runtime collections and helpers
|
# shared runtime collections and helpers
|
||||||
|
|
||||||
def cleanup_function(config, hbdclass):
|
def save_state(config, hbdclass):
|
||||||
"""This function will be executed upon program exit."""
|
"""Save current state to pickle file. Safe to call at any time."""
|
||||||
logger.info("Running cleanup function...")
|
|
||||||
import pickle
|
import pickle
|
||||||
|
import os
|
||||||
|
|
||||||
# Ensure all timer references are cleared before pickling
|
# Clear timer references before pickling (they can't be serialized)
|
||||||
for hostname, host in list(hbdclass.Host.hosts.items()):
|
for hostname, host in list(hbdclass.Host.hosts.items()):
|
||||||
for conn_type, conn in host.connections.items():
|
for conn_type, conn in host.connections.items():
|
||||||
if hasattr(conn, 'cancel_overdue_timer'):
|
if hasattr(conn, 'cancel_overdue_timer'):
|
||||||
@@ -40,13 +40,26 @@ def cleanup_function(config, hbdclass):
|
|||||||
conn.timeout_duration = None
|
conn.timeout_duration = None
|
||||||
|
|
||||||
pickfile = config.get("pickfile", "hbd.pickle")
|
pickfile = config.get("pickfile", "hbd.pickle")
|
||||||
|
tmpfile = pickfile + ".tmp"
|
||||||
|
|
||||||
pickf = open(pickfile, "wb")
|
try:
|
||||||
pick = pickle.Pickler(pickf)
|
with open(tmpfile, "wb") as pickf:
|
||||||
pick.dump(hbdclass.Host.hosts)
|
pick = pickle.Pickler(pickf)
|
||||||
pick.dump(data.msgs)
|
pick.dump(hbdclass.Host.hosts)
|
||||||
pickf.close()
|
pick.dump(data.msgs)
|
||||||
|
os.replace(tmpfile, pickfile)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to save state: %s", e)
|
||||||
|
try:
|
||||||
|
os.unlink(tmpfile)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_function(config, hbdclass):
|
||||||
|
"""This function will be executed upon program exit."""
|
||||||
|
logger.info("Running cleanup function...")
|
||||||
|
save_state(config, hbdclass)
|
||||||
logger.info("Cleanup complete.")
|
logger.info("Cleanup complete.")
|
||||||
|
|
||||||
|
|
||||||
@@ -160,14 +173,20 @@ async def _run_async(config, config_path=None):
|
|||||||
f"Warning: Could not reset IPV6_V6ONLY not supported or dual-stack is unavailable. Error: {e}"
|
f"Warning: Could not reset IPV6_V6ONLY not supported or dual-stack is unavailable. Error: {e}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# 3. Bind to all interfaces (::) on a specific port
|
|
||||||
|
|
||||||
# UDP server endpoint (handler wired to handle_datagram with context)
|
|
||||||
bind_addr = ("::", config.get("hb_port", 50003))
|
bind_addr = ("::", config.get("hb_port", 50003))
|
||||||
sock.bind(bind_addr)
|
sock.bind(bind_addr)
|
||||||
logger.info("Starting UDP server on %s:%s", *bind_addr)
|
logger.info("Starting UDP server on %s:%s", *bind_addr)
|
||||||
|
|
||||||
def udp_handler(msg, addr, transport):
|
# Try to enable kernel receive timestamps (Linux SO_TIMESTAMPNS).
|
||||||
|
# If supported, read datagrams via recvmsg() so RTT uses the kernel
|
||||||
|
# timestamp rather than the time.time() call after asyncio scheduling.
|
||||||
|
use_kernel_ts = udp.enable_kernel_timestamps(sock)
|
||||||
|
if use_kernel_ts:
|
||||||
|
logger.info("SO_TIMESTAMPNS enabled: using kernel receive timestamps for RTT")
|
||||||
|
else:
|
||||||
|
logger.info("SO_TIMESTAMPNS not available: using time.time() for RTT")
|
||||||
|
|
||||||
|
def udp_handler(msg, addr, transport, recv_ts=None):
|
||||||
ctx = dict(
|
ctx = dict(
|
||||||
config=config,
|
config=config,
|
||||||
hbdclass=hbdclass,
|
hbdclass=hbdclass,
|
||||||
@@ -177,13 +196,32 @@ async def _run_async(config, config_path=None):
|
|||||||
threshold_checker=threshold_checker,
|
threshold_checker=threshold_checker,
|
||||||
DEBUG=config.get("debug", 0),
|
DEBUG=config.get("debug", 0),
|
||||||
verbose=config.get("verbose", False),
|
verbose=config.get("verbose", False),
|
||||||
|
recv_ts=recv_ts,
|
||||||
)
|
)
|
||||||
udp.handle_datagram(msg, addr, transport, ctx)
|
udp.handle_datagram(msg, addr, transport, ctx)
|
||||||
|
|
||||||
transport, protocol = await loop.create_datagram_endpoint(
|
if use_kernel_ts:
|
||||||
lambda: udp.EchoServerProtocol(config=config, handler=udp_handler),
|
# recvmsg path: manage the socket ourselves with loop.add_reader()
|
||||||
sock=sock,
|
sock.setblocking(False)
|
||||||
|
transport = udp.RecvmsgTransport(loop, sock)
|
||||||
|
reader = udp.make_recvmsg_reader(sock, udp_handler, transport)
|
||||||
|
loop.add_reader(sock.fileno(), reader)
|
||||||
|
protocol = None
|
||||||
|
else:
|
||||||
|
transport, protocol = await loop.create_datagram_endpoint(
|
||||||
|
lambda: udp.EchoServerProtocol(config=config, handler=udp_handler),
|
||||||
|
sock=sock,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Restore connection timers for hosts loaded from pickle
|
||||||
|
restore_ctx = dict(
|
||||||
|
config=config,
|
||||||
|
hbdclass=hbdclass,
|
||||||
|
log=eventlog,
|
||||||
|
msg_to_websockets=msg_to_websockets,
|
||||||
|
threshold_checker=threshold_checker,
|
||||||
)
|
)
|
||||||
|
udp.restore_connection_timers(hbdclass, restore_ctx)
|
||||||
|
|
||||||
# HTTP server (asyncio-based via aiohttp)
|
# HTTP server (asyncio-based via aiohttp)
|
||||||
try:
|
try:
|
||||||
@@ -257,6 +295,19 @@ async def _run_async(config, config_path=None):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception("websocket server failed to start: %s", e)
|
logger.exception("websocket server failed to start: %s", e)
|
||||||
|
|
||||||
|
# Periodic autosave task
|
||||||
|
autosave_interval = config.get("autosave_interval", 300) # default: 5 minutes
|
||||||
|
|
||||||
|
async def autosave_task():
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(autosave_interval)
|
||||||
|
logger.debug("Autosaving state...")
|
||||||
|
save_state(config, hbdclass)
|
||||||
|
logger.debug("Autosave complete (%d hosts)", len(hbdclass.Host.hosts))
|
||||||
|
|
||||||
|
autosave = asyncio.create_task(autosave_task())
|
||||||
|
logger.info("Autosave task started (interval: %ds)", autosave_interval)
|
||||||
|
|
||||||
# Main event loop - monitor shutdown and reload events
|
# Main event loop - monitor shutdown and reload events
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
@@ -304,7 +355,7 @@ async def _run_async(config, config_path=None):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Error closing UDP transport: %s", e)
|
logger.warning("Error closing UDP transport: %s", e)
|
||||||
|
|
||||||
tasks_to_cancel = [http_task, ws_task]
|
tasks_to_cancel = [http_task, ws_task, autosave]
|
||||||
for task in tasks_to_cancel:
|
for task in tasks_to_cancel:
|
||||||
if task:
|
if task:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -459,6 +459,9 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Protocol metadata fields injected by the client – never plugin metrics
|
||||||
|
const SKIP_FIELDS = new Set(['id', 'name']);
|
||||||
|
|
||||||
function renderPluginData(data, timestamp) {
|
function renderPluginData(data, timestamp) {
|
||||||
// Check if this should be rendered as a simple table
|
// Check if this should be rendered as a simple table
|
||||||
const pluginName = getCurrentPluginName();
|
const pluginName = getCurrentPluginName();
|
||||||
@@ -471,6 +474,7 @@
|
|||||||
let html = '<div class="metric-grid">';
|
let html = '<div class="metric-grid">';
|
||||||
|
|
||||||
for (const [key, value] of Object.entries(data)) {
|
for (const [key, value] of Object.entries(data)) {
|
||||||
|
if (SKIP_FIELDS.has(key)) continue;
|
||||||
// Skip nested objects for now, handle them separately
|
// Skip nested objects for now, handle them separately
|
||||||
if (typeof value === 'object' && value !== null) {
|
if (typeof value === 'object' && value !== null) {
|
||||||
continue;
|
continue;
|
||||||
@@ -572,6 +576,7 @@
|
|||||||
// Table body
|
// Table body
|
||||||
html += '<tbody>';
|
html += '<tbody>';
|
||||||
for (const [key, value] of Object.entries(data)) {
|
for (const [key, value] of Object.entries(data)) {
|
||||||
|
if (SKIP_FIELDS.has(key)) continue;
|
||||||
const label = formatLabel(key);
|
const label = formatLabel(key);
|
||||||
const formattedValue = formatValue(key, value);
|
const formattedValue = formatValue(key, value);
|
||||||
const unit = getUnit(key);
|
const unit = getUnit(key);
|
||||||
@@ -1012,12 +1017,17 @@
|
|||||||
}
|
}
|
||||||
|
|
||||||
function formatLabel(key) {
|
function formatLabel(key) {
|
||||||
|
if (key === 'time') return 'Collected At';
|
||||||
return key
|
return key
|
||||||
.replace(/_/g, ' ')
|
.replace(/_/g, ' ')
|
||||||
.replace(/\b\w/g, l => l.toUpperCase());
|
.replace(/\b\w/g, l => l.toUpperCase());
|
||||||
}
|
}
|
||||||
|
|
||||||
function formatValue(key, value) {
|
function formatValue(key, value) {
|
||||||
|
// Epoch timestamp field sent by the client alongside plugin data
|
||||||
|
if (key === 'time' && typeof value === 'number') {
|
||||||
|
return new Date(value * 1000).toLocaleString();
|
||||||
|
}
|
||||||
if (typeof value === 'number') {
|
if (typeof value === 'number') {
|
||||||
// Format percentages
|
// Format percentages
|
||||||
if (key.includes('percent') || key.includes('usage')) {
|
if (key.includes('percent') || key.includes('usage')) {
|
||||||
|
|||||||
+197
-49
@@ -1,6 +1,9 @@
|
|||||||
"""UDP listener and datagram processing."""
|
"""UDP listener and datagram processing."""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import socket
|
||||||
|
import struct
|
||||||
|
import time
|
||||||
import zlib
|
import zlib
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
@@ -11,6 +14,99 @@ from . import notify as notify_mod
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
eventlog = notify_mod.eventlog
|
eventlog = notify_mod.eventlog
|
||||||
|
|
||||||
|
# SO_TIMESTAMP: kernel attaches a struct timeval to each received datagram.
|
||||||
|
# Supported on Linux, FreeBSD, and macOS. The constant is not exposed by
|
||||||
|
# Python's socket module on all platforms, so fall back to the Linux value (29)
|
||||||
|
# when absent.
|
||||||
|
_SO_TIMESTAMP = getattr(socket, 'SO_TIMESTAMP', 29)
|
||||||
|
# struct timeval uses two native C longs: tv_sec and tv_usec
|
||||||
|
_TIMEVAL = struct.Struct('@ll')
|
||||||
|
|
||||||
|
|
||||||
|
def enable_kernel_timestamps(sock) -> bool:
|
||||||
|
"""Try to enable SO_TIMESTAMP on *sock*.
|
||||||
|
|
||||||
|
Returns True if the kernel will supply receive timestamps, False otherwise
|
||||||
|
(unsupported platform, older kernel, or insufficient permissions).
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, _SO_TIMESTAMP, 1)
|
||||||
|
return True
|
||||||
|
except OSError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_kernel_ts(ancdata) -> float | None:
|
||||||
|
"""Parse recvmsg ancillary data and return the kernel receive time.
|
||||||
|
|
||||||
|
Returns seconds as a float, or None if no SO_TIMESTAMP cmsg is present.
|
||||||
|
"""
|
||||||
|
for cmsg_level, cmsg_type, cmsg_data in ancdata:
|
||||||
|
if cmsg_level == socket.SOL_SOCKET and cmsg_type == _SO_TIMESTAMP:
|
||||||
|
if len(cmsg_data) >= _TIMEVAL.size:
|
||||||
|
sec, usec = _TIMEVAL.unpack_from(cmsg_data)
|
||||||
|
return sec + usec * 1e-6
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class RecvmsgTransport:
|
||||||
|
"""Thin wrapper used when SO_TIMESTAMP is active (add_reader path).
|
||||||
|
|
||||||
|
Exposes the same sendto() / close() interface as asyncio's DatagramTransport
|
||||||
|
so the rest of the code does not need to know which path is in use.
|
||||||
|
"""
|
||||||
|
def __init__(self, loop, sock):
|
||||||
|
self._loop = loop
|
||||||
|
self._sock = sock
|
||||||
|
|
||||||
|
def sendto(self, data, addr):
|
||||||
|
try:
|
||||||
|
self._sock.sendto(data, addr)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("sendto failed: %s", e)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
try:
|
||||||
|
self._loop.remove_reader(self._sock.fileno())
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
self._sock.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def make_recvmsg_reader(sock, handler, transport):
|
||||||
|
"""Return a callback suitable for loop.add_reader().
|
||||||
|
|
||||||
|
Reads one datagram per call using recvmsg() so that kernel timestamps in
|
||||||
|
the ancillary data are accessible. Falls back to time.time() if the
|
||||||
|
cmsg is missing.
|
||||||
|
|
||||||
|
handler(msg, addr, transport, kernel_ts) – same signature as udp_handler
|
||||||
|
in main.py with the optional kernel_ts argument.
|
||||||
|
"""
|
||||||
|
BUFSIZE = 65536
|
||||||
|
ANCBUFSIZE = 128 # enough for one struct timespec cmsg
|
||||||
|
|
||||||
|
def _read():
|
||||||
|
try:
|
||||||
|
data, ancdata, _, addr = sock.recvmsg(BUFSIZE, ANCBUFSIZE)
|
||||||
|
except BlockingIOError:
|
||||||
|
return
|
||||||
|
except OSError as e:
|
||||||
|
logger.warning("recvmsg error: %s", e)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
kernel_ts = _extract_kernel_ts(ancdata)
|
||||||
|
msg = parse_message(data)
|
||||||
|
if msg:
|
||||||
|
handler(msg, addr, transport, kernel_ts)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Error processing datagram from %s", addr)
|
||||||
|
|
||||||
|
return _read
|
||||||
|
|
||||||
|
|
||||||
class EchoServerProtocol(asyncio.DatagramProtocol):
|
class EchoServerProtocol(asyncio.DatagramProtocol):
|
||||||
def __init__(self, config=None, handler=None):
|
def __init__(self, config=None, handler=None):
|
||||||
@@ -61,6 +157,100 @@ def dicttos(ID, d):
|
|||||||
return opk
|
return opk
|
||||||
|
|
||||||
|
|
||||||
|
DROPOVERDUE = 7 * 24 * 3600 # seconds before an overdue host becomes UNKNOWN
|
||||||
|
|
||||||
|
|
||||||
|
def _make_timer_callbacks(uname, host, watchhosts, ctx):
|
||||||
|
"""Return (on_overdue, on_unknown) async callbacks for connection timer logic.
|
||||||
|
|
||||||
|
Captured values are bound at call time so callbacks are safe to use in loops.
|
||||||
|
"""
|
||||||
|
msg_to_websockets = ctx.get("msg_to_websockets")
|
||||||
|
threshold_checker = ctx.get("threshold_checker")
|
||||||
|
cfg = ctx.get("config", {})
|
||||||
|
|
||||||
|
async def on_unknown(connection):
|
||||||
|
connection.newstate(connection.__class__.UNKNOWN, connection.lastbeat)
|
||||||
|
if msg_to_websockets:
|
||||||
|
msg_to_websockets("host", host.stateinfo())
|
||||||
|
|
||||||
|
async def on_overdue(connection):
|
||||||
|
if connection.getstate() != connection.__class__.UP:
|
||||||
|
return
|
||||||
|
now = time.time()
|
||||||
|
connection.newstate(connection.__class__.OVERDUE, now, cfg.get("grace", 2))
|
||||||
|
msg = f"{connection.afam} overdue"
|
||||||
|
eventlog(uname, "CRITICAL" if uname in watchhosts else "WARNING", msg)
|
||||||
|
if uname in watchhosts:
|
||||||
|
notify_mod.pushmsg_for_host(uname, f"{uname} {msg}")
|
||||||
|
if threshold_checker:
|
||||||
|
threshold_checker.check_value(
|
||||||
|
host_name=uname,
|
||||||
|
metric_path="rtt",
|
||||||
|
value=float("inf"),
|
||||||
|
alert_states=host.alert_states,
|
||||||
|
)
|
||||||
|
if msg_to_websockets:
|
||||||
|
msg_to_websockets("host", host.stateinfo())
|
||||||
|
connection.reset_overdue_timer(DROPOVERDUE, on_unknown)
|
||||||
|
|
||||||
|
return on_overdue, on_unknown
|
||||||
|
|
||||||
|
|
||||||
|
def restore_connection_timers(hbdclass, ctx):
|
||||||
|
"""Restore overdue timers for all loaded connections after a pickle restore.
|
||||||
|
|
||||||
|
For UP connections, the remaining time until overdue is calculated from
|
||||||
|
lastbeat so that clients that vanished during hbd's downtime are detected.
|
||||||
|
For OVERDUE connections, the UNKNOWN drop timer is restored.
|
||||||
|
"""
|
||||||
|
now = time.time()
|
||||||
|
cfg = ctx.get("config", {})
|
||||||
|
grace = cfg.get("grace", 2)
|
||||||
|
from . import config as config_mod
|
||||||
|
watchhosts = config_mod.get_watchhosts(cfg)
|
||||||
|
|
||||||
|
restored = 0
|
||||||
|
for uname, host in list(hbdclass.Host.hosts.items()):
|
||||||
|
interval = host.interval
|
||||||
|
for afam, conn in list(host.connections.items()):
|
||||||
|
state = conn.getstate()
|
||||||
|
if state == hbdclass.Connection.DOWN:
|
||||||
|
continue
|
||||||
|
|
||||||
|
on_overdue, on_unknown = _make_timer_callbacks(uname, host, watchhosts, ctx)
|
||||||
|
|
||||||
|
if state == hbdclass.Connection.UP and interval > 0:
|
||||||
|
elapsed = now - conn.lastbeat
|
||||||
|
remaining = max(1.0, (interval + grace) - elapsed)
|
||||||
|
conn.reset_overdue_timer(remaining, on_overdue)
|
||||||
|
logger.debug(
|
||||||
|
"Restored UP timer %s/%s: %.0fs remaining (elapsed %.0fs)",
|
||||||
|
uname, afam, remaining, elapsed,
|
||||||
|
)
|
||||||
|
restored += 1
|
||||||
|
|
||||||
|
elif state == hbdclass.Connection.OVERDUE:
|
||||||
|
elapsed_overdue = now - conn.statetime
|
||||||
|
remaining = DROPOVERDUE - elapsed_overdue
|
||||||
|
if remaining <= 1:
|
||||||
|
# Already past the drop window — mark UNKNOWN immediately
|
||||||
|
conn.newstate(hbdclass.Connection.UNKNOWN, conn.lastbeat)
|
||||||
|
logger.info(
|
||||||
|
"Marking %s/%s UNKNOWN (overdue %.1f days)",
|
||||||
|
uname, afam, elapsed_overdue / 86400,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
conn.reset_overdue_timer(remaining, on_unknown)
|
||||||
|
logger.debug(
|
||||||
|
"Restored OVERDUE timer %s/%s: %.0fs remaining",
|
||||||
|
uname, afam, remaining,
|
||||||
|
)
|
||||||
|
restored += 1
|
||||||
|
|
||||||
|
logger.info("Restored timers for %d connection(s)", restored)
|
||||||
|
|
||||||
|
|
||||||
def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||||
"""Handle a parsed datagram message.
|
"""Handle a parsed datagram message.
|
||||||
|
|
||||||
@@ -74,7 +264,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
|||||||
"""
|
"""
|
||||||
if not msg:
|
if not msg:
|
||||||
return
|
return
|
||||||
now = __import__("time").time()
|
now = ctx.get("recv_ts") or time.time()
|
||||||
|
|
||||||
# Log message to journal
|
# Log message to journal
|
||||||
msg_journal = ctx.get("msg_journal")
|
msg_journal = ctx.get("msg_journal")
|
||||||
@@ -126,7 +316,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
|||||||
if msg.get("ID") == "HTB":
|
if msg.get("ID") == "HTB":
|
||||||
host.doesack = msg.get("acks", -1)
|
host.doesack = msg.get("acks", -1)
|
||||||
# send ACK back
|
# send ACK back
|
||||||
rmsg = {"time": __import__("time").time()}
|
rmsg = {"time": time.time()}
|
||||||
opkt = dicttos("ACK", rmsg)
|
opkt = dicttos("ACK", rmsg)
|
||||||
try:
|
try:
|
||||||
transport.sendto(opkt, addr)
|
transport.sendto(opkt, addr)
|
||||||
@@ -138,8 +328,9 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
|||||||
# Handle plugin data message
|
# Handle plugin data message
|
||||||
plugin_name = msg.get("plugin")
|
plugin_name = msg.get("plugin")
|
||||||
if plugin_name:
|
if plugin_name:
|
||||||
# Extract all fields except ID and plugin name
|
# Extract plugin fields, dropping protocol metadata fields
|
||||||
plugin_data = {k: v for k, v in msg.items() if k not in ["ID", "plugin"]}
|
plugin_data = {k: v for k, v in msg.items()
|
||||||
|
if k not in ("ID", "plugin", "id", "name")}
|
||||||
# Store plugin data with timestamp
|
# Store plugin data with timestamp
|
||||||
host.add_plugin_data(plugin_name, plugin_data, timestamp=now)
|
host.add_plugin_data(plugin_name, plugin_data, timestamp=now)
|
||||||
if DEBUG > 1:
|
if DEBUG > 1:
|
||||||
@@ -229,51 +420,8 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
|||||||
# Reset overdue timer on every heartbeat
|
# Reset overdue timer on every heartbeat
|
||||||
if interval > 0 and conn.getstate() != hbdcls.Connection.DOWN:
|
if interval > 0 and conn.getstate() != hbdcls.Connection.DOWN:
|
||||||
grace = cfg.get("grace", 2)
|
grace = cfg.get("grace", 2)
|
||||||
timeout_seconds = (interval + grace) if interval > 0 else 30
|
timeout_seconds = interval + grace
|
||||||
|
on_overdue, _ = _make_timer_callbacks(uname, host, watchhosts, ctx)
|
||||||
# Create callback for timer expiration
|
|
||||||
async def on_overdue(connection):
|
|
||||||
"""Called when connection timer expires (no heartbeat received)."""
|
|
||||||
import time
|
|
||||||
now = time.time()
|
|
||||||
|
|
||||||
# Only mark as overdue if still in UP state (not already marked)
|
|
||||||
if connection.getstate() == hbdcls.Connection.UP:
|
|
||||||
connection.newstate(hbdcls.Connection.OVERDUE, now, cfg.get("grace", 2))
|
|
||||||
|
|
||||||
msg = f"{connection.afam} overdue"
|
|
||||||
eventlog(uname, "CRITICAL" if uname in watchhosts else "WARNING", msg)
|
|
||||||
|
|
||||||
if uname in watchhosts:
|
|
||||||
notify_mod.pushmsg_for_host(uname, f"{uname} {msg}")
|
|
||||||
|
|
||||||
# Check RTT thresholds with infinite RTT for overdue hosts
|
|
||||||
threshold_checker = ctx.get("threshold_checker")
|
|
||||||
if threshold_checker:
|
|
||||||
metric_path = "rtt"
|
|
||||||
threshold_checker.check_value(
|
|
||||||
host_name=uname,
|
|
||||||
metric_path=metric_path,
|
|
||||||
value=float('inf'),
|
|
||||||
alert_states=host.alert_states
|
|
||||||
)
|
|
||||||
|
|
||||||
# Notify websockets
|
|
||||||
if msg_to_websockets:
|
|
||||||
msg_to_websockets("host", host.stateinfo())
|
|
||||||
|
|
||||||
# Set a longer timer for marking as UNKNOWN (7 days)
|
|
||||||
DROPOVERDUE = 7 * 24 * 3600
|
|
||||||
|
|
||||||
async def on_unknown(connection):
|
|
||||||
"""Mark connection as unknown after extended absence."""
|
|
||||||
connection.newstate(hbdcls.Connection.UNKNOWN, connection.lastbeat)
|
|
||||||
if msg_to_websockets:
|
|
||||||
msg_to_websockets("host", host.stateinfo())
|
|
||||||
|
|
||||||
connection.reset_overdue_timer(DROPOVERDUE, on_unknown)
|
|
||||||
|
|
||||||
# Reset the timer
|
|
||||||
conn.reset_overdue_timer(timeout_seconds, on_overdue)
|
conn.reset_overdue_timer(timeout_seconds, on_overdue)
|
||||||
|
|
||||||
# Check RTT thresholds using the threshold checker
|
# Check RTT thresholds using the threshold checker
|
||||||
|
|||||||
+29
-26
@@ -65,11 +65,7 @@ async def _handler(websocket, path=None):
|
|||||||
logger.exception("WebSocket handler exception from %s: %s", remote_address, e)
|
logger.exception("WebSocket handler exception from %s: %s", remote_address, e)
|
||||||
finally:
|
finally:
|
||||||
logger.debug("Removing WebSocket connection from %s", remote_address)
|
logger.debug("Removing WebSocket connection from %s", remote_address)
|
||||||
try:
|
_connections.discard(websocket)
|
||||||
_connections.remove(websocket)
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
await websocket.wait_closed()
|
|
||||||
|
|
||||||
|
|
||||||
async def start(
|
async def start(
|
||||||
@@ -93,33 +89,40 @@ async def start(
|
|||||||
_verbose = config.get("verbose", False),
|
_verbose = config.get("verbose", False),
|
||||||
_debug = config.get("debug", 0),
|
_debug = config.get("debug", 0),
|
||||||
|
|
||||||
servers = []
|
# Start servers and keep the server objects for clean shutdown
|
||||||
# plain WebSocket
|
running_servers = []
|
||||||
websockets_logger = logging.getLogger("websockets.server")
|
ws_server = await websockets.serve(_handler, host, ws_port)
|
||||||
#if _debug > 2:
|
running_servers.append(ws_server)
|
||||||
# websockets_logger.setLevel(logging.DEBUG)
|
|
||||||
#else:
|
|
||||||
# websockets_logger.setLevel(logging.INFO)
|
|
||||||
# regular WebSocket
|
|
||||||
ws_server = websockets.serve(_handler, host, ws_port) # , subprotocols=["hbd"])
|
|
||||||
servers.append(ws_server)
|
|
||||||
# secure WebSocket (optional)
|
|
||||||
if wss_port and ssl_context:
|
if wss_port and ssl_context:
|
||||||
wss_server = websockets.serve(
|
wss_server = await websockets.serve(_handler, host, wss_port, ssl=ssl_context)
|
||||||
_handler, host, wss_port, ssl=ssl_context
|
running_servers.append(wss_server)
|
||||||
) # , subprotocols=["hbd"])
|
|
||||||
servers.append(wss_server)
|
|
||||||
|
|
||||||
# await starting of all servers
|
|
||||||
for srv in servers:
|
|
||||||
await srv
|
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"WebSocket server(s) started on port %s (wss %s)", ws_port, wss_port
|
"WebSocket server(s) started on port %s (wss %s)", ws_port, wss_port
|
||||||
)
|
)
|
||||||
|
|
||||||
# block forever (until loop is stopped or cancelled)
|
try:
|
||||||
await asyncio.Future()
|
# Block until cancelled
|
||||||
|
await asyncio.Future()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
# Close all active browser connections so their handler coroutines exit
|
||||||
|
active = list(_connections)
|
||||||
|
if active:
|
||||||
|
logger.info("Closing %d active WebSocket connection(s)...", len(active))
|
||||||
|
await asyncio.gather(
|
||||||
|
*[ws.close() for ws in active],
|
||||||
|
return_exceptions=True,
|
||||||
|
)
|
||||||
|
# Stop the listening servers and wait for all handlers to finish
|
||||||
|
for srv in running_servers:
|
||||||
|
srv.close()
|
||||||
|
await asyncio.gather(
|
||||||
|
*[srv.wait_closed() for srv in running_servers],
|
||||||
|
return_exceptions=True,
|
||||||
|
)
|
||||||
|
logger.info("WebSocket server(s) stopped")
|
||||||
|
|
||||||
|
|
||||||
def broadcast(typ: str, data) -> bool:
|
def broadcast(typ: str, data) -> bool:
|
||||||
|
|||||||
+1
-1
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "hbd"
|
name = "hbd"
|
||||||
version = "5.0.6"
|
version = "5.0.9"
|
||||||
description = "Heartbeat monitoring system — client (hbc) and server (hbd)"
|
description = "Heartbeat monitoring system — client (hbc) and server (hbd)"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.11"
|
requires-python = ">=3.11"
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
set -e
|
set -e
|
||||||
uv version --bump patch
|
uv version --bump patch
|
||||||
VER=$(uv version --short)
|
VER=$(uv version --short)
|
||||||
sed -i "" "s/__version__ = \"[0-9.]*\"\(.*\)$/__version__ = \"$VER\"\1/" hbd/__init__.py
|
sed -i".bak" "s/__version__ = \"[0-9.]*\"\(.*\)$/__version__ = \"$VER\"\1/" hbd/__init__.py
|
||||||
|
|
||||||
# commit pyproject.toml
|
# commit pyproject.toml
|
||||||
git commit -m "version $VER" pyproject.toml hbd/__init__.py
|
git commit -m "version $VER" pyproject.toml hbd/__init__.py
|
||||||
@@ -11,3 +11,5 @@ git push
|
|||||||
# tag version
|
# tag version
|
||||||
git tag -a v$VER -m "Version $VER"
|
git tag -a v$VER -m "Version $VER"
|
||||||
git push --tags
|
git push --tags
|
||||||
|
|
||||||
|
rm hbd/__init__.py.bak
|
||||||
|
|||||||
Reference in New Issue
Block a user