diff --git a/hbd/client/main.py b/hbd/client/main.py index ce05c62..2418910 100644 --- a/hbd/client/main.py +++ b/hbd/client/main.py @@ -56,23 +56,26 @@ class AsyncConnection: self.transport: Optional[asyncio.DatagramTransport] = None self.protocol: Optional[asyncio.DatagramProtocol] = None self._dead = False + self._ever_opened = False + self._open_fail_count = 0 # consecutive failures before first success self.logger = logging.getLogger(f"hbc.conn.{addr}") - + async def open(self) -> bool: """Open the UDP connection. - + Returns: True if successful, False otherwise """ try: loop = asyncio.get_event_loop() - + # Create datagram endpoint self.transport, self.protocol = await loop.create_datagram_endpoint( lambda: HeartbeatProtocol(self), family=self.af ) + self._ever_opened = True self.logger.debug(f"Opened connection to {self.addr}:{self.port}") return True except Exception as e: @@ -262,15 +265,51 @@ async def handle_update(conn: AsyncConnection, _msg: dict): # pyright: ignore[r async def heartbeat_sender(conn: AsyncConnection, interval: int): - """Send periodic heartbeats. - + """Send periodic heartbeats, retrying the connection if it is not open. + + IPv6 connections that fail to open before their first successful send are + dropped after IPV6_EARLY_FAIL_LIMIT attempts so that a network without IPv6 + does not keep a dead sender alive. IPv4 connections are retried indefinitely. + Args: conn: Connection to send on interval: Heartbeat interval in seconds """ logger = logging.getLogger("hbc.heartbeat") - - while running: + IPV6_EARLY_FAIL_LIMIT = 3 + + while running and not conn._dead: + # Ensure transport is open before attempting to send. + if not conn.transport: + opened = await conn.open() + if opened: + conn._open_fail_count = 0 + else: + conn._open_fail_count += 1 + # Drop an IPv6 connection that has never come up within the + # first few attempts — it is likely unavailable on this network. + if (not conn._ever_opened + and conn.af == socket.AF_INET6 + and conn._open_fail_count >= IPV6_EARLY_FAIL_LIMIT): + logger.warning( + f"IPv6 connection to {conn.addr} unreachable after " + f"{conn._open_fail_count} attempts, disabling" + ) + conn._dead = True + break + # Retry after the normal interval; IPv4 retries forever. + try: + if shutdown_event: + await asyncio.wait_for(shutdown_event.wait(), timeout=interval) + break + else: + await asyncio.sleep(interval) + except asyncio.TimeoutError: + pass + except asyncio.CancelledError: + raise + continue + try: msg = { "acks": conn.ackcount, @@ -278,20 +317,17 @@ async def heartbeat_sender(conn: AsyncConnection, interval: int): "interval": interval } await conn.sendto(msg, "HTB") - - except Exception as e: - logger.error(f"Error sending heartbeat: {e}", exc_info=True) + except asyncio.CancelledError: logger.debug("Heartbeat sender cancelled") raise - + except Exception as e: + logger.error(f"Error sending heartbeat: {e}", exc_info=True) + # Wait for next interval or shutdown event try: if shutdown_event: - await asyncio.wait_for( - shutdown_event.wait(), - timeout=interval - ) + await asyncio.wait_for(shutdown_event.wait(), timeout=interval) break else: await asyncio.sleep(interval) @@ -479,14 +515,15 @@ async def async_main(args, config): for addr_info in addrs: af = addr_info[0] addr = addr_info[4][0] - + conn = AsyncConnection(conn_id, addr, hb_port, af, iam) - if await conn.open(): - connections.append(conn) - conn_id += 1 - + if not await conn.open(): + logger.warning(f"Initial open to {addr} failed, heartbeat sender will retry") + connections.append(conn) + conn_id += 1 + if not connections: - logger.error("No connections established") + logger.error("No connections established (DNS resolution failed for all hosts)") return 1 logger.info(f"Created {len(connections)} connections")