major rework for ver 3.0
This commit is contained in:
@@ -11,6 +11,11 @@ import string
|
||||
import select
|
||||
import errno
|
||||
import traceback
|
||||
import urllib2
|
||||
import md5
|
||||
import shutil
|
||||
import zlib
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
import lockfile
|
||||
@@ -33,123 +38,269 @@ import syslog
|
||||
PORT = 50003
|
||||
INTERVAL = 10
|
||||
PIDFILE = '/tmp/hbc.pid'
|
||||
DBG = False
|
||||
VER = 1
|
||||
MAXRECV = 32767
|
||||
|
||||
socks = None
|
||||
up = True
|
||||
ackcount = 0
|
||||
running = True
|
||||
dorestart = False
|
||||
|
||||
class NullDevice:
|
||||
def write(self, s):
|
||||
pass
|
||||
|
||||
|
||||
class Conn:
|
||||
def __init__(self, conId, addr, port, af):
|
||||
self.conId = conId
|
||||
self.addr = addr
|
||||
self.port = port
|
||||
self.af = af
|
||||
|
||||
self.ackcount = 0 # num of accks received
|
||||
self.lastack = 0 # time() last ACK was received
|
||||
self.send = 0
|
||||
self.lastsend = 0 # time() last msg was sent
|
||||
self.rtts = []
|
||||
self.sock=socket.socket(af, socket.SOCK_DGRAM)
|
||||
self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, \
|
||||
self.sock.getsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR) | 1)
|
||||
|
||||
def sendto(self, msg, ID = 'HTB'): # default ID is HearTBeat
|
||||
msg['name'] = iam
|
||||
msg['id'] = self.conId
|
||||
msg['ver'] = VER
|
||||
msg['time'] = time.time()
|
||||
m = dicttos(ID, msg)
|
||||
mz = zlib.compress(m,9)
|
||||
if verbose: print "conn.send('%s', (%s:%s) %s>%s)" % (msg, self.addr, self.port, len(m), len(mz))
|
||||
self.sock.sendto(m, (self.addr, self.port))
|
||||
self.send += 1
|
||||
self.lastsend = time.time()
|
||||
|
||||
|
||||
def ack(self, msgDict):
|
||||
self.lastack = time.time()
|
||||
self.lastacksent = float(msgDict.get('time','0'))
|
||||
if verbose: print "ack RTT: %0.1f ms" % ((self.lastack - self.lastacksent) * 1000.0)
|
||||
self.ackcount += 1
|
||||
|
||||
|
||||
def close(self):
|
||||
if self.sock:
|
||||
self.sock.close()
|
||||
self.sock = None
|
||||
|
||||
|
||||
def dicttos(ID, d, compress=False):
|
||||
s = []
|
||||
for k in d:
|
||||
if type(d[k]) == type(1.2):
|
||||
s.append("%s=%0.5f" % (k, d[k]))
|
||||
else:
|
||||
s.append("%s=%s" % (k, d[k]))
|
||||
pk = ";".join(s)
|
||||
if compress:
|
||||
zpk = zlib.compress(pk, 6)
|
||||
ID = "!"+ID
|
||||
else:
|
||||
zpk = pk
|
||||
return ID + ":" + zpk
|
||||
|
||||
|
||||
def stodict(msg):
|
||||
d = {}
|
||||
r0 = msg.split(':',1)
|
||||
if len(r0) == 1:
|
||||
return None
|
||||
if r0[0][0] == '!': # compressed
|
||||
pk = zlib.decompress(msg[len(r0[0])+1:])
|
||||
d['ID'] = r0[0][1:]
|
||||
else:
|
||||
pk = r0[1]
|
||||
d['ID'] = r0[0]
|
||||
r = pk.split(';')
|
||||
for v in r:
|
||||
vr = v.split('=', 1)
|
||||
k = vr[0].strip()
|
||||
if len(vr) == 1:
|
||||
d[k] = None
|
||||
else:
|
||||
v = vr[1].strip()
|
||||
try:
|
||||
if v[0].isdigit():
|
||||
v = eval(v)
|
||||
except:
|
||||
pass
|
||||
d[k] = v
|
||||
return d
|
||||
|
||||
|
||||
def syslogtrace(note):
|
||||
logm = '%s hbc died: \n%s' % (note, traceback.format_exc())
|
||||
for l in logm.split('\n'):
|
||||
syslog.syslog(syslog.LOG_ERR, ' tb: %s' % l)
|
||||
if verbose:
|
||||
print logm
|
||||
|
||||
|
||||
def getsock(host):
|
||||
try:
|
||||
rs=socket.getaddrinfo(host, 50001, 0, 0, socket.SOL_UDP)
|
||||
except socket.gaierror:
|
||||
logm = '%s hbc died: \n%s' % ('getsock', traceback.format_exc())
|
||||
if DBG: print logm
|
||||
return None
|
||||
socks = []
|
||||
for r in rs:
|
||||
if DBG: print "address %s" % str(r)
|
||||
if r[0] in [10, 28, 30]:
|
||||
af_type=socket.AF_INET6
|
||||
elif r[0] == 2:
|
||||
af_type=socket.AF_INET
|
||||
else:
|
||||
print "dont know this net type: %s" % r[0][0]
|
||||
sys.exit(1)
|
||||
if verbose:
|
||||
syslog.syslog(syslog.LOG_ERR, "socktype: %s" % af_type)
|
||||
sock=socket.socket(af_type, socket.SOCK_DGRAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, \
|
||||
sock.getsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR) | 1)
|
||||
if verbose: syslog.syslog(syslog.LOG_ERR, "get socket %s" % sock)
|
||||
socks.append(sock)
|
||||
|
||||
conId = 1
|
||||
def createConnections(hosts):
|
||||
global conId
|
||||
for host in hosts:
|
||||
if verbose: print "createConnections for %s" % host
|
||||
try:
|
||||
rs=socket.getaddrinfo(host, hb_port, 0, 0, socket.SOL_UDP)
|
||||
except socket.gaierror:
|
||||
logm = '%s hbc died: \n%s' % ('createConnections', traceback.format_exc())
|
||||
if verbose: print logm
|
||||
return None
|
||||
for r in rs:
|
||||
if verbose: print "address %s" % str(r)
|
||||
if r[0] in [10, 28, 30]:
|
||||
af=socket.AF_INET6
|
||||
elif r[0] == 2:
|
||||
af=socket.AF_INET
|
||||
else:
|
||||
print "dont know this net type: %s" % r[0][0]
|
||||
sys.exit(1)
|
||||
|
||||
return socks
|
||||
addr = r[4][0]
|
||||
conns[conId] = Conn(conId, addr, hb_port, af)
|
||||
conId += 1
|
||||
|
||||
|
||||
def socksend(msg, tohost):
|
||||
global socks
|
||||
def doexec(conn, data):
|
||||
try:
|
||||
ro = subprocess.check_output(data, stderr=subprocess.STDOUT, shell=True)
|
||||
fail = "OK"
|
||||
except subprocess.CalledProcessError as e:
|
||||
ro = str(e)
|
||||
fail = "CalledProcessError"
|
||||
except Exception as e:
|
||||
syslogtrace('System')
|
||||
ro = None
|
||||
fail = "cmd failed: %s" % e
|
||||
msg={'service': 'command', 'msg': fail+" "+ro}
|
||||
conns[conn].sendto(msg)
|
||||
|
||||
if socks == None:
|
||||
socks = getsock(tohost[0])
|
||||
for sock in socks:
|
||||
if DBG: print "socksend: sending msg=%s on socket=%s" % (msg, sock)
|
||||
sock.sendto(msg, tohost)
|
||||
if verbose: syslog.syslog(syslog.LOG_ERR, "msg %s sent" % msg)
|
||||
|
||||
def doupdate(conn, msgDict):
|
||||
fail = None
|
||||
try:
|
||||
code = msgDict['code'].decode('base64')
|
||||
csum = msgDict['csum']
|
||||
except:
|
||||
fail = "csum/code missing"
|
||||
if not fail:
|
||||
fail = doupdateone(code, csum)
|
||||
|
||||
msg={'service': 'update', 'msg': fail if fail else "OK"}
|
||||
conns[conn].sendto(msg)
|
||||
return fail
|
||||
|
||||
|
||||
def doupdateone(code, csum):
|
||||
|
||||
m = md5.new()
|
||||
m.update(code)
|
||||
icsum = m.hexdigest()
|
||||
if icsum != csum:
|
||||
return "checksum error"
|
||||
|
||||
fn = sys.argv[0]
|
||||
ofn = "%.sav" % fn
|
||||
try:
|
||||
shutil.copy2(fn, ofn)
|
||||
except Exception as e:
|
||||
return "cannot make backup copy: %s" % e
|
||||
|
||||
try:
|
||||
fh=open(fn, "w")
|
||||
fh.write(code)
|
||||
fh.close()
|
||||
except Exception as e:
|
||||
return "cannot write new code: %s" % e
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def restart():
|
||||
if verbose: print "restart: execv %s %s" % (sys.argv[0], [sys.argv[0]]+cmdargs)
|
||||
os.execv(sys.argv[0], [sys.argv[0]]+cmdargs)
|
||||
print "should not be here"
|
||||
|
||||
|
||||
def process():
|
||||
global up, socks, ackcount
|
||||
global running, dorestart
|
||||
|
||||
if socks == None:
|
||||
socks=getsock(tohost[0])
|
||||
ifiles = {}
|
||||
conIds = {}
|
||||
for conn in conns:
|
||||
ifiles[conns[conn].sock.fileno()] = conns[conn].sock
|
||||
conIds[conns[conn].sock.fileno()] = conn
|
||||
nextReport = time.time()
|
||||
|
||||
ackcount=0
|
||||
lastT=time.time()
|
||||
ifiles = []
|
||||
for sock in socks:
|
||||
ifiles.append(sock.fileno())
|
||||
while up:
|
||||
sleep=(lastT+interval) - time.time()
|
||||
if verbose: syslog.syslog(syslog.LOG_ERR, "sleep %s" % sleep)
|
||||
if sleep > 0:
|
||||
while running:
|
||||
while time.time() < nextReport:
|
||||
sleep=nextReport - time.time()
|
||||
if verbose: print "process: sleep %s" % sleep
|
||||
if sleep <= 0:
|
||||
break
|
||||
try:
|
||||
r=select.select(ifiles,[],[],sleep)
|
||||
r=select.select(ifiles.keys(),[],[],sleep)
|
||||
except KeyboardInterrupt:
|
||||
running = False
|
||||
break
|
||||
except:
|
||||
if up:
|
||||
if running:
|
||||
syslogtrace('select')
|
||||
break
|
||||
if verbose: syslog.syslog(syslog.LOG_ERR, "r is %s" % str(r))
|
||||
cont = False
|
||||
for sock in socks:
|
||||
if sock.fileno() in r[0]:
|
||||
data, addr = sock.recvfrom(1024)
|
||||
if data == "ACK":
|
||||
ackcount+=1
|
||||
else:
|
||||
try:
|
||||
os.system(data)
|
||||
except:
|
||||
syslogtrace('System')
|
||||
pass
|
||||
cont = True
|
||||
continue
|
||||
if cont:
|
||||
continue
|
||||
lastT=time.time()
|
||||
for hb_host in hb_hosts:
|
||||
try:
|
||||
msg="interval=%s;name=%s;time=%s;acks=%s" % (interval, iam, time.time(), ackcount)
|
||||
if verbose: syslog.syslog(syslog.LOG_ERR, "sock.send('%s', (%s, %s))" % (msg, hb_host, hb_port))
|
||||
socksend(msg, (hb_host, hb_port))
|
||||
except:
|
||||
logm = '%s hbc died: \n%s' % ('socksend', traceback.format_exc())
|
||||
if DBG: print logm
|
||||
pass
|
||||
|
||||
if verbose: print "process: r is %s" % str(r)
|
||||
for rfh in r[0]:
|
||||
conn = conIds[rfh]
|
||||
data, addr = ifiles[rfh].recvfrom(MAXRECV)
|
||||
if verbose: print "sock.recvfrom: %s (%s) %s" % (addr, len(data), data[:4])
|
||||
msgDict = stodict(data)
|
||||
if verbose: print "sock.recvfrom: %s (%s) %s" % (addr, len(data), str(msgDict)[:80])
|
||||
if msgDict['ID'] == "ACK":
|
||||
conns[conn].ack(msgDict)
|
||||
elif msgDict['ID'] == "UPD":
|
||||
if doupdate(conn, msgDict) == None:
|
||||
if verbose: print "process: restart after update"
|
||||
dorestart = True
|
||||
break
|
||||
elif msgDict['ID'] == "CMD":
|
||||
doexec(conn, msgDict['cmd'])
|
||||
else:
|
||||
doexec(conn, data) # deprecated until no more VER - hbc
|
||||
if dorestart:
|
||||
running = False
|
||||
break
|
||||
if not running:
|
||||
break
|
||||
for conn in conns:
|
||||
msg={'interval': interval, 'acks': conns[conn].ackcount}
|
||||
conns[conn].sendto(msg)
|
||||
nextReport += interval
|
||||
|
||||
def cleanup(a, b):
|
||||
global up, socks, ackcount
|
||||
up = False
|
||||
syslog.syslog(syslog.LOG_ERR, 'exit a=%s b=%s' % (str(a), str(b)))
|
||||
msg="shutdown=1;name=%s;acks=%s" % (iam, ackcount)
|
||||
for hb_host in hb_hosts:
|
||||
if verbose: syslog.syslog(syslog.LOG_ERR, "hbc: sock.send('%s', (%s, %s))" % (msg, hb_host, hb_port))
|
||||
socksend(msg, (hb_host, hb_port))
|
||||
if verbose: print "process: done running"
|
||||
|
||||
def cleanup():
|
||||
global running
|
||||
running = False
|
||||
for conn in conns:
|
||||
msg={'shutdown': 1, 'acks': conns[conn].ackcount}
|
||||
conns[conn].sendto(msg)
|
||||
conns[conn].close()
|
||||
time.sleep(1)
|
||||
for sock in socks:
|
||||
sock.close()
|
||||
closeall()
|
||||
|
||||
|
||||
def closeall():
|
||||
for conn in conns:
|
||||
conns[conn].close()
|
||||
syslog.syslog(syslog.LOG_ERR, 'exit')
|
||||
|
||||
|
||||
msgonly=False
|
||||
@@ -158,9 +309,11 @@ verbose=False
|
||||
fdaemon=False
|
||||
optlist=[]
|
||||
args=[]
|
||||
msgboot=[]
|
||||
msgboot={}
|
||||
home=os.environ['HOME']
|
||||
configfile="%s/.hbrc" % home
|
||||
cmdargs = []
|
||||
|
||||
|
||||
try:
|
||||
optlist, args = getopt.getopt(sys.argv[1:], 'bc:dhm:v')
|
||||
@@ -169,22 +322,28 @@ except:
|
||||
|
||||
for o,a in optlist:
|
||||
if o == '-b':
|
||||
msgboot.append("boot=1")
|
||||
msgboot['boot']=1
|
||||
msgonly=True
|
||||
elif o == '-c':
|
||||
configfile=a
|
||||
cmdargs += [o, a]
|
||||
elif o == '-d':
|
||||
fdaemon=True
|
||||
cmdargs += [o]
|
||||
elif o == '-h':
|
||||
helpflag=True
|
||||
elif o == '-m':
|
||||
msgboot.append("service=%s" % "service")
|
||||
a.replace(';',':')
|
||||
msgboot.append("msg=%s" % a)
|
||||
msgboot['service'] = "service"
|
||||
msgboot['msg'] = a
|
||||
msgonly=True
|
||||
elif o == '-v':
|
||||
verbose=True
|
||||
cmdargs += [o]
|
||||
|
||||
|
||||
cmdargs += args
|
||||
if verbose: print "cmdargs for restart are %s" % cmdargs
|
||||
|
||||
if helpflag:
|
||||
print "hbc HeartBeatClient"
|
||||
print "usage: hbc [-bdhv] [-c configfile] [-m msg][host1 [..]]"
|
||||
@@ -217,7 +376,7 @@ hb_hosts=[]
|
||||
iam=socket.gethostname()
|
||||
|
||||
try:
|
||||
f=open(configfile,"r")
|
||||
f=open(configfile,"r")
|
||||
if verbose: print "notice: using config file %s" % configfile
|
||||
except:
|
||||
if verbose: print "warning: running without config file: %s" % configfile
|
||||
@@ -244,50 +403,43 @@ if f:
|
||||
|
||||
if len(args) != 0:
|
||||
hb_hosts=args
|
||||
|
||||
|
||||
if len(hb_hosts) == 0:
|
||||
print "no hb server specified"
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
|
||||
#
|
||||
if verbose:
|
||||
print "notice: hb_hosts: %s" % str(hb_hosts)
|
||||
print "notice: hb_port: %s" % hb_port
|
||||
print "notice: interval: %s" % interval
|
||||
print "notice: iam: %s" % iam
|
||||
print "notice: msgonly: %s" % msgonly
|
||||
print "notice: msgboot: %s" % msgboot
|
||||
|
||||
if not msgonly:
|
||||
msgboot.append("interval=%s" % interval)
|
||||
msgboot['interval'] = interval
|
||||
|
||||
|
||||
conns = {}
|
||||
createConnections(hb_hosts)
|
||||
|
||||
if verbose: print "%s connections created" % (len(conns))
|
||||
|
||||
if len(msgboot) > 0:
|
||||
if DBG: print "on boot"
|
||||
msgboot.append("name=%s" % iam)
|
||||
msgboot.append("time=%s" % time.time())
|
||||
msgboot.append("acks=0")
|
||||
msg=";".join(msgboot)
|
||||
while 1:
|
||||
fail=0
|
||||
for hb_host in hb_hosts:
|
||||
try:
|
||||
if DBG: print "sock.send('%s', (%s, %s))" % (msg, hb_host, hb_port)
|
||||
socksend(msg, (hb_host, hb_port))
|
||||
except:
|
||||
logm = '%s hbc died: \n%s' % ('socksend2', traceback.format_exc())
|
||||
if DBG: print logm
|
||||
fail=1
|
||||
if fail:
|
||||
time.sleep(10)
|
||||
else:
|
||||
break
|
||||
|
||||
if verbose: print "msgboot done msgonly=%s" % msgonly
|
||||
if verbose: print "on boot"
|
||||
msgboot['acks'] = 0
|
||||
for conn in conns:
|
||||
conns[conn].sendto(msgboot)
|
||||
|
||||
if msgonly:
|
||||
sys.exit(0)
|
||||
if verbose: print "msgboot done msgonly=%s" % msgonly
|
||||
closeall()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
#
|
||||
#
|
||||
|
||||
syslog.openlog(logoption=syslog.LOG_PID, facility=syslog.LOG_DAEMON)
|
||||
if fdaemon:
|
||||
|
||||
@@ -326,22 +478,34 @@ if fdaemon:
|
||||
}
|
||||
|
||||
context.files_preserve = []
|
||||
for sock in socks:
|
||||
context.files_preserve += [sock, sock.fileno()]
|
||||
for conn in conns:
|
||||
context.files_preserve += [conns[conn].sock, conns[conn].sock.fileno()]
|
||||
with context:
|
||||
syslog.syslog(syslog.LOG_ERR, 'starting heartbeat to %s' % ','.join(hb_hosts))
|
||||
up = True
|
||||
running = True
|
||||
try:
|
||||
process()
|
||||
except:
|
||||
syslogtrace('process')
|
||||
cleanup(0, None)
|
||||
if verbose: print "main: cleanup"
|
||||
cleanup()
|
||||
if dorestart:
|
||||
if verbose: print "main: restart"
|
||||
restart()
|
||||
|
||||
else:
|
||||
up = True
|
||||
running = True
|
||||
try:
|
||||
if verbose: print "starting loop process"
|
||||
process()
|
||||
except:
|
||||
except Exception as e:
|
||||
if verbose: print "err: process exit: %s" % e
|
||||
syslogtrace('process')
|
||||
cleanup(0, None)
|
||||
if verbose: print "main: cleanup"
|
||||
cleanup()
|
||||
if dorestart:
|
||||
if verbose: print "main: restart"
|
||||
restart()
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user