#!/usr/bin/env python # $Id: hbc,v 1.9 2012/03/29 02:08:36 andreas Exp $ import sys import time import socket import os import signal import getopt import string import select import errno import traceback import urllib2 import md5 import shutil import zlib import subprocess try: import lockfile import daemon import daemon.pidfile except: print """ require on Linux python-filelock python-daemon vs 1.61 or > run sudo easy_install-2.7 lockfile python-daemon on *bsd py27-lockfile py27-daemon run sudo pkg install -y py27-lockfile py27-daemon """ sys.exit(1) # N.B. daemon tries to close resource.RLIMIT_NOFILE file descriptors # which on FreeBSD in close to a million # hack: replace the function in daemon with ths one: def get_maximum_file_descriptors(): return 2048 daemon.get_maximum_file_descriptors = get_maximum_file_descriptors import syslog PORT = 50003 INTERVAL = 10 PIDFILE = '/tmp/hbc.pid' VER = 1 MAXRECV = 32767 running = True dorestart = False warned1 = False class NullDevice: def write(self, s): pass class Conn: def __init__(self, conId, addr, port, af): self.conId = conId self.addr = addr self.port = port self.af = af self.ackcount = 0 # num of accks received self.lastack = 0 # time() last ACK was received self.send = 0 self.lastsend = 0 # time() last msg was sent self.rtts = [0] self.sock=socket.socket(af, socket.SOCK_DGRAM) self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, \ self.sock.getsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR) | 1) def sendto(self, msg, ID = 'HTB'): # default ID is HearTBeat global warned1 msg['name'] = shortname(iam) msg['id'] = self.conId msg['ver'] = VER msg['time'] = time.time() m = dicttos(ID, msg) mz = zlib.compress(m,9) if verbose: print "conn.send('%s', (%s:%s) %s>%s)" % (msg, self.addr, self.port, len(m), len(mz)) try: self.sock.sendto(m, (self.addr, self.port)) except socket.error as e: if not warned1: print "socket error: %s %s:%s" % (e, self.addr, self.port) warned1 = True return self.send += 1 self.lastsend = time.time() def ack(self, msgDict): self.lastack = time.time() self.lastacksent = float(msgDict.get('time','0')) if verbose: print "ack RTT: %0.1f ms" % ((self.lastack - self.lastsend) * 1000.0) self.rtts.append((self.lastack - self.lastsend) * 1000.0) if len(self.rtts) > 10: del self.rtts[0] self.ackcount += 1 def close(self): if self.sock: self.sock.close() self.sock = None def shortname(name): r = string.split(name, '.') return r[0] def dicttos(ID, d, compress=False): s = [] for k in d: if type(d[k]) == type(1.2): s.append("%s=%0.5f" % (k, d[k])) else: s.append("%s=%s" % (k, d[k])) pk = ";".join(s) if compress: zpk = zlib.compress(pk, 6) ID = "!"+ID else: zpk = pk return ID + ":" + zpk def stodict(msg): d = {} r0 = msg.split(':',1) if len(r0) == 1: return None if r0[0][0] == '!': # compressed pk = zlib.decompress(msg[len(r0[0])+1:]) d['ID'] = r0[0][1:] else: pk = r0[1] d['ID'] = r0[0] r = pk.split(';') for v in r: vr = v.split('=', 1) k = vr[0].strip() if len(vr) == 1: d[k] = None else: v = vr[1].strip() try: if v[0].isdigit(): v = eval(v) except: pass d[k] = v return d def syslogtrace(note): logm = '%s hbc died: \n%s' % (note, traceback.format_exc()) for l in logm.split('\n'): syslog.syslog(syslog.LOG_ERR, ' tb: %s' % l) if verbose: print logm conId = 1 def createConnections(hosts): global conId for host in hosts: if verbose: print "createConnections for %s" % host try: rs=socket.getaddrinfo(host, hb_port, 0, 0, socket.SOL_UDP) except socket.gaierror: logm = '%s hbc died: \n%s' % ('createConnections', traceback.format_exc()) if verbose: print logm return None for r in rs: if verbose: print "address %s" % str(r) if r[0] in [10, 24, 28, 30]: # for Linux, NetBSD, FreeBSD af=socket.AF_INET6 elif r[0] == 2: af=socket.AF_INET else: print "dont know this net type: %s" % r[0][0] sys.exit(1) addr = r[4][0] conns[conId] = Conn(conId, addr, hb_port, af) conId += 1 def doexec(conn, data): try: ro = subprocess.check_output(data, stderr=subprocess.STDOUT, shell=True) fail = "OK" except subprocess.CalledProcessError as e: ro = str(e) fail = "CalledProcessError" except Exception as e: syslogtrace('System') ro = None fail = "cmd failed: %s" % e msg={'service': 'command', 'msg': fail+" "+ro} conns[conn].sendto(msg) def doupdate(conn, msgDict): fail = None try: code = msgDict['code'].decode('base64') csum = msgDict['csum'] except: fail = "csum/code missing" if not fail: fail = doupdateone(code, csum) msg={'service': 'update', 'msg': fail if fail else "OK"} conns[conn].sendto(msg) if not fail: syslog.syslog(syslog.LOG_ERR, 'hc updates, fs = %s' % (len(code))) return fail def doupdateone(code, csum): m = md5.new() m.update(code) icsum = m.hexdigest() if icsum != csum: return "checksum error" fn = sys.argv[0] ofn = "%.sav" % fn try: shutil.copy2(fn, ofn) except Exception as e: return "cannot make backup copy: %s" % e try: fh=open(fn, "w") fh.write(code) fh.close() except Exception as e: return "cannot write new code: %s" % e return None def restart(): if verbose: print "restart: execv %s %s" % (sys.argv[0], [sys.argv[0]]+cmdargs) syslog.syslog(syslog.LOG_ERR, 'restart %s' % (sys.argv[0])) e = "fallthrough" try: os.execv(sys.argv[0], [sys.argv[0]]+cmdargs) except Exception as e: pass print "should not be here:", str(e) syslog.syslog(syslog.LOG_ERR, 'restart failed: %s' % e) def process(): global running, dorestart ifiles = {} conIds = {} for conn in conns: ifiles[conns[conn].sock.fileno()] = conns[conn].sock conIds[conns[conn].sock.fileno()] = conn nextReport = time.time() while running: while time.time() < nextReport: sleep=nextReport - time.time() if verbose: print "process: sleep %s" % sleep if sleep <= 0: break try: r=select.select(ifiles.keys(),[],[],sleep) except KeyboardInterrupt: running = False break except SystemExit: syslog.syslog(syslog.LOG_ERR, 'daemon exit, running=: %s' % running) if running: running = False break except: if running: syslogtrace('select') running = False break if verbose: print "process: r is %s" % str(r) for rfh in r[0]: conn = conIds[rfh] data, addr = ifiles[rfh].recvfrom(MAXRECV) if verbose: print "sock.recvfrom: %s (%s) %s" % (addr, len(data), data[:4]) msgDict = stodict(data) if verbose: print "sock.recvfrom: %s (%s) %s" % (addr, len(data), str(msgDict)[:80]) if msgDict['ID'] == "ACK": conns[conn].ack(msgDict) elif msgDict['ID'] == "UPD": if doupdate(conn, msgDict) == None: if verbose: print "process: restart after update" dorestart = True break elif msgDict['ID'] == "CMD": doexec(conn, msgDict['cmd']) else: doexec(conn, data) # deprecated until no more VER - hbc if dorestart: running = False break if not running: break for conn in conns: # msg={'interval': interval, 'acks': conns[conn].ackcount, 'rtt': conns[conn].rtts[-1]} msg={'acks': conns[conn].ackcount, 'rtt': conns[conn].rtts[-1]} conns[conn].sendto(msg) nextReport = time.time() + interval if verbose: print "process: done running" def cleanup(): global running if verbose: syslog.syslog(syslog.LOG_ERR, 'cleanup') running = False for conn in conns: msg={'shutdown': 1, 'acks': conns[conn].ackcount} conns[conn].sendto(msg) conns[conn].close() time.sleep(1) closeall() def closeall(): if verbose: syslog.syslog(syslog.LOG_ERR, 'closecall') for conn in conns: conns[conn].close() msgonly=False helpflag=False verbose=False fdaemon=False optlist=[] args=[] msgboot={} home=os.environ['HOME'] configfile="%s/.hbrc" % home cmdargs = [] try: optlist, args = getopt.getopt(sys.argv[1:], 'bc:dhm:v') except: helpflag=True for o,a in optlist: if o == '-b': msgboot['boot']=1 elif o == '-c': configfile=a cmdargs += [o, a] elif o == '-d': fdaemon=True cmdargs += [o] elif o == '-h': helpflag=True elif o == '-m': msgboot['service'] = "service" msgboot['msg'] = a msgonly=True elif o == '-v': verbose=True cmdargs += [o] cmdargs += args if verbose: print "cmdargs for restart are %s" % cmdargs if helpflag: print "hbc HeartBeatClient" print "usage: hbc [-bdhv] [-c configfile] [-m msg][host1 [..]]" print print " -b indicate machine boot" print " -c configfile" print " -d daemonize" print " -h this help" print " -m send a message" print " -v verbose" print print """ config file can contain hb_hosts=('host1', 'host2', ..._ hb_port=50003 interval=20 logfile=... logfmt={|test|msg} grace=SECONDS reportstrict={True|False} """ sys.exit(1) # # set defaults hb_port=PORT interval=INTERVAL hb_hosts=[] iam=socket.gethostname() try: f=open(configfile,"r") if verbose: print "notice: using config file %s" % configfile except: if verbose: print "warning: running without config file: %s" % configfile f=None if f: while 1: l=f.readline() if len(l) == 0: break r=l[:-1].split('=') if r[0] == 'hb_hosts': hb_hosts=eval(r[1]) if verbose: print "notice: cfg hb_hosts: %s" % hb_hosts elif r[0] == 'interval': interval=eval(r[1]) elif r[0] == 'hb_port': hb_port=eval(r[1]) elif r[0] == 'name': iam=eval(r[1]) if verbose: print "name set to %s" % iam f.close() if len(args) != 0: hb_hosts=args if len(hb_hosts) == 0: print "no hb server specified" sys.exit(1) # if verbose: print "notice: hb_hosts: %s" % str(hb_hosts) print "notice: hb_port: %s" % hb_port print "notice: interval: %s" % interval print "notice: iam: %s" % iam print "notice: msgonly: %s" % msgonly print "notice: msgboot: %s" % msgboot if not msgonly: msgboot['interval'] = interval conns = {} createConnections(hb_hosts) if verbose: print "%s connections created" % (len(conns)) if len(msgboot) > 0: if verbose: print "on boot" msgboot['acks'] = 0 for conn in conns: conns[conn].sendto(msgboot) if msgonly: if verbose: print "msgboot done msgonly=%s" % msgonly closeall() sys.exit(0) # syslog.openlog('hbc', syslog.LOG_PID, syslog.LOG_DAEMON) if fdaemon: pidfile = daemon.pidfile.TimeoutPIDLockFile(PIDFILE, acquire_timeout=-1) try: opid = pidfile.read_pid() except: opid = None if verbose: print "opid %s" % opid if opid: try: os.kill(opid, 0) is_running = True except: is_running = False if verbose: print "is_running %s" % is_running if is_running: print "process still alive %s" % opid sys.exit(1) print "warning: stale pid file removed" os.unlink(PIDFILE) print "daemoinizing... %s" % os.getpid() context = daemon.DaemonContext( working_directory='/tmp', umask=0o022, pidfile=pidfile, detach_process=True, # initgroups=False, ) context.signal_map = { # signal.SIGHUP: cleanup, signal.SIGTERM: 'terminate', # signal.SIGUSR1: reload_program_config, } context.files_preserve = [] for conn in conns: context.files_preserve += [conns[conn].sock, conns[conn].sock.fileno()] with context: syslog.syslog(syslog.LOG_ERR, 'starting heartbeat to %s' % ','.join(hb_hosts)) running = True try: process() except: syslogtrace('process') else: running = True try: if verbose: print "starting loop process" process() except Exception as e: if verbose: print "err: process exit: %s" % e syslogtrace('process') if verbose: print "main: cleanup" cleanup() if dorestart: restart()