get rtt time differently
This commit is contained in:
+23
-8
@@ -173,14 +173,20 @@ async def _run_async(config, config_path=None):
|
||||
f"Warning: Could not reset IPV6_V6ONLY not supported or dual-stack is unavailable. Error: {e}"
|
||||
)
|
||||
|
||||
# 3. Bind to all interfaces (::) on a specific port
|
||||
|
||||
# UDP server endpoint (handler wired to handle_datagram with context)
|
||||
bind_addr = ("::", config.get("hb_port", 50003))
|
||||
sock.bind(bind_addr)
|
||||
logger.info("Starting UDP server on %s:%s", *bind_addr)
|
||||
|
||||
def udp_handler(msg, addr, transport):
|
||||
# Try to enable kernel receive timestamps (Linux SO_TIMESTAMPNS).
|
||||
# If supported, read datagrams via recvmsg() so RTT uses the kernel
|
||||
# timestamp rather than the time.time() call after asyncio scheduling.
|
||||
use_kernel_ts = udp.enable_kernel_timestamps(sock)
|
||||
if use_kernel_ts:
|
||||
logger.info("SO_TIMESTAMPNS enabled: using kernel receive timestamps for RTT")
|
||||
else:
|
||||
logger.info("SO_TIMESTAMPNS not available: using time.time() for RTT")
|
||||
|
||||
def udp_handler(msg, addr, transport, recv_ts=None):
|
||||
ctx = dict(
|
||||
config=config,
|
||||
hbdclass=hbdclass,
|
||||
@@ -190,13 +196,22 @@ async def _run_async(config, config_path=None):
|
||||
threshold_checker=threshold_checker,
|
||||
DEBUG=config.get("debug", 0),
|
||||
verbose=config.get("verbose", False),
|
||||
recv_ts=recv_ts,
|
||||
)
|
||||
udp.handle_datagram(msg, addr, transport, ctx)
|
||||
|
||||
transport, protocol = await loop.create_datagram_endpoint(
|
||||
lambda: udp.EchoServerProtocol(config=config, handler=udp_handler),
|
||||
sock=sock,
|
||||
)
|
||||
if use_kernel_ts:
|
||||
# recvmsg path: manage the socket ourselves with loop.add_reader()
|
||||
sock.setblocking(False)
|
||||
transport = udp.RecvmsgTransport(loop, sock)
|
||||
reader = udp.make_recvmsg_reader(sock, udp_handler, transport)
|
||||
loop.add_reader(sock.fileno(), reader)
|
||||
protocol = None
|
||||
else:
|
||||
transport, protocol = await loop.create_datagram_endpoint(
|
||||
lambda: udp.EchoServerProtocol(config=config, handler=udp_handler),
|
||||
sock=sock,
|
||||
)
|
||||
|
||||
# Restore connection timers for hosts loaded from pickle
|
||||
restore_ctx = dict(
|
||||
|
||||
+97
-4
@@ -1,6 +1,9 @@
|
||||
"""UDP listener and datagram processing."""
|
||||
|
||||
import asyncio
|
||||
import socket
|
||||
import struct
|
||||
import time
|
||||
import zlib
|
||||
import logging
|
||||
|
||||
@@ -11,6 +14,98 @@ from . import notify as notify_mod
|
||||
logger = logging.getLogger(__name__)
|
||||
eventlog = notify_mod.eventlog
|
||||
|
||||
# SO_TIMESTAMPNS: kernel attaches a struct timespec to each received datagram.
|
||||
# The constant is not exposed by Python's socket module on all platforms,
|
||||
# so fall back to the Linux value (35) when absent.
|
||||
_SO_TIMESTAMPNS = getattr(socket, 'SO_TIMESTAMPNS', 35)
|
||||
# struct timespec uses two native C longs: tv_sec and tv_nsec
|
||||
_TIMESPEC = struct.Struct('@ll')
|
||||
|
||||
|
||||
def enable_kernel_timestamps(sock) -> bool:
|
||||
"""Try to enable SO_TIMESTAMPNS on *sock*.
|
||||
|
||||
Returns True if the kernel will supply receive timestamps, False otherwise
|
||||
(non-Linux, older kernel, or insufficient permissions).
|
||||
"""
|
||||
try:
|
||||
sock.setsockopt(socket.SOL_SOCKET, _SO_TIMESTAMPNS, 1)
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def _extract_kernel_ts(ancdata) -> float | None:
|
||||
"""Parse recvmsg ancillary data and return the kernel receive time.
|
||||
|
||||
Returns seconds as a float, or None if no SO_TIMESTAMPNS cmsg is present.
|
||||
"""
|
||||
for cmsg_level, cmsg_type, cmsg_data in ancdata:
|
||||
if cmsg_level == socket.SOL_SOCKET and cmsg_type == _SO_TIMESTAMPNS:
|
||||
if len(cmsg_data) >= _TIMESPEC.size:
|
||||
sec, nsec = _TIMESPEC.unpack_from(cmsg_data)
|
||||
return sec + nsec * 1e-9
|
||||
return None
|
||||
|
||||
|
||||
class RecvmsgTransport:
|
||||
"""Thin wrapper used when SO_TIMESTAMPNS is active (add_reader path).
|
||||
|
||||
Exposes the same sendto() / close() interface as asyncio's DatagramTransport
|
||||
so the rest of the code does not need to know which path is in use.
|
||||
"""
|
||||
def __init__(self, loop, sock):
|
||||
self._loop = loop
|
||||
self._sock = sock
|
||||
|
||||
def sendto(self, data, addr):
|
||||
try:
|
||||
self._sock.sendto(data, addr)
|
||||
except Exception as e:
|
||||
logger.debug("sendto failed: %s", e)
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self._loop.remove_reader(self._sock.fileno())
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
self._sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def make_recvmsg_reader(sock, handler, transport):
|
||||
"""Return a callback suitable for loop.add_reader().
|
||||
|
||||
Reads one datagram per call using recvmsg() so that kernel timestamps in
|
||||
the ancillary data are accessible. Falls back to time.time() gracefully
|
||||
if the cmsg is missing.
|
||||
|
||||
handler(msg, addr, transport, kernel_ts) – same signature as udp_handler
|
||||
in main.py with the optional kernel_ts argument.
|
||||
"""
|
||||
BUFSIZE = 65536
|
||||
ANCBUFSIZE = 128 # enough for one struct timespec cmsg
|
||||
|
||||
def _read():
|
||||
try:
|
||||
data, ancdata, _, addr = sock.recvmsg(BUFSIZE, ANCBUFSIZE)
|
||||
except BlockingIOError:
|
||||
return
|
||||
except OSError as e:
|
||||
logger.warning("recvmsg error: %s", e)
|
||||
return
|
||||
try:
|
||||
kernel_ts = _extract_kernel_ts(ancdata)
|
||||
msg = parse_message(data)
|
||||
if msg:
|
||||
handler(msg, addr, transport, kernel_ts)
|
||||
except Exception:
|
||||
logger.exception("Error processing datagram from %s", addr)
|
||||
|
||||
return _read
|
||||
|
||||
|
||||
class EchoServerProtocol(asyncio.DatagramProtocol):
|
||||
def __init__(self, config=None, handler=None):
|
||||
@@ -79,7 +174,6 @@ def _make_timer_callbacks(uname, host, watchhosts, ctx):
|
||||
msg_to_websockets("host", host.stateinfo())
|
||||
|
||||
async def on_overdue(connection):
|
||||
import time
|
||||
if connection.getstate() != connection.__class__.UP:
|
||||
return
|
||||
now = time.time()
|
||||
@@ -109,7 +203,6 @@ def restore_connection_timers(hbdclass, ctx):
|
||||
lastbeat so that clients that vanished during hbd's downtime are detected.
|
||||
For OVERDUE connections, the UNKNOWN drop timer is restored.
|
||||
"""
|
||||
import time
|
||||
now = time.time()
|
||||
cfg = ctx.get("config", {})
|
||||
grace = cfg.get("grace", 2)
|
||||
@@ -170,7 +263,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
"""
|
||||
if not msg:
|
||||
return
|
||||
now = __import__("time").time()
|
||||
now = ctx.get("recv_ts") or time.time()
|
||||
|
||||
# Log message to journal
|
||||
msg_journal = ctx.get("msg_journal")
|
||||
@@ -222,7 +315,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
if msg.get("ID") == "HTB":
|
||||
host.doesack = msg.get("acks", -1)
|
||||
# send ACK back
|
||||
rmsg = {"time": __import__("time").time()}
|
||||
rmsg = {"time": time.time()}
|
||||
opkt = dicttos("ACK", rmsg)
|
||||
try:
|
||||
transport.sendto(opkt, addr)
|
||||
|
||||
Reference in New Issue
Block a user