Compare commits

..

7 Commits

Author SHA1 Message Date
Andreas Wrede 5edbaacf81 version 5.0.9
Release / release (push) Successful in 15s
2026-04-07 11:02:19 -04:00
Andreas Wrede 8421f472f2 there is only one __version__ 2026-04-07 11:00:22 -04:00
Andreas Wrede 51f9bdc2b5 use SO_TIMESTAMP, works on Linux, FreeBSD and macOS 2026-04-07 10:46:54 -04:00
andreas 02bc42fbf0 get rtt time differently 2026-04-07 10:40:12 -04:00
andreas 832a8b0bda save state to pickle file, restart timers on restart 2026-04-06 17:24:59 -04:00
Andreas Wrede 57c4b86430 version 5.0.8
Release / release (push) Successful in 6s
2026-04-04 15:18:12 -04:00
Andreas Wrede 43fad7beed fix release.yml for freebsd runner 2026-04-04 15:11:56 -04:00
12 changed files with 328 additions and 109 deletions
+10 -4
View File
@@ -11,15 +11,21 @@ jobs:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v4
# - name: Set up Python
# uses: actions/setup-python@v5
# with:
# python-version: '3.11'
- name: Set up Python - name: Set up Python
uses: actions/setup-python@v5 # Use a generic run step for FreeBSD if actions/setup-python
with: # fails in restricted environments.
python-version: '3.11' run: |
python3 --version
python3 -m ensurepip --upgrade
- name: Install build tools - name: Install build tools
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install build twine # pip install build twine
- name: Build package - name: Build package
run: python -m build run: python -m build
+3 -2
View File
@@ -7,6 +7,7 @@ logfile: "/home/andreas/logs/heartbeat/heartbeat.log"
logfmt: "msg" logfmt: "msg"
grace: 40 grace: 40
interval: 10 interval: 10
autosave_interval: 300 # Autosave interval in seconds (default: 5 minutes)
# Notification Channels - Define notification providers centrally # Notification Channels - Define notification providers centrally
# Each channel has a type (pushover, email, signal, mattermost) and type-specific configuration # Each channel has a type (pushover, email, signal, mattermost) and type-specific configuration
@@ -159,7 +160,7 @@ threshold_configs:
warning: 85.0 warning: 85.0
critical: 90.0 critical: 90.0
rtt: rtt:
warning: 50 warning: 200
critical: 250.0 critical: 250.0
@@ -204,7 +205,7 @@ threshold_configs:
critical: 2 critical: 2
operator: ">=" operator: ">="
rtt: rtt:
warning: 50 warning: 200
critical: 250.0 critical: 250.0
truenas_server: truenas_server:
+1 -1
View File
@@ -14,4 +14,4 @@ Install options:
""" """
__all__ = ["__version__"] __all__ = ["__version__"]
__version__ = "5.0.7" __version__ = "5.0.9"
+1 -1
View File
@@ -1,3 +1,3 @@
"""HeartBeat Client (hbc) - System monitoring client.""" """HeartBeat Client (hbc) - System monitoring client."""
__version__ = "5.0.5" from hbd import __version__
+1 -1
View File
@@ -1,3 +1,3 @@
"""Common utilities shared between hbc and hbd.""" """Common utilities shared between hbc and hbd."""
__version__ = "5.0.5" from hbd import __version__
+1 -1
View File
@@ -1,3 +1,3 @@
"""HeartBeat Daemon (hbd) - Server/daemon component.""" """HeartBeat Daemon (hbd) - Server/daemon component."""
__version__ = "5.0.5" from hbd import __version__
+1 -1
View File
@@ -189,7 +189,7 @@ class Connection:
except Exception: except Exception:
pass pass
self.addr = addr self.addr = addr
Connection.htab[addr] = self.host.nameconnection_count Connection.htab[addr] = self.host.name
if self.host.isDynDns(): if self.host.isDynDns():
Host.dnsQ.put((self.host.name, self.addr)) Host.dnsQ.put((self.host.name, self.addr))
return r return r
+68 -17
View File
@@ -22,12 +22,12 @@ eventlog = notify_mod.eventlog
# shared runtime collections and helpers # shared runtime collections and helpers
def cleanup_function(config, hbdclass): def save_state(config, hbdclass):
"""This function will be executed upon program exit.""" """Save current state to pickle file. Safe to call at any time."""
logger.info("Running cleanup function...")
import pickle import pickle
import os
# Ensure all timer references are cleared before pickling # Clear timer references before pickling (they can't be serialized)
for hostname, host in list(hbdclass.Host.hosts.items()): for hostname, host in list(hbdclass.Host.hosts.items()):
for conn_type, conn in host.connections.items(): for conn_type, conn in host.connections.items():
if hasattr(conn, 'cancel_overdue_timer'): if hasattr(conn, 'cancel_overdue_timer'):
@@ -40,13 +40,26 @@ def cleanup_function(config, hbdclass):
conn.timeout_duration = None conn.timeout_duration = None
pickfile = config.get("pickfile", "hbd.pickle") pickfile = config.get("pickfile", "hbd.pickle")
tmpfile = pickfile + ".tmp"
pickf = open(pickfile, "wb") try:
pick = pickle.Pickler(pickf) with open(tmpfile, "wb") as pickf:
pick.dump(hbdclass.Host.hosts) pick = pickle.Pickler(pickf)
pick.dump(data.msgs) pick.dump(hbdclass.Host.hosts)
pickf.close() pick.dump(data.msgs)
os.replace(tmpfile, pickfile)
except Exception as e:
logger.error("Failed to save state: %s", e)
try:
os.unlink(tmpfile)
except Exception:
pass
def cleanup_function(config, hbdclass):
"""This function will be executed upon program exit."""
logger.info("Running cleanup function...")
save_state(config, hbdclass)
logger.info("Cleanup complete.") logger.info("Cleanup complete.")
@@ -160,14 +173,20 @@ async def _run_async(config, config_path=None):
f"Warning: Could not reset IPV6_V6ONLY not supported or dual-stack is unavailable. Error: {e}" f"Warning: Could not reset IPV6_V6ONLY not supported or dual-stack is unavailable. Error: {e}"
) )
# 3. Bind to all interfaces (::) on a specific port
# UDP server endpoint (handler wired to handle_datagram with context)
bind_addr = ("::", config.get("hb_port", 50003)) bind_addr = ("::", config.get("hb_port", 50003))
sock.bind(bind_addr) sock.bind(bind_addr)
logger.info("Starting UDP server on %s:%s", *bind_addr) logger.info("Starting UDP server on %s:%s", *bind_addr)
def udp_handler(msg, addr, transport): # Try to enable kernel receive timestamps (Linux SO_TIMESTAMPNS).
# If supported, read datagrams via recvmsg() so RTT uses the kernel
# timestamp rather than the time.time() call after asyncio scheduling.
use_kernel_ts = udp.enable_kernel_timestamps(sock)
if use_kernel_ts:
logger.info("SO_TIMESTAMPNS enabled: using kernel receive timestamps for RTT")
else:
logger.info("SO_TIMESTAMPNS not available: using time.time() for RTT")
def udp_handler(msg, addr, transport, recv_ts=None):
ctx = dict( ctx = dict(
config=config, config=config,
hbdclass=hbdclass, hbdclass=hbdclass,
@@ -177,13 +196,32 @@ async def _run_async(config, config_path=None):
threshold_checker=threshold_checker, threshold_checker=threshold_checker,
DEBUG=config.get("debug", 0), DEBUG=config.get("debug", 0),
verbose=config.get("verbose", False), verbose=config.get("verbose", False),
recv_ts=recv_ts,
) )
udp.handle_datagram(msg, addr, transport, ctx) udp.handle_datagram(msg, addr, transport, ctx)
transport, protocol = await loop.create_datagram_endpoint( if use_kernel_ts:
lambda: udp.EchoServerProtocol(config=config, handler=udp_handler), # recvmsg path: manage the socket ourselves with loop.add_reader()
sock=sock, sock.setblocking(False)
transport = udp.RecvmsgTransport(loop, sock)
reader = udp.make_recvmsg_reader(sock, udp_handler, transport)
loop.add_reader(sock.fileno(), reader)
protocol = None
else:
transport, protocol = await loop.create_datagram_endpoint(
lambda: udp.EchoServerProtocol(config=config, handler=udp_handler),
sock=sock,
)
# Restore connection timers for hosts loaded from pickle
restore_ctx = dict(
config=config,
hbdclass=hbdclass,
log=eventlog,
msg_to_websockets=msg_to_websockets,
threshold_checker=threshold_checker,
) )
udp.restore_connection_timers(hbdclass, restore_ctx)
# HTTP server (asyncio-based via aiohttp) # HTTP server (asyncio-based via aiohttp)
try: try:
@@ -257,6 +295,19 @@ async def _run_async(config, config_path=None):
except Exception as e: except Exception as e:
logger.exception("websocket server failed to start: %s", e) logger.exception("websocket server failed to start: %s", e)
# Periodic autosave task
autosave_interval = config.get("autosave_interval", 300) # default: 5 minutes
async def autosave_task():
while True:
await asyncio.sleep(autosave_interval)
logger.debug("Autosaving state...")
save_state(config, hbdclass)
logger.debug("Autosave complete (%d hosts)", len(hbdclass.Host.hosts))
autosave = asyncio.create_task(autosave_task())
logger.info("Autosave task started (interval: %ds)", autosave_interval)
# Main event loop - monitor shutdown and reload events # Main event loop - monitor shutdown and reload events
try: try:
while True: while True:
@@ -304,7 +355,7 @@ async def _run_async(config, config_path=None):
except Exception as e: except Exception as e:
logger.warning("Error closing UDP transport: %s", e) logger.warning("Error closing UDP transport: %s", e)
tasks_to_cancel = [http_task, ws_task] tasks_to_cancel = [http_task, ws_task, autosave]
for task in tasks_to_cancel: for task in tasks_to_cancel:
if task: if task:
try: try:
+15 -5
View File
@@ -459,23 +459,27 @@
} }
} }
// Protocol metadata fields injected by the client never plugin metrics
const SKIP_FIELDS = new Set(['id', 'name']);
function renderPluginData(data, timestamp) { function renderPluginData(data, timestamp) {
// Check if this should be rendered as a simple table // Check if this should be rendered as a simple table
const pluginName = getCurrentPluginName(); const pluginName = getCurrentPluginName();
const simplePlugins = ['os_info', 'cpu_monitor', 'memory_monitor', 'nagios_runner']; const simplePlugins = ['os_info', 'cpu_monitor', 'memory_monitor', 'nagios_runner'];
if (simplePlugins.includes(pluginName) && isSimpleKeyValueData(data)) { if (simplePlugins.includes(pluginName) && isSimpleKeyValueData(data)) {
return renderSimpleDataTable(data, timestamp); return renderSimpleDataTable(data, timestamp);
} }
let html = '<div class="metric-grid">'; let html = '<div class="metric-grid">';
for (const [key, value] of Object.entries(data)) { for (const [key, value] of Object.entries(data)) {
if (SKIP_FIELDS.has(key)) continue;
// Skip nested objects for now, handle them separately // Skip nested objects for now, handle them separately
if (typeof value === 'object' && value !== null) { if (typeof value === 'object' && value !== null) {
continue; continue;
} }
html += renderMetric(key, value); html += renderMetric(key, value);
} }
@@ -572,10 +576,11 @@
// Table body // Table body
html += '<tbody>'; html += '<tbody>';
for (const [key, value] of Object.entries(data)) { for (const [key, value] of Object.entries(data)) {
if (SKIP_FIELDS.has(key)) continue;
const label = formatLabel(key); const label = formatLabel(key);
const formattedValue = formatValue(key, value); const formattedValue = formatValue(key, value);
const unit = getUnit(key); const unit = getUnit(key);
html += '<tr>'; html += '<tr>';
html += `<td class="name">${label}</td>`; html += `<td class="name">${label}</td>`;
html += `<td class="value">${formattedValue}${unit ? ' ' + unit : ''}</td>`; html += `<td class="value">${formattedValue}${unit ? ' ' + unit : ''}</td>`;
@@ -1012,12 +1017,17 @@
} }
function formatLabel(key) { function formatLabel(key) {
if (key === 'time') return 'Collected At';
return key return key
.replace(/_/g, ' ') .replace(/_/g, ' ')
.replace(/\b\w/g, l => l.toUpperCase()); .replace(/\b\w/g, l => l.toUpperCase());
} }
function formatValue(key, value) { function formatValue(key, value) {
// Epoch timestamp field sent by the client alongside plugin data
if (key === 'time' && typeof value === 'number') {
return new Date(value * 1000).toLocaleString();
}
if (typeof value === 'number') { if (typeof value === 'number') {
// Format percentages // Format percentages
if (key.includes('percent') || key.includes('usage')) { if (key.includes('percent') || key.includes('usage')) {
+197 -49
View File
@@ -1,6 +1,9 @@
"""UDP listener and datagram processing.""" """UDP listener and datagram processing."""
import asyncio import asyncio
import socket
import struct
import time
import zlib import zlib
import logging import logging
@@ -11,6 +14,99 @@ from . import notify as notify_mod
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
eventlog = notify_mod.eventlog eventlog = notify_mod.eventlog
# SO_TIMESTAMP: kernel attaches a struct timeval to each received datagram.
# Supported on Linux, FreeBSD, and macOS. The constant is not exposed by
# Python's socket module on all platforms, so fall back to the Linux value (29)
# when absent.
_SO_TIMESTAMP = getattr(socket, 'SO_TIMESTAMP', 29)
# struct timeval uses two native C longs: tv_sec and tv_usec
_TIMEVAL = struct.Struct('@ll')
def enable_kernel_timestamps(sock) -> bool:
"""Try to enable SO_TIMESTAMP on *sock*.
Returns True if the kernel will supply receive timestamps, False otherwise
(unsupported platform, older kernel, or insufficient permissions).
"""
try:
sock.setsockopt(socket.SOL_SOCKET, _SO_TIMESTAMP, 1)
return True
except OSError:
return False
def _extract_kernel_ts(ancdata) -> float | None:
"""Parse recvmsg ancillary data and return the kernel receive time.
Returns seconds as a float, or None if no SO_TIMESTAMP cmsg is present.
"""
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if cmsg_level == socket.SOL_SOCKET and cmsg_type == _SO_TIMESTAMP:
if len(cmsg_data) >= _TIMEVAL.size:
sec, usec = _TIMEVAL.unpack_from(cmsg_data)
return sec + usec * 1e-6
return None
class RecvmsgTransport:
"""Thin wrapper used when SO_TIMESTAMP is active (add_reader path).
Exposes the same sendto() / close() interface as asyncio's DatagramTransport
so the rest of the code does not need to know which path is in use.
"""
def __init__(self, loop, sock):
self._loop = loop
self._sock = sock
def sendto(self, data, addr):
try:
self._sock.sendto(data, addr)
except Exception as e:
logger.debug("sendto failed: %s", e)
def close(self):
try:
self._loop.remove_reader(self._sock.fileno())
except Exception:
pass
try:
self._sock.close()
except Exception:
pass
def make_recvmsg_reader(sock, handler, transport):
"""Return a callback suitable for loop.add_reader().
Reads one datagram per call using recvmsg() so that kernel timestamps in
the ancillary data are accessible. Falls back to time.time() if the
cmsg is missing.
handler(msg, addr, transport, kernel_ts) same signature as udp_handler
in main.py with the optional kernel_ts argument.
"""
BUFSIZE = 65536
ANCBUFSIZE = 128 # enough for one struct timespec cmsg
def _read():
try:
data, ancdata, _, addr = sock.recvmsg(BUFSIZE, ANCBUFSIZE)
except BlockingIOError:
return
except OSError as e:
logger.warning("recvmsg error: %s", e)
return
try:
kernel_ts = _extract_kernel_ts(ancdata)
msg = parse_message(data)
if msg:
handler(msg, addr, transport, kernel_ts)
except Exception:
logger.exception("Error processing datagram from %s", addr)
return _read
class EchoServerProtocol(asyncio.DatagramProtocol): class EchoServerProtocol(asyncio.DatagramProtocol):
def __init__(self, config=None, handler=None): def __init__(self, config=None, handler=None):
@@ -61,6 +157,100 @@ def dicttos(ID, d):
return opk return opk
DROPOVERDUE = 7 * 24 * 3600 # seconds before an overdue host becomes UNKNOWN
def _make_timer_callbacks(uname, host, watchhosts, ctx):
"""Return (on_overdue, on_unknown) async callbacks for connection timer logic.
Captured values are bound at call time so callbacks are safe to use in loops.
"""
msg_to_websockets = ctx.get("msg_to_websockets")
threshold_checker = ctx.get("threshold_checker")
cfg = ctx.get("config", {})
async def on_unknown(connection):
connection.newstate(connection.__class__.UNKNOWN, connection.lastbeat)
if msg_to_websockets:
msg_to_websockets("host", host.stateinfo())
async def on_overdue(connection):
if connection.getstate() != connection.__class__.UP:
return
now = time.time()
connection.newstate(connection.__class__.OVERDUE, now, cfg.get("grace", 2))
msg = f"{connection.afam} overdue"
eventlog(uname, "CRITICAL" if uname in watchhosts else "WARNING", msg)
if uname in watchhosts:
notify_mod.pushmsg_for_host(uname, f"{uname} {msg}")
if threshold_checker:
threshold_checker.check_value(
host_name=uname,
metric_path="rtt",
value=float("inf"),
alert_states=host.alert_states,
)
if msg_to_websockets:
msg_to_websockets("host", host.stateinfo())
connection.reset_overdue_timer(DROPOVERDUE, on_unknown)
return on_overdue, on_unknown
def restore_connection_timers(hbdclass, ctx):
"""Restore overdue timers for all loaded connections after a pickle restore.
For UP connections, the remaining time until overdue is calculated from
lastbeat so that clients that vanished during hbd's downtime are detected.
For OVERDUE connections, the UNKNOWN drop timer is restored.
"""
now = time.time()
cfg = ctx.get("config", {})
grace = cfg.get("grace", 2)
from . import config as config_mod
watchhosts = config_mod.get_watchhosts(cfg)
restored = 0
for uname, host in list(hbdclass.Host.hosts.items()):
interval = host.interval
for afam, conn in list(host.connections.items()):
state = conn.getstate()
if state == hbdclass.Connection.DOWN:
continue
on_overdue, on_unknown = _make_timer_callbacks(uname, host, watchhosts, ctx)
if state == hbdclass.Connection.UP and interval > 0:
elapsed = now - conn.lastbeat
remaining = max(1.0, (interval + grace) - elapsed)
conn.reset_overdue_timer(remaining, on_overdue)
logger.debug(
"Restored UP timer %s/%s: %.0fs remaining (elapsed %.0fs)",
uname, afam, remaining, elapsed,
)
restored += 1
elif state == hbdclass.Connection.OVERDUE:
elapsed_overdue = now - conn.statetime
remaining = DROPOVERDUE - elapsed_overdue
if remaining <= 1:
# Already past the drop window — mark UNKNOWN immediately
conn.newstate(hbdclass.Connection.UNKNOWN, conn.lastbeat)
logger.info(
"Marking %s/%s UNKNOWN (overdue %.1f days)",
uname, afam, elapsed_overdue / 86400,
)
else:
conn.reset_overdue_timer(remaining, on_unknown)
logger.debug(
"Restored OVERDUE timer %s/%s: %.0fs remaining",
uname, afam, remaining,
)
restored += 1
logger.info("Restored timers for %d connection(s)", restored)
def handle_datagram(msg: dict, addr, transport, ctx: dict): def handle_datagram(msg: dict, addr, transport, ctx: dict):
"""Handle a parsed datagram message. """Handle a parsed datagram message.
@@ -74,7 +264,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
""" """
if not msg: if not msg:
return return
now = __import__("time").time() now = ctx.get("recv_ts") or time.time()
# Log message to journal # Log message to journal
msg_journal = ctx.get("msg_journal") msg_journal = ctx.get("msg_journal")
@@ -126,7 +316,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
if msg.get("ID") == "HTB": if msg.get("ID") == "HTB":
host.doesack = msg.get("acks", -1) host.doesack = msg.get("acks", -1)
# send ACK back # send ACK back
rmsg = {"time": __import__("time").time()} rmsg = {"time": time.time()}
opkt = dicttos("ACK", rmsg) opkt = dicttos("ACK", rmsg)
try: try:
transport.sendto(opkt, addr) transport.sendto(opkt, addr)
@@ -138,8 +328,9 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
# Handle plugin data message # Handle plugin data message
plugin_name = msg.get("plugin") plugin_name = msg.get("plugin")
if plugin_name: if plugin_name:
# Extract all fields except ID and plugin name # Extract plugin fields, dropping protocol metadata fields
plugin_data = {k: v for k, v in msg.items() if k not in ["ID", "plugin"]} plugin_data = {k: v for k, v in msg.items()
if k not in ("ID", "plugin", "id", "name")}
# Store plugin data with timestamp # Store plugin data with timestamp
host.add_plugin_data(plugin_name, plugin_data, timestamp=now) host.add_plugin_data(plugin_name, plugin_data, timestamp=now)
if DEBUG > 1: if DEBUG > 1:
@@ -229,51 +420,8 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
# Reset overdue timer on every heartbeat # Reset overdue timer on every heartbeat
if interval > 0 and conn.getstate() != hbdcls.Connection.DOWN: if interval > 0 and conn.getstate() != hbdcls.Connection.DOWN:
grace = cfg.get("grace", 2) grace = cfg.get("grace", 2)
timeout_seconds = (interval + grace) if interval > 0 else 30 timeout_seconds = interval + grace
on_overdue, _ = _make_timer_callbacks(uname, host, watchhosts, ctx)
# Create callback for timer expiration
async def on_overdue(connection):
"""Called when connection timer expires (no heartbeat received)."""
import time
now = time.time()
# Only mark as overdue if still in UP state (not already marked)
if connection.getstate() == hbdcls.Connection.UP:
connection.newstate(hbdcls.Connection.OVERDUE, now, cfg.get("grace", 2))
msg = f"{connection.afam} overdue"
eventlog(uname, "CRITICAL" if uname in watchhosts else "WARNING", msg)
if uname in watchhosts:
notify_mod.pushmsg_for_host(uname, f"{uname} {msg}")
# Check RTT thresholds with infinite RTT for overdue hosts
threshold_checker = ctx.get("threshold_checker")
if threshold_checker:
metric_path = "rtt"
threshold_checker.check_value(
host_name=uname,
metric_path=metric_path,
value=float('inf'),
alert_states=host.alert_states
)
# Notify websockets
if msg_to_websockets:
msg_to_websockets("host", host.stateinfo())
# Set a longer timer for marking as UNKNOWN (7 days)
DROPOVERDUE = 7 * 24 * 3600
async def on_unknown(connection):
"""Mark connection as unknown after extended absence."""
connection.newstate(hbdcls.Connection.UNKNOWN, connection.lastbeat)
if msg_to_websockets:
msg_to_websockets("host", host.stateinfo())
connection.reset_overdue_timer(DROPOVERDUE, on_unknown)
# Reset the timer
conn.reset_overdue_timer(timeout_seconds, on_overdue) conn.reset_overdue_timer(timeout_seconds, on_overdue)
# Check RTT thresholds using the threshold checker # Check RTT thresholds using the threshold checker
+29 -26
View File
@@ -65,11 +65,7 @@ async def _handler(websocket, path=None):
logger.exception("WebSocket handler exception from %s: %s", remote_address, e) logger.exception("WebSocket handler exception from %s: %s", remote_address, e)
finally: finally:
logger.debug("Removing WebSocket connection from %s", remote_address) logger.debug("Removing WebSocket connection from %s", remote_address)
try: _connections.discard(websocket)
_connections.remove(websocket)
except KeyError:
pass
await websocket.wait_closed()
async def start( async def start(
@@ -93,33 +89,40 @@ async def start(
_verbose = config.get("verbose", False), _verbose = config.get("verbose", False),
_debug = config.get("debug", 0), _debug = config.get("debug", 0),
servers = [] # Start servers and keep the server objects for clean shutdown
# plain WebSocket running_servers = []
websockets_logger = logging.getLogger("websockets.server") ws_server = await websockets.serve(_handler, host, ws_port)
#if _debug > 2: running_servers.append(ws_server)
# websockets_logger.setLevel(logging.DEBUG)
#else:
# websockets_logger.setLevel(logging.INFO)
# regular WebSocket
ws_server = websockets.serve(_handler, host, ws_port) # , subprotocols=["hbd"])
servers.append(ws_server)
# secure WebSocket (optional)
if wss_port and ssl_context: if wss_port and ssl_context:
wss_server = websockets.serve( wss_server = await websockets.serve(_handler, host, wss_port, ssl=ssl_context)
_handler, host, wss_port, ssl=ssl_context running_servers.append(wss_server)
) # , subprotocols=["hbd"])
servers.append(wss_server)
# await starting of all servers
for srv in servers:
await srv
logger.info( logger.info(
"WebSocket server(s) started on port %s (wss %s)", ws_port, wss_port "WebSocket server(s) started on port %s (wss %s)", ws_port, wss_port
) )
# block forever (until loop is stopped or cancelled) try:
await asyncio.Future() # Block until cancelled
await asyncio.Future()
except asyncio.CancelledError:
pass
finally:
# Close all active browser connections so their handler coroutines exit
active = list(_connections)
if active:
logger.info("Closing %d active WebSocket connection(s)...", len(active))
await asyncio.gather(
*[ws.close() for ws in active],
return_exceptions=True,
)
# Stop the listening servers and wait for all handlers to finish
for srv in running_servers:
srv.close()
await asyncio.gather(
*[srv.wait_closed() for srv in running_servers],
return_exceptions=True,
)
logger.info("WebSocket server(s) stopped")
def broadcast(typ: str, data) -> bool: def broadcast(typ: str, data) -> bool:
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "hbd" name = "hbd"
version = "5.0.7" version = "5.0.9"
description = "Heartbeat monitoring system — client (hbc) and server (hbd)" description = "Heartbeat monitoring system — client (hbc) and server (hbd)"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"