Files
heartbeat/hbd
T
2021-06-20 13:40:49 -04:00

1221 lines
28 KiB
Python
Executable File

#!/usr/bin/env python3
# $Id: hbd,v 1.38 2013/07/14 02:25:05 andreas Exp $
# Wait for heartbeat messages and act on them (or their absence)
#
VER = 4.2
import time
import os
import string
import sys
import socket
import ssl
import pathlib
import atexit
import select
import socketserver
import http.server
import getopt
import signal
import pickle
import smtplib
import traceback
import urllib.request, urllib.parse, urllib.error
import urllib.parse
import http.client
import threading
import subprocess
from hashlib import md5
import json
import zlib
import codecs
import asyncio
import websockets
from subprocess import Popen, STDOUT, PIPE
#from hbdclass import *
import hbdclass
CERT_PATH="/usr/local/etc/letsencrypt/live/w02.wrede.ca/"
WSS_PEM = CERT_PATH + "fullchain.pem"
WSS_KEY = CERT_PATH + "privkey.pem"
NSUPDATE_BIN = "/usr/local/bin/nsupdate" # override in .hbrc possible
SEND_EMAIL=False
SEND_PUSHOVER=True
DEBUG = 0
hbdclass.DEBUG = DEBUG
MAXRECV = 32767
LOGFILE = "/home/andreas/public_html/messages/andreas"
PICKFILE = "/var/tmp/hbd.pick"
AEMAIL = ["andreas@wrede.ca"]
NAME = "heatbeat"
SMTPSERVER = "localhost"
msgs = []
#AEW upcount = 0
PORT = 50003
TPORT = 50004
THOST = ""
WSPORT = 50005
WSSPORT = 50006
verbose = False
INTERVAL = 10
GRACE = 2
DROPOVERDUE = 7*24*3600
os.environ['TZ'] = 'EST5EDT'
tsfm=["%H","%d","%U"]
lastfm=["","",""]
tcss = """<script src="https://home.wrede.ca/pr/sorttable.js"></script>
<style>
#ntable {
border-collapse: collapse;
}
#wide-ntable {
border-collapse: collapse;
width: 100%;
}
#ntable td, #ntable th {
border: 1px solid #ddd;
text-align: left;
padding: 1px;
}
#ntable tr:nth-child(even){background-color: #f2f2f2}
#ntable tr:hover {background-color: #ddd;}
#ntable th {
padding-top: 12px;
padding-bottom: 12px;
background-color: #9d9d9d;
color: white;
}
</style> """
def handler(signum, frame):
global running, sig
sig = signum
if not running:
if verbose:
sys.stderr.write("NOT runing signal: %s running: %d" % (sig, running))
sys.exit(2)
if verbose:
sys.stderr.write("signal: %s running: %s frame: %s" % (sig, running, frame))
def shortname(name):
r = name.split('.')
return r[0]
class NullDevice:
def write(self, s):
pass
class LogDevice:
def __init__(self):
self.fh = open("/tmp/log1","a")
def write(self, s):
self.fh.write(s)
self.fh.flush()
def dicttos(ID, d, compress=False):
s = []
for k in d:
if type(d[k]) == type(1.2):
s.append("%s=%0.5f" % (k, d[k]))
else:
s.append("%s=%s" % (k, d[k]))
pk = ";".join(s)
if compress:
zpk = zlib.compress(pk.encode(), 6)
ID = "!" + ID + ":"
opk = ID.encode() + zpk
else:
zpk = pk
opk = ID + ":" + zpk
return opk
def stodict(msg):
d = {}
if len(msg) > 0 and chr(msg[0]) == "!":
pk = zlib.decompress(msg[5:]).decode()
d['ID'] = msg[1:4].decode()
else:
r0 = msg.split(':',1)
pk = r0[1]
d['ID'] = r0[0]
r = pk.split(';')
for v in r:
vr = v.split('=', 1)
k = vr[0].strip()
if len(vr) == 1:
d[k] = None
else:
v = vr[1].strip()
if v[0].isdigit():
v = eval(v)
d[k] = v
return d
def oldmtodict(msg):
return stodict('HTB:'+msg)
def email(s, msg):
if not SEND_EMAIL:
return
ret = "OK"
toaddrs = AEMAIL
fromemail = "aew.heartbeat@wrede.ca"
subj = "Info from %s: %s" % (NAME, s)
date = time.strftime("%a, %d %b %Y %H:%M:%S %z", time.localtime())
body = "To: %s\nFrom: %s\nSubject: %s\nDate: %s\n\n%s" % (toaddrs[0], fromemail, subj, date, msg)
try:
server = smtplib.SMTP(SMTPSERVER)
if DEBUG > 0: server.set_debuglevel(1)
server.sendmail(fromemail, toaddrs, body)
except smtplib.SMTPRecipientsRefused as errs:
log(None, "cannot send email: %s\n" % (errs))
ret = "Fail"
except:
print(("smtp error: "+traceback.format_exc()))
saveandrestart()
try:
server.quit()
except:
pass
return ret
def pushmsg(msg):
if pushsrv in ["all", "pushover"]:
pushover(msg)
if pushsrv in ["all","mattermost"]:
pushmattermost(msg)
if pushsrv in ["all","signal"]:
pushsignal(msg)
if pushsrv in ["all"]:
print("notice:", msg)
def pushover(msg):
if not SEND_PUSHOVER:
return
conn = http.client.HTTPSConnection("api.pushover.net:443")
try:
conn.request("POST", "/1/messages.json",
urllib.parse.urlencode({
"token": "ac7NLX2rPjXFareeDgLpXNoDf4iFmf",
"user": "uDhH33UjQQDYtNzJb1ThRiWb9ingGK",
"message": msg, }), { "Content-type": "application/x-www-form-urlencoded" })
conn.getresponse()
except:
pass
CHANNEL = "Monitoring"
TOKEN = "rxz6b3886iygxnhbzpmgbsrocy"
HOST = "192.168.10.101"
ICON = "https://in-transit.ca/HeartBeat.png"
USERNAME = "admin"
def pushmattermost(msg):
ses = {
'url': HOST,
'scheme':'http',
'basepath': '/api/v4',
'port':8065,
}
mm = Driver(ses)
msg = {
"text": msg,
"channel": CHANNEL,
"username": USERNAME,
"icon_url": ICON
}
try:
rc = mm.webhooks.call_webhook(TOKEN, msg)
except Exception as e:
rc = str(e)
if not rc:
print(rc)
USER = "+16472472447"
RECIPIENT = "+14168226179"
def pushsignal(msg, title="hbd", recipient=RECIPIENT):
message = f'"{title}: {msg}"'
CLI = [
"/usr/bin/ssh", "andreas@w02",
"/usr/local/bin/signal-cli", "-u", USER,
"send", "-m", message,
# "-g", GROUP,
recipient,
]
if verbose: print(f"DBG cli: {CLI}")
res = subprocess.run(CLI, shell=False, capture_output=True)
rc = res.returncode == 0
print(res.stdout.decode())
if not rc:
print(f"signalcli failed: {res.stderr.decode()}")
else:
if verbose: print(f"signalcli msg sent, res {res.stdout.decode()}")
return rc
# nsupdate: set the DNS A record for a fqdn
# return: None if ok, else error text
def nsupdate(hostname, newip, dyndomain):
D = {}
D['domain'] = dyndomain
D['fqdn'] = "%s.dy.%s" % (hostname, dyndomain)
D['dnsttl'] = '5'
D['newip'] = newip
D['ts'] = time.strftime('%Y-%m-%d.%H:%M:%S', time.gmtime())
if newip.find(":") > 0:
nsup = """update delete %(fqdn)s AAAA
update add %(fqdn)s %(dnsttl)s AAAA %(newip)s
update delete %(fqdn)s TXT
update add %(fqdn)s %(dnsttl)s TXT "Created: %(ts)s"
send
answer
""" % D
else:
nsup = """update delete %(fqdn)s A
update add %(fqdn)s %(dnsttl)s A %(newip)s
update delete %(fqdn)s TXT
update add %(fqdn)s %(dnsttl)s TXT "Created: %(ts)s"
send
answer
""" % D
if DEBUG > 0: log(None, "DBG: nsup %s" % nsup)
cmd = [nsupdate_bin, "-k", "/etc/dhcpc/Kdy.%(domain)s.+157+00000." % D, "-v"]
if DEBUG > 0: log(None, "DBG: cmd %s" % cmd)
try:
p = Popen(cmd, shell=False, bufsize=1, stdin=PIPE, stdout=PIPE, stderr=STDOUT)
except OSError as e:
return "nsupdate: execution failed: %s" % e
except:
return "nsupdate: some error occured"
(output, err) = p.communicate(nsup.encode())
if output.decode().find('status: NOERROR') >= 0:
return None
return output.decode()+err.decode()
#
def dur(sec):
sec = int(sec)
h = int(sec / 3600)
m = int((sec - h * 3600) / 60)
s = int((sec - h * 3600) % 60)
if h > 0:
return "%d:%02d:%02d" % (h, m, s)
if m > 0:
return "%d:%02d" % (m, s)
return "0:%02d" % s
def fixsort():
s = list(hbdclass.Host.hosts.keys())
s.sort()
x = 0
for n in s:
hbdclass.Host.hosts[n].num = x
x += 1
#
def on_exit():
if DEBUG > 0: sys.stderr.write("on_exit\n")
try:
logf.close()
except:
pass
print("exit")
def initlog(logfile):
try:
return open(logfile, "a+")
except:
pass
try:
return open(logfile, "w")
except Exception as e:
print("cannot open loffile %s, using STDERR: %s" % (logfile, e))
return sys.stderr
#
#
def checkoverdue():
now = time.time()
for h in list(hbdclass.Host.hosts.keys()):
pmsg = []
for c in hbdclass.Host.hosts[h].connections:
conn = hbdclass.Host.hosts[h].connections[c]
if conn.state == hbdclass.Connection.down:
continue
timeout = hbdclass.Host.hosts[h].interval + grace
if conn.state == hbdclass.Connection.up and (now - conn.lastbeat) > timeout:
conn.newstate(hbdclass.Connection.overdue, now, grace)
pmsg.append(conn.afam)
if conn.state == hbdclass.Connection.overdue and (now - conn.lastbeat) > DROPOVERDUE:
conn.newstate(hbdclass.Connection.unknown, conn.lastbeat)
if pmsg != []:
if h in watchhosts:
email("overdue", "%s overdue" % " and ".join(pmsg))
pushmsg("%s %s overdue" % (h, " and ".join(pmsg)))
log(h, "%s overdue" % " and ".join(pmsg))
msg_to_websockets('host', hbdclass.Host.hosts[h].stateinfo())
def log(host, m, service=None):
if DEBUG > 0: print("Log: %s %s" % (host, m))
now = time.time()
ts = time.strftime("%b %d %H:%M:%S", time.localtime(now))
if service:
srv = "service %s: " % service
else:
srv = ""
if host:
hst = "%s " % host
else:
hst = ""
msg = "%s: %s%s%s" % (ts, hst, srv, m)
msgs.append(msg+'\n')
msg_to_websockets('message', msg)
if logfmt == "msg":
m2 = "%d|%s|%s\n" % (now, hst, m)
else:
m2 = msg+'\n'
logf.write(m2)
logf.flush()
pickleit()
def dnsupdatethread():
while True:
name, addr = hbdclass.Host.dnsQ.get()
m = "changed address to %s" % (addr)
for dyndomain in dyndomains:
err = nsupdate(name, addr, dyndomain)
if err:
m += ", DNS update failed: %s" % err
email("error: nsupdate failed", "%s.dy.%s: %s" % (name, dyndomain, m))
else:
m += ", DNS updated."
hbdclass.Host.dnsQ.task_done()
log(name, m)
#
#
#
#
def readsock(sock):
global now
if DEBUG > 3: sys.stderr.write("readsock recfrom start")
now = time.time()
data, addrp = sock.recvfrom(MAXRECV)
if DEBUG > 3: sys.stderr.write("readsock = %s, %s\n" % (data,addrp))
try:
msg = stodict(data)
except:
return
if DEBUG > 3: sys.stderr.write("msg is %s" % str(msg))
if not msg: # Old hbc client
if verbose: print(("old hbc:", data))
msg = oldmtodict(data)
if DEBUG > 2: print(("readsock = %s, %s" % (msg,addrp)))
addr = addrp[0:2]
name = shortname(msg.get('name', "unknown"))
if not name in hbdclass.Host.hosts: # was: hosts.has_key(name):
host = hbdclass.Host(name)
host.dyn = name in dyndnshosts
if verbose: print(("XX: New host, num now %s" % (len(hbdclass.Host.hosts))))
newh=True
else:
host = hbdclass.Host.hosts[name]
newh=False
cid = msg.get('id', 0)
try:
rtt = float(msg.get('rtt',None))
except:
rtt = None
if msg['ID'] == 'HTB':
host.doesack = msg.get('acks', -1)
host.setcver(msg.get('ver', 0))
interval = int(msg.get('interval', 0))
shutdown = msg.get('shutdown', 0)
service = msg.get('service', "unknown")
message = msg.get('msg', None)
boot = msg.get('boot', 0)
conn, res = host.conndata(cid, addr[0], rtt, now)
if res:
log(name, res)
if name in watchhosts:
email("address change", "%s %s" % (host.name, res))
pushmsg("%s %s" % (host.name, res))
if boot:
log(name, "booted")
if name in watchhosts:
m = "%s booted" % (host.name)
email("booted", m)
pushmsg(m)
if message:
log(name, "msg: %s" % message, service=service)
if name in watchhosts:
email("msg", message)
pushmsg(message)
if conn.getstate() != hbdclass.Connection.up: # XXX and interval > 0:
lasts = conn.state
d = conn.newstate(hbdclass.Connection.up, now)
m = "%s back after being %s for %s" % (conn.afam, lasts, dur(d))
log(name, m)
if name in watchhosts:
email("%s back" % conn.afam, name)
pushmsg("%s %s is back" % (name, conn.afam))
if boot or newh:
host.upcount = host.doesack
else:
host.upcount += 1
if shutdown:
log(name, "%s shutdown" % conn.afam)
if name in watchhosts:
email("shutdown", "%s %s shutdown" % (name, conn.afam))
pushmsg("%s %s shutdown" % (name, conn.afam))
conn.newstate(hbdclass.Connection.down, now)
if interval > 0:
host.interval = interval
rmsg = {'time': time.time()}
op = 'ACK'
if host.cver < 1:
opkt = 'ACK'
rmsg = 'ACK'
else:
opkt = dicttos('ACK', rmsg, host.cver > 1) # clients w/ ver 2+ can cope
try:
ss=sock.sendto(opkt, addr)
except:
pass # XXX return pkg failes
if DEBUG > 2: print(("sendto1: %s (%s) %s %s" % (addr, len(opkt), op, str(rmsg)[:50])))
# send any commands we have queued
while len(host.cmds):
op, rmsg = host.cmds[0]
if op == 'CMD':
email("%s cmd exec" % name, "command '%s' sent" % rmsg)
del host.cmds[0]
log(name, "command sent")
if host.cver < 1:
rmsg = rmsg['cmd']
elif op == 'UPD':
del host.cmds[0]
log(name, "update initiated")
if host.cver < 1:
log(name," ver 0 does not support UPD")
continue
if host.cver < 1:
opkt = rmsg
op = ""
else:
opkt = dicttos(op, rmsg, True)
try:
ss=sock.sendto(opkt, addr)
except Exception as e:
print(("opkt len is %s" % len(opkt)))
print(("cannot send: %s" % e))
if verbose: print(("sendto2: %s (%s) %s %s" % (addr, len(opkt), op, str(rmsg)[:50])))
if DEBUG > 2: print(("msg from %s,%s, sent %s bytes back" % (addr[0], addr[1], ss)))
msg_to_websockets('host', host.stateinfo())
def updatecode(ucode, uname):
fail = None
try:
fh = open(ucode, "r")
new_code = fh.read()
fh.close()
except Exception as e:
fail = "cannot read new code: %s" % e
if not fail:
m = md5()
new_codeE = new_code.encode()
m.update(new_codeE)
icsum = m.hexdigest()
rmsg = {'csum': icsum, 'code': codecs.encode(new_codeE, 'base64') }
hbdclass.Host.hosts[uname].cmds.append(('UPD',rmsg))
return fail
#
# Web Server
#
class HttpServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
allow_reuse_address = True
def threaded(self):
pass
#
#
class HttpHandler(http.server.BaseHTTPRequestHandler):
server_version = "HeartbeatHTTP/%s" % VER
def version_string(self):
return self.server_version
def handle(self):
# return http.server.BaseHTTPRequestHandler.handle(self)
try:
return http.server.BaseHTTPRequestHandler.handle(self)
except Exception as e:
self.log_error("Request went away: %r", e)
self.close_connection = 1
return
def do_HEAD(self):
self.setheaders(200)
def setheaders(self, code, headerdict={}):
self.send_response(code)
self.send_header("Last-Modified", time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)))
# self.send_header("Accept-Ranges","bytes")
# self.send_header("hbdclass.Connection","close")
for h in headerdict:
self.send_header(h, headerdict[h])
self.end_headers()
def buildhead(self, title="Heartbeat", refresh=None, extras=None):
res=[]
res.append('<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">')
res.append("<html>")
res.append("<head>")
res.append('<title>%s</title>' % (title))
if refresh:
res.append("<meta http-equiv = Refresh content = %d>\n" % refresh)
if extras:
res.append(extras)
res.append("</head>")
res.append('<body BGCOLOR = "#FFFFFF" LINK = "#008000" VLINK = "#008000">')
return res
def buildpage(self):
res=self.buildhead(refresh=60, extras=tcss)
res.append("<H2>Heartbeat status %s</h2>" % VER)
res += hbdclass.ubHost.buildhosttable()
res += hbdclass.ubHost.buildmsgtable(msgs)
res.append('<p> %s (%s)</p>' % (time.strftime("%H:%M:%S", time.localtime(now)), os.environ.get('TZ', 'CET-1CDT')))
res.append("</body></html>")
return res
def builderror(self, code, cause, lcause):
res=[]
res.append('<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">')
res.append('<html><head>')
res.append('<title>%s %s</title>' % (code, cause))
res.append('</head><body>')
res.append('<h1>%s</h1>' % (cause))
res.append('<p>%s</p>' % lcause)
res.append('<hr>')
res.append('<address>hbd (Unix) Server at %s:%s</address>' % (hbd_host, hbd_port))
res.append('</body></html>')
return code, res
def do_GET(self):
global sig
code = 200
xsig = 0
rqAcceptEncoding = self.headers.get('Accept-encoding',{})
headerdict = {"Content-Type": "text/html; charset = ISO-8859-1" }
if DEBUG > 2: sys.stderr.write("handle\n")
qr = urllib.parse.urlparse(self.path)
qa = urllib.parse.parse_qs(qr.query)
if DEBUG > 2: sys.stderr.write("handle = %s\n" % (qr.geturl()))
if qr.path == "/":
res=self.buildpage()
elif qr.path == "/c": # command on host /c?h=melschserver&c=sudo%20ls
uname=qa.get('h',[None])[0]
ucmd=qa.get('c', [None])[0]
if not ucmd or not uname:
code, res=self.builderror(400, 'Argument error', "need h= and c= arguments")
elif uname not in hbdclass.Host.hosts:
code, res=self.builderror(400, 'Data error', "h=%s not found" % uname)
else:
hbdclass.Host.hosts[uname].cmds.append(('CMD', {'cmd': urllib.parse.unquote(ucmd)}))
res=self.buildhead()
res.append("cmd %s queued for host %s" % (uname, ucmd))
elif qr.path == "/d": # drop host /d?h=melschserver
uname=qa.get('h',[None])[0]
if not uname:
code, res=self.builderror(400, 'Argument error', "need h= argument")
if not uname in hbdclass.Host.hosts:
code, res=self.builderror(400, 'Data error', "h=%s not found" % uname)
else:
log(uname, "dropped")
# for addr in hbdclass.Host.hosts[uname].0i
# TODO: send message to websocket about dropped host
del hbdclass.Host.hosts[uname]
res=self.buildhead()
res.append("Done")
elif qr.path == "/n": # register name
uname=qa.get('h',[None])[0]
if not uname:
code, res=self.builderror(400, 'Argument error', "need h= argument")
if not uname in hbdclass.Host.hosts:
code, res=self.builderror(400, 'Data error', "h=%s not found" % uname)
else:
ll = hbdclass.Host.hosts[uname].registerDns()
res.append(ll)
log(uname, ll)
elif qr.path == "/u": # update
uname=urllib.parse.unquote(qa.get('h',[None])[0])
ucode=qa.get('c', [None])[0]
if not ucode or not uname:
code, res=self.builderror(400, 'Argument error', "need h= and c= arguments")
elif uname != 'All' and uname not in hbdclass.Host.hosts:
code, res=self.builderror(400, 'Data error', "h=%s not found" % uname)
else:
res=self.buildhead()
if uname != 'All':
names = [uname]
else:
names = []
for n in hbdclass.Host.hosts:
if hbdclass.Host.hosts[n].cver >= 2: # earliest version that supports update
names.append(n)
for n in names:
err = updatecode(ucode, n)
res.append("update started for %s: %s<br>" % (n, err if err else "OK"))
res.append("Done")
elif qr.path == "/api/0/hosts": # api access to host table
headerdict = {"Content-Type": "application/json; charset=utf-8" }
l=[]
for h in hbdclass.Host.hosts:
l.append(hbdclass.Host.hosts[h].jsons())
res=["["+",".join(l)+"]"]
elif qr.path == "/api/0/messages": # api access to host table
headerdict = {"Content-Type": "application/json; charset=utf-8" }
l=msgs[len(msgs)-30:]
res=[json.dumps(l)]
elif qr.path == "/r": # restart
res=self.buildhead()
res.append("restart request")
xsig=signal.SIGHUP
log(None, "restart request")
else:
code, res=self.builderror(404, "Not Found", "requested URL was not found on this server.")
if 'deflate' in rqAcceptEncoding:
headerdict['Content-Encoding'] = "deflate"
towrite = zlib.compress("\n".join(res).encode(), 6)
else:
towrite = "\n".join(res)
headerdict['Content-Length'] = len(towrite)
headerdict['Cache-Control'] = 'private, must-revalidate, max-age=0'
headerdict['Expires'] = 'Thu, 01 Jan 1970 00:00:00 GMT'
self.setheaders(code, headerdict)
self.wfile.write(towrite)
if xsig:
sig = xsig
def setrunning(new):
global running
if DEBUG > 0: sys.stderr.write("running is now = %s\n" % (new))
running = new
def closeup():
setrunning(False)
try:
sock.close()
except:
pass
try:
sock6.close()
except:
pass
if DEBUG > 0: sys.stderr.write("asking http server to stop\n")
try:
serv.shutdown()
if DEBUG > 0: sys.stderr.write("http server stopped\n")
except Exception as e:
if DEBUG > 0: sys.stderr.write("http server did NOT stop: %s\n" % str(e))
try:
serv.server_close()
except:
pass
log(None, "restarting")
try:
logf.close()
except:
pass
# signal.signal(signal.SIGTERM, 0)
signal.signal(signal.SIGHUP, 0)
def restart():
if verbose: print(("execv %s %s" % (sys.argv[0], [sys.argv[0]]+cmdargs)))
os.execv(sys.argv[0], [sys.argv[0]]+cmdargs)
print("should not be here")
def saveandrestart():
closeup()
restart()
def pickleit():
pickf = open(pickfile, 'wb')
pick = pickle.Pickler(pickf)
pick.dump(hbdclass.Host.hosts)
pick.dump(msgs)
pick.dump(lastfm)
pickf.close()
ws_connections = {}
async def ws_serve(websocket, path):
ws_connections[websocket] = path
remote_address = websocket.remote_address
if verbose: print(f"DBG ws_serve: {remote_address}: {path}")
while True:
try:
name = await websocket.recv()
if verbose: print(f"DBG ws_serve: receive {name}")
except (
websockets.exceptions.ConnectionClosedOK,
websockets.exceptions.ConnectionClosedError) as e:
if verbose: print(f'ws closed: {e}')
break
if verbose: print(f"initial {name} at {path}")
# send initial set of hosts and messages
# hosts in sorted order
for h in sorted(hbdclass.Host.hosts):
jmsg = json.dumps({'type': 'host', 'data': hbdclass.Host.hosts[h].stateinfo() })
await websocket.send(jmsg)
# messages in reverse order
for m in msgs[len(msgs)-20:]:
jmsg = json.dumps({'type': 'message', 'data': m })
await websocket.send(jmsg)
if verbose: print(f"DBG ws_serve: close {remote_address}")
await websocket.wait_closed()
def websocketupdater():
loop.run_forever()
def msg_to_websockets(typ: str, msg: str):
jmsg = json.dumps({'type': typ, 'data': msg})
to_close = []
for ws in ws_connections:
if ws.closed:
to_close.append(ws)
continue
try:
asyncio.run_coroutine_threadsafe(ws.send(jmsg), loop)
except Exception:
to_close.append(ws)
print("ws.send exception: closed")
for ws in to_close:
asyncio.run_coroutine_threadsafe(ws.wait_closed(), loop)
if ws in ws_connections:
del ws_connections[ws]
#
# Main
#
PUSHSRVS = ["all", "pushover", "mattermost" ]
helpflag = False
forground = False
pushsrv = "pushover" # mattermost
dyndomains = ["wrede.org"]
optlist = []
args = []
home = os.environ['HOME']
cmdargs = []
configfile = "%s/.hbrc" % home
try:
optlist, args = getopt.getopt(sys.argv[1:], 'c:dfh:p:vx')
except:
helpflag = True
for o, a in optlist:
if o == '-c':
configfile = a
cmdargs += [o, a]
if o == '-f':
forground = True
cmdargs += [o]
elif o == '-h':
helpflag = True
elif o == '-v':
verbose = True
cmdargs += [o]
elif o == '-p':
if a in PUSHSRVS:
pushsrv = a
cmdargs += [o, a]
else:
print("invalid push service, use of of %s" % PUSHSRVS)
helpflag = True
elif o == '-x':
DEBUG += 1
cmdargs += [o]
if helpflag:
print("hbc HeartBeatDaemon")
print("usage: hbd [-dfhvx] [-c configfile]")
print()
print(" -c configfile")
print(" -d display")
print(" -f run in foreground")
print(" -h this help")
print(" -v verbose")
print(" -x increase debug lvl")
print()
print(""" config file can contain
logfile = /var/log/heartbeat.log
logfmt = [text|msg]
hb_port = 50003
interval = 20
hbd_port = 50004
hbd_host = www.domain.com
grace = 2
""")
sys.exit(1)
#
# set defaults
hb_port = PORT
hbd_host = THOST
hbd_port = TPORT
pickfile = PICKFILE
logfile = LOGFILE
logfmt = "text"
interval = INTERVAL
grace = GRACE
watchhosts = []
dyndnshosts = []
drophosts = []
nsupdate_bin = NSUPDATE_BIN
try:
f = open(configfile, "r")
if verbose:
print(("notice: using config file %s" % configfile))
except:
print(("warning: running without config file: %s" % configfile))
f = None
if f:
while 1:
ls = f.readline()
if len(ls) == 0:
break
l = ls[:-1].strip()
if len(l) == 0 or l[0] == "#":
continue
if verbose:
print((" %s" % l))
r = l.split('=')
o = r[0].strip()
try:
a = eval(r[1].strip())
except Exception as e:
print(("error: %s" % str(r)))
sys.exit(1)
if o == 'interval':
interval = a
elif o == 'grace':
grace = a
elif o == 'hbd_port':
hbd_port = a
elif o == 'hbd_host':
hbd_host = a
elif o == 'pickfile':
pickfile = a
elif o == 'hb_port':
hb_port = a
elif o == 'logfile':
logfile = a
elif o == 'logfmt':
logfmt = a
elif o == 'watchhosts':
watchhosts = a
elif o == 'dyndnshosts':
dyndnshosts = a
elif o == 'drophosts':
drophosts = a
elif o == 'nsupdate_bin':
nsupdate_bin = a
elif o == 'pushsrv':
pushsrv = a
elif o == 'dyndomains':
dyndomains = a
f.close()
if len(args) != 0:
print("error: args")
sys.exit(1)
if pushsrv in ["all", "mattermost"]:
try:
from mattermostdriver import Driver
except:
print("warning: mattermostdriver python module missing, reverting to pushover")
pushsrv = "pushover"
if verbose:
print("notice: logging to %s" % logfile)
print("notice: push service is %s" % pushsrv)
logf = initlog(logfile)
if 1 and os.path.exists(pickfile):
if verbose: print(("opening pickls %s" % pickfile))
pickf = open(pickfile, 'rb')
pick = pickle.Unpickler(pickf)
try:
hbdclass.Host.hosts = pick.load()
msgs = pick.load()
try:
lastfm = pick.load()
except:
lastfm = ["","",""]
pickf.close()
except Exception as e:
print(("load pickled failed: %s" % e))
os.unlink(pickfile)
hbdclass.Connection.htab = {}
for h in list(hbdclass.Host.hosts.keys()):
hbdclass.Host.hosts[h].dyn = h in dyndnshosts
hbdclass.Host.hosts[h].watched = h in watchhosts
hbdclass.Host.hosts[h].fixup()
for h in drophosts:
if h in hbdclass.Host.hosts:
del hbdclass.Host.hosts[h]
if verbose: print(("%s pickled hosts loaded" % len(hbdclass.Host.hosts)))
else:
if verbose: print("no pickled data")
now = time.time()
startsec = int(now) % interval
log(None, "Starting %s" % VER)
atexit.register(on_exit)
ilist = []
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", hb_port))
ilist.append(sock)
sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock6.bind(("", hb_port))
ilist.append(sock6)
#ilist.append(serv.fileno())
if not forground:
pid = os.fork()
if pid > 0:
if verbose:
print(("daemoinizing... pid = %d" % pid))
sys.exit(0)
verbose = False
os.close(0)
os.close(1)
os.close(2)
sys.stdin.close()
if DEBUG > 0:
sys.stdout = LogDevice()
sys.stderr = LogDevice()
else:
sys.stdout = NullDevice()
sys.stderr = NullDevice()
os.chdir("/tmp")
os.setsid()
os.umask(0)
try:
serv = HttpServer((hbd_host, hbd_port), HttpHandler)
except:
print(("failed to start server on %s:%s" % (hbd_host, hbd_port)))
sys.exit(1)
#
loop=asyncio.new_event_loop()
asyncio.set_event_loop(loop)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
wss_pem = pathlib.Path(WSS_PEM)
wss_key = pathlib.Path(WSS_KEY)
ssl_context.load_cert_chain(wss_pem, keyfile=wss_key)
wss_start_server = websockets.serve(ws_serve, hbd_host, WSSPORT,
ssl=ssl_context, loop=loop, subprotocols="hbd")
loop.run_until_complete(wss_start_server)
ws_start_server = websockets.serve(ws_serve, hbd_host, WSPORT,
loop = loop, subprotocols="hbd")
loop.run_until_complete(ws_start_server)
servthread = threading.Thread(target=serv.serve_forever)
servthread.daemon = True
servthread.start()
dnsT = threading.Thread(target=dnsupdatethread)
dnsT.daemon = True
dnsT.start()
wsT = threading.Thread(target=websocketupdater)
wsT.daemon = True
wsT.start()
running = True
sig = 0
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGHUP, handler)
rnext = int(now)+15 # 15 seconds time to settle after (re-)start
sleep = 1
firstcheck = int(now) + 15
while running:
sr = None
if DEBUG > 3: sys.stderr.write("about to sleep = %s\n" % (sleep))
try:
sr = select.select(ilist, [], [], sleep)
now = time.time()
except KeyboardInterrupt:
sys.stderr.write("Keyboard Interrupt!\n")
running = False
closeup()
continue
except OSError as value:
if value.errno != 4: # interrupted system call
sys.stderr.write("select err %s %s" % (select.error, value))
#raise os.error, value
continue
continue
except Exception as e:
if DEBUG > 2: sys.stderr.write("select exception %s\n" % (str(e)))
sys.exit(1)
if DEBUG > 3: sys.stderr.write("woke from sleep = %s (%s)\n" % (str(sr), str(ilist)))
for fh in sr[0]:
if fh in [sock, sock6]:
readsock(fh)
# elif fh == serv.fileno():
# serv.handle_request()
else:
sys.stderr.write("what happend just now?\n")
if DEBUG > 3: sys.stderr.write("done handling, running is %s, sig is %s\n" % (running, sig))
# check hour/day/week
for v in range(3):
fm=tsfm[v]
ts=time.strftime(tsfm[v], time.localtime(now))
if ts != lastfm[v]:
lastfm[v]=ts
for h in list(hbdclass.Host.hosts.keys()):
hbdclass.Host.hosts[h].hdwcounts[v] = [hbdclass.Host.hosts[h].doesack, hbdclass.Host.hosts[h].upcount]
if now >= rnext and now >= firstcheck:
rnext = now+1
checkoverdue()
sleep = rnext-now
if sleep < 0:
sys.stderr.write("sleep is negative! %s next = %s\n" % (sleep, rnext))
sleep = 0
if DEBUG > 3: sys.stderr.write("sleep = %s next = %s\n" % (sleep, rnext))
if sig != 0:
setrunning(False)
if sig == signal.SIGHUP:
if DEBUG > 0: sys.stderr.write("signal 1 saveandrestart\n")
saveandrestart()