220 lines
6.2 KiB
Python
220 lines
6.2 KiB
Python
"""UDP listener and datagram processing."""
|
|
import asyncio
|
|
import zlib
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
from .proto import stodict, oldmtodict
|
|
from hbd.utils import dur
|
|
|
|
|
|
class EchoServerProtocol(asyncio.DatagramProtocol):
|
|
def __init__(self, config=None, handler=None):
|
|
super().__init__()
|
|
self.config = config or {}
|
|
self.handler = handler
|
|
|
|
def connection_made(self, transport):
|
|
self.transport = transport
|
|
logger.info("UDP Server listening...")
|
|
|
|
def datagram_received(self, data, addr):
|
|
logger.debug("Received from %s", addr)
|
|
try:
|
|
msg = parse_message(data)
|
|
if self.handler:
|
|
# handler can be a callable provided by the application
|
|
# pass the transport so handlers can send replies (ACKs/commands)
|
|
self.handler(msg, addr, self.transport)
|
|
except Exception:
|
|
logger.exception("Error while processing datagram from %s", addr)
|
|
|
|
|
|
def parse_message(data: bytes):
|
|
"""Parse a raw datagram into a message dict.
|
|
|
|
Uses the protocol decoding helpers and falls back to old format when
|
|
decoding returns an empty dict (compat with older clients).
|
|
"""
|
|
msg = stodict(data)
|
|
if not msg:
|
|
# fallback to old format
|
|
msg = oldmtodict(data)
|
|
return msg
|
|
|
|
def dicttos(ID, d, compress=False):
|
|
s = []
|
|
for k in d:
|
|
if isinstance(d[k], float):
|
|
s.append("%s=%0.5f" % (k, d[k]))
|
|
else:
|
|
s.append("%s=%s" % (k, d[k]))
|
|
pk = ";".join(s)
|
|
if compress:
|
|
zpk = zlib.compress(pk.encode(), 6)
|
|
ID = "!" + ID + ":"
|
|
opk = ID.encode() + zpk
|
|
else:
|
|
zpk = pk
|
|
opk = ID + ":" + zpk
|
|
return opk
|
|
|
|
def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
|
"""Handle a parsed datagram message.
|
|
|
|
ctx is a dictionary with runtime dependencies:
|
|
- config: dict of configuration
|
|
- hbdclass: module providing Host/Connection classes
|
|
- log: callable(loghost, message)
|
|
- pushmsg: callable(message)
|
|
- msg_to_websockets: callable(typ, data)
|
|
- DEBUG, verbose
|
|
"""
|
|
if not msg:
|
|
return
|
|
now = __import__("time").time()
|
|
cfg = ctx.get("config", {})
|
|
hbdcls = ctx.get("hbdclass")
|
|
log = ctx.get("log")
|
|
pushmsg = ctx.get("pushmsg")
|
|
msg_to_websockets = ctx.get("msg_to_websockets")
|
|
DEBUG = ctx.get("DEBUG", 0)
|
|
verbose = ctx.get("verbose", False)
|
|
|
|
# normalize addr (ip, port)
|
|
ip = addr[0] if isinstance(addr, (list, tuple)) else addr
|
|
name = msg.get("name", "unknown")
|
|
from hbd.utils import shortname
|
|
uname = shortname(name)
|
|
|
|
if uname not in hbdcls.Host.hosts:
|
|
host = hbdcls.Host(uname)
|
|
host.dyn = uname in cfg.get("dyndnshosts", [])
|
|
if verbose:
|
|
print(("XX: New host, num now %s" % (len(hbdcls.Host.hosts))))
|
|
newh = True
|
|
else:
|
|
host = hbdcls.Host.hosts[uname]
|
|
newh = False
|
|
|
|
cid = msg.get("id", 0)
|
|
try:
|
|
rtt = float(msg.get("rtt", None))
|
|
except Exception:
|
|
rtt = None
|
|
|
|
if msg.get("ID") == "HTB":
|
|
host.doesack = msg.get("acks", -1)
|
|
host.setcver(msg.get("ver", 0))
|
|
|
|
try:
|
|
conn, res = host.conndata(cid, ip, rtt, now)
|
|
except Exception as e:
|
|
if DEBUG > 0:
|
|
print("conndata failed: %s" % e)
|
|
return
|
|
|
|
if res:
|
|
if log:
|
|
log(uname, res)
|
|
if uname in cfg.get("watchhosts", []):
|
|
if pushmsg:
|
|
pushmsg("%s %s" % (host.name, res))
|
|
|
|
interval = int(msg.get("interval", 0) or 0)
|
|
shutdown = msg.get("shutdown", 0)
|
|
service = msg.get("service", "unknown")
|
|
message = msg.get("msg", None)
|
|
boot = msg.get("boot", 0)
|
|
|
|
if boot:
|
|
if log:
|
|
log(uname, "booted")
|
|
if uname in cfg.get("watchhosts", []):
|
|
m = "%s booted" % (host.name)
|
|
if pushmsg:
|
|
pushmsg(m)
|
|
if message:
|
|
if log:
|
|
log(uname, "msg: %s" % message, service=service)
|
|
if uname in cfg.get("watchhosts", []):
|
|
if pushmsg:
|
|
pushmsg(message)
|
|
|
|
if conn.getstate() != hbdcls.Connection.UP:
|
|
lasts = conn.state
|
|
d = conn.newstate(hbdcls.Connection.UP, now)
|
|
m = "%s back after being %s for %s" % (conn.afam, lasts, dur(d))
|
|
if log:
|
|
log(uname, m)
|
|
if uname in cfg.get("watchhosts", []):
|
|
if pushmsg:
|
|
pushmsg("%s %s is back" % (uname, conn.afam))
|
|
|
|
if boot or newh:
|
|
host.upcount = host.doesack
|
|
else:
|
|
host.upcount += 1
|
|
|
|
if shutdown:
|
|
if log:
|
|
log(uname, "%s shutdown" % conn.afam)
|
|
if uname in cfg.get("watchhosts", []):
|
|
if pushmsg:
|
|
pushmsg("%s %s shutdown" % (uname, conn.afam))
|
|
conn.newstate(hbdcls.Connection.DOWN, now)
|
|
|
|
if interval > 0:
|
|
host.interval = interval
|
|
|
|
# send ACK back
|
|
rmsg = {"time": __import__("time").time()}
|
|
if host.cver < 1:
|
|
opkt = b"ACK"
|
|
else:
|
|
opkt = dicttos("ACK", rmsg, host.cver > 1)
|
|
try:
|
|
transport.sendto(opkt, addr)
|
|
except Exception as e:
|
|
if DEBUG > 0:
|
|
print(("cannot send ack: %s" % e))
|
|
|
|
# send any commands we have queued
|
|
while len(host.cmds):
|
|
op, rmsg = host.cmds[0]
|
|
if op == "CMD":
|
|
del host.cmds[0]
|
|
if log:
|
|
log(uname, "command sent")
|
|
if host.cver < 1:
|
|
rmsg = rmsg["cmd"]
|
|
elif op == "UPD":
|
|
del host.cmds[0]
|
|
if log:
|
|
log(uname, "update initiated")
|
|
if host.cver < 1:
|
|
if log:
|
|
log(uname, " ver 0 does not support UPD")
|
|
continue
|
|
if host.cver < 1:
|
|
opkt = rmsg if isinstance(rmsg, (bytes, str)) else str(rmsg)
|
|
if isinstance(opkt, str):
|
|
opkt = opkt.encode()
|
|
else:
|
|
opkt = dicttos(op, rmsg, True)
|
|
try:
|
|
transport.sendto(opkt, addr)
|
|
except Exception as e:
|
|
if DEBUG > 0:
|
|
print(("cannot send cmd/update: %s" % e))
|
|
|
|
if msg_to_websockets:
|
|
try:
|
|
msg_to_websockets("host", host.stateinfo())
|
|
except Exception:
|
|
pass
|
|
|
|
|