"""UDP listener and datagram processing.""" import asyncio import zlib import logging from ..common.proto import stodict, oldmtodict from ..common.utils import dur from . import notify as notify_mod logger = logging.getLogger(__name__) eventlog = notify_mod.eventlog class EchoServerProtocol(asyncio.DatagramProtocol): def __init__(self, config=None, handler=None): super().__init__() self.config = config or {} self.handler = handler def connection_made(self, transport): self.transport = transport logger.info("UDP Server listening...") def datagram_received(self, data, addr): logger.debug("Received from %s", addr) try: msg = parse_message(data) if self.handler: # handler can be a callable provided by the application # pass the transport so handlers can send replies (ACKs/commands) self.handler(msg, addr, self.transport) except Exception: logger.exception("Error while processing datagram from %s", addr) def parse_message(data: bytes): """Parse a raw datagram into a message dict. Uses the protocol decoding helpers and falls back to old format when decoding returns an empty dict (compat with older clients). """ msg = stodict(data) if not msg: # fallback to old format msg = oldmtodict(data) return msg def dicttos(ID, d): s = [] for k in d: if isinstance(d[k], float): s.append("%s=%0.5f" % (k, d[k])) else: s.append("%s=%s" % (k, d[k])) pk = ";".join(s) zpk = zlib.compress(pk.encode(), 6) ID = "!" + ID + ":" opk = ID.encode() + zpk return opk def handle_datagram(msg: dict, addr, transport, ctx: dict): """Handle a parsed datagram message. ctx is a dictionary with runtime dependencies: - config: dict of configuration - hbdclass: module providing Host/Connection classes - log: callable(loghost, message) - pushmsg: callable(message) - msg_to_websockets: callable(typ, data) - msg_journal: MessageJournal instance for logging all messages - DEBUG, verbose """ if not msg: return now = __import__("time").time() # Log message to journal msg_journal = ctx.get("msg_journal") if msg_journal: # Create async task to log message (non-blocking) import asyncio try: loop = asyncio.get_event_loop() loop.create_task(msg_journal.log_message(msg, addr, now)) except Exception as e: logger.debug(f"Failed to log message to journal: {e}") cfg = ctx.get("config", {}) hbdcls = ctx.get("hbdclass") log = ctx.get("log") pushmsg = ctx.get("pushmsg") msg_to_websockets = ctx.get("msg_to_websockets") DEBUG = ctx.get("DEBUG", 0) verbose = ctx.get("verbose", False) # normalize addr (ip, port) ip = addr[0] if isinstance(addr, (list, tuple)) else addr name = msg.get("name", "unknown") from ..common.utils import shortname uname = shortname(name) if uname not in hbdcls.Host.hosts: host = hbdcls.Host(uname) host.dyn = uname in cfg.get("dyndnshosts", []) if verbose: print(("XX: New host, num now %s" % (len(hbdcls.Host.hosts)))) newh = True else: host = hbdcls.Host.hosts[uname] newh = False cid = msg.get("id", 0) try: rtt = float(msg.get("rtt")) except TypeError: rtt = None if msg.get("ID") == "HTB": host.doesack = msg.get("acks", -1) # send ACK back rmsg = {"time": __import__("time").time()} if host.cver < 1: opkt = b"ACK" else: opkt = dicttos("ACK", rmsg) try: transport.sendto(opkt, addr) except Exception as e: if DEBUG > 0: print(("cannot send ack: %s" % e)) elif msg.get("ID") == "PLG": # Handle plugin data message plugin_name = msg.get("plugin") if plugin_name: # Extract all fields except ID and plugin name plugin_data = {k: v for k, v in msg.items() if k not in ["ID", "plugin"]} # Store plugin data with timestamp host.add_plugin_data(plugin_name, plugin_data, timestamp=now) if DEBUG > 1: print(f"Stored plugin data for {uname}: {plugin_name}") # Check thresholds if checker is available threshold_checker = ctx.get("threshold_checker") if threshold_checker: try: state_changes = threshold_checker.check_plugin_data( host_name=uname, plugin_name=plugin_name, data=plugin_data, alert_states=host.alert_states, ) if DEBUG > 1 and state_changes: print(f"Threshold state changes for {uname}: {state_changes}") except Exception as e: logger.error(f"Error checking thresholds for {uname}.{plugin_name}: {e}") # Notify websockets of plugin update if msg_to_websockets: try: msg_to_websockets("plugin", { "host": uname, "plugin": plugin_name, "data": plugin_data, "timestamp": now }) except Exception: pass host.setcver(msg.get("ver", 0)) try: conn, res = host.conndata(cid, ip, rtt, now) except Exception as e: if DEBUG > 0: print("conndata failed: %s" % e) return if res: eventlog(uname, "WARNING", res) if uname in cfg.get("watchhosts", []): if pushmsg: pushmsg("%s %s" % (host.name, res)) interval = int(msg.get("interval", 0) or 0) shutdown = msg.get("shutdown", 0) service = msg.get("service", "unknown") message = msg.get("msg", None) boot = msg.get("boot", 0) if boot: eventlog(uname, "INFO", "booted") if uname in cfg.get("watchhosts", []): m = "%s booted" % (host.name) if pushmsg: pushmsg(m) if message: eventlog(uname, "INFO", "msg: %s" % message, service=service) if uname in cfg.get("watchhosts", []): if pushmsg: pushmsg(message) if conn.getstate() != hbdcls.Connection.UP: lasts = conn.state d = conn.newstate(hbdcls.Connection.UP, now) if d == 0 or lasts == "unknown": m = "%s is up" % (conn.afam) else: m = "%s back after being %s for %s" % (conn.afam, lasts, dur(d)) eventlog(uname, "RECOVER", m) if uname in cfg.get("watchhosts", []): if pushmsg: pushmsg("%s %s is back" % (uname, conn.afam)) if boot or newh: host.upcount = host.doesack else: host.upcount += 1 if shutdown: eventlog(uname, "INFO", "%s shutdown" % conn.afam) if uname in cfg.get("watchhosts", []): if pushmsg: pushmsg("%s %s shutdown" % (uname, conn.afam)) conn.newstate(hbdcls.Connection.DOWN, now) if interval > 0: host.interval = interval # Timer-based reachability monitoring # Reset overdue timer on every heartbeat if interval > 0 and conn.getstate() != hbdcls.Connection.DOWN: grace = cfg.get("grace", 2) timeout_seconds = (interval + grace) if interval > 0 else 30 # Create callback for timer expiration async def on_overdue(connection): """Called when connection timer expires (no heartbeat received).""" import time now = time.time() # Only mark as overdue if still in UP state (not already marked) if connection.getstate() == hbdcls.Connection.UP: connection.newstate(hbdcls.Connection.OVERDUE, now, cfg.get("grace", 2)) msg = f"{connection.afam} overdue" eventlog(uname, "CRITICAL" if uname in cfg.get("watchhosts", []) else "WARNING", msg) if uname in cfg.get("watchhosts", []): if pushmsg: pushmsg(f"{uname} {msg}") # Notify websockets if msg_to_websockets: msg_to_websockets("host", host.stateinfo()) # Set a longer timer for marking as UNKNOWN (7 days) DROPOVERDUE = 7 * 24 * 3600 async def on_unknown(connection): """Mark connection as unknown after extended absence.""" connection.newstate(hbdcls.Connection.UNKNOWN, connection.lastbeat) if msg_to_websockets: msg_to_websockets("host", host.stateinfo()) connection.reset_overdue_timer(DROPOVERDUE, on_unknown) # Reset the timer conn.reset_overdue_timer(timeout_seconds, on_overdue) # Check RTT thresholds using the threshold checker threshold_checker = ctx.get("threshold_checker") if threshold_checker and rtt and rtt > 0: # Metric path for RTT is "rtt." metric_path = f"rtt.{uname}" # Check against configured thresholds (handles alerts, notifications, etc.) threshold_checker.check_value( host_name=uname, metric_path=metric_path, value=rtt, alert_states=host.alert_states ) # send any commands we have queued while len(host.cmds): op, rmsg = host.cmds[0] if op == "CMD": del host.cmds[0] if log: log(uname, "command sent") if host.cver < 1: rmsg = rmsg["cmd"] elif op == "UPD": del host.cmds[0] if log: log(uname, "update initiated") if host.cver < 1: if log: log(uname, " ver 0 does not support UPD") continue if host.cver < 1: opkt = rmsg if isinstance(rmsg, (bytes, str)) else str(rmsg) if isinstance(opkt, str): opkt = opkt.encode() else: opkt = dicttos(op, rmsg) try: transport.sendto(opkt, addr) except Exception as e: if DEBUG > 0: print(("cannot send cmd/update: %s" % e)) if msg_to_websockets: try: msg_to_websockets("host", host.stateinfo()) except Exception as e: if DEBUG > 0: print(("cannot send websocket message: %s" % e))