diff --git a/.hb.yaml b/.hb.yaml index f4cb140..4324a67 100644 --- a/.hb.yaml +++ b/.hb.yaml @@ -64,4 +64,17 @@ thresholds: percent: warning: 8.0 critical: 90.0 - + nagios_runner: + load_status: + warning: WARNING + operator: = "=" + critical: CRITICAL + operator: = "=" + UPS_status_code: + warning: 1 + critical: 2 + operator: ">=" + rtt: + y: + warning: 0.1 + critical: 10.0 diff --git a/.hb.yaml.swp b/.hb.yaml.swp new file mode 100644 index 0000000..63c02b4 Binary files /dev/null and b/.hb.yaml.swp differ diff --git a/.vscode/launch.json b/.vscode/launch.json index f91b797..9229db8 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -9,7 +9,7 @@ "type": "debugpy", "request": "launch", "module": "hbd.server.cli", - "args": ["-c", "/home/andreas/git/heartbeat/.hb.yaml", "-f", "-v", "-x", "-x", "-x"], + "args": ["-c", "/home/andreas/git/heartbeat/.hb.yaml", "-f", "-v", "-x", "-x", "-x", "-x"], "cwd": "${workspaceFolder}", "env": { "PYTHONPATH": "${workspaceFolder}" diff --git a/README.md b/README.md index 906c575..55cc86b 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,18 @@ Heartbeat includes a sophisticated threshold alerting system that monitors plugi ```yaml thresholds: + # RTT (Round-Trip Time) thresholds for heartbeat monitoring + # These are checked on every HTB message arrival + rtt: + webserver01: + warning: 100.0 # Warn when RTT > 100ms + critical: 500.0 # Critical when RTT > 500ms + + database01: + warning: 50.0 + critical: 200.0 + + # Plugin metric thresholds cpu_monitor: cpu_percent: warning: 80.0 # Warn when CPU > 80% @@ -177,6 +189,38 @@ thresholds: threshold_renotify_interval: 3600 # Re-notify every hour for ongoing alerts ``` +### RTT Monitoring + +Heartbeat monitors network latency (Round-Trip Time) for each host's heartbeat messages. RTT thresholds are **fully integrated with the threshold alerting system**: + +- **Per-host configuration**: Set different thresholds for each monitored host +- **Real-time checking**: Thresholds evaluated on every HTB message arrival +- **Alert state tracking**: RTT alerts use the same state management as plugin metrics +- **Hysteresis support**: Configurable hysteresis prevents rapid state transitions +- **Alerts dashboard**: RTT alerts visible on the `/alerts` web page alongside plugin alerts +- **Smart notifications**: Only triggers on state changes (OK → WARNING → CRITICAL) +- **Re-notification**: Periodic reminders for ongoing RTT issues +- **Event & journal logging**: All RTT events logged for audit trail + +**Configuration format:** +```yaml +thresholds: + rtt: + : + warning: # Warn when RTT > this value + critical: # Critical when RTT > this value + hysteresis: 0.1 # Optional: 10% hysteresis (default) +``` + +**Example alerts:** +``` +WARNING: webserver01 - rtt.webserver01 = 125.3 +CRITICAL: database01 - rtt.database01 = 520.1 +RECOVERED: webserver01 - rtt.webserver01 = 45.2 (WARNING -> OK) +``` + +RTT alerts appear on the Alerts dashboard and can be filtered by severity level. The `metric_path` format is `rtt.`, making it easy to distinguish from plugin metrics. + ### Alert Behavior 1. **State Changes**: Notifications sent when crossing thresholds diff --git a/hbd/server/cli.py b/hbd/server/cli.py index 2c54bb2..0a4f0a6 100644 --- a/hbd/server/cli.py +++ b/hbd/server/cli.py @@ -43,9 +43,8 @@ def main(argv=None): config["verbose"] = True if args.pushsrv: config["pushsrv"] = args.pushsrv - if args.debug: - config.setdefault("debug", 0) - config["debug"] += args.debug + if args.debug > 0: + config["debug"] = args.debug run_server(config) diff --git a/hbd/server/data.py b/hbd/server/data.py new file mode 100644 index 0000000..99a785a --- /dev/null +++ b/hbd/server/data.py @@ -0,0 +1,12 @@ +msgs = [] # in-memory list of recent messages for new websocket clients; also logged to file via notify.eventlog +class Data: + + def __init__(self, config): + self.config = config + self.data = {} + + def update(self, new_data): + self.data.update(new_data) + + def get(self, key, default=None): + return self.data.get(key, default) \ No newline at end of file diff --git a/hbd/server/hbdclass.py b/hbd/server/hbdclass.py index bd01d97..ffb7a97 100644 --- a/hbd/server/hbdclass.py +++ b/hbd/server/hbdclass.py @@ -42,6 +42,11 @@ class Connection: self.statetime = self.lastbeat self.deltastatetime = "computed" self.state = Connection.UNKNOWN + + # Timer-based reachability monitoring + self.overdue_timer = None + self.overdue_callback = None + self.timeout_duration = None if host: Connection.htab[addr] = self.host.name @@ -49,6 +54,27 @@ class Connection: log(self.host.name, "dns update %s" % self.addr) Host.dnsQ.put((self.host.name, self.addr)) + def __getstate__(self): + """Prepare Connection for pickling by excluding non-serializable timer objects.""" + state = self.__dict__.copy() + # Remove asyncio timer objects that can't be pickled + # These will be recreated when the next HTB arrives after unpickling + state['overdue_timer'] = None + state['overdue_callback'] = None + state['timeout_duration'] = None + return state + + def __setstate__(self, state): + """Restore Connection from pickle, reinitializing timer fields.""" + self.__dict__.update(state) + # Ensure timer fields are initialized (they'll be recreated when HTB arrives) + if not hasattr(self, 'overdue_timer'): + self.overdue_timer = None + if not hasattr(self, 'overdue_callback'): + self.overdue_callback = None + if not hasattr(self, 'timeout_duration'): + self.timeout_duration = None + def registerDns(self): Host.dnsQ.put((self.host.name, self.addr)) @@ -123,7 +149,18 @@ class Connection: return d def jsons(self): - return json.dumps(self.__dict__) + """Serialize connection to JSON, excluding non-serializable timer objects.""" + data = {} + for key, value in self.__dict__.items(): + # Skip timer-related fields that can't be serialized + if key in ['overdue_timer', 'overdue_callback', 'timeout_duration']: + continue + # Handle host backpointer by converting to name + if key == 'host': + data[key] = value.name if value else None + else: + data[key] = value + return json.dumps(data) # set new state, return number of secs in previous state def newstate(self, state, now, when=0): @@ -151,10 +188,87 @@ class Connection: except Exception: pass self.addr = addr - Connection.htab[addr] = self.host.name + Connection.htab[addr] = self.host.nameconnection_count if self.host.isDynDns(): Host.dnsQ.put((self.host.name, self.addr)) return r + + def reset_overdue_timer(self, timeout_seconds, callback): + """Reset the overdue timer for this connection. + + Cancels any existing timer and sets a new one that will mark + the connection as overdue if no heartbeat arrives before timeout. + + Args: + timeout_seconds: Seconds before marking as overdue + callback: Async function to call when timer expires + """ + import asyncio + + # Cancel existing timer if any + if self.overdue_timer and not self.overdue_timer.cancelled(): + self.overdue_timer.cancel() + + # Store parameters for later reference + self.timeout_duration = timeout_seconds + self.overdue_callback = callback + + # Create new timer + async def timer_expired(): + await callback(self) + + try: + loop = asyncio.get_event_loop() + self.overdue_timer = loop.call_later(timeout_seconds, + lambda: asyncio.create_task(timer_expired())) + except RuntimeError: + # No event loop running yet + pass + + def cancel_overdue_timer(self): + """Cancel the overdue timer if it exists and clear all timer references.""" + if self.overdue_timer: + try: + if not self.overdue_timer.cancelled(): + self.overdue_timer.cancel() + except Exception: + pass + # Clear all timer-related references + self.overdue_timer = None + self.overdue_callback = None + self.timeout_duration = None + + def get_avg_rtt(self): + """Get average RTT from recent samples.""" + valid_rtts = [r for r in self.rtts if r > 0] + if valid_rtts: + return sum(valid_rtts) / len(valid_rtts) + return 0 + + def get_current_rtt(self): + """Get most recent RTT value.""" + return self.rtts[-1] if self.rtts else 0 + + def check_rtt_threshold(self, warning_threshold=None, critical_threshold=None): + """Check if RTT exceeds thresholds. + + Args: + warning_threshold: RTT in ms for warning level + critical_threshold: RTT in ms for critical level + + Returns: + Tuple of (level, rtt_value) where level is None, 'WARNING', or 'CRITICAL' + """ + rtt = self.get_current_rtt() + if rtt <= 0: + return (None, rtt) + + if critical_threshold and rtt > critical_threshold: + return ('CRITICAL', rtt) + elif warning_threshold and rtt > warning_threshold: + return ('WARNING', rtt) + + return (None, rtt) # @@ -224,14 +338,30 @@ class Host: def stateinfo(self): ddict = {} for d in self.__dict__: + if d in ["alert_states", "plugin_data"]: + continue if d == "connections": cl = [] for c in ["IPv4", "IPv6"]: if c not in self.connections: continue - # dirty ugly hack: fix conn to host backpointer - cld = copy.deepcopy(self.connections[c].__dict__) - cld["host"] = cld["host"].name + # Create connection dict, excluding non-serializable timer objects + conn = self.connections[c] + cld = {} + for key, value in conn.__dict__.items(): + # Skip timer-related fields that can't be serialized + if key in ['overdue_timer', 'overdue_callback', 'timeout_duration']: + continue + # Handle host backpointer by converting to name + if key == 'host': + cld[key] = value.name if value else None + else: + # Safe copy for serializable values + try: + cld[key] = copy.deepcopy(value) + except Exception: + # If deepcopy fails, use shallow copy + cld[key] = value cl.append(cld) ddict[d] = cl else: diff --git a/hbd/server/http.py b/hbd/server/http.py index fb96828..6638888 100644 --- a/hbd/server/http.py +++ b/hbd/server/http.py @@ -8,6 +8,7 @@ import os import logging from aiohttp import web import jinja2 +from . import data logger = logging.getLogger(__name__) @@ -22,7 +23,6 @@ async def start( port: int, config, hbdclass, - msgs_getter, log=None, email=None, pushmsg=None, @@ -52,7 +52,7 @@ async def start( res.append('') res.append(f"

Heartbeat status {VER}

") res += hbdclass.ubHost.buildhosttable() - res += hbdclass.ubHost.buildmsgtable(msgs_getter()) + res += hbdclass.ubHost.buildmsgtable(data.msgs) res.append( "

%s (%s)

" % ( @@ -69,7 +69,7 @@ async def start( return web.json_response(json.loads("[" + ",".join(lst) + "]")) async def api_messages(request): - lst = msgs_getter()[-30:] + lst = data.msgs[-30:] return web.json_response(lst) async def cmd(request): @@ -155,7 +155,7 @@ async def start( hosts=[ hbdclass.Host.hosts[h].stateinfo() for h in sorted(hbdclass.Host.hosts) ], - messages=msgs_getter()[-30:], + messages=data.msgs[-30:], ) return web.Response(text=body, content_type="text/html") diff --git a/hbd/server/main.py b/hbd/server/main.py index e7424ed..c681e16 100644 --- a/hbd/server/main.py +++ b/hbd/server/main.py @@ -14,28 +14,35 @@ from . import hbdclass from . import ws as ws_mod from . import notify as notify_mod +from . import data logger = logging.getLogger(__name__) msg_to_websockets = ws_mod.broadcast -eventlog = notify_mod.log - -lastfm = ["", "", ""] +eventlog = notify_mod.eventlog # shared runtime collections and helpers -msgs = notify_mod.msgs -def cleanup_function(config): +def cleanup_function(config, hbdclass): """This function will be executed upon program exit.""" logger.info("Running cleanup function...") import pickle + # Ensure all timer references are cleared before pickling + for hostname, host in list(hbdclass.Host.hosts.items()): + for conn_type, conn in host.connections.items(): + if hasattr(conn, 'overdue_timer'): + conn.overdue_timer = None + if hasattr(conn, 'overdue_callback'): + conn.overdue_callback = None + if hasattr(conn, 'timeout_duration'): + conn.timeout_duration = None + pickfile = config.get("pickfile", "hbd.pickle") pickf = open(pickfile, "wb") pick = pickle.Pickler(pickf) pick.dump(hbdclass.Host.hosts) - pick.dump(msgs) - pick.dump(lastfm) + pick.dump(data.msgs) pickf.close() logger.info("Cleanup complete.") @@ -124,7 +131,6 @@ async def _run_async(config): port=config.get("hbd_port", 50004), config=config, hbdclass=hbdclass, - msgs_getter=lambda: msgs, log=eventlog, pushmsg=pushmsg, msg_to_websockets=msg_to_websockets, @@ -186,7 +192,7 @@ async def _run_async(config): hbdclass.Host.hosts[h].stateinfo() for h in sorted(hbdclass.Host.hosts) ], - get_msgs=lambda: msgs, +# get_msgs=lambda: msgs, config=config, ) ) @@ -200,7 +206,6 @@ async def _run_async(config): monitor_mod.start( config=config, hbdclass=hbdclass, - log=eventlog, pushmsg=pushmsg, msg_to_websockets=msg_to_websockets, ) @@ -216,6 +221,13 @@ async def _run_async(config): except Exception as e: logger.exception("Error in main loop: %s", e) finally: + # Clean up connection timers + try: + logger.info("Cleaning up connection timers...") + await monitor_mod.cleanup_connections(hbdclass) + except Exception as e: + logger.warning("Error cleaning up connection timers: %s", e) + # Cancel all running tasks logger.info("Cancelling tasks...") try: @@ -279,7 +291,6 @@ async def _run_async(config): def load_pickled_hosts(config, hbdclass): """Load pickled hosts from file, if available.""" - global lastfm, msgs import os import pickle @@ -294,11 +305,7 @@ def load_pickled_hosts(config, hbdclass): pick = pickle.Unpickler(pickf) try: hbdclass.Host.hosts = pick.load() - msgs = pick.load() - try: - lastfm = pick.load() - except Exception: - lastfm = ["", "", ""] + data.msgs = pick.load() pickf.close() except Exception as e: logger.exception("load pickled failed: %s", e) @@ -331,7 +338,7 @@ def run(config): load_pickled_hosts(config, hbdclass) notify_mod.initlog(logfile=config.get("logfile", "messages.log")) - eventlog(None, f"hbd version {__version__} starting up") + eventlog(None, "INFO", f"hbd version {__version__} starting up") # Create and set the event loop manually loop = asyncio.new_event_loop() @@ -344,8 +351,9 @@ def run(config): except Exception as e: logger.exception("Unhandled exception in main: %s", e) finally: - cleanup_function(config) + cleanup_function(config, hbdclass) logger.info("hbd shutdown complete") + eventlog(None, "INFO", f"hbd version {__version__} shutdown") notify_mod.closelog() # Explicitly close the loop try: diff --git a/hbd/server/monitor.py b/hbd/server/monitor.py index f459689..0b69886 100644 --- a/hbd/server/monitor.py +++ b/hbd/server/monitor.py @@ -1,50 +1,66 @@ -"""monitor helper and thread for heartbeat daemon.""" +"""Monitor helper for heartbeat daemon. + +This module provides monitoring tasks for the heartbeat daemon. +The primary reachability monitoring is now event-driven (timers set/reset +on HTB arrival in udp.py) rather than periodic polling. + +This module can be extended for additional monitoring tasks. +""" from __future__ import annotations import asyncio import time +from . import notify as notify_mod DROPOVERDUE = 7 * 24 * 3600 +eventlog = notify_mod.eventlog -def checkoverdue( - config: dict, - hbdclass, - log: callable, - pushmsg: callable, - msg_to_websockets: callable, -): - now = time.time() - for h in list(hbdclass.Host.hosts.keys()): - pmsg = [] - for c in hbdclass.Host.hosts[h].connections: - conn = hbdclass.Host.hosts[h].connections[c] - if conn.state == hbdclass.Connection.DOWN: - continue - timeout = hbdclass.Host.hosts[h].interval + config.get("grace", 10) - if conn.state == hbdclass.Connection.UP and (now - conn.lastbeat) > timeout: - conn.newstate(hbdclass.Connection.OVERDUE, now, config.get("grace", 10)) - pmsg.append(conn.afam) - if ( - conn.state == hbdclass.Connection.OVERDUE - and (now - conn.lastbeat) > DROPOVERDUE - ): - conn.newstate(hbdclass.Connection.UNKNOWN, conn.lastbeat) - if pmsg != []: - if h in config.get("watchhosts", []): - pushmsg("%s %s overdue" % (h, " and ".join(pmsg))) - log(h, "%s overdue" % " and ".join(pmsg)) - msg_to_websockets("host", hbdclass.Host.hosts[h].stateinfo()) +async def cleanup_connections(hbdclass): + """Clean up connection timers on shutdown. + + Cancels all active overdue timers to prevent callbacks after shutdown. + """ + for hostname, host in list(hbdclass.Host.hosts.items()): + for conn_type, conn in host.connections.items(): + if hasattr(conn, 'cancel_overdue_timer'): + conn.cancel_overdue_timer() async def start( config: dict, hbdclass: callable, - log=None, pushmsg=None, msg_to_websockets=None, ): - """start a monitor loop that checks for overdue hosts every minute""" + """Start monitor background tasks. + + Note: Reachability monitoring is now timer-based and happens in udp.py + when HTB messages arrive. This function can be used for additional + monitoring tasks. + + Currently runs a simple status logger every 5 minutes. + """ + import logging + logger = logging.getLogger(__name__) + logger_interval = 300 # Log status every 5 minutes + while True: - await asyncio.sleep(15) # 15 seconds between checks - checkoverdue(config, hbdclass, log, pushmsg, msg_to_websockets) + await asyncio.sleep(logger_interval) + + # Log monitoring status + total_hosts = len(hbdclass.Host.hosts) + up_count = sum( + 1 for h in hbdclass.Host.hosts.values() + for c in h.connections.values() + if c.state == hbdclass.Connection.UP + ) + overdue_count = sum( + 1 for h in hbdclass.Host.hosts.values() + for c in h.connections.values() + if c.state == hbdclass.Connection.OVERDUE + ) + + logger.debug( + f"Monitor status: {total_hosts} hosts, {up_count} UP, {overdue_count} OVERDUE" + ) diff --git a/hbd/server/notify.py b/hbd/server/notify.py index d215c06..feaedb3 100644 --- a/hbd/server/notify.py +++ b/hbd/server/notify.py @@ -8,7 +8,9 @@ import subprocess import smtplib import time import sys +from . import data from . import ws as ws_mod +from . import main as main_mod DEFAULT_PUSHPROVIDERS = ["all", "pushover", "mattermost", "signal"] msg_to_websockets = ws_mod.broadcast @@ -17,19 +19,18 @@ msg_to_websockets = ws_mod.broadcast _config = {} logger = logging.getLogger(__name__) -msgs = [] logf = None def initlog(logfile): global logf try: logf = open(logfile, "a+") - return logf except Exception as e: import sys print("cannot open logfile %s, using STDERR: %s" % (logfile, e)) - return sys.stderr + logf = sys.stderr + return logf def closelog(): global logf @@ -39,10 +40,13 @@ def closelog(): except Exception: pass -def log(host, m, service=None): +def eventlog(host, lvl, m, service=None): ts = time.time() - s = f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))} {host or ''} {m}" - msgs.append(s) + s = f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))} {lvl} " + if host: + s += f"{host} " + s += m + data.msgs.append(s) logger.info(s) if logf: try: diff --git a/hbd/server/threshold.py b/hbd/server/threshold.py index f0d4e3f..0a4b891 100644 --- a/hbd/server/threshold.py +++ b/hbd/server/threshold.py @@ -16,7 +16,7 @@ from typing import Dict, Any, Optional, Tuple, Callable from . import notify as notify_mod logger = logging.getLogger(__name__) -eventlog = notify_mod.log +eventlog = notify_mod.eventlog class AlertLevel(Enum): """Alert severity levels.""" @@ -96,6 +96,8 @@ class AlertState: "notification_count": self.notification_count, } + def __str__(self): + return self.to_dict().__str__() class ThresholdConfig: """Configuration for a single threshold check.""" @@ -280,6 +282,11 @@ class ThresholdChecker: def _parse_plugin_thresholds(self, plugin_name: str, thresholds: Dict[str, Any]): """Parse thresholds for a specific plugin.""" + # Special handling for RTT thresholds (per-host) + if plugin_name == "rtt": + self._parse_rtt_thresholds(thresholds) + return + for metric_name, threshold_config in thresholds.items(): if not isinstance(threshold_config, dict): continue @@ -353,6 +360,97 @@ class ThresholdChecker: self.thresholds[metric_path] = threshold + def _parse_rtt_thresholds(self, rtt_thresholds: Dict[str, Any]): + """Parse RTT thresholds (per-host network latency thresholds). + + RTT thresholds are configured as: + thresholds: + rtt: + hostname1: + warning: 100.0 # ms + critical: 500.0 # ms + """ + for hostname, threshold_config in rtt_thresholds.items(): + if not isinstance(threshold_config, dict): + continue + + # Metric path is "rtt." + metric_path = f"rtt.{hostname}" + + warning = threshold_config.get("warning") + critical = threshold_config.get("critical") + operator = threshold_config.get("operator", ">") + hysteresis = threshold_config.get("hysteresis", 0.1) # 10% default + enabled = threshold_config.get("enabled", True) + + if warning is None and critical is None: + logger.warning("No RTT thresholds defined for %s, skipping", hostname) + continue + + threshold = ThresholdConfig( + metric_path=metric_path, + warning=warning, + critical=critical, + operator=operator, + hysteresis=hysteresis, + enabled=enabled, + ) + + self.thresholds[metric_path] = threshold + logger.debug( + "Registered RTT threshold for %s: warn=%s ms, crit=%s ms", + hostname, + warning, + critical + ) + + def check_value( + self, + host_name: str, + metric_path: str, + value: float, + alert_states: Dict[str, AlertState], + ) -> Optional[Tuple[AlertLevel, AlertLevel]]: + """ + Check a single value against configured threshold. + + Args: + host_name: Name of the host + metric_path: Full metric path (e.g., "rtt.hostname") + value: The metric value to check + alert_states: Host's alert_states dictionary + + Returns: + Tuple of (old_level, new_level) if state changed, None otherwise + """ + if metric_path not in self.thresholds: + return None + + threshold = self.thresholds[metric_path] + + # Get or create alert state + if metric_path not in alert_states: + alert_states[metric_path] = AlertState(metric_path) + + alert_state = alert_states[metric_path] + + # Evaluate threshold with hysteresis + new_level = threshold.evaluate_with_hysteresis( + value, + alert_state.level + ) + + # Update state and check for changes + old_level = alert_state.level + if alert_state.update(new_level, value): + self._trigger_notification(host_name, metric_path, old_level, new_level, value) + return (old_level, new_level) + elif new_level != AlertLevel.OK: + # Check if we should re-notify + self._check_renotify(host_name, alert_state, metric_path, value) + + return None + def check_plugin_data( self, host_name: str, @@ -476,18 +574,22 @@ class ThresholdChecker: """Trigger a notification for an alert state change.""" # Format message if new_level == AlertLevel.OK: - message = f"RECOVERED: {host_name} - {metric_path} = {value} ({old_level.name} -> OK)" + lvl = "RECOVERED" + message = f"{metric_path} = {value} ({old_level.name} -> OK)" elif new_level == AlertLevel.WARNING: - message = f"WARNING: {host_name} - {metric_path} = {value}" + lvl = "WARNING" + message = f"{metric_path} = {value}" elif new_level == AlertLevel.CRITICAL: - message = f"CRITICAL: {host_name} - {metric_path} = {value}" + lvl = "CRITICAL" + message = f"{metric_path} = {value}" else: - message = f"UNKNOWN: {host_name} - {metric_path} = {value}" + lvl = "UNKNOWN" + message = f"{metric_path} = {value}" # Send notification if self.notification_callback is not None: try: - self.notification_callback(message) + self.notification_callback(f"{lvl}: {host_name} - {message}") logger.info("Notification sent: %s", message) except Exception as e: logger.error("Failed to send notification: %s", e) @@ -507,7 +609,7 @@ class ThresholdChecker: except Exception as e: logger.debug(f"Failed to log threshold event to journal: {e}") # Log to eventlog as well - eventlog(host_name, message, service="threshold") + eventlog(host_name, lvl, message, service="threshold") def _check_renotify( self, diff --git a/hbd/server/udp.py b/hbd/server/udp.py index 8579381..1709e16 100644 --- a/hbd/server/udp.py +++ b/hbd/server/udp.py @@ -6,8 +6,10 @@ import logging from ..common.proto import stodict, oldmtodict from ..common.utils import dur +from . import notify as notify_mod logger = logging.getLogger(__name__) +eventlog = notify_mod.eventlog class EchoServerProtocol(asyncio.DatagramProtocol): @@ -170,8 +172,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): return if res: - if log: - log(uname, res) + eventlog(uname, "WARNING", res) if uname in cfg.get("watchhosts", []): if pushmsg: pushmsg("%s %s" % (host.name, res)) @@ -183,15 +184,13 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): boot = msg.get("boot", 0) if boot: - if log: - log(uname, "booted") + eventlog(uname, "INFO", "booted") if uname in cfg.get("watchhosts", []): m = "%s booted" % (host.name) if pushmsg: pushmsg(m) if message: - if log: - log(uname, "msg: %s" % message, service=service) + eventlog(uname, "INFO", "msg: %s" % message, service=service) if uname in cfg.get("watchhosts", []): if pushmsg: pushmsg(message) @@ -199,9 +198,11 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): if conn.getstate() != hbdcls.Connection.UP: lasts = conn.state d = conn.newstate(hbdcls.Connection.UP, now) - m = "%s back after being %s for %s" % (conn.afam, lasts, dur(d)) - if log: - log(uname, m) + if d == 0 or lasts == "unknown": + m = "%s is up" % (conn.afam) + else: + m = "%s back after being %s for %s" % (conn.afam, lasts, dur(d)) + eventlog(uname, "RECOVER", m) if uname in cfg.get("watchhosts", []): if pushmsg: pushmsg("%s %s is back" % (uname, conn.afam)) @@ -212,8 +213,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): host.upcount += 1 if shutdown: - if log: - log(uname, "%s shutdown" % conn.afam) + eventlog(uname, "INFO", "%s shutdown" % conn.afam) if uname in cfg.get("watchhosts", []): if pushmsg: pushmsg("%s %s shutdown" % (uname, conn.afam)) @@ -221,6 +221,61 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): if interval > 0: host.interval = interval + + # Timer-based reachability monitoring + # Reset overdue timer on every heartbeat + if interval > 0 and conn.getstate() != hbdcls.Connection.DOWN: + grace = cfg.get("grace", 2) + timeout_seconds = (interval + grace) if interval > 0 else 30 + + # Create callback for timer expiration + async def on_overdue(connection): + """Called when connection timer expires (no heartbeat received).""" + import time + now = time.time() + + # Only mark as overdue if still in UP state (not already marked) + if connection.getstate() == hbdcls.Connection.UP: + connection.newstate(hbdcls.Connection.OVERDUE, now, cfg.get("grace", 2)) + + msg = f"{connection.afam} overdue" + eventlog(uname, "CRITICAL" if uname in cfg.get("watchhosts", []) else "WARNING", msg) + + if uname in cfg.get("watchhosts", []): + if pushmsg: + pushmsg(f"{uname} {msg}") + + # Notify websockets + if msg_to_websockets: + msg_to_websockets("host", host.stateinfo()) + + # Set a longer timer for marking as UNKNOWN (7 days) + DROPOVERDUE = 7 * 24 * 3600 + + async def on_unknown(connection): + """Mark connection as unknown after extended absence.""" + connection.newstate(hbdcls.Connection.UNKNOWN, connection.lastbeat) + if msg_to_websockets: + msg_to_websockets("host", host.stateinfo()) + + connection.reset_overdue_timer(DROPOVERDUE, on_unknown) + + # Reset the timer + conn.reset_overdue_timer(timeout_seconds, on_overdue) + + # Check RTT thresholds using the threshold checker + threshold_checker = ctx.get("threshold_checker") + if threshold_checker and rtt and rtt > 0: + # Metric path for RTT is "rtt." + metric_path = f"rtt.{uname}" + + # Check against configured thresholds (handles alerts, notifications, etc.) + threshold_checker.check_value( + host_name=uname, + metric_path=metric_path, + value=rtt, + alert_states=host.alert_states + ) # send ACK back rmsg = {"time": __import__("time").time()} @@ -266,5 +321,6 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): if msg_to_websockets: try: msg_to_websockets("host", host.stateinfo()) - except Exception: - pass + except Exception as e: + if DEBUG > 0: + print(("cannot send websocket message: %s" % e)) diff --git a/hbd/server/ws.py b/hbd/server/ws.py index 3ebac21..de7f3aa 100644 --- a/hbd/server/ws.py +++ b/hbd/server/ws.py @@ -8,6 +8,7 @@ import asyncio import json import logging from typing import Callable, Iterable, Optional +from . import data import websockets @@ -16,7 +17,7 @@ logger.setLevel(logging.INFO) _connections = set() _loop: Optional[asyncio.AbstractEventLoop] = None _get_hosts: Optional[Callable[[], Iterable]] = None -_get_msgs: Optional[Callable[[], Iterable]] = None +#_get_msgs: Optional[Callable[[], Iterable]] = None _verbose = False @@ -38,11 +39,11 @@ async def _handler(websocket, path=None): except Exception as e: logger.error("Error sending initial hosts: %s", e, exc_info=True) # send recent messages - if _get_msgs: + if data.msgs: try: - msgs = list(_get_msgs())[-100:] - logger.debug("Sending %d recent messages to new WebSocket client", len(msgs)) - for m in msgs: +# msgs = list(_get_msgs())[-100:] + logger.debug("Sending %d recent messages to new WebSocket client", len(data.msgs)) + for m in data.msgs: jmsg = json.dumps({"type": "message", "data": m}) await websocket.send(jmsg) except Exception as e: @@ -77,7 +78,7 @@ async def start( wss_port: Optional[int] = None, ssl_context=None, get_hosts: Optional[Callable] = None, - get_msgs: Optional[Callable] = None, +# get_msgs: Optional[Callable] = None, config: dict = {}, ): """Start WebSocket servers and block until cancelled. @@ -86,17 +87,19 @@ async def start( If `wss_port` and `ssl_context` are provided, a WSS server will also be started. """ - global _loop, _get_hosts, _get_msgs, _verbose + global _loop, _get_hosts, _verbose _loop = asyncio.get_running_loop() _get_hosts = get_hosts - _get_msgs = get_msgs _verbose = config.get("verbose", False), - _debug = config.get("debug", False), + _debug = config.get("debug", 0), servers = [] # plain WebSocket websockets_logger = logging.getLogger("websockets.server") - websockets_logger.setLevel(logging.DEBUG if _debug > 2 else logging.INFO) + #if _debug > 2: + # websockets_logger.setLevel(logging.DEBUG) + #else: + # websockets_logger.setLevel(logging.INFO) # regular WebSocket ws_server = websockets.serve(_handler, host, ws_port) # , subprotocols=["hbd"]) servers.append(ws_server) diff --git a/hbdclass.py b/hbdclass.py- similarity index 100% rename from hbdclass.py rename to hbdclass.py-