Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e8bb553349 | |||
| e4ecb8723f | |||
| 5edbaacf81 | |||
| 8421f472f2 | |||
| 51f9bdc2b5 | |||
| 02bc42fbf0 | |||
| 832a8b0bda |
@@ -25,7 +25,7 @@ jobs:
|
||||
- name: Install build tools
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
# pip install build twine
|
||||
pip install build twine
|
||||
|
||||
- name: Build package
|
||||
run: python -m build
|
||||
@@ -34,6 +34,13 @@ jobs:
|
||||
id: get_version
|
||||
run: echo "VERSION=${GITHUB_REF#refs/tags/v}" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Upload to Gitea PyPI registry
|
||||
env:
|
||||
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
|
||||
TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }}
|
||||
run: |
|
||||
twine upload --repository-url https://git.wrede.ca/api/packages/andreas/pypi dist/*
|
||||
|
||||
- name: Create release
|
||||
uses: actions/gitea-release-action@v1
|
||||
with:
|
||||
|
||||
@@ -7,6 +7,7 @@ logfile: "/home/andreas/logs/heartbeat/heartbeat.log"
|
||||
logfmt: "msg"
|
||||
grace: 40
|
||||
interval: 10
|
||||
autosave_interval: 300 # Autosave interval in seconds (default: 5 minutes)
|
||||
|
||||
# Notification Channels - Define notification providers centrally
|
||||
# Each channel has a type (pushover, email, signal, mattermost) and type-specific configuration
|
||||
@@ -159,7 +160,7 @@ threshold_configs:
|
||||
warning: 85.0
|
||||
critical: 90.0
|
||||
rtt:
|
||||
warning: 50
|
||||
warning: 200
|
||||
critical: 250.0
|
||||
|
||||
|
||||
@@ -204,7 +205,7 @@ threshold_configs:
|
||||
critical: 2
|
||||
operator: ">="
|
||||
rtt:
|
||||
warning: 50
|
||||
warning: 200
|
||||
critical: 250.0
|
||||
|
||||
truenas_server:
|
||||
|
||||
+1
-1
@@ -14,4 +14,4 @@ Install options:
|
||||
"""
|
||||
|
||||
__all__ = ["__version__"]
|
||||
__version__ = "5.0.8"
|
||||
__version__ = "5.0.10"
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""HeartBeat Client (hbc) - System monitoring client."""
|
||||
|
||||
__version__ = "5.0.5"
|
||||
from hbd import __version__
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""Common utilities shared between hbc and hbd."""
|
||||
|
||||
__version__ = "5.0.5"
|
||||
from hbd import __version__
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""HeartBeat Daemon (hbd) - Server/daemon component."""
|
||||
|
||||
__version__ = "5.0.5"
|
||||
from hbd import __version__
|
||||
|
||||
@@ -189,7 +189,7 @@ class Connection:
|
||||
except Exception:
|
||||
pass
|
||||
self.addr = addr
|
||||
Connection.htab[addr] = self.host.nameconnection_count
|
||||
Connection.htab[addr] = self.host.name
|
||||
if self.host.isDynDns():
|
||||
Host.dnsQ.put((self.host.name, self.addr))
|
||||
return r
|
||||
|
||||
+62
-11
@@ -22,12 +22,12 @@ eventlog = notify_mod.eventlog
|
||||
|
||||
# shared runtime collections and helpers
|
||||
|
||||
def cleanup_function(config, hbdclass):
|
||||
"""This function will be executed upon program exit."""
|
||||
logger.info("Running cleanup function...")
|
||||
def save_state(config, hbdclass):
|
||||
"""Save current state to pickle file. Safe to call at any time."""
|
||||
import pickle
|
||||
import os
|
||||
|
||||
# Ensure all timer references are cleared before pickling
|
||||
# Clear timer references before pickling (they can't be serialized)
|
||||
for hostname, host in list(hbdclass.Host.hosts.items()):
|
||||
for conn_type, conn in host.connections.items():
|
||||
if hasattr(conn, 'cancel_overdue_timer'):
|
||||
@@ -40,13 +40,26 @@ def cleanup_function(config, hbdclass):
|
||||
conn.timeout_duration = None
|
||||
|
||||
pickfile = config.get("pickfile", "hbd.pickle")
|
||||
tmpfile = pickfile + ".tmp"
|
||||
|
||||
pickf = open(pickfile, "wb")
|
||||
try:
|
||||
with open(tmpfile, "wb") as pickf:
|
||||
pick = pickle.Pickler(pickf)
|
||||
pick.dump(hbdclass.Host.hosts)
|
||||
pick.dump(data.msgs)
|
||||
pickf.close()
|
||||
os.replace(tmpfile, pickfile)
|
||||
except Exception as e:
|
||||
logger.error("Failed to save state: %s", e)
|
||||
try:
|
||||
os.unlink(tmpfile)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def cleanup_function(config, hbdclass):
|
||||
"""This function will be executed upon program exit."""
|
||||
logger.info("Running cleanup function...")
|
||||
save_state(config, hbdclass)
|
||||
logger.info("Cleanup complete.")
|
||||
|
||||
|
||||
@@ -160,14 +173,20 @@ async def _run_async(config, config_path=None):
|
||||
f"Warning: Could not reset IPV6_V6ONLY not supported or dual-stack is unavailable. Error: {e}"
|
||||
)
|
||||
|
||||
# 3. Bind to all interfaces (::) on a specific port
|
||||
|
||||
# UDP server endpoint (handler wired to handle_datagram with context)
|
||||
bind_addr = ("::", config.get("hb_port", 50003))
|
||||
sock.bind(bind_addr)
|
||||
logger.info("Starting UDP server on %s:%s", *bind_addr)
|
||||
|
||||
def udp_handler(msg, addr, transport):
|
||||
# Try to enable kernel receive timestamps (Linux SO_TIMESTAMPNS).
|
||||
# If supported, read datagrams via recvmsg() so RTT uses the kernel
|
||||
# timestamp rather than the time.time() call after asyncio scheduling.
|
||||
use_kernel_ts = udp.enable_kernel_timestamps(sock)
|
||||
if use_kernel_ts:
|
||||
logger.info("SO_TIMESTAMPNS enabled: using kernel receive timestamps for RTT")
|
||||
else:
|
||||
logger.info("SO_TIMESTAMPNS not available: using time.time() for RTT")
|
||||
|
||||
def udp_handler(msg, addr, transport, recv_ts=None):
|
||||
ctx = dict(
|
||||
config=config,
|
||||
hbdclass=hbdclass,
|
||||
@@ -177,14 +196,33 @@ async def _run_async(config, config_path=None):
|
||||
threshold_checker=threshold_checker,
|
||||
DEBUG=config.get("debug", 0),
|
||||
verbose=config.get("verbose", False),
|
||||
recv_ts=recv_ts,
|
||||
)
|
||||
udp.handle_datagram(msg, addr, transport, ctx)
|
||||
|
||||
if use_kernel_ts:
|
||||
# recvmsg path: manage the socket ourselves with loop.add_reader()
|
||||
sock.setblocking(False)
|
||||
transport = udp.RecvmsgTransport(loop, sock)
|
||||
reader = udp.make_recvmsg_reader(sock, udp_handler, transport)
|
||||
loop.add_reader(sock.fileno(), reader)
|
||||
protocol = None
|
||||
else:
|
||||
transport, protocol = await loop.create_datagram_endpoint(
|
||||
lambda: udp.EchoServerProtocol(config=config, handler=udp_handler),
|
||||
sock=sock,
|
||||
)
|
||||
|
||||
# Restore connection timers for hosts loaded from pickle
|
||||
restore_ctx = dict(
|
||||
config=config,
|
||||
hbdclass=hbdclass,
|
||||
log=eventlog,
|
||||
msg_to_websockets=msg_to_websockets,
|
||||
threshold_checker=threshold_checker,
|
||||
)
|
||||
udp.restore_connection_timers(hbdclass, restore_ctx)
|
||||
|
||||
# HTTP server (asyncio-based via aiohttp)
|
||||
try:
|
||||
http_task = asyncio.create_task(
|
||||
@@ -257,6 +295,19 @@ async def _run_async(config, config_path=None):
|
||||
except Exception as e:
|
||||
logger.exception("websocket server failed to start: %s", e)
|
||||
|
||||
# Periodic autosave task
|
||||
autosave_interval = config.get("autosave_interval", 300) # default: 5 minutes
|
||||
|
||||
async def autosave_task():
|
||||
while True:
|
||||
await asyncio.sleep(autosave_interval)
|
||||
logger.debug("Autosaving state...")
|
||||
save_state(config, hbdclass)
|
||||
logger.debug("Autosave complete (%d hosts)", len(hbdclass.Host.hosts))
|
||||
|
||||
autosave = asyncio.create_task(autosave_task())
|
||||
logger.info("Autosave task started (interval: %ds)", autosave_interval)
|
||||
|
||||
# Main event loop - monitor shutdown and reload events
|
||||
try:
|
||||
while True:
|
||||
@@ -304,7 +355,7 @@ async def _run_async(config, config_path=None):
|
||||
except Exception as e:
|
||||
logger.warning("Error closing UDP transport: %s", e)
|
||||
|
||||
tasks_to_cancel = [http_task, ws_task]
|
||||
tasks_to_cancel = [http_task, ws_task, autosave]
|
||||
for task in tasks_to_cancel:
|
||||
if task:
|
||||
try:
|
||||
|
||||
@@ -459,6 +459,9 @@
|
||||
}
|
||||
}
|
||||
|
||||
// Protocol metadata fields injected by the client – never plugin metrics
|
||||
const SKIP_FIELDS = new Set(['id', 'name']);
|
||||
|
||||
function renderPluginData(data, timestamp) {
|
||||
// Check if this should be rendered as a simple table
|
||||
const pluginName = getCurrentPluginName();
|
||||
@@ -471,6 +474,7 @@
|
||||
let html = '<div class="metric-grid">';
|
||||
|
||||
for (const [key, value] of Object.entries(data)) {
|
||||
if (SKIP_FIELDS.has(key)) continue;
|
||||
// Skip nested objects for now, handle them separately
|
||||
if (typeof value === 'object' && value !== null) {
|
||||
continue;
|
||||
@@ -572,6 +576,7 @@
|
||||
// Table body
|
||||
html += '<tbody>';
|
||||
for (const [key, value] of Object.entries(data)) {
|
||||
if (SKIP_FIELDS.has(key)) continue;
|
||||
const label = formatLabel(key);
|
||||
const formattedValue = formatValue(key, value);
|
||||
const unit = getUnit(key);
|
||||
@@ -1012,12 +1017,17 @@
|
||||
}
|
||||
|
||||
function formatLabel(key) {
|
||||
if (key === 'time') return 'Collected At';
|
||||
return key
|
||||
.replace(/_/g, ' ')
|
||||
.replace(/\b\w/g, l => l.toUpperCase());
|
||||
}
|
||||
|
||||
function formatValue(key, value) {
|
||||
// Epoch timestamp field sent by the client alongside plugin data
|
||||
if (key === 'time' && typeof value === 'number') {
|
||||
return new Date(value * 1000).toLocaleString();
|
||||
}
|
||||
if (typeof value === 'number') {
|
||||
// Format percentages
|
||||
if (key.includes('percent') || key.includes('usage')) {
|
||||
|
||||
+197
-49
@@ -1,6 +1,9 @@
|
||||
"""UDP listener and datagram processing."""
|
||||
|
||||
import asyncio
|
||||
import socket
|
||||
import struct
|
||||
import time
|
||||
import zlib
|
||||
import logging
|
||||
|
||||
@@ -11,6 +14,99 @@ from . import notify as notify_mod
|
||||
logger = logging.getLogger(__name__)
|
||||
eventlog = notify_mod.eventlog
|
||||
|
||||
# SO_TIMESTAMP: kernel attaches a struct timeval to each received datagram.
|
||||
# Supported on Linux, FreeBSD, and macOS. The constant is not exposed by
|
||||
# Python's socket module on all platforms, so fall back to the Linux value (29)
|
||||
# when absent.
|
||||
_SO_TIMESTAMP = getattr(socket, 'SO_TIMESTAMP', 29)
|
||||
# struct timeval uses two native C longs: tv_sec and tv_usec
|
||||
_TIMEVAL = struct.Struct('@ll')
|
||||
|
||||
|
||||
def enable_kernel_timestamps(sock) -> bool:
|
||||
"""Try to enable SO_TIMESTAMP on *sock*.
|
||||
|
||||
Returns True if the kernel will supply receive timestamps, False otherwise
|
||||
(unsupported platform, older kernel, or insufficient permissions).
|
||||
"""
|
||||
try:
|
||||
sock.setsockopt(socket.SOL_SOCKET, _SO_TIMESTAMP, 1)
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def _extract_kernel_ts(ancdata) -> float | None:
|
||||
"""Parse recvmsg ancillary data and return the kernel receive time.
|
||||
|
||||
Returns seconds as a float, or None if no SO_TIMESTAMP cmsg is present.
|
||||
"""
|
||||
for cmsg_level, cmsg_type, cmsg_data in ancdata:
|
||||
if cmsg_level == socket.SOL_SOCKET and cmsg_type == _SO_TIMESTAMP:
|
||||
if len(cmsg_data) >= _TIMEVAL.size:
|
||||
sec, usec = _TIMEVAL.unpack_from(cmsg_data)
|
||||
return sec + usec * 1e-6
|
||||
return None
|
||||
|
||||
|
||||
class RecvmsgTransport:
|
||||
"""Thin wrapper used when SO_TIMESTAMP is active (add_reader path).
|
||||
|
||||
Exposes the same sendto() / close() interface as asyncio's DatagramTransport
|
||||
so the rest of the code does not need to know which path is in use.
|
||||
"""
|
||||
def __init__(self, loop, sock):
|
||||
self._loop = loop
|
||||
self._sock = sock
|
||||
|
||||
def sendto(self, data, addr):
|
||||
try:
|
||||
self._sock.sendto(data, addr)
|
||||
except Exception as e:
|
||||
logger.debug("sendto failed: %s", e)
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self._loop.remove_reader(self._sock.fileno())
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
self._sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def make_recvmsg_reader(sock, handler, transport):
|
||||
"""Return a callback suitable for loop.add_reader().
|
||||
|
||||
Reads one datagram per call using recvmsg() so that kernel timestamps in
|
||||
the ancillary data are accessible. Falls back to time.time() if the
|
||||
cmsg is missing.
|
||||
|
||||
handler(msg, addr, transport, kernel_ts) – same signature as udp_handler
|
||||
in main.py with the optional kernel_ts argument.
|
||||
"""
|
||||
BUFSIZE = 65536
|
||||
ANCBUFSIZE = 128 # enough for one struct timespec cmsg
|
||||
|
||||
def _read():
|
||||
try:
|
||||
data, ancdata, _, addr = sock.recvmsg(BUFSIZE, ANCBUFSIZE)
|
||||
except BlockingIOError:
|
||||
return
|
||||
except OSError as e:
|
||||
logger.warning("recvmsg error: %s", e)
|
||||
return
|
||||
try:
|
||||
kernel_ts = _extract_kernel_ts(ancdata)
|
||||
msg = parse_message(data)
|
||||
if msg:
|
||||
handler(msg, addr, transport, kernel_ts)
|
||||
except Exception:
|
||||
logger.exception("Error processing datagram from %s", addr)
|
||||
|
||||
return _read
|
||||
|
||||
|
||||
class EchoServerProtocol(asyncio.DatagramProtocol):
|
||||
def __init__(self, config=None, handler=None):
|
||||
@@ -61,6 +157,100 @@ def dicttos(ID, d):
|
||||
return opk
|
||||
|
||||
|
||||
DROPOVERDUE = 7 * 24 * 3600 # seconds before an overdue host becomes UNKNOWN
|
||||
|
||||
|
||||
def _make_timer_callbacks(uname, host, watchhosts, ctx):
|
||||
"""Return (on_overdue, on_unknown) async callbacks for connection timer logic.
|
||||
|
||||
Captured values are bound at call time so callbacks are safe to use in loops.
|
||||
"""
|
||||
msg_to_websockets = ctx.get("msg_to_websockets")
|
||||
threshold_checker = ctx.get("threshold_checker")
|
||||
cfg = ctx.get("config", {})
|
||||
|
||||
async def on_unknown(connection):
|
||||
connection.newstate(connection.__class__.UNKNOWN, connection.lastbeat)
|
||||
if msg_to_websockets:
|
||||
msg_to_websockets("host", host.stateinfo())
|
||||
|
||||
async def on_overdue(connection):
|
||||
if connection.getstate() != connection.__class__.UP:
|
||||
return
|
||||
now = time.time()
|
||||
connection.newstate(connection.__class__.OVERDUE, now, cfg.get("grace", 2))
|
||||
msg = f"{connection.afam} overdue"
|
||||
eventlog(uname, "CRITICAL" if uname in watchhosts else "WARNING", msg)
|
||||
if uname in watchhosts:
|
||||
notify_mod.pushmsg_for_host(uname, f"{uname} {msg}")
|
||||
if threshold_checker:
|
||||
threshold_checker.check_value(
|
||||
host_name=uname,
|
||||
metric_path="rtt",
|
||||
value=float("inf"),
|
||||
alert_states=host.alert_states,
|
||||
)
|
||||
if msg_to_websockets:
|
||||
msg_to_websockets("host", host.stateinfo())
|
||||
connection.reset_overdue_timer(DROPOVERDUE, on_unknown)
|
||||
|
||||
return on_overdue, on_unknown
|
||||
|
||||
|
||||
def restore_connection_timers(hbdclass, ctx):
|
||||
"""Restore overdue timers for all loaded connections after a pickle restore.
|
||||
|
||||
For UP connections, the remaining time until overdue is calculated from
|
||||
lastbeat so that clients that vanished during hbd's downtime are detected.
|
||||
For OVERDUE connections, the UNKNOWN drop timer is restored.
|
||||
"""
|
||||
now = time.time()
|
||||
cfg = ctx.get("config", {})
|
||||
grace = cfg.get("grace", 2)
|
||||
from . import config as config_mod
|
||||
watchhosts = config_mod.get_watchhosts(cfg)
|
||||
|
||||
restored = 0
|
||||
for uname, host in list(hbdclass.Host.hosts.items()):
|
||||
interval = host.interval
|
||||
for afam, conn in list(host.connections.items()):
|
||||
state = conn.getstate()
|
||||
if state == hbdclass.Connection.DOWN:
|
||||
continue
|
||||
|
||||
on_overdue, on_unknown = _make_timer_callbacks(uname, host, watchhosts, ctx)
|
||||
|
||||
if state == hbdclass.Connection.UP and interval > 0:
|
||||
elapsed = now - conn.lastbeat
|
||||
remaining = max(1.0, (interval + grace) - elapsed)
|
||||
conn.reset_overdue_timer(remaining, on_overdue)
|
||||
logger.debug(
|
||||
"Restored UP timer %s/%s: %.0fs remaining (elapsed %.0fs)",
|
||||
uname, afam, remaining, elapsed,
|
||||
)
|
||||
restored += 1
|
||||
|
||||
elif state == hbdclass.Connection.OVERDUE:
|
||||
elapsed_overdue = now - conn.statetime
|
||||
remaining = DROPOVERDUE - elapsed_overdue
|
||||
if remaining <= 1:
|
||||
# Already past the drop window — mark UNKNOWN immediately
|
||||
conn.newstate(hbdclass.Connection.UNKNOWN, conn.lastbeat)
|
||||
logger.info(
|
||||
"Marking %s/%s UNKNOWN (overdue %.1f days)",
|
||||
uname, afam, elapsed_overdue / 86400,
|
||||
)
|
||||
else:
|
||||
conn.reset_overdue_timer(remaining, on_unknown)
|
||||
logger.debug(
|
||||
"Restored OVERDUE timer %s/%s: %.0fs remaining",
|
||||
uname, afam, remaining,
|
||||
)
|
||||
restored += 1
|
||||
|
||||
logger.info("Restored timers for %d connection(s)", restored)
|
||||
|
||||
|
||||
def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
"""Handle a parsed datagram message.
|
||||
|
||||
@@ -74,7 +264,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
"""
|
||||
if not msg:
|
||||
return
|
||||
now = __import__("time").time()
|
||||
now = ctx.get("recv_ts") or time.time()
|
||||
|
||||
# Log message to journal
|
||||
msg_journal = ctx.get("msg_journal")
|
||||
@@ -126,7 +316,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
if msg.get("ID") == "HTB":
|
||||
host.doesack = msg.get("acks", -1)
|
||||
# send ACK back
|
||||
rmsg = {"time": __import__("time").time()}
|
||||
rmsg = {"time": time.time()}
|
||||
opkt = dicttos("ACK", rmsg)
|
||||
try:
|
||||
transport.sendto(opkt, addr)
|
||||
@@ -138,8 +328,9 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
# Handle plugin data message
|
||||
plugin_name = msg.get("plugin")
|
||||
if plugin_name:
|
||||
# Extract all fields except ID and plugin name
|
||||
plugin_data = {k: v for k, v in msg.items() if k not in ["ID", "plugin"]}
|
||||
# Extract plugin fields, dropping protocol metadata fields
|
||||
plugin_data = {k: v for k, v in msg.items()
|
||||
if k not in ("ID", "plugin", "id", "name")}
|
||||
# Store plugin data with timestamp
|
||||
host.add_plugin_data(plugin_name, plugin_data, timestamp=now)
|
||||
if DEBUG > 1:
|
||||
@@ -229,51 +420,8 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
# Reset overdue timer on every heartbeat
|
||||
if interval > 0 and conn.getstate() != hbdcls.Connection.DOWN:
|
||||
grace = cfg.get("grace", 2)
|
||||
timeout_seconds = (interval + grace) if interval > 0 else 30
|
||||
|
||||
# Create callback for timer expiration
|
||||
async def on_overdue(connection):
|
||||
"""Called when connection timer expires (no heartbeat received)."""
|
||||
import time
|
||||
now = time.time()
|
||||
|
||||
# Only mark as overdue if still in UP state (not already marked)
|
||||
if connection.getstate() == hbdcls.Connection.UP:
|
||||
connection.newstate(hbdcls.Connection.OVERDUE, now, cfg.get("grace", 2))
|
||||
|
||||
msg = f"{connection.afam} overdue"
|
||||
eventlog(uname, "CRITICAL" if uname in watchhosts else "WARNING", msg)
|
||||
|
||||
if uname in watchhosts:
|
||||
notify_mod.pushmsg_for_host(uname, f"{uname} {msg}")
|
||||
|
||||
# Check RTT thresholds with infinite RTT for overdue hosts
|
||||
threshold_checker = ctx.get("threshold_checker")
|
||||
if threshold_checker:
|
||||
metric_path = "rtt"
|
||||
threshold_checker.check_value(
|
||||
host_name=uname,
|
||||
metric_path=metric_path,
|
||||
value=float('inf'),
|
||||
alert_states=host.alert_states
|
||||
)
|
||||
|
||||
# Notify websockets
|
||||
if msg_to_websockets:
|
||||
msg_to_websockets("host", host.stateinfo())
|
||||
|
||||
# Set a longer timer for marking as UNKNOWN (7 days)
|
||||
DROPOVERDUE = 7 * 24 * 3600
|
||||
|
||||
async def on_unknown(connection):
|
||||
"""Mark connection as unknown after extended absence."""
|
||||
connection.newstate(hbdcls.Connection.UNKNOWN, connection.lastbeat)
|
||||
if msg_to_websockets:
|
||||
msg_to_websockets("host", host.stateinfo())
|
||||
|
||||
connection.reset_overdue_timer(DROPOVERDUE, on_unknown)
|
||||
|
||||
# Reset the timer
|
||||
timeout_seconds = interval + grace
|
||||
on_overdue, _ = _make_timer_callbacks(uname, host, watchhosts, ctx)
|
||||
conn.reset_overdue_timer(timeout_seconds, on_overdue)
|
||||
|
||||
# Check RTT thresholds using the threshold checker
|
||||
|
||||
+28
-25
@@ -65,11 +65,7 @@ async def _handler(websocket, path=None):
|
||||
logger.exception("WebSocket handler exception from %s: %s", remote_address, e)
|
||||
finally:
|
||||
logger.debug("Removing WebSocket connection from %s", remote_address)
|
||||
try:
|
||||
_connections.remove(websocket)
|
||||
except KeyError:
|
||||
pass
|
||||
await websocket.wait_closed()
|
||||
_connections.discard(websocket)
|
||||
|
||||
|
||||
async def start(
|
||||
@@ -93,33 +89,40 @@ async def start(
|
||||
_verbose = config.get("verbose", False),
|
||||
_debug = config.get("debug", 0),
|
||||
|
||||
servers = []
|
||||
# plain WebSocket
|
||||
websockets_logger = logging.getLogger("websockets.server")
|
||||
#if _debug > 2:
|
||||
# websockets_logger.setLevel(logging.DEBUG)
|
||||
#else:
|
||||
# websockets_logger.setLevel(logging.INFO)
|
||||
# regular WebSocket
|
||||
ws_server = websockets.serve(_handler, host, ws_port) # , subprotocols=["hbd"])
|
||||
servers.append(ws_server)
|
||||
# secure WebSocket (optional)
|
||||
# Start servers and keep the server objects for clean shutdown
|
||||
running_servers = []
|
||||
ws_server = await websockets.serve(_handler, host, ws_port)
|
||||
running_servers.append(ws_server)
|
||||
if wss_port and ssl_context:
|
||||
wss_server = websockets.serve(
|
||||
_handler, host, wss_port, ssl=ssl_context
|
||||
) # , subprotocols=["hbd"])
|
||||
servers.append(wss_server)
|
||||
|
||||
# await starting of all servers
|
||||
for srv in servers:
|
||||
await srv
|
||||
wss_server = await websockets.serve(_handler, host, wss_port, ssl=ssl_context)
|
||||
running_servers.append(wss_server)
|
||||
|
||||
logger.info(
|
||||
"WebSocket server(s) started on port %s (wss %s)", ws_port, wss_port
|
||||
)
|
||||
|
||||
# block forever (until loop is stopped or cancelled)
|
||||
try:
|
||||
# Block until cancelled
|
||||
await asyncio.Future()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
# Close all active browser connections so their handler coroutines exit
|
||||
active = list(_connections)
|
||||
if active:
|
||||
logger.info("Closing %d active WebSocket connection(s)...", len(active))
|
||||
await asyncio.gather(
|
||||
*[ws.close() for ws in active],
|
||||
return_exceptions=True,
|
||||
)
|
||||
# Stop the listening servers and wait for all handlers to finish
|
||||
for srv in running_servers:
|
||||
srv.close()
|
||||
await asyncio.gather(
|
||||
*[srv.wait_closed() for srv in running_servers],
|
||||
return_exceptions=True,
|
||||
)
|
||||
logger.info("WebSocket server(s) stopped")
|
||||
|
||||
|
||||
def broadcast(typ: str, data) -> bool:
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "hbd"
|
||||
version = "5.0.8"
|
||||
version = "5.0.10"
|
||||
description = "Heartbeat monitoring system — client (hbc) and server (hbd)"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
|
||||
Reference in New Issue
Block a user