"""UDP listener and datagram processing.""" import asyncio from compression import zlib import logging logger = logging.getLogger(__name__) from .proto import stodict, oldmtodict from hbd.utils import dur class EchoServerProtocol(asyncio.DatagramProtocol): def __init__(self, config=None, handler=None): super().__init__() self.config = config or {} self.handler = handler def connection_made(self, transport): self.transport = transport logger.info("UDP Server listening...") def datagram_received(self, data, addr): logger.debug("Received from %s", addr) try: msg = parse_message(data) if self.handler: # handler can be a callable provided by the application # pass the transport so handlers can send replies (ACKs/commands) self.handler(msg, addr, self.transport) except Exception: logger.exception("Error while processing datagram from %s", addr) def parse_message(data: bytes): """Parse a raw datagram into a message dict. Uses the protocol decoding helpers and falls back to old format when decoding returns an empty dict (compat with older clients). """ msg = stodict(data) if not msg: # fallback to old format msg = oldmtodict(data) return msg def dicttos(ID, d, compress=False): s = [] for k in d: if isinstance(d[k], float): s.append("%s=%0.5f" % (k, d[k])) else: s.append("%s=%s" % (k, d[k])) pk = ";".join(s) if compress: zpk = zlib.compress(pk.encode(), 6) ID = "!" + ID + ":" opk = ID.encode() + zpk else: zpk = pk opk = ID + ":" + zpk return opk def handle_datagram(msg: dict, addr, transport, ctx: dict): """Handle a parsed datagram message. ctx is a dictionary with runtime dependencies: - config: dict of configuration - hbdclass: module providing Host/Connection classes - log: callable(loghost, message) - email: callable(subject, message) - pushmsg: callable(message) - msg_to_websockets: callable(typ, data) - DEBUG, verbose """ if not msg: return now = __import__("time").time() cfg = ctx.get("config", {}) hbdcls = ctx.get("hbdclass") log = ctx.get("log") email = ctx.get("email") pushmsg = ctx.get("pushmsg") msg_to_websockets = ctx.get("msg_to_websockets") DEBUG = ctx.get("DEBUG", 0) verbose = ctx.get("verbose", False) # normalize addr (ip, port) ip = addr[0] if isinstance(addr, (list, tuple)) else addr name = msg.get("name", "unknown") from hbd.utils import shortname uname = shortname(name) if uname not in hbdcls.Host.hosts: host = hbdcls.Host(uname) host.dyn = uname in cfg.get("dyndnshosts", []) if verbose: print(("XX: New host, num now %s" % (len(hbdcls.Host.hosts)))) newh = True else: host = hbdcls.Host.hosts[uname] newh = False cid = msg.get("id", 0) try: rtt = float(msg.get("rtt", None)) except Exception: rtt = None if msg.get("ID") == "HTB": host.doesack = msg.get("acks", -1) host.setcver(msg.get("ver", 0)) try: conn, res = host.conndata(cid, ip, rtt, now) except Exception as e: if DEBUG > 0: print("conndata failed: %s" % e) return if res: if log: log(uname, res) if uname in cfg.get("watchhosts", []): if email: email("address change", "%s %s" % (host.name, res)) if pushmsg: pushmsg("%s %s" % (host.name, res)) interval = int(msg.get("interval", 0) or 0) shutdown = msg.get("shutdown", 0) service = msg.get("service", "unknown") message = msg.get("msg", None) boot = msg.get("boot", 0) if boot: if log: log(uname, "booted") if uname in cfg.get("watchhosts", []): m = "%s booted" % (host.name) if email: email("booted", m) if pushmsg: pushmsg(m) if message: if log: log(uname, "msg: %s" % message, service=service) if uname in cfg.get("watchhosts", []): if email: email("msg", message) if pushmsg: pushmsg(message) if conn.getstate() != hbdcls.Connection.UP: lasts = conn.state d = conn.newstate(hbdcls.Connection.UP, now) m = "%s back after being %s for %s" % (conn.afam, lasts, dur(d)) if log: log(uname, m) if uname in cfg.get("watchhosts", []): if email: email("%s back" % conn.afam, uname) if pushmsg: pushmsg("%s %s is back" % (uname, conn.afam)) if boot or newh: host.upcount = host.doesack else: host.upcount += 1 if shutdown: if log: log(uname, "%s shutdown" % conn.afam) if uname in cfg.get("watchhosts", []): if email: email("shutdown", "%s %s shutdown" % (uname, conn.afam)) if pushmsg: pushmsg("%s %s shutdown" % (uname, conn.afam)) conn.newstate(hbdcls.Connection.DOWN, now) if interval > 0: host.interval = interval # send ACK back rmsg = {"time": __import__("time").time()} if host.cver < 1: opkt = b"ACK" else: opkt = dicttos("ACK", rmsg, host.cver > 1) try: transport.sendto(opkt, addr) except Exception as e: if DEBUG > 0: print(("cannot send ack: %s" % e)) # send any commands we have queued while len(host.cmds): op, rmsg = host.cmds[0] if op == "CMD": if email: email("%s cmd exec" % uname, "command '%s' sent" % rmsg) del host.cmds[0] if log: log(uname, "command sent") if host.cver < 1: rmsg = rmsg["cmd"] elif op == "UPD": del host.cmds[0] if log: log(uname, "update initiated") if host.cver < 1: if log: log(uname, " ver 0 does not support UPD") continue if host.cver < 1: opkt = rmsg if isinstance(rmsg, (bytes, str)) else str(rmsg) if isinstance(opkt, str): opkt = opkt.encode() else: opkt = dicttos(op, rmsg, True) try: transport.sendto(opkt, addr) except Exception as e: if DEBUG > 0: print(("cannot send cmd/update: %s" % e)) if msg_to_websockets: try: msg_to_websockets("host", host.stateinfo()) except Exception: pass