add hbc and prepare for package

This commit is contained in:
2026-02-06 15:19:14 -05:00
parent 4df700e4ef
commit 3ca619e86d
15 changed files with 574 additions and 176 deletions
Executable
+593
View File
@@ -0,0 +1,593 @@
#!/usr/bin/env python3
# $Id: hbc,v 1.9 2012/03/29 02:08:36 andreas Exp $
# NEW
import argparse
import sys
import time
import socket
import os
import signal
import getopt
import string
import select
import errno
import traceback
from hashlib import md5
import shutil
import zlib
import subprocess
import syslog
import codecs
from .config import load_config
PORT = 50003
INTERVAL = 10
REOPENC = 6
PIDFILE = "/tmp/hbc.pid"
VER = 6
MAXRECV = 32767
running = True
dorestart = False
warned1 = False
msgonly = False
helpflag = False
verbose = False
fdaemon = False
daemonized = False
optlist = []
msgboot = {}
home = os.environ["HOME"]
configfile = "%s/.hbrc" % home
cmdargs = []
iam = socket.gethostname()
def log(msg):
if fdaemon:
syslog.syslog(syslog.LOG_ERR, msg)
else:
print(msg)
def handler(signum, frame):
if signum == signal.SIGTERM:
cleanup()
class NullDevice:
def write(self, s):
pass
class Conn:
def __init__(self, conId, addr, port, af):
self.conId = conId
self.addr = addr
self.port = port
self.af = af
self.ackcount = 0 # num of accks received
self.lastack = 0 # time() last ACK was received
self.send = 0
self.lastsend = 0 # time() last msg was sent
self.rtts = [0]
self.sock = None
def __str__(self):
return "Con(%s, %s %s)" % (self.addr, self.port, self.af)
def open(self):
self.sock = socket.socket(self.af, socket.SOCK_DGRAM)
self.sock.setsockopt(
socket.SOL_SOCKET,
socket.SO_REUSEADDR,
self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1,
)
def sendto(self, msg, ID="HTB"): # default ID is HearTBeat
global warned1
if self.send % REOPENC == 0:
self.close()
if not self.sock:
self.open()
msg["name"] = shortname(iam)
msg["id"] = self.conId
msg["ver"] = VER
msg["time"] = time.time()
m = dicttos(ID, msg) # always compress
if verbose:
log("conn.send('%s', (%s:%s) %s)" % (msg, self.addr, self.port, len(m)))
try:
self.sock.sendto(m, (self.addr, self.port))
except socket.error as e:
if not warned1:
log("socket error: %s %s:%s" % (e, self.addr, self.port))
warned1 = True
self.close()
return
self.send += 1
self.lastsend = time.time()
def ack(self, msgDict, now):
try:
self.lastack = msgDict["time"]
mul = 2
except:
self.lastack = now
mul = 1
rtt = (self.lastack - self.lastsend) * mul
if verbose:
log("ack RTT: %0.1f ms (now %s)" % (rtt * 1000.0, now))
self.rtts.append(rtt * 1000.0)
if len(self.rtts) > 10:
del self.rtts[0]
self.ackcount += 1
def close(self):
if self.sock:
self.sock.close()
self.sock = None
def shortname(name):
r = name.split(".")
return r[0]
def dicttos(ID, d):
s = []
for k in d:
if type(d[k]) == type(1.2):
s.append("%s=%0.5f" % (k, d[k]))
else:
s.append("%s=%s" % (k, d[k]))
pk = ";".join(s)
zpk = zlib.compress(pk.encode(), 6)
ID = "!" + ID + ":"
return ID.encode() + zpk
def stodict(msg):
d = {}
if len(msg) > 0 and chr(msg[0]) == "!":
pk = zlib.decompress(msg[5:]).decode()
d["ID"] = msg[1:4].decode()
else:
r0 = msg.split(":", 1)
pk = r0[1]
d["ID"] = r0[0]
r = pk.split(";")
for v in r:
vr = v.split("=", 1)
k = vr[0].strip()
if len(vr) == 1:
d[k] = None
else:
v = vr[1].strip()
try:
v = eval(v)
except:
pass
d[k] = v
if verbose:
print("msg is %s" % d)
return d
def XXstodict(msg):
d = {}
r0 = msg.split(":", 1)
if len(r0) == 1:
return None
if r0[0][0] == "!": # compressed
pk = zlib.decompress(msg[len(r0[0]) + 1 :])
d["ID"] = r0[0][1:]
else:
pk = r0[1]
d["ID"] = r0[0]
r = pk.split(";")
for v in r:
vr = v.split("=", 1)
k = vr[0].strip()
if len(vr) == 1:
d[k] = None
else:
v = vr[1].strip()
try:
if v[0].isdigit():
v = eval(v)
except:
pass
d[k] = v
return d
def syslogtrace(note):
logm = "%s hbc died: \n%s" % (note, traceback.format_exc())
log(logm)
for l in logm.split("\n"):
syslog.syslog(syslog.LOG_ERR, " tb: %s" % l)
if verbose:
print(logm)
conId = 1
def createConnections(hosts):
global conId
for host in hosts:
if verbose:
log("createConnections for %s" % host)
try:
rs = socket.getaddrinfo(host, hb_port, 0, 0, socket.SOL_UDP)
except socket.gaierror:
logm = "%s hbc died: \n%s" % ("createConnections", traceback.format_exc())
if verbose:
log(logm)
return None
for r in rs:
if verbose:
log("address %s" % str(r))
if r[0] in [10, 24, 28, 30]: # for Linux, NetBSD, FreeBSD
af = socket.AF_INET6
elif r[0] == 2:
af = socket.AF_INET
else:
print("dont know this net type: %s" % r[0][0])
sys.exit(1)
addr = r[4][0]
conns[conId] = Conn(conId, addr, hb_port, af)
if verbose:
print("cons[%s] = %s" % (conId, str(conns[conId])))
conId += 1
def doexec(conn, data):
try:
ro = subprocess.check_output(
data, stderr=subprocess.STDOUT, shell=True
).decode()
fail = "OK"
except subprocess.CalledProcessError as e:
ro = str(e)
fail = "CalledProcessError"
except Exception as e:
syslogtrace("System")
ro = "N/A"
fail = "cmd failed: %s" % e
msg = {"service": "command", "msg": fail + " " + ro}
conns[conn].sendto(msg)
def doupdate(conn, msgDict):
fail = None
try:
code = codecs.decode(msgDict["code"], "base64").decode()
csum = msgDict["csum"]
except Exception as e:
fail = "csum/code missing: %s" % e
if not fail:
fail = doupdateone(code, csum)
msg = {"service": "update", "msg": fail if fail else "OK"}
conns[conn].sendto(msg)
if not fail:
log("hc updates, fs = %s" % (len(code)))
return fail
def doupdateone(code, csum):
m = md5()
m.update(code.encode())
icsum = m.hexdigest()
if icsum != csum:
return "checksum error"
fn = sys.argv[0]
ofn = "%s.sav" % fn
try:
shutil.copy2(fn, ofn)
except Exception as e:
return "cannot make backup copy: %s" % e
try:
fh = open(fn, "w")
fh.write(code)
fh.close()
except Exception as e:
return "cannot write new code: %s" % e
return None
def restart():
if verbose:
print("restart: execv %s %s" % (sys.argv[0], [sys.argv[0]] + cmdargs))
syslog.syslog(syslog.LOG_ERR, "restart %s" % (sys.argv[0]))
e = "fallthrough"
try:
os.execv(sys.argv[0], [sys.argv[0]] + cmdargs)
except Exception as e:
pass
print("should not be here:", str(e))
log("restart failed: %s" % e)
def process():
global running, dorestart
nextReport = time.time()
while running:
while time.time() < nextReport:
ifiles = {}
conIds = {}
for conn in conns:
if conns[conn].sock:
ifiles[conns[conn].sock.fileno()] = conns[conn].sock
conIds[conns[conn].sock.fileno()] = conn
sleep = nextReport - time.time()
if sleep <= 0:
break
try:
r = select.select(list(ifiles.keys()), [], [], sleep)
now = (
time.time()
) # nb: delay from actual packet arrival to select is ca. 105ms!
except KeyboardInterrupt:
running = False
break
except SystemExit:
log("daemon exit, running was %s" % running)
if running:
running = False
break
except:
if running:
syslogtrace("select")
running = False
break
for rfh in r[0]:
conn = conIds[rfh]
data, addr = ifiles[rfh].recvfrom(MAXRECV)
if verbose:
print("sock.recvfrom: %s (%s) %s" % (addr, len(data), data[:4]))
try:
msgDict = stodict(data)
except Exception as e:
print(
"failed to parse incoming data from %s: %s (%s)"
% (addr, data, e)
)
continue
if verbose:
print(
"sock.recvfrom: %s (%s) %s"
% (addr, len(data), str(msgDict)[:80])
)
if msgDict == None:
print("bad backet from %s (%s) %s" % (addr, len(data), data))
elif msgDict["ID"] == "ACK":
conns[conn].ack(msgDict, now)
elif msgDict["ID"] == "UPD":
if doupdate(conn, msgDict) == None:
if verbose:
print("process: restart after update")
dorestart = True
break
elif msgDict["ID"] == "CMD":
doexec(conn, msgDict["cmd"])
else:
doexec(conn, data) # deprecated until no more VER - hbc
if dorestart:
running = False
break
if not running:
break
for conn in conns:
msg = {"acks": conns[conn].ackcount, "rtt": conns[conn].rtts[-1]}
conns[conn].sendto(msg)
time.sleep(
0.1
) # N.B. Linux (i.e. Rasperry Pi 3 drops the second pkg unless delayed
if nextReport + interval >= time.time():
nextReport += interval
else:
nextReport = time.time() + interval
if verbose:
log("process: done running")
def cleanup():
global running
if not running:
return
if verbose:
log("cleanup")
running = False
for conn in conns:
msg = {"shutdown": 1, "acks": conns[conn].ackcount}
conns[conn].sendto(msg)
conns[conn].close()
time.sleep(1)
closeall()
def closeall():
if verbose:
syslog.syslog(syslog.LOG_ERR, "closecall")
for conn in conns:
conns[conn].close()
def daemonize(
working_dir="/", stdin="/dev/zero", stdout="/dev/null", stderr="/dev/null"
):
"""
Does the UNIX double-fork magic, see Stevens' "Advanced Programming in the
UNIX Environment" for details (ISBN 0201563177)
http://www.yendor.com/programming/unix/apue/proc/fork2.c
"""
try:
# first fork
pid = os.fork()
if pid > 0:
# exit from first parent
os._exit(0)
except OSError as e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
os._exit(1)
# decouple from parent environment
os.chdir(working_dir)
os.setsid()
os.umask(0)
# second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
os._exit(0)
except OSError as e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# redirects standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(stdin, "r")
so = open(stdout, "a+")
se = open(stderr, "a+")
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
#
# Main program
#
def build_parser():
parser = argparse.ArgumentParser(
prog="hbc",
description="HeartBeatClient - send a heatbeat message to a HeartBeatDaemon",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("-b", "--boot", action="store_true", help="Send a boot message")
parser.add_argument("-c", "--config", dest="configfile", help="Config file path (YAML)")
parser.add_argument("-m", "--message", dest="message", help="Send a message")
parser.add_argument("-n", "--name", dest="name", help="Name to use in heartbeat message")
parser.add_argument("-f", "--daemon", action="store_true", help="Run in daemon mode")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("-x", "--debug", action="count", default=0, help="Increase debug level")
parser.add_argument("hosts", nargs="+", help="Heartbeat daemon hosts to send to")
return parser
def main(argv=None):
global msgonly, helpflag, verbose, fdaemon, daemonized, optlist, msgboot, home, configfile, cmdargs, iam, hb_port, conns, interval, hb_hosts
parser = build_parser()
args = parser.parse_args(argv)
config = load_config(args.configfile)
# Apply CLI overrides
if args.boot:
msgboot["boot"] = 1
if args.message:
msgboot["service"] = "service"
msgboot["msg"] = args.message
msgonly = True
if args.name:
iam = args.name
if args.daemon:
fdaemon = True
if args.verbose:
verbose = True
if args.debug:
config.setdefault("debug", 0)
config["debug"] += args.debug
cmdargs += argv
if verbose:
print("cmdargs for restart are %s" % cmdargs)
#
# set defaults
hb_hosts = args.hosts
hb_port = config.get("hb_port", PORT)
interval = config.get("interval", INTERVAL)
#
if verbose:
print("notice: hb_hosts: %s" % str(hb_hosts))
print("notice: hb_port: %s" % hb_port)
print("notice: interval: %s" % interval)
print("notice: iam: %s" % iam)
print("notice: msgonly: %s" % msgonly)
print("notice: msgboot: %s" % msgboot)
if not msgonly:
msgboot["interval"] = interval
conns = {}
while True:
if verbose:
log("create connections")
createConnections(hb_hosts)
if len(conns) != 0:
break
if verbose:
log("no connections yet, sleep a bit")
time.sleep(2)
if verbose:
log("%s connections created" % (len(conns)))
if len(msgboot) > 0:
if verbose:
print("on boot")
msgboot["acks"] = 0
for conn in conns:
conns[conn].sendto(msgboot)
if msgonly:
if verbose:
print("msgboot done msgonly=%s" % msgonly)
closeall()
sys.exit(0)
#
syslog.openlog("hbc", syslog.LOG_PID, syslog.LOG_DAEMON)
if fdaemon:
print("daemoinizing.")
daemonize()
daemonized = True
syslog.syslog(syslog.LOG_ERR, "starting heartbeat to %s" % ",".join(hb_hosts))
signal.signal(signal.SIGTERM, handler)
running = True
try:
process()
except Exception as e:
syslogtrace("process")
if verbose:
print("err: process exit: %s" % e)
if verbose:
log("main: cleanup")
cleanup()
if dorestart:
restart()
if __name__ == "__main__":
main()
+6 -2
View File
@@ -131,10 +131,14 @@ async def start(
return web.Response(text="restart request")
async def live(request):
# render template from templates/live.html using Jinja2
env = jinja2.Environment(loader=jinja2.FileSystemLoader(config.get("templates_dir", "templates")))
# render template from hbd/templates/live.html using Jinja2
# Resolve templates directory relative to the hbd package
pkg_dir = os.path.dirname(__file__)
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
host = config.get("hb_host", "localhost")
extra_scripts = config.get("http_extra_scripts", "")
host = request.host.split(":")[0]
heartbeat_ws_url = f"ws://{host}:{config.get('ws_port', 50005)}/hbd"
tmpl = env.get_template("live.html")
body = tmpl.render(
+6
View File
@@ -0,0 +1,6 @@
#!/bin/sh
# install hbd/hbc from wheel and create symlinks for hbd and hbc in /usr/local/bin
set -e
pip install --upgrade --force-reinstall --no-deps --find-links https://github.com/andreas-h/heartbeat/releases/latest/download/ heartbeat-hbd
+3 -2
View File
@@ -9,6 +9,7 @@ from . import __version__
from . import udp
from . import hbdclass
from . import ws as ws_mod
logger = logging.getLogger(__name__)
@@ -73,7 +74,7 @@ async def _run_async(config):
# prepare runtime dependencies
import threading
from . import hbdclass
# from . import hbdclass
from . import http as http_mod
from . import dns as dns_mod
from . import notify as notify_mod
@@ -254,7 +255,7 @@ def load_pickled_hosts(config, hbdclass):
lastfm = ["", "", ""]
pickf.close()
except Exception as e:
print(("load pickled failed: %s" % e))
logger.exception("load pickled failed: %s", e)
os.unlink(pickfile)
hbdclass.Connection.htab = {}
for h in list(hbdclass.Host.hosts.keys()):
+5
View File
@@ -0,0 +1,5 @@
<footer>
<div id="copyright">
&copy;2002-2021 <A HREF="mailto:andreas@wrede.ca">Andreas Wrede</A> All Rights Reserved.</p>
</div>
</footer>
+7
View File
@@ -0,0 +1,7 @@
<head>
<meta http-equiv="content-type" content="text/html; charset=utf-8" />
<link rel="stylesheet" href="/static/style.css" type="text/css" />
<link rel="icon" href="/static/images/favicon.ico" sizes="32x32" />
<title>{{ title }}</title>
<script src="{{ extra_scripts }}"></script>
</head>
+281
View File
@@ -0,0 +1,281 @@
<!DOCTYPE html>
<html>
{% include 'head.html' %}
<style>
.content {
display: flex;
flex-direction: column;
}
.table {
/* flex: 1; */
flex-grow: none;
}
.log {
flex: 2;
flex-grow: 1;
}
#ntable {
border-collapse: collapse;
font-size: 95%;
/* width: 100%; */
}
#ntable td,
#ntable th {
border: 1px solid #ddd;
text-align: left;
padding: 0px;
}
#ntable tr:nth-child(even) {
background-color: #f2f2f2;
}
#ntable tr:hover {
background-color: #ddd;
}
#ntable th {
padding-top: 12px;
padding-bottom: 12px;
background-color: #9d9d9d;
color: white;
}
#ntable
th:not(.sorttable_sorted):not(.sorttable_sorted_reverse):not(.sorttable_nosort):after {
content: " \2195";
}
/* Modal for connection status messages */
.connection-modal {
display: none;
position: fixed;
z-index: 1000;
left: 0;
top: 0;
width: 100%;
height: 100%;
background-color: rgba(0, 0, 0, 0.4);
}
.connection-modal.show {
display: flex;
justify-content: center;
align-items: center;
}
.connection-modal-content {
background-color: #f9f9f9;
padding: 20px;
border: 1px solid #888;
border-radius: 5px;
text-align: center;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.2);
min-width: 300px;
}
.connection-modal-content p {
margin: 10px 0;
font-size: 16px;
color: #333;
}
</style>
<script type="text/javascript">
var cnt = 0;
var nTable = document;
var name_idx = {};
var c = 0;
function setup() {
name_idx = {};
nTable = document.getElementById("ntable");
for (var i = 0, row; (row = nTable.rows[i]); i++) {
if (i == 0) continue;
name = nTable.rows[i].cells[0].innerText;
name_idx[name] = nTable.rows[i];
/* console.log("name_Id[" + name + "]: " + name_idx[name].innerText); */
}
}
function createRow(data) {
var row = document.createElement("tr");
var c_name = document.createElement("td");
var c_ver = document.createElement("td");
var c_ipv4addr = document.createElement("td");
var c_ipv4state = document.createElement("td");
var c_ipv4latency = document.createElement("td");
c_ipv4latency.style.textAlign = "right";
var c_ipv4statets = document.createElement("td");
c_ipv4statets.style.textAlign = "right";
var c_ipv6addr = document.createElement("td");
var c_ipv6state = document.createElement("td");
var c_ipv6latency = document.createElement("td");
c_ipv6latency.style.textAlign = "right";
var c_ipv6statets = document.createElement("td");
c_ipv6statets.style.textAlign = "right";
row.appendChild(c_name);
row.appendChild(c_ver);
row.appendChild(c_ipv4addr);
row.appendChild(c_ipv4state);
row.appendChild(c_ipv4latency);
row.appendChild(c_ipv4statets);
row.appendChild(c_ipv6addr);
row.appendChild(c_ipv6state);
row.appendChild(c_ipv6latency);
row.appendChild(c_ipv6statets);
if (data.dyn) {
c_name.innerHTML = "<b>" + data.name + "</b>";
} else {
c_name.innerHTML = data.name;
}
c_ver.innerHTML = data.cver;
c_ipv4addr.innerHTML = data.connections[0].addr;
c_ipv4state.innerHTML = data.connections[0].state;
if (data.connections.length > 1) {
c_ipv6addr.innerHTML = data.connections[1].addr;
c_ipv6state.innerHTML = data.connections[1].state;
}
var table = document.getElementById("ntablebody"); // find table to append to
table.appendChild(row); // append row to table
name_idx[c_name] = row;
}
function formatTS(ts) {
const milliseconds = ts * 1000;
const dateObject = new Date(milliseconds);
return dateObject.toLocaleString("de-DE");
}
function update_table(data) {
if (!(data.name in name_idx)) {
createRow(data);
setup();
}
for (var i = 0; i < data.connections.length; i++) {
name_idx[data.name].cells[2 + i * 4].innerHTML = data.connections[i].addr;
name_idx[data.name].cells[5 + i * 4].innerHTML = formatTS(
data.connections[i].statetime
);
if (data.connections[i].state == "up") {
state = "up";
latency = Number.parseFloat(data.connections[i].rtts[0]).toFixed(2);
} else {
if (data.connections[i].state == "unknown") {
state = "";
latency = "";
name_idx[data.name].cells[2 + i * 4].innerHTML = "";
name_idx[data.name].cells[5 + i * 4].innerHTML = "";
} else {
state = "<b>" + data.connections[i].state + "</b>";
latency = "-";
}
}
name_idx[data.name].cells[3 + i * 4].innerHTML = state;
name_idx[data.name].cells[4 + i * 4].innerHTML = latency;
}
}
function WS_Connect() {
if ("WebSocket" in window) {
//N.B: subprotocol field causes chrome to error 1006
var ws_hbd = new WebSocket("{{heartbeat_ws_url}}" /*, "hdb" */);
ws_hbd.onopen = function () {
// Web Socket is connected, send data using send()
console.log("ws connect");
// Hide modal window if visible
var modal = document.getElementById("connectionModal");
if (modal) {
modal.classList.remove("show");
}
ws_hbd.send("heartbeat_web");
};
ws_hbd.onerror = function (event) {
console.log(event);
};
ws_hbd.onmessage = function (event) {
/* console.log(event.data); */
var state = JSON.parse(event.data);
/* console.log("State: " + state.type); */
if (state.type == "host") {
update_table(state.data);
} else if (state.type == "message") {
var msgs = document.getElementById("messages");
msgs.insertAdjacentHTML("afterbegin", state.data + "<br>");
}
cnt++;
};
ws_hbd.onclose = function (event) {
/* console.log(event); */
console.log("Connection is closed, reopening");
// Show modal window
var modal = document.getElementById("connectionModal");
if (modal) {
modal.classList.add("show");
}
setTimeout(function () {
WS_Connect();
}, 3000);
};
} else {
// The browser doesn't support WebSocket
console.log("WebSocket NOT supported by your Browser!");
}
}
WS_Connect();
</script>
<body>
{% include 'menu.html' %}
<div id="content" class="content" style="overflow: hidden">
<div id="table" class="table" style="overflow: hidden">
<!-- <h2>{{title}}</h2> -->
<table id="ntable" class="sortable">
<thead>
<tr>
<th>Name</th>
<th>Ver</th>
<th>IPv4 Addr</th>
<th>State</th>
<th style="text-align: right">Latencey</th>
<th style="text-align: right">Last State</th>
<th>IPv6 Addr</th>
<th>State</th>
<th style="text-align: right">Latencey</th>
<th style="text-align: right">Last State</th>
</tr>
</thead>
<tbody id="ntablebody"></tbody>
</table>
</div>
<div id="log" class="log" style="overflow: auto;">
<h2>Log of Events</h2>
<div id="messages">
</div>
</div>
</div>
{% include 'foot.html' %}
<!-- Connection status modal -->
<div id="connectionModal" class="connection-modal">
<div class="connection-modal-content">
<p>⚠️ Connection is closed, reopening...</p>
</div>
</div>
<script>
setup();
</script>
</body>
</html>
+3
View File
@@ -0,0 +1,3 @@
<label for="drawer-toggle" id="drawer-toggle-label"></label>
<header>{{ header }}</header>
+1 -1
View File
@@ -1,6 +1,6 @@
"""UDP listener and datagram processing."""
import asyncio
from compression import zlib
import zlib
import logging
logger = logging.getLogger(__name__)