From 02bc42fbf0ed8399bfaea47bc65706aca20246d6 Mon Sep 17 00:00:00 2001 From: Andreas Wrede Date: Tue, 7 Apr 2026 10:40:12 -0400 Subject: [PATCH] get rtt time differently --- hbd/server/main.py | 31 ++++++++++---- hbd/server/udp.py | 101 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 120 insertions(+), 12 deletions(-) diff --git a/hbd/server/main.py b/hbd/server/main.py index 9527b10..e18bb48 100644 --- a/hbd/server/main.py +++ b/hbd/server/main.py @@ -173,14 +173,20 @@ async def _run_async(config, config_path=None): f"Warning: Could not reset IPV6_V6ONLY not supported or dual-stack is unavailable. Error: {e}" ) - # 3. Bind to all interfaces (::) on a specific port - - # UDP server endpoint (handler wired to handle_datagram with context) bind_addr = ("::", config.get("hb_port", 50003)) sock.bind(bind_addr) logger.info("Starting UDP server on %s:%s", *bind_addr) - def udp_handler(msg, addr, transport): + # Try to enable kernel receive timestamps (Linux SO_TIMESTAMPNS). + # If supported, read datagrams via recvmsg() so RTT uses the kernel + # timestamp rather than the time.time() call after asyncio scheduling. + use_kernel_ts = udp.enable_kernel_timestamps(sock) + if use_kernel_ts: + logger.info("SO_TIMESTAMPNS enabled: using kernel receive timestamps for RTT") + else: + logger.info("SO_TIMESTAMPNS not available: using time.time() for RTT") + + def udp_handler(msg, addr, transport, recv_ts=None): ctx = dict( config=config, hbdclass=hbdclass, @@ -190,13 +196,22 @@ async def _run_async(config, config_path=None): threshold_checker=threshold_checker, DEBUG=config.get("debug", 0), verbose=config.get("verbose", False), + recv_ts=recv_ts, ) udp.handle_datagram(msg, addr, transport, ctx) - transport, protocol = await loop.create_datagram_endpoint( - lambda: udp.EchoServerProtocol(config=config, handler=udp_handler), - sock=sock, - ) + if use_kernel_ts: + # recvmsg path: manage the socket ourselves with loop.add_reader() + sock.setblocking(False) + transport = udp.RecvmsgTransport(loop, sock) + reader = udp.make_recvmsg_reader(sock, udp_handler, transport) + loop.add_reader(sock.fileno(), reader) + protocol = None + else: + transport, protocol = await loop.create_datagram_endpoint( + lambda: udp.EchoServerProtocol(config=config, handler=udp_handler), + sock=sock, + ) # Restore connection timers for hosts loaded from pickle restore_ctx = dict( diff --git a/hbd/server/udp.py b/hbd/server/udp.py index 3511898..20cebef 100644 --- a/hbd/server/udp.py +++ b/hbd/server/udp.py @@ -1,6 +1,9 @@ """UDP listener and datagram processing.""" import asyncio +import socket +import struct +import time import zlib import logging @@ -11,6 +14,98 @@ from . import notify as notify_mod logger = logging.getLogger(__name__) eventlog = notify_mod.eventlog +# SO_TIMESTAMPNS: kernel attaches a struct timespec to each received datagram. +# The constant is not exposed by Python's socket module on all platforms, +# so fall back to the Linux value (35) when absent. +_SO_TIMESTAMPNS = getattr(socket, 'SO_TIMESTAMPNS', 35) +# struct timespec uses two native C longs: tv_sec and tv_nsec +_TIMESPEC = struct.Struct('@ll') + + +def enable_kernel_timestamps(sock) -> bool: + """Try to enable SO_TIMESTAMPNS on *sock*. + + Returns True if the kernel will supply receive timestamps, False otherwise + (non-Linux, older kernel, or insufficient permissions). + """ + try: + sock.setsockopt(socket.SOL_SOCKET, _SO_TIMESTAMPNS, 1) + return True + except OSError: + return False + + +def _extract_kernel_ts(ancdata) -> float | None: + """Parse recvmsg ancillary data and return the kernel receive time. + + Returns seconds as a float, or None if no SO_TIMESTAMPNS cmsg is present. + """ + for cmsg_level, cmsg_type, cmsg_data in ancdata: + if cmsg_level == socket.SOL_SOCKET and cmsg_type == _SO_TIMESTAMPNS: + if len(cmsg_data) >= _TIMESPEC.size: + sec, nsec = _TIMESPEC.unpack_from(cmsg_data) + return sec + nsec * 1e-9 + return None + + +class RecvmsgTransport: + """Thin wrapper used when SO_TIMESTAMPNS is active (add_reader path). + + Exposes the same sendto() / close() interface as asyncio's DatagramTransport + so the rest of the code does not need to know which path is in use. + """ + def __init__(self, loop, sock): + self._loop = loop + self._sock = sock + + def sendto(self, data, addr): + try: + self._sock.sendto(data, addr) + except Exception as e: + logger.debug("sendto failed: %s", e) + + def close(self): + try: + self._loop.remove_reader(self._sock.fileno()) + except Exception: + pass + try: + self._sock.close() + except Exception: + pass + + +def make_recvmsg_reader(sock, handler, transport): + """Return a callback suitable for loop.add_reader(). + + Reads one datagram per call using recvmsg() so that kernel timestamps in + the ancillary data are accessible. Falls back to time.time() gracefully + if the cmsg is missing. + + handler(msg, addr, transport, kernel_ts) – same signature as udp_handler + in main.py with the optional kernel_ts argument. + """ + BUFSIZE = 65536 + ANCBUFSIZE = 128 # enough for one struct timespec cmsg + + def _read(): + try: + data, ancdata, _, addr = sock.recvmsg(BUFSIZE, ANCBUFSIZE) + except BlockingIOError: + return + except OSError as e: + logger.warning("recvmsg error: %s", e) + return + try: + kernel_ts = _extract_kernel_ts(ancdata) + msg = parse_message(data) + if msg: + handler(msg, addr, transport, kernel_ts) + except Exception: + logger.exception("Error processing datagram from %s", addr) + + return _read + class EchoServerProtocol(asyncio.DatagramProtocol): def __init__(self, config=None, handler=None): @@ -79,7 +174,6 @@ def _make_timer_callbacks(uname, host, watchhosts, ctx): msg_to_websockets("host", host.stateinfo()) async def on_overdue(connection): - import time if connection.getstate() != connection.__class__.UP: return now = time.time() @@ -109,7 +203,6 @@ def restore_connection_timers(hbdclass, ctx): lastbeat so that clients that vanished during hbd's downtime are detected. For OVERDUE connections, the UNKNOWN drop timer is restored. """ - import time now = time.time() cfg = ctx.get("config", {}) grace = cfg.get("grace", 2) @@ -170,7 +263,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): """ if not msg: return - now = __import__("time").time() + now = ctx.get("recv_ts") or time.time() # Log message to journal msg_journal = ctx.get("msg_journal") @@ -222,7 +315,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): if msg.get("ID") == "HTB": host.doesack = msg.get("acks", -1) # send ACK back - rmsg = {"time": __import__("time").time()} + rmsg = {"time": time.time()} opkt = dicttos("ACK", rmsg) try: transport.sendto(opkt, addr)