#!/usr/bin/env python3 # $Id: hbd,v 1.38 2013/07/14 02:25:05 andreas Exp $ # Wait for heartbeat messages and act on them (or their absence) # VER = 4.2 import time import os import string import sys import socket import ssl import pathlib import atexit import select import socketserver import http.server import getopt import signal import pickle import smtplib import traceback import urllib.request, urllib.parse, urllib.error import urllib.parse import http.client import threading import subprocess from hashlib import md5 import json import zlib import codecs import asyncio import websockets from subprocess import Popen, STDOUT, PIPE #from hbdclass import * import hbdclass CERT_PATH="/usr/local/etc/letsencrypt/live/w02.wrede.ca/" WSS_PEM = CERT_PATH + "fullchain.pem" WSS_KEY = CERT_PATH + "privkey.pem" NSUPDATE_BIN = "/usr/local/bin/nsupdate" # override in .hbrc possible SEND_EMAIL=False SEND_PUSHOVER=True DEBUG = 0 hbdclass.DEBUG = DEBUG MAXRECV = 32767 LOGFILE = "/home/andreas/public_html/messages/andreas" PICKFILE = "/var/tmp/hbd.pick" AEMAIL = ["andreas@wrede.ca"] NAME = "heatbeat" SMTPSERVER = "localhost" msgs = [] #AEW upcount = 0 PORT = 50003 TPORT = 50004 THOST = "" WSPORT = 50005 WSSPORT = 50006 verbose = False INTERVAL = 10 GRACE = 2 DROPOVERDUE = 7*24*3600 os.environ['TZ'] = 'EST5EDT' tsfm=["%H","%d","%U"] lastfm=["","",""] tcss = """ """ def handler(signum, frame): global running, sig sig = signum if not running: if verbose: sys.stderr.write("NOT runing signal: %s running: %d" % (sig, running)) sys.exit(2) if verbose: sys.stderr.write("signal: %s running: %s frame: %s" % (sig, running, frame)) def shortname(name): r = name.split('.') return r[0] class NullDevice: def write(self, s): pass class LogDevice: def __init__(self): self.fh = open("/tmp/log1","a") def write(self, s): self.fh.write(s) self.fh.flush() def dicttos(ID, d, compress=False): s = [] for k in d: if type(d[k]) == type(1.2): s.append("%s=%0.5f" % (k, d[k])) else: s.append("%s=%s" % (k, d[k])) pk = ";".join(s) if compress: zpk = zlib.compress(pk.encode(), 6) ID = "!" + ID + ":" opk = ID.encode() + zpk else: zpk = pk opk = ID + ":" + zpk return opk def stodict(msg): d = {} if len(msg) > 0 and chr(msg[0]) == "!": pk = zlib.decompress(msg[5:]).decode() d['ID'] = msg[1:4].decode() else: r0 = msg.split(':',1) pk = r0[1] d['ID'] = r0[0] r = pk.split(';') for v in r: vr = v.split('=', 1) k = vr[0].strip() if len(vr) == 1: d[k] = None else: v = vr[1].strip() if v[0].isdigit(): v = eval(v) d[k] = v return d def oldmtodict(msg): return stodict('HTB:'+msg) def email(s, msg): if not SEND_EMAIL: return ret = "OK" toaddrs = AEMAIL fromemail = "aew.heartbeat@wrede.ca" subj = "Info from %s: %s" % (NAME, s) date = time.strftime("%a, %d %b %Y %H:%M:%S %z", time.localtime()) body = "To: %s\nFrom: %s\nSubject: %s\nDate: %s\n\n%s" % (toaddrs[0], fromemail, subj, date, msg) try: server = smtplib.SMTP(SMTPSERVER) if DEBUG > 0: server.set_debuglevel(1) server.sendmail(fromemail, toaddrs, body) except smtplib.SMTPRecipientsRefused as errs: log(None, "cannot send email: %s\n" % (errs)) ret = "Fail" except: print(("smtp error: "+traceback.format_exc())) saveandrestart() try: server.quit() except: pass return ret def pushmsg(msg): if pushsrv in ["all", "pushover"]: pushover(msg) if pushsrv in ["all","mattermost"]: pushmattermost(msg) if pushsrv in ["all","signal"]: pushsignal(msg) if pushsrv in ["all"]: print("notice:", msg) def pushover(msg): if not SEND_PUSHOVER: return conn = http.client.HTTPSConnection("api.pushover.net:443") try: conn.request("POST", "/1/messages.json", urllib.parse.urlencode({ "token": "ac7NLX2rPjXFareeDgLpXNoDf4iFmf", "user": "uDhH33UjQQDYtNzJb1ThRiWb9ingGK", "message": msg, }), { "Content-type": "application/x-www-form-urlencoded" }) conn.getresponse() except: pass CHANNEL = "Monitoring" TOKEN = "rxz6b3886iygxnhbzpmgbsrocy" HOST = "192.168.10.101" ICON = "https://in-transit.ca/HeartBeat.png" USERNAME = "admin" def pushmattermost(msg): ses = { 'url': HOST, 'scheme':'http', 'basepath': '/api/v4', 'port':8065, } mm = Driver(ses) msg = { "text": msg, "channel": CHANNEL, "username": USERNAME, "icon_url": ICON } try: rc = mm.webhooks.call_webhook(TOKEN, msg) except Exception as e: rc = str(e) if not rc: print(rc) USER = "+16472472447" RECIPIENT = "+14168226179" def pushsignal(msg, title="hbd", recipient=RECIPIENT): message = f'"{title}: {msg}"' CLI = [ "/usr/bin/ssh", "andreas@w02", "/usr/local/bin/signal-cli", "-u", USER, "send", "-m", message, # "-g", GROUP, recipient, ] if verbose: print(f"DBG cli: {CLI}") res = subprocess.run(CLI, shell=False, capture_output=True) rc = res.returncode == 0 print(res.stdout.decode()) if not rc: print(f"signalcli failed: {res.stderr.decode()}") else: if verbose: print(f"signalcli msg sent, res {res.stdout.decode()}") return rc # nsupdate: set the DNS A record for a fqdn # return: None if ok, else error text def nsupdate(hostname, newip, dyndomain): D = {} D['domain'] = dyndomain D['fqdn'] = "%s.dy.%s" % (hostname, dyndomain) D['dnsttl'] = '5' D['newip'] = newip D['ts'] = time.strftime('%Y-%m-%d.%H:%M:%S', time.gmtime()) if newip.find(":") > 0: nsup = """update delete %(fqdn)s AAAA update add %(fqdn)s %(dnsttl)s AAAA %(newip)s update delete %(fqdn)s TXT update add %(fqdn)s %(dnsttl)s TXT "Created: %(ts)s" send answer """ % D else: nsup = """update delete %(fqdn)s A update add %(fqdn)s %(dnsttl)s A %(newip)s update delete %(fqdn)s TXT update add %(fqdn)s %(dnsttl)s TXT "Created: %(ts)s" send answer """ % D if DEBUG > 0: log(None, "DBG: nsup %s" % nsup) cmd = [nsupdate_bin, "-k", "/etc/dhcpc/Kdy.%(domain)s.+157+00000." % D, "-v"] if DEBUG > 0: log(None, "DBG: cmd %s" % cmd) try: p = Popen(cmd, shell=False, bufsize=1, stdin=PIPE, stdout=PIPE, stderr=STDOUT) except OSError as e: return "nsupdate: execution failed: %s" % e except: return "nsupdate: some error occured" (output, err) = p.communicate(nsup.encode()) if output.decode().find('status: NOERROR') >= 0: return None return output.decode()+err.decode() # def dur(sec): sec = int(sec) h = int(sec / 3600) m = int((sec - h * 3600) / 60) s = int((sec - h * 3600) % 60) if h > 0: return "%d:%02d:%02d" % (h, m, s) if m > 0: return "%d:%02d" % (m, s) return "0:%02d" % s def fixsort(): s = list(hbdclass.Host.hosts.keys()) s.sort() x = 0 for n in s: hbdclass.Host.hosts[n].num = x x += 1 # def on_exit(): if DEBUG > 0: sys.stderr.write("on_exit\n") try: logf.close() except: pass print("exit") def initlog(logfile): try: return open(logfile, "a+") except: pass try: return open(logfile, "w") except Exception as e: print("cannot open loffile %s, using STDERR: %s" % (logfile, e)) return sys.stderr # # def checkoverdue(): now = time.time() for h in list(hbdclass.Host.hosts.keys()): pmsg = [] for c in hbdclass.Host.hosts[h].connections: conn = hbdclass.Host.hosts[h].connections[c] if conn.state == hbdclass.Connection.down: continue timeout = hbdclass.Host.hosts[h].interval + grace if conn.state == hbdclass.Connection.up and (now - conn.lastbeat) > timeout: conn.newstate(hbdclass.Connection.overdue, now, grace) pmsg.append(conn.afam) if conn.state == hbdclass.Connection.overdue and (now - conn.lastbeat) > DROPOVERDUE: conn.newstate(hbdclass.Connection.unknown, conn.lastbeat) if pmsg != []: if h in watchhosts: email("overdue", "%s overdue" % " and ".join(pmsg)) pushmsg("%s %s overdue" % (h, " and ".join(pmsg))) log(h, "%s overdue" % " and ".join(pmsg)) msg_to_websockets('host', hbdclass.Host.hosts[h].stateinfo()) def log(host, m, service=None): if DEBUG > 0: print("Log: %s %s" % (host, m)) now = time.time() ts = time.strftime("%b %d %H:%M:%S", time.localtime(now)) if service: srv = "service %s: " % service else: srv = "" if host: hst = "%s " % host else: hst = "" msg = "%s: %s%s%s" % (ts, hst, srv, m) msgs.append(msg+'\n') msg_to_websockets('message', msg) if logfmt == "msg": m2 = "%d|%s|%s\n" % (now, hst, m) else: m2 = msg+'\n' logf.write(m2) logf.flush() pickleit() def dnsupdatethread(): while True: name, addr = hbdclass.Host.dnsQ.get() m = "changed address to %s" % (addr) for dyndomain in dyndomains: err = nsupdate(name, addr, dyndomain) if err: m += ", DNS update failed: %s" % err email("error: nsupdate failed", "%s.dy.%s: %s" % (name, dyndomain, m)) else: m += ", DNS updated." hbdclass.Host.dnsQ.task_done() log(name, m) # # # # def readsock(sock): global now if DEBUG > 3: sys.stderr.write("readsock recfrom start") now = time.time() data, addrp = sock.recvfrom(MAXRECV) if DEBUG > 3: sys.stderr.write("readsock = %s, %s\n" % (data,addrp)) try: msg = stodict(data) except: return if DEBUG > 3: sys.stderr.write("msg is %s" % str(msg)) if not msg: # Old hbc client if verbose: print(("old hbc:", data)) msg = oldmtodict(data) if DEBUG > 2: print(("readsock = %s, %s" % (msg,addrp))) addr = addrp[0:2] name = shortname(msg.get('name', "unknown")) if not name in hbdclass.Host.hosts: # was: hosts.has_key(name): host = hbdclass.Host(name) host.dyn = name in dyndnshosts if verbose: print(("XX: New host, num now %s" % (len(hbdclass.Host.hosts)))) newh=True else: host = hbdclass.Host.hosts[name] newh=False cid = msg.get('id', 0) try: rtt = float(msg.get('rtt',None)) except: rtt = None if msg['ID'] == 'HTB': host.doesack = msg.get('acks', -1) host.setcver(msg.get('ver', 0)) interval = int(msg.get('interval', 0)) shutdown = msg.get('shutdown', 0) service = msg.get('service', "unknown") message = msg.get('msg', None) boot = msg.get('boot', 0) conn, res = host.conndata(cid, addr[0], rtt, now) if res: log(name, res) if name in watchhosts: email("address change", "%s %s" % (host.name, res)) pushmsg("%s %s" % (host.name, res)) if boot: log(name, "booted") if name in watchhosts: m = "%s booted" % (host.name) email("booted", m) pushmsg(m) if message: log(name, "msg: %s" % message, service=service) if name in watchhosts: email("msg", message) pushmsg(message) if conn.getstate() != hbdclass.Connection.up: # XXX and interval > 0: lasts = conn.state d = conn.newstate(hbdclass.Connection.up, now) m = "%s back after being %s for %s" % (conn.afam, lasts, dur(d)) log(name, m) if name in watchhosts: email("%s back" % conn.afam, name) pushmsg("%s %s is back" % (name, conn.afam)) if boot or newh: host.upcount = host.doesack else: host.upcount += 1 if shutdown: log(name, "%s shutdown" % conn.afam) if name in watchhosts: email("shutdown", "%s %s shutdown" % (name, conn.afam)) pushmsg("%s %s shutdown" % (name, conn.afam)) conn.newstate(hbdclass.Connection.down, now) if interval > 0: host.interval = interval rmsg = {'time': time.time()} op = 'ACK' if host.cver < 1: opkt = 'ACK' rmsg = 'ACK' else: opkt = dicttos('ACK', rmsg, host.cver > 1) # clients w/ ver 2+ can cope try: ss=sock.sendto(opkt, addr) except: pass # XXX return pkg failes if DEBUG > 2: print(("sendto1: %s (%s) %s %s" % (addr, len(opkt), op, str(rmsg)[:50]))) # send any commands we have queued while len(host.cmds): op, rmsg = host.cmds[0] if op == 'CMD': email("%s cmd exec" % name, "command '%s' sent" % rmsg) del host.cmds[0] log(name, "command sent") if host.cver < 1: rmsg = rmsg['cmd'] elif op == 'UPD': del host.cmds[0] log(name, "update initiated") if host.cver < 1: log(name," ver 0 does not support UPD") continue if host.cver < 1: opkt = rmsg op = "" else: opkt = dicttos(op, rmsg, True) try: ss=sock.sendto(opkt, addr) except Exception as e: print(("opkt len is %s" % len(opkt))) print(("cannot send: %s" % e)) if verbose: print(("sendto2: %s (%s) %s %s" % (addr, len(opkt), op, str(rmsg)[:50]))) if DEBUG > 2: print(("msg from %s,%s, sent %s bytes back" % (addr[0], addr[1], ss))) msg_to_websockets('host', host.stateinfo()) def updatecode(ucode, uname): fail = None try: fh = open(ucode, "r") new_code = fh.read() fh.close() except Exception as e: fail = "cannot read new code: %s" % e if not fail: m = md5() new_codeE = new_code.encode() m.update(new_codeE) icsum = m.hexdigest() rmsg = {'csum': icsum, 'code': codecs.encode(new_codeE, 'base64') } hbdclass.Host.hosts[uname].cmds.append(('UPD',rmsg)) return fail # # Web Server # class HttpServer(socketserver.ThreadingMixIn, http.server.HTTPServer): allow_reuse_address = True def threaded(self): pass # # class HttpHandler(http.server.BaseHTTPRequestHandler): server_version = "HeartbeatHTTP/%s" % VER def version_string(self): return self.server_version def handle(self): # return http.server.BaseHTTPRequestHandler.handle(self) try: return http.server.BaseHTTPRequestHandler.handle(self) except Exception as e: self.log_error("Request went away: %r", e) self.close_connection = 1 return def do_HEAD(self): self.setheaders(200) def setheaders(self, code, headerdict={}): self.send_response(code) self.send_header("Last-Modified", time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now))) # self.send_header("Accept-Ranges","bytes") # self.send_header("hbdclass.Connection","close") for h in headerdict: self.send_header(h, headerdict[h]) self.end_headers() def buildhead(self, title="Heartbeat", refresh=None, extras=None): res=[] res.append('') res.append("") res.append("") res.append('%s' % (title)) if refresh: res.append("\n" % refresh) if extras: res.append(extras) res.append("") res.append('') return res def buildpage(self): res=self.buildhead(refresh=60, extras=tcss) res.append("

Heartbeat status %s

" % VER) res += hbdclass.ubHost.buildhosttable() res += hbdclass.ubHost.buildmsgtable(msgs) res.append('

%s (%s)

' % (time.strftime("%H:%M:%S", time.localtime(now)), os.environ.get('TZ', 'CET-1CDT'))) res.append("") return res def builderror(self, code, cause, lcause): res=[] res.append('') res.append('') res.append('%s %s' % (code, cause)) res.append('') res.append('

%s

' % (cause)) res.append('

%s

' % lcause) res.append('
') res.append('
hbd (Unix) Server at %s:%s
' % (hbd_host, hbd_port)) res.append('') return code, res def do_GET(self): global sig code = 200 xsig = 0 rqAcceptEncoding = self.headers.get('Accept-encoding',{}) headerdict = {"Content-Type": "text/html; charset = ISO-8859-1" } if DEBUG > 2: sys.stderr.write("handle\n") qr = urllib.parse.urlparse(self.path) qa = urllib.parse.parse_qs(qr.query) if DEBUG > 2: sys.stderr.write("handle = %s\n" % (qr.geturl())) if qr.path == "/": res=self.buildpage() elif qr.path == "/c": # command on host /c?h=melschserver&c=sudo%20ls uname=qa.get('h',[None])[0] ucmd=qa.get('c', [None])[0] if not ucmd or not uname: code, res=self.builderror(400, 'Argument error', "need h= and c= arguments") elif uname not in hbdclass.Host.hosts: code, res=self.builderror(400, 'Data error', "h=%s not found" % uname) else: hbdclass.Host.hosts[uname].cmds.append(('CMD', {'cmd': urllib.parse.unquote(ucmd)})) res=self.buildhead() res.append("cmd %s queued for host %s" % (uname, ucmd)) elif qr.path == "/d": # drop host /d?h=melschserver uname=qa.get('h',[None])[0] if not uname: code, res=self.builderror(400, 'Argument error', "need h= argument") if not uname in hbdclass.Host.hosts: code, res=self.builderror(400, 'Data error', "h=%s not found" % uname) else: log(uname, "dropped") # for addr in hbdclass.Host.hosts[uname].0i # TODO: send message to websocket about dropped host del hbdclass.Host.hosts[uname] res=self.buildhead() res.append("Done") elif qr.path == "/n": # register name uname=qa.get('h',[None])[0] if not uname: code, res=self.builderror(400, 'Argument error', "need h= argument") if not uname in hbdclass.Host.hosts: code, res=self.builderror(400, 'Data error', "h=%s not found" % uname) else: ll = hbdclass.Host.hosts[uname].registerDns() res.append(ll) log(uname, ll) elif qr.path == "/u": # update uname=urllib.parse.unquote(qa.get('h',[None])[0]) ucode=qa.get('c', [None])[0] if not ucode or not uname: code, res=self.builderror(400, 'Argument error', "need h= and c= arguments") elif uname != 'All' and uname not in hbdclass.Host.hosts: code, res=self.builderror(400, 'Data error', "h=%s not found" % uname) else: res=self.buildhead() if uname != 'All': names = [uname] else: names = [] for n in hbdclass.Host.hosts: if hbdclass.Host.hosts[n].cver >= 2: # earliest version that supports update names.append(n) for n in names: err = updatecode(ucode, n) res.append("update started for %s: %s
" % (n, err if err else "OK")) res.append("Done") elif qr.path == "/api/0/hosts": # api access to host table headerdict = {"Content-Type": "application/json; charset=utf-8" } l=[] for h in hbdclass.Host.hosts: l.append(hbdclass.Host.hosts[h].jsons()) res=["["+",".join(l)+"]"] elif qr.path == "/api/0/messages": # api access to host table headerdict = {"Content-Type": "application/json; charset=utf-8" } l=msgs[len(msgs)-30:] res=[json.dumps(l)] elif qr.path == "/r": # restart res=self.buildhead() res.append("restart request") xsig=signal.SIGHUP log(None, "restart request") else: code, res=self.builderror(404, "Not Found", "requested URL was not found on this server.") if 'deflate' in rqAcceptEncoding: headerdict['Content-Encoding'] = "deflate" towrite = zlib.compress("\n".join(res).encode(), 6) else: towrite = "\n".join(res) headerdict['Content-Length'] = len(towrite) headerdict['Cache-Control'] = 'private, must-revalidate, max-age=0' headerdict['Expires'] = 'Thu, 01 Jan 1970 00:00:00 GMT' self.setheaders(code, headerdict) self.wfile.write(towrite) if xsig: sig = xsig def setrunning(new): global running if DEBUG > 0: sys.stderr.write("running is now = %s\n" % (new)) running = new def closeup(): setrunning(False) try: sock.close() except: pass try: sock6.close() except: pass if DEBUG > 0: sys.stderr.write("asking http server to stop\n") try: serv.shutdown() if DEBUG > 0: sys.stderr.write("http server stopped\n") except Exception as e: if DEBUG > 0: sys.stderr.write("http server did NOT stop: %s\n" % str(e)) try: serv.server_close() except: pass log(None, "restarting") try: logf.close() except: pass # signal.signal(signal.SIGTERM, 0) signal.signal(signal.SIGHUP, 0) def restart(): if verbose: print(("execv %s %s" % (sys.argv[0], [sys.argv[0]]+cmdargs))) os.execv(sys.argv[0], [sys.argv[0]]+cmdargs) print("should not be here") def saveandrestart(): closeup() restart() def pickleit(): pickf = open(pickfile, 'wb') pick = pickle.Pickler(pickf) pick.dump(hbdclass.Host.hosts) pick.dump(msgs) pick.dump(lastfm) pickf.close() ws_connections = {} async def ws_serve(websocket, path): ws_connections[websocket] = path remote_address = websocket.remote_address if verbose: print(f"DBG ws_serve: {remote_address}: {path}") while True: try: name = await websocket.recv() if verbose: print(f"DBG ws_serve: receive {name}") except ( websockets.exceptions.ConnectionClosedOK, websockets.exceptions.ConnectionClosedError) as e: print(f'ws closed: {e}') break print(f"< {name} at {path}") for h in sorted(hbdclass.Host.hosts): jmsg = json.dumps({'type': 'host', 'data': hbdclass.Host.hosts[h].stateinfo() }) await websocket.send(jmsg) for m in msgs[len(msgs)-20:]: jmsg = json.dumps({'type': 'message', 'data': m }) await websocket.send(jmsg) if verbose: print(f"DBG ws_serve: close {remote_address}") await websocket.wait_closed() # try: # del ws_connections[websocket] # except Exception as e: # print(f"warning: failed to delete websocket: {e}") def websocketupdater(): loop.run_forever() def msg_to_websockets(typ: str, msg: str): jmsg = json.dumps({'type': typ, 'data': msg}) to_close = [] for ws in ws_connections: if ws.closed: to_close.append(ws) continue try: asyncio.run_coroutine_threadsafe(ws.send(jmsg), loop) except Exception: to_close.append(ws) print("ws.send exception: closed") for ws in to_close: asyncio.run_coroutine_threadsafe(ws.wait_closed(), loop) if ws in ws_connections: del ws_connections[ws] # # Main # PUSHSRVS = ["all", "pushover", "mattermost" ] helpflag = False forground = False pushsrv = "pushover" # mattermost dyndomains = ["wrede.org"] optlist = [] args = [] home = os.environ['HOME'] cmdargs = [] configfile = "%s/.hbrc" % home try: optlist, args = getopt.getopt(sys.argv[1:], 'c:dfh:p:vx') except: helpflag = True for o, a in optlist: if o == '-c': configfile = a cmdargs += [o, a] if o == '-f': forground = True cmdargs += [o] elif o == '-h': helpflag = True elif o == '-v': verbose = True cmdargs += [o] elif o == '-p': if a in PUSHSRVS: pushsrv = a cmdargs += [o, a] else: print("invalid push service, use of of %s" % PUSHSRVS) helpflag = True elif o == '-x': DEBUG += 1 cmdargs += [o] if helpflag: print("hbc HeartBeatDaemon") print("usage: hbd [-dfhvx] [-c configfile]") print() print(" -c configfile") print(" -d display") print(" -f run in foreground") print(" -h this help") print(" -v verbose") print(" -x increase debug lvl") print() print(""" config file can contain logfile = /var/log/heartbeat.log logfmt = [text|msg] hb_port = 50003 interval = 20 hbd_port = 50004 hbd_host = www.domain.com grace = 2 """) sys.exit(1) # # set defaults hb_port = PORT hbd_host = THOST hbd_port = TPORT pickfile = PICKFILE logfile = LOGFILE logfmt = "text" interval = INTERVAL grace = GRACE watchhosts = [] dyndnshosts = [] drophosts = [] nsupdate_bin = NSUPDATE_BIN try: f = open(configfile, "r") if verbose: print(("notice: using config file %s" % configfile)) except: print(("warning: running without config file: %s" % configfile)) f = None if f: while 1: ls = f.readline() if len(ls) == 0: break l = ls[:-1].strip() if len(l) == 0 or l[0] == "#": continue if verbose: print((" %s" % l)) r = l.split('=') o = r[0].strip() try: a = eval(r[1].strip()) except Exception as e: print(("error: %s" % str(r))) sys.exit(1) if o == 'interval': interval = a elif o == 'grace': grace = a elif o == 'hbd_port': hbd_port = a elif o == 'hbd_host': hbd_host = a elif o == 'pickfile': pickfile = a elif o == 'hb_port': hb_port = a elif o == 'logfile': logfile = a elif o == 'logfmt': logfmt = a elif o == 'watchhosts': watchhosts = a elif o == 'dyndnshosts': dyndnshosts = a elif o == 'drophosts': drophosts = a elif o == 'nsupdate_bin': nsupdate_bin = a elif o == 'pushsrv': pushsrv = a elif o == 'dyndomains': dyndomains = a f.close() if len(args) != 0: print("error: args") sys.exit(1) if pushsrv in ["all", "mattermost"]: try: from mattermostdriver import Driver except: print("warning: mattermostdriver python module missing, reverting to pushover") pushsrv = "pushover" if verbose: print("notice: logging to %s" % logfile) print("notice: push service is %s" % pushsrv) logf = initlog(logfile) if 1 and os.path.exists(pickfile): if verbose: print(("opening pickls %s" % pickfile)) pickf = open(pickfile, 'rb') pick = pickle.Unpickler(pickf) try: hbdclass.Host.hosts = pick.load() msgs = pick.load() try: lastfm = pick.load() except: lastfm = ["","",""] pickf.close() except Exception as e: print(("load pickled failed: %s" % e)) os.unlink(pickfile) hbdclass.Connection.htab = {} for h in list(hbdclass.Host.hosts.keys()): hbdclass.Host.hosts[h].dyn = h in dyndnshosts hbdclass.Host.hosts[h].watched = h in watchhosts hbdclass.Host.hosts[h].fixup() for h in drophosts: if h in hbdclass.Host.hosts: del hbdclass.Host.hosts[h] if verbose: print(("%s pickled hosts loaded" % len(hbdclass.Host.hosts))) else: if verbose: print("no pickled data") now = time.time() startsec = int(now) % interval log(None, "Starting %s" % VER) atexit.register(on_exit) ilist = [] sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(("", hb_port)) ilist.append(sock) sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock6.bind(("", hb_port)) ilist.append(sock6) #ilist.append(serv.fileno()) if not forground: pid = os.fork() if pid > 0: if verbose: print(("daemoinizing... pid = %d" % pid)) sys.exit(0) verbose = False os.close(0) os.close(1) os.close(2) sys.stdin.close() if DEBUG > 0: sys.stdout = LogDevice() sys.stderr = LogDevice() else: sys.stdout = NullDevice() sys.stderr = NullDevice() os.chdir("/tmp") os.setsid() os.umask(0) try: serv = HttpServer((hbd_host, hbd_port), HttpHandler) except: print(("failed to start server on %s:%s" % (hbd_host, hbd_port))) sys.exit(1) # loop=asyncio.new_event_loop() asyncio.set_event_loop(loop) ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) wss_pem = pathlib.Path(WSS_PEM) wss_key = pathlib.Path(WSS_KEY) ssl_context.load_cert_chain(wss_pem, keyfile=wss_key) wss_start_server = websockets.serve(ws_serve, hbd_host, WSSPORT, ssl=ssl_context, loop=loop, subprotocols="hbd") loop.run_until_complete(wss_start_server) ws_start_server = websockets.serve(ws_serve, hbd_host, WSPORT, loop = loop, subprotocols="hbd") loop.run_until_complete(ws_start_server) servthread = threading.Thread(target=serv.serve_forever) servthread.daemon = True servthread.start() dnsT = threading.Thread(target=dnsupdatethread) dnsT.daemon = True dnsT.start() wsT = threading.Thread(target=websocketupdater) wsT.daemon = True wsT.start() running = True sig = 0 signal.signal(signal.SIGTERM, handler) signal.signal(signal.SIGHUP, handler) rnext = int(now)+15 # 15 seconds time to settle after (re-)start sleep = 1 firstcheck = int(now) + 15 while running: sr = None if DEBUG > 3: sys.stderr.write("about to sleep = %s\n" % (sleep)) try: sr = select.select(ilist, [], [], sleep) now = time.time() except KeyboardInterrupt: sys.stderr.write("Keyboard Interrupt!\n") running = False closeup() continue except OSError as value: if value.errno != 4: # interrupted system call sys.stderr.write("select err %s %s" % (select.error, value)) #raise os.error, value continue continue except Exception as e: if DEBUG > 2: sys.stderr.write("select exception %s\n" % (str(e))) sys.exit(1) if DEBUG > 3: sys.stderr.write("woke from sleep = %s (%s)\n" % (str(sr), str(ilist))) for fh in sr[0]: if fh in [sock, sock6]: readsock(fh) # elif fh == serv.fileno(): # serv.handle_request() else: sys.stderr.write("what happend just now?\n") if DEBUG > 3: sys.stderr.write("done handling, running is %s, sig is %s\n" % (running, sig)) # check hour/day/week for v in range(3): fm=tsfm[v] ts=time.strftime(tsfm[v], time.localtime(now)) if ts != lastfm[v]: lastfm[v]=ts for h in list(hbdclass.Host.hosts.keys()): hbdclass.Host.hosts[h].hdwcounts[v] = [hbdclass.Host.hosts[h].doesack, hbdclass.Host.hosts[h].upcount] if now >= rnext and now >= firstcheck: rnext = now+1 checkoverdue() sleep = rnext-now if sleep < 0: sys.stderr.write("sleep is negative! %s next = %s\n" % (sleep, rnext)) sleep = 0 if DEBUG > 3: sys.stderr.write("sleep = %s next = %s\n" % (sleep, rnext)) if sig != 0: setrunning(False) if sig == signal.SIGHUP: if DEBUG > 0: sys.stderr.write("signal 1 saveandrestart\n") saveandrestart()