re-format with black

This commit is contained in:
2021-05-03 17:21:56 -04:00
parent aebd1e8b01
commit 4d7442d70c
+181 -142
View File
@@ -22,7 +22,7 @@ import codecs
PORT = 50003 PORT = 50003
INTERVAL = 10 INTERVAL = 10
REOPENC = 6 REOPENC = 6
PIDFILE = '/tmp/hbc.pid' PIDFILE = "/tmp/hbc.pid"
VER = 6 VER = 6
MAXRECV = 32767 MAXRECV = 32767
@@ -30,16 +30,19 @@ running = True
dorestart = False dorestart = False
warned1 = False warned1 = False
def log(msg): def log(msg):
if fdaemon: if fdaemon:
syslog.syslog(syslog.LOG_ERR, msg) syslog.syslog(syslog.LOG_ERR, msg)
else: else:
print(msg) print(msg)
def handler(signum, frame): def handler(signum, frame):
if signum == signal.SIGTERM: if signum == signal.SIGTERM:
cleanup() cleanup()
class NullDevice: class NullDevice:
def write(self, s): def write(self, s):
pass pass
@@ -59,57 +62,57 @@ class Conn:
self.rtts = [0] self.rtts = [0]
self.sock = None self.sock = None
def __str__(self): def __str__(self):
return "Con(%s, %s %s)" % (self.addr, self.port, self.af) return "Con(%s, %s %s)" % (self.addr, self.port, self.af)
def open(self): def open(self):
self.sock=socket.socket(self.af, socket.SOCK_DGRAM) self.sock = socket.socket(self.af, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, \ self.sock.setsockopt(
self.sock.getsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR) | 1) socket.SOL_SOCKET,
socket.SO_REUSEADDR,
self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1,
)
def sendto(self, msg, ID="HTB"): # default ID is HearTBeat
def sendto(self, msg, ID = 'HTB'): # default ID is HearTBeat
global warned1 global warned1
if self.send % REOPENC == 0: if self.send % REOPENC == 0:
self.close() self.close()
if not self.sock: if not self.sock:
self.open() self.open()
msg['name'] = shortname(iam) msg["name"] = shortname(iam)
msg['id'] = self.conId msg["id"] = self.conId
msg['ver'] = VER msg["ver"] = VER
msg['time'] = time.time() msg["time"] = time.time()
m = dicttos(ID, msg) # always compress m = dicttos(ID, msg) # always compress
if verbose: if verbose:
log("conn.send('%s', (%s:%s) %s)" % (msg, self.addr, self.port, len(m))) log("conn.send('%s', (%s:%s) %s)" % (msg, self.addr, self.port, len(m)))
try: try:
self.sock.sendto(m, (self.addr, self.port)) self.sock.sendto(m, (self.addr, self.port))
except socket.error as e: except socket.error as e:
if not warned1: log("socket error: %s %s:%s" % (e, self.addr, self.port)) if not warned1:
log("socket error: %s %s:%s" % (e, self.addr, self.port))
warned1 = True warned1 = True
self.close() self.close()
return return
self.send += 1 self.send += 1
self.lastsend = time.time() self.lastsend = time.time()
def ack(self, msgDict, now): def ack(self, msgDict, now):
try: try:
self.lastack = msgDict['time'] self.lastack = msgDict["time"]
mul = 2 mul = 2
except: except:
self.lastack = now self.lastack = now
mul = 1 mul = 1
rtt = (self.lastack - self.lastsend) * mul rtt = (self.lastack - self.lastsend) * mul
if verbose: log("ack RTT: %0.1f ms (now %s)" % (rtt * 1000.0, now)) if verbose:
log("ack RTT: %0.1f ms (now %s)" % (rtt * 1000.0, now))
self.rtts.append(rtt * 1000.0) self.rtts.append(rtt * 1000.0)
if len(self.rtts) > 10: if len(self.rtts) > 10:
del self.rtts[0] del self.rtts[0]
self.ackcount += 1 self.ackcount += 1
def close(self): def close(self):
if self.sock: if self.sock:
self.sock.close() self.sock.close()
@@ -117,7 +120,7 @@ class Conn:
def shortname(name): def shortname(name):
r = name.split('.') r = name.split(".")
return r[0] return r[0]
@@ -133,18 +136,19 @@ def dicttos(ID, d):
ID = "!" + ID + ":" ID = "!" + ID + ":"
return ID.encode() + zpk return ID.encode() + zpk
def stodict(msg): def stodict(msg):
d = {} d = {}
if len(msg) > 0 and chr(msg[0]) == "!": if len(msg) > 0 and chr(msg[0]) == "!":
pk = zlib.decompress(msg[5:]).decode() pk = zlib.decompress(msg[5:]).decode()
d['ID'] = msg[1:4].decode() d["ID"] = msg[1:4].decode()
else: else:
r0 = msg.split(':',1) r0 = msg.split(":", 1)
pk = r0[1] pk = r0[1]
d['ID'] = r0[0] d["ID"] = r0[0]
r = pk.split(';') r = pk.split(";")
for v in r: for v in r:
vr = v.split('=', 1) vr = v.split("=", 1)
k = vr[0].strip() k = vr[0].strip()
if len(vr) == 1: if len(vr) == 1:
d[k] = None d[k] = None
@@ -155,24 +159,25 @@ def stodict(msg):
except: except:
pass pass
d[k] = v d[k] = v
if verbose: print("msg is %s" % d) if verbose:
print("msg is %s" % d)
return d return d
def XXstodict(msg): def XXstodict(msg):
d = {} d = {}
r0 = msg.split(':',1) r0 = msg.split(":", 1)
if len(r0) == 1: if len(r0) == 1:
return None return None
if r0[0][0] == '!': # compressed if r0[0][0] == "!": # compressed
pk = zlib.decompress(msg[len(r0[0])+1:]) pk = zlib.decompress(msg[len(r0[0]) + 1 :])
d['ID'] = r0[0][1:] d["ID"] = r0[0][1:]
else: else:
pk = r0[1] pk = r0[1]
d['ID'] = r0[0] d["ID"] = r0[0]
r = pk.split(';') r = pk.split(";")
for v in r: for v in r:
vr = v.split('=', 1) vr = v.split("=", 1)
k = vr[0].strip() k = vr[0].strip()
if len(vr) == 1: if len(vr) == 1:
d[k] = None d[k] = None
@@ -188,31 +193,36 @@ def XXstodict(msg):
def syslogtrace(note): def syslogtrace(note):
logm = '%s hbc died: \n%s' % (note, traceback.format_exc()) logm = "%s hbc died: \n%s" % (note, traceback.format_exc())
log(logm) log(logm)
for l in logm.split('\n'): for l in logm.split("\n"):
syslog.syslog(syslog.LOG_ERR, ' tb: %s' % l) syslog.syslog(syslog.LOG_ERR, " tb: %s" % l)
if verbose: if verbose:
print(logm) print(logm)
conId = 1 conId = 1
def createConnections(hosts): def createConnections(hosts):
global conId global conId
for host in hosts: for host in hosts:
if verbose: log("createConnections for %s" % host) if verbose:
log("createConnections for %s" % host)
try: try:
rs=socket.getaddrinfo(host, hb_port, 0, 0, socket.SOL_UDP) rs = socket.getaddrinfo(host, hb_port, 0, 0, socket.SOL_UDP)
except socket.gaierror: except socket.gaierror:
logm = '%s hbc died: \n%s' % ('createConnections', traceback.format_exc()) logm = "%s hbc died: \n%s" % ("createConnections", traceback.format_exc())
if verbose: log(logm) if verbose:
log(logm)
return None return None
for r in rs: for r in rs:
if verbose: log("address %s" % str(r)) if verbose:
log("address %s" % str(r))
if r[0] in [10, 24, 28, 30]: # for Linux, NetBSD, FreeBSD if r[0] in [10, 24, 28, 30]: # for Linux, NetBSD, FreeBSD
af=socket.AF_INET6 af = socket.AF_INET6
elif r[0] == 2: elif r[0] == 2:
af=socket.AF_INET af = socket.AF_INET
else: else:
print("dont know this net type: %s" % r[0][0]) print("dont know this net type: %s" % r[0][0])
sys.exit(1) sys.exit(1)
@@ -226,33 +236,35 @@ def createConnections(hosts):
def doexec(conn, data): def doexec(conn, data):
try: try:
ro = subprocess.check_output(data, stderr=subprocess.STDOUT, shell=True).decode() ro = subprocess.check_output(
data, stderr=subprocess.STDOUT, shell=True
).decode()
fail = "OK" fail = "OK"
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
ro = str(e) ro = str(e)
fail = "CalledProcessError" fail = "CalledProcessError"
except Exception as e: except Exception as e:
syslogtrace('System') syslogtrace("System")
ro = "N/A" ro = "N/A"
fail = "cmd failed: %s" % e fail = "cmd failed: %s" % e
msg={'service': 'command', 'msg': fail+" "+ro} msg = {"service": "command", "msg": fail + " " + ro}
conns[conn].sendto(msg) conns[conn].sendto(msg)
def doupdate(conn, msgDict): def doupdate(conn, msgDict):
fail = None fail = None
try: try:
code = codecs.decode(msgDict['code'],'base64').decode() code = codecs.decode(msgDict["code"], "base64").decode()
csum = msgDict['csum'] csum = msgDict["csum"]
except Exception as e: except Exception as e:
fail = "csum/code missing: %s" % e fail = "csum/code missing: %s" % e
if not fail: if not fail:
fail = doupdateone(code, csum) fail = doupdateone(code, csum)
msg={'service': 'update', 'msg': fail if fail else "OK"} msg = {"service": "update", "msg": fail if fail else "OK"}
conns[conn].sendto(msg) conns[conn].sendto(msg)
if not fail: if not fail:
log('hc updates, fs = %s' % (len(code))) log("hc updates, fs = %s" % (len(code)))
return fail return fail
@@ -273,7 +285,7 @@ def doupdateone(code, csum):
return "cannot make backup copy: %s" % e return "cannot make backup copy: %s" % e
try: try:
fh=open(fn, "w") fh = open(fn, "w")
fh.write(code) fh.write(code)
fh.close() fh.close()
except Exception as e: except Exception as e:
@@ -284,15 +296,15 @@ def doupdateone(code, csum):
def restart(): def restart():
if verbose: if verbose:
print("restart: execv %s %s" % (sys.argv[0], [sys.argv[0]]+cmdargs)) print("restart: execv %s %s" % (sys.argv[0], [sys.argv[0]] + cmdargs))
syslog.syslog(syslog.LOG_ERR, 'restart %s' % (sys.argv[0])) syslog.syslog(syslog.LOG_ERR, "restart %s" % (sys.argv[0]))
e = "fallthrough" e = "fallthrough"
try: try:
os.execv(sys.argv[0], [sys.argv[0]]+cmdargs) os.execv(sys.argv[0], [sys.argv[0]] + cmdargs)
except Exception as e: except Exception as e:
pass pass
print("should not be here:", str(e)) print("should not be here:", str(e))
log('restart failed: %s' % e) log("restart failed: %s" % e)
def process(): def process():
@@ -309,47 +321,58 @@ def process():
ifiles[conns[conn].sock.fileno()] = conns[conn].sock ifiles[conns[conn].sock.fileno()] = conns[conn].sock
conIds[conns[conn].sock.fileno()] = conn conIds[conns[conn].sock.fileno()] = conn
sleep=nextReport - time.time() sleep = nextReport - time.time()
if sleep <= 0: if sleep <= 0:
break break
try: try:
r=select.select(list(ifiles.keys()),[],[],sleep) r = select.select(list(ifiles.keys()), [], [], sleep)
now = time.time() # nb: delay from actual packet arrival to select is ca. 105ms! now = (
time.time()
) # nb: delay from actual packet arrival to select is ca. 105ms!
except KeyboardInterrupt: except KeyboardInterrupt:
running = False running = False
break break
except SystemExit: except SystemExit:
log('daemon exit, running was %s' % running) log("daemon exit, running was %s" % running)
if running: if running:
running = False running = False
break break
except: except:
if running: if running:
syslogtrace('select') syslogtrace("select")
running = False running = False
break break
for rfh in r[0]: for rfh in r[0]:
conn = conIds[rfh] conn = conIds[rfh]
data, addr = ifiles[rfh].recvfrom(MAXRECV) data, addr = ifiles[rfh].recvfrom(MAXRECV)
if verbose: print("sock.recvfrom: %s (%s) %s" % (addr, len(data), data[:4])) if verbose:
print("sock.recvfrom: %s (%s) %s" % (addr, len(data), data[:4]))
try: try:
msgDict = stodict(data) msgDict = stodict(data)
except Exception as e: except Exception as e:
print("failed to parse incoming data from %s: %s (%s)" % (addr, data, e)) print(
"failed to parse incoming data from %s: %s (%s)"
% (addr, data, e)
)
continue continue
if verbose: print("sock.recvfrom: %s (%s) %s" % (addr, len(data), str(msgDict)[:80])) if verbose:
print(
"sock.recvfrom: %s (%s) %s"
% (addr, len(data), str(msgDict)[:80])
)
if msgDict == None: if msgDict == None:
print("bad backet from %s (%s) %s" % (addr, len(data), data)) print("bad backet from %s (%s) %s" % (addr, len(data), data))
elif msgDict['ID'] == "ACK": elif msgDict["ID"] == "ACK":
conns[conn].ack(msgDict, now) conns[conn].ack(msgDict, now)
elif msgDict['ID'] == "UPD": elif msgDict["ID"] == "UPD":
if doupdate(conn, msgDict) == None: if doupdate(conn, msgDict) == None:
if verbose: print("process: restart after update") if verbose:
print("process: restart after update")
dorestart = True dorestart = True
break break
elif msgDict['ID'] == "CMD": elif msgDict["ID"] == "CMD":
doexec(conn, msgDict['cmd']) doexec(conn, msgDict["cmd"])
else: else:
doexec(conn, data) # deprecated until no more VER - hbc doexec(conn, data) # deprecated until no more VER - hbc
if dorestart: if dorestart:
@@ -358,24 +381,29 @@ def process():
if not running: if not running:
break break
for conn in conns: for conn in conns:
msg={'acks': conns[conn].ackcount, 'rtt': conns[conn].rtts[-1]} msg = {"acks": conns[conn].ackcount, "rtt": conns[conn].rtts[-1]}
conns[conn].sendto(msg) conns[conn].sendto(msg)
time.sleep(0.1) #N.B. Linux (i.e. Rasperry Pi 3 drops the second pkg unless delayed time.sleep(
0.1
) # N.B. Linux (i.e. Rasperry Pi 3 drops the second pkg unless delayed
if nextReport + interval >= time.time(): if nextReport + interval >= time.time():
nextReport += interval nextReport += interval
else: else:
nextReport = time.time() + interval nextReport = time.time() + interval
if verbose: log( "process: done running") if verbose:
log("process: done running")
def cleanup(): def cleanup():
global running global running
if not running: if not running:
return return
if verbose: log('cleanup') if verbose:
log("cleanup")
running = False running = False
for conn in conns: for conn in conns:
msg={'shutdown': 1, 'acks': conns[conn].ackcount} msg = {"shutdown": 1, "acks": conns[conn].ackcount}
conns[conn].sendto(msg) conns[conn].sendto(msg)
conns[conn].close() conns[conn].close()
time.sleep(1) time.sleep(1)
@@ -383,12 +411,15 @@ def cleanup():
def closeall(): def closeall():
if verbose: syslog.syslog(syslog.LOG_ERR, 'closecall') if verbose:
syslog.syslog(syslog.LOG_ERR, "closecall")
for conn in conns: for conn in conns:
conns[conn].close() conns[conn].close()
def daemonize(working_dir="/", stdin='/dev/zero', stdout='/dev/null', stderr='/dev/null'): def daemonize(
working_dir="/", stdin="/dev/zero", stdout="/dev/null", stderr="/dev/null"
):
""" """
Does the UNIX double-fork magic, see Stevens' "Advanced Programming in the Does the UNIX double-fork magic, see Stevens' "Advanced Programming in the
UNIX Environment" for details (ISBN 0201563177) UNIX Environment" for details (ISBN 0201563177)
@@ -422,59 +453,59 @@ def daemonize(working_dir="/", stdin='/dev/zero', stdout='/dev/null', stderr='/d
# redirects standard file descriptors # redirects standard file descriptors
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
si = open(stdin, 'r') si = open(stdin, "r")
so = open(stdout, 'a+') so = open(stdout, "a+")
se = open(stderr, 'a+') se = open(stderr, "a+")
os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno()) os.dup2(se.fileno(), sys.stderr.fileno())
msgonly = False
msgonly=False helpflag = False
helpflag=False verbose = False
verbose=False fdaemon = False
fdaemon=False
daemonized = False daemonized = False
optlist=[] optlist = []
args=[] args = []
msgboot={} msgboot = {}
home=os.environ['HOME'] home = os.environ["HOME"]
configfile="%s/.hbrc" % home configfile = "%s/.hbrc" % home
cmdargs = [] cmdargs = []
iam=socket.gethostname() iam = socket.gethostname()
try: try:
optlist, args = getopt.getopt(sys.argv[1:], 'bc:dhm:n:v') optlist, args = getopt.getopt(sys.argv[1:], "bc:dhm:n:v")
except: except:
helpflag=True helpflag = True
for o,a in optlist: for o, a in optlist:
if o == '-b': if o == "-b":
msgboot['boot']=1 msgboot["boot"] = 1
elif o == '-c': elif o == "-c":
configfile=a configfile = a
cmdargs += [o, a] cmdargs += [o, a]
elif o == '-d': elif o == "-d":
fdaemon=True fdaemon = True
cmdargs += [o] cmdargs += [o]
elif o == '-h': elif o == "-h":
helpflag=True helpflag = True
elif o == '-m': elif o == "-m":
msgboot['service'] = "service" msgboot["service"] = "service"
msgboot['msg'] = a msgboot["msg"] = a
msgonly=True msgonly = True
elif o == '-n': elif o == "-n":
iam=a iam = a
cmdargs += [o, a] cmdargs += [o, a]
elif o == '-v': elif o == "-v":
verbose=True verbose = True
cmdargs += [o] cmdargs += [o]
cmdargs += args cmdargs += args
if verbose: print("cmdargs for restart are %s" % cmdargs) if verbose:
print("cmdargs for restart are %s" % cmdargs)
if helpflag: if helpflag:
print("hbc HeartBeatClient") print("hbc HeartBeatClient")
@@ -487,7 +518,8 @@ if helpflag:
print(" -m send a message") print(" -m send a message")
print(" -v verbose") print(" -v verbose")
print() print()
print(""" config file can contain print(
""" config file can contain
hb_hosts=('host1', 'host2', ..._ hb_hosts=('host1', 'host2', ..._
hb_port=50003 hb_port=50003
interval=20 interval=20
@@ -495,45 +527,49 @@ logfile=...
logfmt={|test|msg} logfmt={|test|msg}
grace=SECONDS grace=SECONDS
reportstrict={True|False} reportstrict={True|False}
""") """
)
sys.exit(1) sys.exit(1)
# #
# set defaults # set defaults
hb_port=PORT hb_port = PORT
interval=INTERVAL interval = INTERVAL
hb_hosts=[] hb_hosts = []
try: try:
f=open(configfile,"r") f = open(configfile, "r")
if verbose: print("notice: using config file %s" % configfile) if verbose:
print("notice: using config file %s" % configfile)
except: except:
if verbose: print("warning: running without config file: %s" % configfile) if verbose:
f=None print("warning: running without config file: %s" % configfile)
f = None
if f: if f:
while 1: while 1:
l=f.readline() l = f.readline()
if len(l) == 0: if len(l) == 0:
break break
r=l[:-1].split('=') r = l[:-1].split("=")
if r[0] == 'hb_hosts': if r[0] == "hb_hosts":
hb_hosts=eval(r[1]) hb_hosts = eval(r[1])
if verbose: if verbose:
print("notice: cfg hb_hosts: %s" % hb_hosts) print("notice: cfg hb_hosts: %s" % hb_hosts)
elif r[0] == 'interval': elif r[0] == "interval":
interval=eval(r[1]) interval = eval(r[1])
elif r[0] == 'hb_port': elif r[0] == "hb_port":
hb_port=eval(r[1]) hb_port = eval(r[1])
elif r[0] == 'name': elif r[0] == "name":
iam=eval(r[1]) iam = eval(r[1])
if verbose: print("name set to %s" % iam) if verbose:
print("name set to %s" % iam)
f.close() f.close()
if len(args) != 0: if len(args) != 0:
hb_hosts=args hb_hosts = args
if len(hb_hosts) == 0: if len(hb_hosts) == 0:
@@ -550,52 +586,55 @@ if verbose:
print("notice: msgboot: %s" % msgboot) print("notice: msgboot: %s" % msgboot)
if not msgonly: if not msgonly:
msgboot['interval'] = interval msgboot["interval"] = interval
conns = {} conns = {}
while True: while True:
if verbose: log("create connections") if verbose:
log("create connections")
createConnections(hb_hosts) createConnections(hb_hosts)
if len(conns) != 0: if len(conns) != 0:
break break
if verbose: log("no connections yet, sleep a bit") if verbose:
log("no connections yet, sleep a bit")
time.sleep(2) time.sleep(2)
if verbose: if verbose:
log("%s connections created" % (len(conns))) log("%s connections created" % (len(conns)))
if len(msgboot) > 0: if len(msgboot) > 0:
if verbose: print("on boot") if verbose:
msgboot['acks'] = 0 print("on boot")
msgboot["acks"] = 0
for conn in conns: for conn in conns:
conns[conn].sendto(msgboot) conns[conn].sendto(msgboot)
if msgonly: if msgonly:
if verbose: print("msgboot done msgonly=%s" % msgonly) if verbose:
print("msgboot done msgonly=%s" % msgonly)
closeall() closeall()
sys.exit(0) sys.exit(0)
# #
syslog.openlog('hbc', syslog.LOG_PID, syslog.LOG_DAEMON) syslog.openlog("hbc", syslog.LOG_PID, syslog.LOG_DAEMON)
if fdaemon: if fdaemon:
print("daemoinizing.") print("daemoinizing.")
daemonize() daemonize()
daemonized = True daemonized = True
syslog.syslog(syslog.LOG_ERR, 'starting heartbeat to %s' % ','.join(hb_hosts)) syslog.syslog(syslog.LOG_ERR, "starting heartbeat to %s" % ",".join(hb_hosts))
signal.signal(signal.SIGTERM, handler) signal.signal(signal.SIGTERM, handler)
running = True running = True
try: try:
process() process()
except Exception as e: except Exception as e:
syslogtrace('process') syslogtrace("process")
if verbose: print("err: process exit: %s" % e) if verbose:
print("err: process exit: %s" % e)
if verbose: log( "main: cleanup") if verbose:
log("main: cleanup")
cleanup() cleanup()
if dorestart: if dorestart:
restart() restart()