#!/usr/bin/env python # $Id: hbd,v 1.38 2013/07/14 02:25:05 andreas Exp $ # Wait for heartbeat messages and act on them (or their absence) # VER = 3.00 import time import os import string import sys import socket import atexit import select import SocketServer import BaseHTTPServer import getopt import signal import cPickle import smtplib import traceback import urllib import urlparse import httplib import threading import Queue import md5 import json import zlib from subprocess import Popen, STDOUT, PIPE from hbdclass import * SEND_EMAIL=False SEND_PUSHOVER=True DEBUG = 0 MAXRECV = 32767 LOGFILE = "/home/andreas/public_html/messages/andreas" PICKFILE = "/var/tmp/hbd.pick" AEMAIL = ["andreas@wrede.ca"] NAME = "heatbeat" SMTPSERVER = "localhost" msgs = [] #AEW upcount = 0 PORT = 50003 TPORT = 50004 THOST = "" verbose = False INTERVAL = 10 GRACE = 2 DROPOVERDUE = 7*24*3600 os.environ['TZ'] = 'EST5EDT' tsfm=["%H","%d","%U"] lastfm=["","",""] tcss = """ """ def handler(signum, frame): global running, sig sig = signum if not running: if verbose: sys.stderr.write("NOT runing signal: %s running: %d" % (sig, running)) sys.exit(2) if verbose: sys.stderr.write("signal: %s running: %s frame: %s" % (sig, running, frame)) def shortname(name): r = string.split(name, '.') return r[0] class NullDevice: def write(self, s): pass class LogDevice: def __init__(self): self.fh = open("/tmp/log1","a") def write(self, s): self.fh.write(s) self.fh.flush() def dicttos(ID, d, compress=False): s = [] for k in d: if type(d[k]) == type(1.2): s.append("%s=%0.5f" % (k, d[k])) else: s.append("%s=%s" % (k, d[k])) pk = ";".join(s) if compress: zpk = zlib.compress(pk, 6) ID = "!"+ID else: zpk = pk return ID + ":" + zpk def stodict(msg): d = {} r0 = msg.split(':',1) if len(r0) == 1: return None if r0[0][0] == '!': # compressed pk = zlib.decompress(msg[len(r0[0])+1:]) d['ID'] = r0[0][1:] else: pk = r0[1] d['ID'] = r0[0] r = pk.split(';') for v in r: vr = v.split('=', 1) k = vr[0].strip() if len(vr) == 1: d[k] = None else: v = vr[1].strip() if v[0].isdigit(): v = eval(v) d[k] = v return d def oldmtodict(msg): return stodict('HTB:'+msg) def email(s, msg): if not SEND_EMAIL: return ret = "OK" toaddrs = AEMAIL fromemail = "aew.heartbeat@wrede.ca" subj = "Info from %s: %s" % (NAME, s) date = time.strftime("%a, %d %b %Y %H:%M:%S %z", time.localtime()) body = "To: %s\nFrom: %s\nSubject: %s\nDate: %s\n\n%s" % (toaddrs[0], fromemail, subj, date, msg) try: server = smtplib.SMTP(SMTPSERVER) if DEBUG > 0: server.set_debuglevel(1) server.sendmail(fromemail, toaddrs, body) except smtplib.SMTPRecipientsRefused, errs: log(None, "cannot send email: %s\n" % (errs)) ret = "Fail" except: print("smtp error: "+traceback.format_exc()) saveandrestart() try: server.quit() except: pass return ret def pushover(msg): if not SEND_PUSHOVER: return conn = httplib.HTTPSConnection("api.pushover.net:443") try: conn.request("POST", "/1/messages.json", urllib.urlencode({ "token": "ac7NLX2rPjXFareeDgLpXNoDf4iFmf", "user": "uDhH33UjQQDYtNzJb1ThRiWb9ingGK", "message": msg, }), { "Content-type": "application/x-www-form-urlencoded" }) conn.getresponse() except: pass # nsupdate: set the DNS A record for a fqdn # return: None if ok, else error text def nsupdate(hostname, newip): D = {} D['domain'] = 'dy.wapanafa.org' D['fqdn'] = '%s.dy.wapanafa.org' % hostname D['dnsttl'] = '5' D['newip'] = newip D['ts'] = time.strftime('%Y-%m-%d.%H:%M:%S', time.gmtime()) if newip.find(":") > 0: nsup = """update delete %(fqdn)s AAAA update add %(fqdn)s %(dnsttl)s AAAA %(newip)s update delete %(fqdn)s TXT update add %(fqdn)s %(dnsttl)s TXT "Created: %(ts)s" send answer """ % D else: nsup = """update delete %(fqdn)s A update add %(fqdn)s %(dnsttl)s A %(newip)s update delete %(fqdn)s TXT update add %(fqdn)s %(dnsttl)s TXT "Created: %(ts)s" send answer """ % D if DEBUG > 0: log(None, "DBG: nsup %s" % nsup) cmd = ["/usr/local/bin/nsupdate", "-k", "/etc/dhcpc/K%(domain)s.+157+00000." % D, "-v"] if DEBUG > 0: log(None, "DBG: cmd %s" % cmd) try: p = Popen(cmd, shell=False, bufsize=1, stdin=PIPE, stdout=PIPE, stderr=STDOUT) except OSError, e: return "nsupdate: execution failed: %s" % e except: return "nsupdate: some error occured" (output, err) = p.communicate(nsup) if output.find('status: NOERROR') >= 0: return None return output # def dur(sec): sec = int(sec) h = sec / 3600 m = (sec - h * 3600) / 60 s = (sec - h * 3600) % 60 if h > 0: return "%d:%02d:%02d" % (h, m, s) if m > 0: return "%d:%02d" % (m, s) return "0:%02d" % s def fixsort(): s = Host.hosts.keys() s.sort() x = 0 for n in s: Host.hosts[n].num = x x += 1 # def on_exit(): if DEBUG > 0: sys.stderr.write("on_exit\n") try: logf.close() except: pass print "exit" def initlog(logfile): try: return open(logfile, "a+") except: pass return open(logfile, "w") # # def checkoverdue(): now = time.time() for h in Host.hosts.keys(): pmsg = [] for c in Host.hosts[h].connections: conn = Host.hosts[h].connections[c] if conn.state == Connection.down: continue timeout = Host.hosts[h].interval + grace if conn.state == Connection.up and (now - conn.lastbeat) > timeout: conn.newstate(Connection.overdue, now, grace) pmsg.append(conn.afam) if conn.state == Connection.overdue and (now - conn.lastbeat) > DROPOVERDUE: conn.newstate(Connection.unknown, conn.lastbeat) if pmsg != []: if h in watchhosts: email("overdue", "%s overdue" % " and ".join(pmsg)) pushover("%s %s overdue" % (h, " and ".join(pmsg))) log(h, "%s overdue" % " and ".join(pmsg)) def log(host, m, service=None): if DEBUG > 0: print "Log: %s" % m ts = time.strftime("%b %d %H:%M:%S", time.localtime(time.time())) if service: srv = "service %s: " % service else: srv = "" if host: hst = "%s " % host else: hst = "" msg = "%s: %s%s%s\n" % (ts, hst, srv, m) msgs.append(msg) if logfmt == "msg": m2 = "%d|%s|%s\n" % (now, hst, m) else: m2 = msg logf.write(m2) logf.flush() pickleit() def dnsupdatethread(): while True: name, addr = Host.dnsQ.get() m = "changed address to %s" % (addr) err = nsupdate(name, addr) if err: m += ", DNS update failed: %s" % err email("error: nsupdate failed", "%s: %s" % (name, m)) else: m += ", DNS updated." Host.dnsQ.task_done() log(name, m) # # # # def readsock(sock): global now if DEBUG > 3: sys.stderr.write("readsock recfrom start") data, addrp = sock.recvfrom(MAXRECV) now = time.time() if DEBUG > 2: sys.stderr.write("readsock = %s, %s\n" % (data,addrp)) msg = stodict(data) if not msg: # Old hbc client if verbose: print "old hbc:", data oldclient = True msg = oldmtodict(data) else: oldclient = False if DEBUG > 2: print "readsock = %s, %s" % (msg,addrp) addr = addrp[0:2] name = shortname(msg.get('name', "unknown")) if not name in Host.hosts: # was: hosts.has_key(name): host = Host(name) if verbose: print "XX: New host, num now %s" % (len(Host.hosts)) newh=True else: host = Host.hosts[name] newh=False cid = msg.get('id', 0) rtt = msg.get('rtt',None) if msg['ID'] == 'HTB': host.doesack = msg.get('acks', -1) host.setcver(msg.get('ver', 0)) interval = int(msg.get('interval', 0)) shutdown = msg.get('shutdown', 0) service = msg.get('service', "unknown") message = msg.get('msg', None) boot = msg.get('boot', 0) if boot: log(name, "booted") if name in watchhosts: email("booted", m) pushover(m) if message: log(name, "msg: %s" % message, service=service) if name in watchhosts: email("msg", message) pushover(message) conn, res = host.conndata(cid, addr[0], rtt, now) if res: log(name, res) if name in watchhosts: email("address change", "%s %s" % (host.name, res)) pushover("%s %s" % (host.name, res)) if conn.getstate() != Connection.up: # XXX and interval > 0: lasts = conn.state d = conn.newstate(Connection.up, now) m = "%s back after being %s for %s" % (conn.afam, lasts, dur(d)) log(name, m) if name in watchhosts: email("%s back" % conn.afam, name) pushover("%s %s is back" % (name, conn.afam)) if boot or newh: host.upcount = host.doesack else: host.upcount += 1 if shutdown: log(name, "%s shutdown" % conn.afam) if name in watchhosts: email("shutdown", "%s %s shutdown" % (name, conn.afam)) pushover("%s %s shutdown" % (name, conn.afam)) conn.newstate(Connection.down, now) if interval > 0: host.interval = interval rmsg = {'time': time.time()} op = 'ACK' if host.cver < 1: opkt = 'ACK' rmsg = 'ACK' else: opkt = dicttos('ACK', rmsg, host.cver > 1) # clients w/ ver 2+ can cope ss=sock.sendto(opkt, addr) if DEBUG > 2: print "sendto1: %s (%s) %s %s" % (addr, len(opkt), op, str(rmsg)[:50]) # send any commands we have queued while len(host.cmds): op, rmsg = host.cmds[0] if op == 'CMD': email("%s cmd exec" % name, "command '%s' sent" % rmsg) del host.cmds[0] log(name, "command sent") if host.cver < 1: rmsg = rmsg['cmd'] elif op == 'UPD': del host.cmds[0] log(name, "update initiated") if host.cver < 1: log(name," ver 0 does not support UPD") continue if host.cver < 1: opkt = rmsg op = "" else: opkt = dicttos(op, rmsg, True) try: ss=sock.sendto(opkt, addr) except Exception as e: print "opkt len is %s" % len(opkt) print "cannot send: %s" % e if verbose: print "sendto2: %s (%s) %s %s" % (addr, len(opkt), op, str(rmsg)[:50]) if DEBUG > 2: print "msg from %s,%s, sent %s bytes back" % (addr[0], addr[1], ss) def updatecode(ucode, uname): fail = None try: fh = open(ucode, "r") new_code = fh.read() fh.close() except Exception as e: fail = "cannot read new code: %s" % e if not fail: m = md5.new() m.update(new_code) icsum = m.hexdigest() rmsg = {'csum': icsum, 'code': new_code.encode('base64','strict') } Host.hosts[uname].cmds.append(('UPD',rmsg)) return fail # # Web Server # class HttpServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): allow_reuse_address = True def threaded(): pass # # class HttpHandler(BaseHTTPServer.BaseHTTPRequestHandler): server_version = "HeartbeatHTTP/%s" % VER def version_string(self): return self.server_version def handle(self): try: return BaseHTTPServer.BaseHTTPRequestHandler.handle(self) except Exception as e: self.log_error("Request went away: %r", e) self.close_connection = 1 return def do_HEAD(self): self.setheaders(200) def setheaders(self, code, headerdict): self.send_response(code) self.send_header("Last-Modified", time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now))) # self.send_header("Accept-Ranges","bytes") # self.send_header("Connection","close") for h in headerdict: self.send_header(h, headerdict[h]) self.end_headers() def buildhead(self, title="Heartbeat", refresh=None, extras=None): res=[] res.append('') res.append("") res.append("") res.append('%s' % (title)) if refresh: res.append("\n" % refresh) if extras: res.append(extras) res.append("") res.append('') return res def buildpage(self): res=self.buildhead(refresh=60, extras=tcss) res.append("

Heartbeat status %s

" % VER) res += ubHost.buildhosttable() res += ubHost.buildmsgtable(msgs) res.append('

%s (%s)

' % (time.strftime("%H:%M:%S", time.localtime(now)), os.environ.get('TZ', 'CET-1CDT'))) res.append("") return res def builderror(self, code, cause, lcause): res=[] res.append('') res.append('') res.append('%s %s' % (code, cause)) res.append('') res.append('

%s

' % (cause)) res.append('

%s

' % lcause) res.append('
') res.append('
hbd (Unix) Server at %s:%s
' % (hbd_host, hbd_port)) res.append('') return code, res def do_GET(self): global sig code = 200 xsig = 0 rqAcceptEncoding = self.headers.getheader('Accept-encoding',{}) headerdict = {"Content-Type": "text/html; charset = ISO-8859-1" } if DEBUG > 2: sys.stderr.write("handle\n") qr = urlparse.urlparse(self.path) qa = urlparse.parse_qs(qr.query) if DEBUG > 2: sys.stderr.write("handle = %s\n" % (qr.geturl())) if qr.path == "/": res=self.buildpage() elif qr.path == "/c": # command on host /c?h=melschserver&c=sudo%20ls uname=qa.get('h',[None])[0] ucmd=qa.get('c', [None])[0] if not ucmd or not uname: code, res=self.builderror(400, 'Argument error', "need h= and c= arguments") elif not Host.hosts.has_key(uname): code, res=self.builderror(400, 'Data error', "h=%s not found" % uname) else: Host.hosts[uname].cmds.append(('CMD', {'cmd': urllib.unquote(ucmd)})) res=self.buildhead() res.append("cmd %s queued for host %s" % (uname, ucmd)) elif qr.path == "/d": # drop host /d?h=melschserver uname=qa.get('h',[None])[0] if not uname: code, res=self.builderror(400, 'Argument error', "need h= argument") if not uname in Host.hosts: code, res=self.builderror(400, 'Data error', "h=%s not found" % uname) else: log(uname, "dropped") del Host.hosts[uname] res=self.buildhead() res.append("Done") elif qr.path == "/n": # register name uname=qa.get('h',[None])[0] if not uname: code, res=self.builderror(400, 'Argument error', "need h= argument") if not uname in Host.hosts: code, res=self.builderror(400, 'Data error', "h=%s not found" % uname) else: ll = Host.hosts[uname].registerDns() res.append(ll) log(uname, ll) elif qr.path == "/u": # update uname=urllib.unquote(qa.get('h',[None])[0]) ucode=qa.get('c', [None])[0] if not ucode or not uname: code, res=self.builderror(400, 'Argument error', "need h= and c= arguments") elif uname != 'All' and not Host.hosts.has_key(uname): code, res=self.builderror(400, 'Data error', "h=%s not found" % uname) else: res=self.buildhead() if uname != 'All': names = [uname] else: names = [] for n in Host.hosts: if Host.hosts[n].cver >= 2: # earliest version that supports update names.append(n) for n in names: err = updatecode(ucode, n) res.append("update started for %s: %s
" % (n, err if err else "OK")) res.append("Done") elif qr.path == "/api/0/hosts": # api access to host table headerdict = {"Content-Type": "application/json; charset=utf-8" } l=[] for h in Host.hosts: l.append(Host.hosts[h].jsons()) res=["["+",".join(l)+"]"] elif qr.path == "/api/0/messages": # api access to host table headerdict = {"Content-Type": "application/json; charset=utf-8" } l=msgs[len(msgs)-30:] res=[json.dumps(l)] elif qr.path == "/r": # restart res=self.buildhead() res.append("restart request") xsig=signal.SIGHUP log(None, "restart request") else: code, res=self.builderror(404, "Not Found", "requested URL was not found on this server.") if 'deflate' in rqAcceptEncoding: headerdict['Content-Encoding'] = "deflate" towrite = zlib.compress(string.join(res, "\n"), 6) else: towrite = string.join(res, "\n") headerdict['Content-Length'] = len(towrite) headerdict['Cache-Control'] = 'private, must-revalidate, max-age=0' headerdict['Expires'] = 'Thu, 01 Jan 1970 00:00:00 GMT' self.setheaders(code, headerdict) self.wfile.write(towrite) if xsig: sig = xsig def setrunning(new): global running if DEBUG > 0: sys.stderr.write("running is now = %s\n" % (new)) running = new def closeup(): setrunning(False) try: sock.close() except: pass try: sock6.close() except: pass if DEBUG > 0: sys.stderr.write("asking http server to stop\n") try: serv.shutdown() if DEBUG > 0: sys.stderr.write("http server stopped\n") except Exception as e: if DEBUG > 0: sys.stderr.write("http server did NOT stop: %s\n" % str(e)) try: serv.server_close() except: pass log(None, "restarting") try: logf.close() except: pass # signal.signal(signal.SIGTERM, 0) signal.signal(signal.SIGHUP, 0) def restart(): if verbose: print "execv %s %s" % (sys.argv[0], [sys.argv[0]]+cmdargs) os.execv(sys.argv[0], [sys.argv[0]]+cmdargs) print "should not be here" def saveandrestart(): closeup() restart() def pickleit(): pickf = open(pickfile, 'w') pick = cPickle.Pickler(pickf) pick.dump(Host.hosts) pick.dump(msgs) pick.dump(lastfm) pickf.close() # # Main # helpflag = False forground = False optlist = [] args = [] home = os.environ['HOME'] cmdargs = [] configfile = "%s/.hbrc" % home try: optlist, args = getopt.getopt(sys.argv[1:], 'c:dfh:vx') except: helpflag = True for o, a in optlist: if o == '-c': configfile = a cmdargs += [o, a] if o == '-f': forground = True cmdargs += [o] elif o == '-h': helpflag = True elif o == '-v': verbose = True cmdargs += [o] elif o == '-x': DEBUG += 1 cmdargs += [o] if helpflag: print "hbc HeartBeatDaemon" print "usage: hbd [-dfhvx] [-c configfile]" print print " -c configfile" print " -d display" print " -f run in foreground" print " -h this help" print " -v verbose" print " -x increase debug lvl" print print """ config file can contain logfile = /var/log/heartbeat.log logfmt = [text|msg] hb_port = 50003 interval = 20 hbd_port = 50004 hbd_host = www.domain.com grace = 2 """ sys.exit(1) # # set defaults hb_port = PORT hbd_host = THOST hbd_port = TPORT pickfile = PICKFILE logfile = LOGFILE logfmt = "text" interval = INTERVAL grace = GRACE watchhosts = [] dyndnshosts = [] drophosts = [] try: f = open(configfile, "r") if verbose: print "notice: using config file %s" % configfile except: print "warning: running without config file: %s" % configfile f = None if f: while 1: l = f.readline() if len(l) == 0: break if verbose: print " %s" % l[:-1] r = l[:-1].split('=') o = r[0].strip() a = eval(r[1].strip()) if o == 'interval': interval = a elif o == 'grace': grace = a elif o == 'hbd_port': hbd_port = a elif o == 'hbd_host': hbd_host = a elif o == 'pickfile': pickfile = a elif o == 'hb_port': hb_port = a elif o == 'logfile': logfile = a elif o == 'logfmt': logfmt = a elif o == 'watchhosts': watchhosts = a elif o == 'dyndnshosts': dyndnshosts = a elif o == 'drophosts': drophosts = a f.close() if len(args) != 0: print "error: args" sys.exit(1) if verbose: print "notice: logging to %s" % logfile logf = initlog(logfile) if 1 and os.path.exists(pickfile): if verbose: print "opening pickls %s" % pickfile pickf = open(pickfile, 'r') pick = cPickle.Unpickler(pickf) try: Host.hosts = pick.load() msgs = pick.load() try: lastfm = pick.load() except: lastfm = ["","",""] pickf.close() except: os.unlink(pickfile) Connection.htab = {} for h in Host.hosts.keys(): Host.hosts[h].dyn = h in dyndnshosts Host.hosts[h].fixup() for h in drophosts: if h in Host.hosts: del Host.hosts[h] if verbose: print "%s pickled hosts loaded" % len(Host.hosts) else: if verbose: print "no pickled data" now = time.time() startsec = int(now) % interval log(None, "Starting %s" % VER) atexit.register(on_exit) ilist = [] sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(("", hb_port)) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ilist.append(sock) sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock6.bind(("", hb_port)) ilist.append(sock6) #ilist.append(serv.fileno()) if not forground: pid = os.fork() if pid > 0: if verbose: print "daemoinizing... pid = %d" % pid sys.exit(0) verbose = False os.close(0) os.close(1) os.close(2) sys.stdin.close() if DEBUG > 0: sys.stdout = LogDevice() sys.stderr = LogDevice() else: sys.stdout = NullDevice() sys.stderr = NullDevice() os.chdir("/tmp") os.setsid() os.umask(0) try: serv = HttpServer((hbd_host, hbd_port), HttpHandler) except: print "failed to start server on %s:%s" % (hbd_host, hbd_port) sys.exit(1) servthread = threading.Thread(target=serv.serve_forever) servthread.daemon = True servthread.start() Host.dnsQ = Queue.Queue() dnsT = threading.Thread(target=dnsupdatethread) dnsT.daemon = True dnsT.start() running = True sig = 0 signal.signal(signal.SIGTERM, handler) signal.signal(signal.SIGHUP, handler) next = int(now)+15 # 15 seconds time to settle after (re-)start sleep = 1 firstcheck = int(now) + 15 while running: sr = None if DEBUG > 2: sys.stderr.write("about to sleep = %s\n" % (sleep)) try: sr = select.select(ilist, [], [], sleep) now = time.time() except KeyboardInterrupt: sys.stderr.write("Keyboard Interrupt!\n") running = False closeup() continue except select.error, value: if value[0] != 4: # interrupted system call sys.stderr.write("select err %s %s" % (select.error, value)) #raise os.error, value continue continue except Exception as e: if DEBUG > 2: sys.stderr.write("select exception %s\n" % (str(e))) sys.exit(1) if DEBUG > 2: sys.stderr.write("woke from sleep = %s (%s)\n" % (str(sr), str(ilist))) for fh in sr[0]: if fh in [sock, sock6]: readsock(fh) # elif fh == serv.fileno(): # serv.handle_request() else: sys.stderr.write("what happend just now?\n") if DEBUG > 2: sys.stderr.write("done handling, running is %s, sig is %s\n" % (running, sig)) # check hour/day/week for v in xrange(3): fm=tsfm[v] ts=time.strftime(tsfm[v], time.localtime(now)) if ts != lastfm[v]: lastfm[v]=ts for h in Host.hosts.keys(): Host.hosts[h].hdwcounts[v] = [Host.hosts[h].doesack, Host.hosts[h].upcount] if now >= next and now >= firstcheck: next = now+1 checkoverdue() sleep = next-now if sleep < 0: sys.stderr.write("sleep is negative! %s next = %s\n" % (sleep, next)) sleep = 0 if DEBUG > 2: sys.stderr.write("sleep = %s next = %s\n" % (sleep, next)) if sig != 0: setrunning(False) if sig == signal.SIGHUP: if DEBUG > 0: sys.stderr.write("signal 1 saveandrestart\n") saveandrestart()