feat: retry AsyncConnection.open() indefinitely; drop IPv6 only on early startup failure
IPv4 connections are retried forever in heartbeat_sender if open() fails, so a temporary network outage does not terminate the sender. IPv6 connections that have never opened successfully are dropped after IPV6_EARLY_FAIL_LIMIT (3) consecutive failures so that a network without IPv6 support does not keep a dead sender running. At startup all resolved connections are added to the list regardless of whether the initial open() succeeds; the heartbeat_sender loop handles the first real connection attempt. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+58
-21
@@ -56,23 +56,26 @@ class AsyncConnection:
|
|||||||
self.transport: Optional[asyncio.DatagramTransport] = None
|
self.transport: Optional[asyncio.DatagramTransport] = None
|
||||||
self.protocol: Optional[asyncio.DatagramProtocol] = None
|
self.protocol: Optional[asyncio.DatagramProtocol] = None
|
||||||
self._dead = False
|
self._dead = False
|
||||||
|
self._ever_opened = False
|
||||||
|
self._open_fail_count = 0 # consecutive failures before first success
|
||||||
|
|
||||||
self.logger = logging.getLogger(f"hbc.conn.{addr}")
|
self.logger = logging.getLogger(f"hbc.conn.{addr}")
|
||||||
|
|
||||||
async def open(self) -> bool:
|
async def open(self) -> bool:
|
||||||
"""Open the UDP connection.
|
"""Open the UDP connection.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True if successful, False otherwise
|
True if successful, False otherwise
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
# Create datagram endpoint
|
# Create datagram endpoint
|
||||||
self.transport, self.protocol = await loop.create_datagram_endpoint(
|
self.transport, self.protocol = await loop.create_datagram_endpoint(
|
||||||
lambda: HeartbeatProtocol(self),
|
lambda: HeartbeatProtocol(self),
|
||||||
family=self.af
|
family=self.af
|
||||||
)
|
)
|
||||||
|
self._ever_opened = True
|
||||||
self.logger.debug(f"Opened connection to {self.addr}:{self.port}")
|
self.logger.debug(f"Opened connection to {self.addr}:{self.port}")
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -262,15 +265,51 @@ async def handle_update(conn: AsyncConnection, _msg: dict): # pyright: ignore[r
|
|||||||
|
|
||||||
|
|
||||||
async def heartbeat_sender(conn: AsyncConnection, interval: int):
|
async def heartbeat_sender(conn: AsyncConnection, interval: int):
|
||||||
"""Send periodic heartbeats.
|
"""Send periodic heartbeats, retrying the connection if it is not open.
|
||||||
|
|
||||||
|
IPv6 connections that fail to open before their first successful send are
|
||||||
|
dropped after IPV6_EARLY_FAIL_LIMIT attempts so that a network without IPv6
|
||||||
|
does not keep a dead sender alive. IPv4 connections are retried indefinitely.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
conn: Connection to send on
|
conn: Connection to send on
|
||||||
interval: Heartbeat interval in seconds
|
interval: Heartbeat interval in seconds
|
||||||
"""
|
"""
|
||||||
logger = logging.getLogger("hbc.heartbeat")
|
logger = logging.getLogger("hbc.heartbeat")
|
||||||
|
IPV6_EARLY_FAIL_LIMIT = 3
|
||||||
while running:
|
|
||||||
|
while running and not conn._dead:
|
||||||
|
# Ensure transport is open before attempting to send.
|
||||||
|
if not conn.transport:
|
||||||
|
opened = await conn.open()
|
||||||
|
if opened:
|
||||||
|
conn._open_fail_count = 0
|
||||||
|
else:
|
||||||
|
conn._open_fail_count += 1
|
||||||
|
# Drop an IPv6 connection that has never come up within the
|
||||||
|
# first few attempts — it is likely unavailable on this network.
|
||||||
|
if (not conn._ever_opened
|
||||||
|
and conn.af == socket.AF_INET6
|
||||||
|
and conn._open_fail_count >= IPV6_EARLY_FAIL_LIMIT):
|
||||||
|
logger.warning(
|
||||||
|
f"IPv6 connection to {conn.addr} unreachable after "
|
||||||
|
f"{conn._open_fail_count} attempts, disabling"
|
||||||
|
)
|
||||||
|
conn._dead = True
|
||||||
|
break
|
||||||
|
# Retry after the normal interval; IPv4 retries forever.
|
||||||
|
try:
|
||||||
|
if shutdown_event:
|
||||||
|
await asyncio.wait_for(shutdown_event.wait(), timeout=interval)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
await asyncio.sleep(interval)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
msg = {
|
msg = {
|
||||||
"acks": conn.ackcount,
|
"acks": conn.ackcount,
|
||||||
@@ -278,20 +317,17 @@ async def heartbeat_sender(conn: AsyncConnection, interval: int):
|
|||||||
"interval": interval
|
"interval": interval
|
||||||
}
|
}
|
||||||
await conn.sendto(msg, "HTB")
|
await conn.sendto(msg, "HTB")
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error sending heartbeat: {e}", exc_info=True)
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.debug("Heartbeat sender cancelled")
|
logger.debug("Heartbeat sender cancelled")
|
||||||
raise
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error sending heartbeat: {e}", exc_info=True)
|
||||||
|
|
||||||
# Wait for next interval or shutdown event
|
# Wait for next interval or shutdown event
|
||||||
try:
|
try:
|
||||||
if shutdown_event:
|
if shutdown_event:
|
||||||
await asyncio.wait_for(
|
await asyncio.wait_for(shutdown_event.wait(), timeout=interval)
|
||||||
shutdown_event.wait(),
|
|
||||||
timeout=interval
|
|
||||||
)
|
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
await asyncio.sleep(interval)
|
await asyncio.sleep(interval)
|
||||||
@@ -479,14 +515,15 @@ async def async_main(args, config):
|
|||||||
for addr_info in addrs:
|
for addr_info in addrs:
|
||||||
af = addr_info[0]
|
af = addr_info[0]
|
||||||
addr = addr_info[4][0]
|
addr = addr_info[4][0]
|
||||||
|
|
||||||
conn = AsyncConnection(conn_id, addr, hb_port, af, iam)
|
conn = AsyncConnection(conn_id, addr, hb_port, af, iam)
|
||||||
if await conn.open():
|
if not await conn.open():
|
||||||
connections.append(conn)
|
logger.warning(f"Initial open to {addr} failed, heartbeat sender will retry")
|
||||||
conn_id += 1
|
connections.append(conn)
|
||||||
|
conn_id += 1
|
||||||
|
|
||||||
if not connections:
|
if not connections:
|
||||||
logger.error("No connections established")
|
logger.error("No connections established (DNS resolution failed for all hosts)")
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
logger.info(f"Created {len(connections)} connections")
|
logger.info(f"Created {len(connections)} connections")
|
||||||
|
|||||||
Reference in New Issue
Block a user