a76d0fc840
- threshold.py: add _find_threshold() with suffix fallback so thresholds like ping_monitor.rtt_avg match ping_monitor.8_8_8_8_rtt_avg etc.; each pinged host keeps its own alert state - hbdclass.py: format RTT as integer ms (round()) - live.html: JS RTT display rounded to nearest ms (Math.round) Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
639 lines
21 KiB
Python
639 lines
21 KiB
Python
"""
|
|
host and connection class shared between hbd and
|
|
the websit's heartbeat.py
|
|
|
|
"""
|
|
|
|
import time
|
|
import json
|
|
import copy
|
|
import queue
|
|
|
|
num = 0
|
|
|
|
MAXRTTS = 10
|
|
|
|
DEBUG = 2
|
|
|
|
|
|
def log(host, m):
|
|
if DEBUG:
|
|
print("class log: %s %s" % (host, m))
|
|
|
|
|
|
class Connection:
|
|
# map of addrs to names
|
|
|
|
htab = {}
|
|
UNKNOWN = "unknown"
|
|
UP = "up"
|
|
DOWN = "down"
|
|
OVERDUE = "overdue"
|
|
|
|
def __init__(self, host, cid, addr, afam):
|
|
self.host = host
|
|
self.cid = cid
|
|
if addr[0:7] == "::ffff:":
|
|
addr = addr[7:]
|
|
self.addr = addr
|
|
self.afam = afam
|
|
self.rtts = [0]
|
|
self.lastbeat = time.time()
|
|
self.statetime = self.lastbeat
|
|
self.deltastatetime = "computed"
|
|
self.state = Connection.UNKNOWN
|
|
|
|
# Timer-based reachability monitoring
|
|
self.overdue_timer = None
|
|
self.overdue_callback = None
|
|
self.timeout_duration = None
|
|
|
|
if host:
|
|
Connection.htab[addr] = self.host.name
|
|
if self.host.isDynDns():
|
|
log(self.host.name, "dns update %s" % self.addr)
|
|
Host.dnsQ.put((self.host.name, self.addr))
|
|
|
|
def __getstate__(self):
|
|
"""Prepare Connection for pickling by excluding non-serializable timer objects."""
|
|
state = self.__dict__.copy()
|
|
# Remove asyncio timer objects that can't be pickled
|
|
# These will be recreated when the next HTB arrives after unpickling
|
|
state['overdue_timer'] = None
|
|
state['overdue_callback'] = None
|
|
state['timeout_duration'] = None
|
|
return state
|
|
|
|
def __setstate__(self, state):
|
|
"""Restore Connection from pickle, reinitializing timer fields."""
|
|
self.__dict__.update(state)
|
|
# Ensure timer fields are initialized (they'll be recreated when HTB arrives)
|
|
if not hasattr(self, 'overdue_timer'):
|
|
self.overdue_timer = None
|
|
if not hasattr(self, 'overdue_callback'):
|
|
self.overdue_callback = None
|
|
if not hasattr(self, 'timeout_duration'):
|
|
self.timeout_duration = None
|
|
|
|
def registerDns(self):
|
|
Host.dnsQ.put((self.host.name, self.addr))
|
|
|
|
def clearstate(self):
|
|
d = {}
|
|
d["addr"] = ""
|
|
d["rtt"] = ""
|
|
d["lastbeat"] = ""
|
|
d["state"] = ""
|
|
d["statetime"] = ""
|
|
d["deltastatetime"] = ""
|
|
d["rttstate"] = ""
|
|
return d
|
|
|
|
def statedict(self, Null=False):
|
|
d = self.clearstate()
|
|
now = time.time()
|
|
if not Null:
|
|
d["addr"] = self.addr
|
|
if self.rtts[-1]:
|
|
d["rtt"] = "%d" % round(self.rtts[-1])
|
|
elif self.state == Connection.UNKNOWN:
|
|
d["rtt"] = ""
|
|
else:
|
|
d["rtt"] = "?"
|
|
d["lastbeat"] = self.lastbeat
|
|
if self.state == Connection.OVERDUE:
|
|
d["state"] = "<b>%s</b>" % self.state
|
|
else:
|
|
d["state"] = self.state
|
|
if self.state == Connection.UP:
|
|
d["rttstate"] = d["rtt"]
|
|
elif self.state == Connection.OVERDUE:
|
|
d["rttstate"] = ""
|
|
else:
|
|
d["rttstate"] = d["state"]
|
|
d["statetime"] = time.strftime(
|
|
"%Y-%m-%d %H:%M:%S", time.localtime(self.statetime)
|
|
)
|
|
delta = now - self.statetime
|
|
|
|
if self.state == Connection.UNKNOWN:
|
|
d["deltastatetime"] = ""
|
|
elif delta > 86400:
|
|
# d['deltastatetime'] = time.strftime("%d %H:%M:%S", time.gmtime(delta))
|
|
d["deltastatetime"] = "%0.1f days" % (delta / 86400.0)
|
|
elif delta > 3600:
|
|
# d['deltastatetime'] = time.strftime("%H:%M:%S", time.gmtime(delta))
|
|
d["deltastatetime"] = time.strftime("%k:%M hrs", time.gmtime(delta))
|
|
# d['deltastatetime'] = "%0.1f hrs" % (delta / 3600.)
|
|
elif delta > 60:
|
|
# d['deltastatetime'] = time.strftime("%M:%S", time.gmtime(delta))
|
|
d["deltastatetime"] = time.strftime("%M:%S mins", time.gmtime(delta))
|
|
# d['deltastatetime'] = "%0.1f mins" % (delta / 60.)
|
|
else:
|
|
# d['deltastatetime'] = time.strftime("%S", time.gmtime(delta))
|
|
d["deltastatetime"] = "%i secs" % (delta)
|
|
if self.state == Connection.UNKNOWN and now - self.lastbeat > 86400 * 10:
|
|
d = self.clearstate()
|
|
|
|
return d
|
|
|
|
def headerdict(self, afam):
|
|
d = {}
|
|
d["addr"] = "%s Addr" % afam
|
|
d["rtt"] = "Latencey"
|
|
d["lastbeat"] = "Last Contact"
|
|
d["state"] = "State"
|
|
d["statetime"] = "Last State"
|
|
d["rttstate"] = "Reach"
|
|
d["deltastatetime"] = "Last State"
|
|
return d
|
|
|
|
def jsons(self):
|
|
"""Serialize connection to JSON, excluding non-serializable timer objects."""
|
|
data = {}
|
|
for key, value in self.__dict__.items():
|
|
# Skip timer-related fields that can't be serialized
|
|
if key in ['overdue_timer', 'overdue_callback', 'timeout_duration']:
|
|
continue
|
|
# Handle host backpointer by converting to name
|
|
if key == 'host':
|
|
data[key] = value.name if value else None
|
|
else:
|
|
data[key] = value
|
|
return json.dumps(data)
|
|
|
|
# set new state, return number of secs in previous state
|
|
def newstate(self, state, now, when=0):
|
|
self.state = state
|
|
delta = now - when
|
|
s = delta - self.statetime
|
|
self.statetime = delta
|
|
return s
|
|
|
|
def getstate(self):
|
|
return self.state
|
|
|
|
def newaddr(self, addr, rtt, now):
|
|
self.lastbeat = now
|
|
if rtt is not None:
|
|
self.rtts.append(rtt)
|
|
if len(self.rtts) > MAXRTTS:
|
|
del self.rtts[0]
|
|
|
|
if self.addr == addr:
|
|
r = None
|
|
else:
|
|
r = "changed from %s to %s" % (self.addr, addr)
|
|
try:
|
|
del Connection.htab[self.addr]
|
|
except Exception:
|
|
pass
|
|
self.addr = addr
|
|
Connection.htab[addr] = self.host.name
|
|
if self.host.isDynDns():
|
|
Host.dnsQ.put((self.host.name, self.addr))
|
|
return r
|
|
|
|
def reset_overdue_timer(self, timeout_seconds, callback):
|
|
"""Reset the overdue timer for this connection.
|
|
|
|
Cancels any existing timer and sets a new one that will mark
|
|
the connection as overdue if no heartbeat arrives before timeout.
|
|
|
|
Args:
|
|
timeout_seconds: Seconds before marking as overdue
|
|
callback: Async function to call when timer expires
|
|
"""
|
|
import asyncio
|
|
|
|
# Cancel existing timer if any
|
|
if self.overdue_timer and not self.overdue_timer.cancelled():
|
|
self.overdue_timer.cancel()
|
|
|
|
# Store parameters for later reference
|
|
self.timeout_duration = timeout_seconds
|
|
self.overdue_callback = callback
|
|
|
|
# Create new timer
|
|
async def timer_expired():
|
|
await callback(self)
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
self.overdue_timer = loop.call_later(timeout_seconds,
|
|
lambda: asyncio.create_task(timer_expired()))
|
|
except RuntimeError:
|
|
# No event loop running yet
|
|
pass
|
|
|
|
def cancel_overdue_timer(self):
|
|
"""Cancel the overdue timer if it exists and clear all timer references."""
|
|
if self.overdue_timer:
|
|
try:
|
|
if not self.overdue_timer.cancelled():
|
|
self.overdue_timer.cancel()
|
|
except Exception:
|
|
pass
|
|
# Clear all timer-related references
|
|
self.overdue_timer = None
|
|
self.overdue_callback = None
|
|
self.timeout_duration = None
|
|
|
|
def get_avg_rtt(self):
|
|
"""Get average RTT from recent samples."""
|
|
valid_rtts = [r for r in self.rtts if r > 0]
|
|
if valid_rtts:
|
|
return sum(valid_rtts) / len(valid_rtts)
|
|
return 0
|
|
|
|
def get_current_rtt(self):
|
|
"""Get most recent RTT value."""
|
|
return self.rtts[-1] if self.rtts else 0
|
|
|
|
def check_rtt_threshold(self, warning_threshold=None, critical_threshold=None):
|
|
"""Check if RTT exceeds thresholds.
|
|
|
|
Args:
|
|
warning_threshold: RTT in ms for warning level
|
|
critical_threshold: RTT in ms for critical level
|
|
|
|
Returns:
|
|
Tuple of (level, rtt_value) where level is None, 'WARNING', or 'CRITICAL'
|
|
"""
|
|
rtt = self.get_current_rtt()
|
|
if rtt <= 0:
|
|
return (None, rtt)
|
|
|
|
if critical_threshold and rtt > critical_threshold:
|
|
return ('CRITICAL', rtt)
|
|
elif warning_threshold and rtt > warning_threshold:
|
|
return ('WARNING', rtt)
|
|
|
|
return (None, rtt)
|
|
|
|
|
|
#
|
|
class Host:
|
|
# Table of Hosts
|
|
hosts = {}
|
|
dnsQ = queue.Queue()
|
|
|
|
def __init__(self, name):
|
|
global num
|
|
self.name = name
|
|
if name:
|
|
num += 1
|
|
Host.hosts[name] = self
|
|
self.num = num
|
|
self.dyn = False
|
|
self.watched = True
|
|
self.upcount = 0
|
|
self.interval = 0
|
|
self.doesack = -1
|
|
self.cmds = []
|
|
self.connections = {}
|
|
# Plugin data storage: {plugin_name: [(timestamp, data), ...]}
|
|
self.plugin_data = {}
|
|
self.plugin_retention = 100 # Keep last N samples per plugin
|
|
# Alert state tracking: {metric_path: AlertState}
|
|
self.alert_states = {}
|
|
# User access control
|
|
self.owner: str | None = None # username of owner
|
|
self.managers: list = [] # usernames with manager role
|
|
self.monitors: list = [] # usernames with monitor role
|
|
|
|
def statedict(self):
|
|
d = {}
|
|
d["raw_name"] = self.name
|
|
d["name"] = self.name
|
|
if self.dyn:
|
|
d["name"] += "*"
|
|
if self.watched:
|
|
d["name"] = "<b>%s</b>" % d["name"]
|
|
d["dyn"] = str(self.dyn)
|
|
d["num"] = self.num
|
|
|
|
# Add alert counts (split by acknowledged status)
|
|
warning_unacked = 0
|
|
warning_acked = 0
|
|
critical_unacked = 0
|
|
critical_acked = 0
|
|
for metric_path, alert_state in self.alert_states.items():
|
|
# Import AlertLevel here to avoid circular imports
|
|
from .threshold import AlertLevel
|
|
if alert_state.level == AlertLevel.WARNING:
|
|
if alert_state.acknowledged:
|
|
warning_acked += 1
|
|
else:
|
|
warning_unacked += 1
|
|
elif alert_state.level == AlertLevel.CRITICAL:
|
|
if alert_state.acknowledged:
|
|
critical_acked += 1
|
|
else:
|
|
critical_unacked += 1
|
|
|
|
d["alert_warning_unacked"] = warning_unacked
|
|
d["alert_warning_acked"] = warning_acked
|
|
d["alert_critical_unacked"] = critical_unacked
|
|
d["alert_critical_acked"] = critical_acked
|
|
|
|
for c in ["IPv4", "IPv6"]:
|
|
if c in self.connections:
|
|
cs = self.connections[c].statedict()
|
|
else:
|
|
cs = ubConnection.statedict(True)
|
|
for csv in cs:
|
|
d["%s.%s" % (c, csv)] = cs[csv]
|
|
|
|
return d
|
|
|
|
def headerdict(self):
|
|
d = {}
|
|
d["name"] = "Name"
|
|
d["dyn"] = "Dyn"
|
|
d["num"] = "??"
|
|
for c in ["IPv4", "IPv6"]:
|
|
cs = ubConnection.headerdict(c)
|
|
for csv in cs:
|
|
d["%s.%s" % (c, csv)] = cs[csv]
|
|
return d
|
|
|
|
def registerDns(self):
|
|
for af in self.connections:
|
|
self.connections[af].registerDns()
|
|
|
|
def stateinfo(self):
|
|
ddict = {}
|
|
for d in self.__dict__:
|
|
if d in ["alert_states", "plugin_data"]:
|
|
continue
|
|
if d == "connections":
|
|
cl = []
|
|
for c in ["IPv4", "IPv6"]:
|
|
if c not in self.connections:
|
|
continue
|
|
# Create connection dict, excluding non-serializable timer objects
|
|
conn = self.connections[c]
|
|
cld = {}
|
|
for key, value in conn.__dict__.items():
|
|
# Skip timer-related fields that can't be serialized
|
|
if key in ['overdue_timer', 'overdue_callback', 'timeout_duration']:
|
|
continue
|
|
# Handle host backpointer by converting to name
|
|
if key == 'host':
|
|
cld[key] = value.name if value else None
|
|
else:
|
|
# Safe copy for serializable values
|
|
try:
|
|
cld[key] = copy.deepcopy(value)
|
|
except Exception:
|
|
# If deepcopy fails, use shallow copy
|
|
cld[key] = value
|
|
cl.append(cld)
|
|
ddict[d] = cl
|
|
else:
|
|
ddict[d] = self.__dict__[d]
|
|
|
|
# Add alert counts (computed from alert_states)
|
|
warning_unacked = 0
|
|
warning_acked = 0
|
|
critical_unacked = 0
|
|
critical_acked = 0
|
|
if hasattr(self, 'alert_states'):
|
|
from .threshold import AlertLevel
|
|
for metric_path, alert_state in self.alert_states.items():
|
|
if alert_state.level == AlertLevel.WARNING:
|
|
if alert_state.acknowledged:
|
|
warning_acked += 1
|
|
else:
|
|
warning_unacked += 1
|
|
elif alert_state.level == AlertLevel.CRITICAL:
|
|
if alert_state.acknowledged:
|
|
critical_acked += 1
|
|
else:
|
|
critical_unacked += 1
|
|
|
|
ddict["alert_warning_unacked"] = warning_unacked
|
|
ddict["alert_warning_acked"] = warning_acked
|
|
ddict["alert_critical_unacked"] = critical_unacked
|
|
ddict["alert_critical_acked"] = critical_acked
|
|
|
|
# User access
|
|
ddict["owner"] = getattr(self, "owner", None)
|
|
ddict["managers"] = list(getattr(self, "managers", []))
|
|
ddict["monitors"] = list(getattr(self, "monitors", []))
|
|
|
|
# hbc version from latest os_info plugin data
|
|
hbc_version = None
|
|
latest_os = self.get_latest_plugin_data("os_info")
|
|
if latest_os:
|
|
_, os_data = latest_os
|
|
hbc_version = os_data.get("hbc_version")
|
|
ddict["hbc_version"] = hbc_version
|
|
|
|
return ddict
|
|
|
|
def jsons(self):
|
|
return json.dumps(self.stateinfo())
|
|
|
|
def isDynDns(self):
|
|
return self.dyn
|
|
|
|
def isIPv4(self, addr):
|
|
if isinstance(addr, tuple):
|
|
return addr[0].find(".") > 0
|
|
else:
|
|
return addr.find(".") > 0
|
|
|
|
def conndata(self, cid, addr, rtt, now):
|
|
if addr[0:7] == "::ffff:":
|
|
addr = addr[7:]
|
|
if self.isIPv4(addr):
|
|
afam = "IPv4"
|
|
else:
|
|
afam = "IPv6"
|
|
|
|
if afam not in self.connections:
|
|
self.connections[afam] = Connection(self, cid, addr, afam)
|
|
|
|
conn = self.connections[afam]
|
|
res = conn.newaddr(addr, rtt, now)
|
|
return conn, res
|
|
|
|
# called when reloading class from pickle, add new fields here
|
|
def fixup(self):
|
|
for c in ["IPv4", "IPv6"]:
|
|
if c in self.connections:
|
|
addr = self.connections[c].addr
|
|
if addr[0:7] == "::ffff:":
|
|
addr = addr[7:]
|
|
self.connections[c].addr = addr
|
|
|
|
# Add plugin_data if missing (for backward compatibility)
|
|
if not hasattr(self, "plugin_data"):
|
|
self.plugin_data = {}
|
|
if not hasattr(self, "plugin_retention"):
|
|
self.plugin_retention = 100
|
|
if not hasattr(self, "alert_states"):
|
|
self.alert_states = {}
|
|
# User access fields (added in user-management feature)
|
|
if not hasattr(self, "owner"):
|
|
self.owner = None
|
|
if not hasattr(self, "managers"):
|
|
self.managers = []
|
|
if not hasattr(self, "monitors"):
|
|
self.monitors = []
|
|
|
|
pass
|
|
|
|
def add_plugin_data(self, plugin_name, data, timestamp=None):
|
|
"""Store plugin data with timestamp.
|
|
|
|
Args:
|
|
plugin_name: Name of the plugin (e.g., "cpu_monitor")
|
|
data: Dict of plugin data
|
|
timestamp: Optional timestamp (default: current time)
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = time.time()
|
|
|
|
if plugin_name not in self.plugin_data:
|
|
self.plugin_data[plugin_name] = []
|
|
|
|
# Add new data
|
|
self.plugin_data[plugin_name].append((timestamp, data))
|
|
|
|
# Enforce retention limit (keep last N samples)
|
|
if len(self.plugin_data[plugin_name]) > self.plugin_retention:
|
|
self.plugin_data[plugin_name] = self.plugin_data[plugin_name][-self.plugin_retention:]
|
|
|
|
def get_plugin_data(self, plugin_name, limit=None):
|
|
"""Retrieve plugin data for a specific plugin.
|
|
|
|
Args:
|
|
plugin_name: Name of the plugin
|
|
limit: Optional limit on number of recent samples to return
|
|
|
|
Returns:
|
|
List of (timestamp, data) tuples, most recent last
|
|
"""
|
|
data = self.plugin_data.get(plugin_name, [])
|
|
if limit and len(data) > limit:
|
|
return data[-limit:]
|
|
return data
|
|
|
|
def get_latest_plugin_data(self, plugin_name):
|
|
"""Get the most recent plugin data for a plugin.
|
|
|
|
Args:
|
|
plugin_name: Name of the plugin
|
|
|
|
Returns:
|
|
(timestamp, data) tuple or None if no data
|
|
"""
|
|
data = self.plugin_data.get(plugin_name, [])
|
|
return data[-1] if data else None
|
|
|
|
def get_all_plugin_data(self):
|
|
"""Get all plugin data for this host.
|
|
|
|
Returns:
|
|
Dict of {plugin_name: [(timestamp, data), ...]}
|
|
"""
|
|
return self.plugin_data
|
|
|
|
# ------------------------------------------------------------------
|
|
# User-role helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def apply_access(self, owner, managers, monitors):
|
|
"""Set owner/managers/monitors on this host (called from config load)."""
|
|
self.owner = owner
|
|
self.managers = list(managers)
|
|
self.monitors = list(monitors)
|
|
|
|
def is_owner(self, username: str) -> bool:
|
|
return self.owner == username
|
|
|
|
def is_manager(self, username: str) -> bool:
|
|
return username in self.managers or self.is_owner(username)
|
|
|
|
def is_monitor(self, username: str) -> bool:
|
|
return username in self.monitors or self.is_manager(username)
|
|
|
|
def access_dict(self) -> dict:
|
|
return {
|
|
"owner": self.owner,
|
|
"managers": list(self.managers),
|
|
"monitors": list(self.monitors),
|
|
}
|
|
|
|
hostfields_long = [
|
|
"name",
|
|
"IPv4.addr",
|
|
"IPv4.state",
|
|
("IPv4.rtt", 'style="text-align: right;"'),
|
|
("IPv4.statetime", 'style="text-align: right;"'),
|
|
"IPv6.addr",
|
|
"IPv6.state",
|
|
("IPv6.rtt", 'style="text-align: right;"'),
|
|
("IPv6.statetime", 'style="text-align: right;"'),
|
|
]
|
|
|
|
hostfields_short = [
|
|
"name",
|
|
("IPv4.rttstate", 'style="text-align: right;"'),
|
|
("IPv4.deltastatetime", 'style="text-align: right;"'),
|
|
("IPv6.rttstate", 'style="text-align: right;"'),
|
|
("IPv6.deltastatetime", 'style="text-align: right;"'),
|
|
]
|
|
|
|
def gene(self, tag, v, attrib=None):
|
|
if attrib:
|
|
a = " %s" % attrib
|
|
else:
|
|
a = ""
|
|
return "<%s%s>%s</%s>" % (tag, a, v, tag)
|
|
|
|
def htmltable(self, tag, hd, short):
|
|
if short:
|
|
hostfields = Host.hostfields_short
|
|
else:
|
|
hostfields = Host.hostfields_long
|
|
h = []
|
|
for f in hostfields:
|
|
if isinstance(f, tuple):
|
|
h.append(self.gene(tag, hd[f[0]], f[1]))
|
|
else:
|
|
h.append(self.gene(tag, hd[f]))
|
|
return self.gene("tr", "\n".join(h))
|
|
|
|
def buildhosttable(self, short=False):
|
|
if DEBUG > 1:
|
|
print("DBG buildhosttable: start")
|
|
res = []
|
|
res.append('<table id="ntable" class="sortable">')
|
|
res.append(ubHost.htmltable("th", ubHost.headerdict(), short))
|
|
hosts_sorted = list(Host.hosts.keys())
|
|
if len(hosts_sorted):
|
|
hosts_sorted.sort()
|
|
for h in hosts_sorted:
|
|
res.append(ubHost.htmltable("td", Host.hosts[h].statedict(), short))
|
|
res.append("</table>")
|
|
if DEBUG > 1:
|
|
print("DBG buildhosttable: %s" % res)
|
|
return res
|
|
|
|
def buildmsgtable(self, msgs):
|
|
res = []
|
|
le = max(40 - len(Host.hosts), 3)
|
|
res.append("<h4>Log of Events</h4>")
|
|
for m in msgs[len(msgs) - le :]:
|
|
res.append("%s<BR>" % m)
|
|
return res
|
|
|
|
|
|
# create fake "unbound objects", remove in Python 3.0
|
|
ubHost = Host(None)
|
|
ubConnection = Connection(None, "", "", "")
|