Move threshhold to server, move eventlog to notify

This commit is contained in:
Andreas Wrede
2026-03-29 20:29:33 -04:00
parent 0543266c92
commit ad7178ebcb
7 changed files with 61 additions and 48 deletions
+1 -1
View File
@@ -299,7 +299,7 @@ async def start(
active_alerts = threshold_checker.get_active_alerts(host.alert_states)
else:
# Fallback if no threshold checker
from hbd.client.threshold import AlertLevel
from hbd.server.threshold import AlertLevel
active_alerts = [
state for state in host.alert_states.values()
if state.level != AlertLevel.OK
+12 -41
View File
@@ -13,40 +13,16 @@ from . import udp
from . import hbdclass
from . import ws as ws_mod
from . import notify as notify_mod
logger = logging.getLogger(__name__)
msg_to_websockets = ws_mod.broadcast
eventlog = notify_mod.log
logf = None
lastfm = ["", "", ""]
# shared runtime collections and helpers
msgs = []
def initlog(logfile):
try:
return open(logfile, "a+")
except Exception as e:
import sys
print("cannot open loffile %s, using STDERR: %s" % (logfile, e))
return sys.stderr
def log(host, m, service=None):
ts = time.time()
s = f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))} {host or ''} {m}"
msgs.append(s)
logger.info(s)
if logf:
try:
logf.write(s + "\n")
logf.flush()
except Exception as e:
logger.warning("failed to write to logfile: %s", e)
msg_to_websockets("message", s)
msgs = notify_mod.msgs
def cleanup_function(config):
"""This function will be executed upon program exit."""
@@ -84,7 +60,7 @@ async def _run_async(config):
from . import notify as notify_mod
from . import monitor as monitor_mod
from . import journal as journal_mod
from ..client import threshold as threshold_mod
from . import threshold as threshold_mod
notify_mod.setup(config)
@@ -125,7 +101,7 @@ async def _run_async(config):
ctx = dict(
config=config,
hbdclass=hbdclass,
log=log,
log=eventlog,
pushmsg=pushmsg,
msg_to_websockets=msg_to_websockets,
msg_journal=msg_journal,
@@ -149,7 +125,7 @@ async def _run_async(config):
config=config,
hbdclass=hbdclass,
msgs_getter=lambda: msgs,
log=log,
log=eventlog,
pushmsg=pushmsg,
msg_to_websockets=msg_to_websockets,
threshold_checker=threshold_checker,
@@ -172,7 +148,7 @@ async def _run_async(config):
dns_task = None
try:
dns_task = dns_mod.start_dns_worker(
hbdclass, config, log=log, pushmsg=pushmsg, loop=loop
hbdclass, config, log=eventlog, pushmsg=pushmsg, loop=loop
)
logger.info("dns update worker started")
except Exception as e:
@@ -211,7 +187,7 @@ async def _run_async(config):
for h in sorted(hbdclass.Host.hosts)
],
get_msgs=lambda: msgs,
verbose=config.get("verbose", False),
config=config,
)
)
logger.info("WebSocket task started")
@@ -224,7 +200,7 @@ async def _run_async(config):
monitor_mod.start(
config=config,
hbdclass=hbdclass,
log=log,
log=eventlog,
pushmsg=pushmsg,
msg_to_websockets=msg_to_websockets,
)
@@ -347,7 +323,6 @@ def run(config):
Manually manages the event loop to ensure clean shutdown.
"""
global logf
import os
logging.basicConfig(
@@ -355,8 +330,8 @@ def run(config):
)
load_pickled_hosts(config, hbdclass)
logf = initlog(logfile=config.get("logfile", "messages.log"))
log(None, f"hbd version {__version__} starting up")
notify_mod.initlog(logfile=config.get("logfile", "messages.log"))
eventlog(None, f"hbd version {__version__} starting up")
# Create and set the event loop manually
loop = asyncio.new_event_loop()
@@ -371,11 +346,7 @@ def run(config):
finally:
cleanup_function(config)
logger.info("hbd shutdown complete")
if logf and logf != sys.stderr:
try:
logf.close()
except Exception:
pass
notify_mod.closelog()
# Explicitly close the loop
try:
# Cancel all remaining tasks
+38
View File
@@ -7,13 +7,50 @@ import urllib.parse
import subprocess
import smtplib
import time
import sys
from . import ws as ws_mod
DEFAULT_PUSHPROVIDERS = ["all", "pushover", "mattermost", "signal"]
msg_to_websockets = ws_mod.broadcast
# module-level configuration set via setup()
_config = {}
logger = logging.getLogger(__name__)
msgs = []
logf = None
def initlog(logfile):
global logf
try:
logf = open(logfile, "a+")
return logf
except Exception as e:
import sys
print("cannot open logfile %s, using STDERR: %s" % (logfile, e))
return sys.stderr
def closelog():
global logf
if logf and logf != sys.stderr:
try:
logf.close()
except Exception:
pass
def log(host, m, service=None):
ts = time.time()
s = f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))} {host or ''} {m}"
msgs.append(s)
logger.info(s)
if logf:
try:
logf.write(s + "\n")
logf.flush()
except Exception as e:
logger.warning("failed to write to logfile: %s", e)
msg_to_websockets("message", s)
def setup(cfg: dict):
"""Initialize notifier defaults from a configuration dict."""
@@ -160,6 +197,7 @@ def pushmsg(cfg: dict, msg: str, debug: int = 0):
Returns a dict of results per provider.
"""
results = {}
p = cfg.get("pushsrv", "pushover")
if p in ("all", "pushover"):
+582
View File
@@ -0,0 +1,582 @@
"""
Threshold checking and alerting for plugin metrics.
This module provides a flexible threshold checking system that:
- Evaluates plugin metrics against configured warning/critical thresholds
- Tracks alert states per host and metric
- Prevents alert flapping with hysteresis
- Triggers notifications only on state changes
- Supports multiple comparison operators
"""
import logging
import time
from enum import Enum
from typing import Dict, Any, Optional, Tuple, Callable
from . import notify as notify_mod
logger = logging.getLogger(__name__)
eventlog = notify_mod.log
class AlertLevel(Enum):
"""Alert severity levels."""
OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3
class ComparisonOperator(Enum):
"""Supported comparison operators for threshold checks."""
GT = ">" # Greater than
GTE = ">=" # Greater than or equal
LT = "<" # Less than
LTE = "<=" # Less than or equal
EQ = "==" # Equal to
NEQ = "!=" # Not equal to
class AlertState:
"""Represents the current alert state for a specific metric."""
def __init__(self, metric_path: str):
"""
Initialize alert state.
Args:
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
"""
self.metric_path = metric_path
self.level = AlertLevel.OK
self.since = time.time()
self.last_value = None
self.last_check = time.time()
self.notification_count = 0
self.last_notification = None
def update(self, level: AlertLevel, value: Any) -> bool:
"""
Update alert state.
Args:
level: New alert level
value: Current metric value
Returns:
True if state changed (notification needed), False otherwise
"""
now = time.time()
self.last_check = now
self.last_value = value
# Check if state changed
if level != self.level:
logger.info(
"Alert state change for %s: %s -> %s (value: %s)",
self.metric_path,
self.level.name,
level.name,
value
)
self.level = level
self.since = now
self.notification_count = 0
return True
return False
def to_dict(self) -> dict:
"""Convert alert state to dictionary for serialization."""
return {
"metric_path": self.metric_path,
"level": self.level.name,
"since": self.since,
"last_value": self.last_value,
"last_check": self.last_check,
"notification_count": self.notification_count,
}
class ThresholdConfig:
"""Configuration for a single threshold check."""
def __init__(
self,
metric_path: str,
warning: Optional[float] = None,
critical: Optional[float] = None,
operator: str = ">",
hysteresis: float = 0.0,
enabled: bool = True,
):
"""
Initialize threshold configuration.
Args:
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
warning: Warning threshold value
critical: Critical threshold value
operator: Comparison operator (>, >=, <, <=, ==, !=)
hysteresis: Hysteresis percentage to prevent flapping (0.0-1.0)
enabled: Whether this threshold is enabled
"""
self.metric_path = metric_path
self.warning = warning
self.critical = critical
self.enabled = enabled
self.hysteresis = hysteresis
# Parse operator
try:
self.operator = ComparisonOperator(operator)
except ValueError:
logger.warning(
"Invalid operator '%s' for %s, using '>' as default",
operator,
metric_path
)
self.operator = ComparisonOperator.GT
def evaluate(self, value: float) -> AlertLevel:
"""
Evaluate a value against this threshold.
Args:
value: Metric value to check
Returns:
AlertLevel indicating the severity
"""
if not self.enabled:
return AlertLevel.OK
try:
# Convert value to float for comparison
value = float(value)
except (TypeError, ValueError):
logger.warning("Cannot convert value %s to float for %s", value, self.metric_path)
return AlertLevel.UNKNOWN
# Check critical threshold first
if self.critical is not None:
if self._compare(value, self.critical):
return AlertLevel.CRITICAL
# Then check warning threshold
if self.warning is not None:
if self._compare(value, self.warning):
return AlertLevel.WARNING
return AlertLevel.OK
def evaluate_with_hysteresis(
self,
value: float,
current_level: AlertLevel
) -> AlertLevel:
"""
Evaluate with hysteresis to prevent flapping.
Args:
value: Current metric value
current_level: Current alert level
Returns:
New alert level considering hysteresis
"""
new_level = self.evaluate(value)
# If no hysteresis, return new level
if self.hysteresis == 0.0:
return new_level
# If improving (going to a lower severity), apply hysteresis
if new_level.value < current_level.value:
# For recovery, value must be better by hysteresis amount
if current_level == AlertLevel.CRITICAL and self.critical is not None:
threshold = self.critical
elif current_level == AlertLevel.WARNING and self.warning is not None:
threshold = self.warning
else:
return new_level
# Calculate hysteresis threshold
hysteresis_amount = abs(threshold * self.hysteresis)
if self.operator in [ComparisonOperator.GT, ComparisonOperator.GTE]:
# For "greater than" thresholds, value must go below by hysteresis
recovery_threshold = threshold - hysteresis_amount
if value >= recovery_threshold:
# Not enough improvement, keep current level
return current_level
elif self.operator in [ComparisonOperator.LT, ComparisonOperator.LTE]:
# For "less than" thresholds, value must go above by hysteresis
recovery_threshold = threshold + hysteresis_amount
if value <= recovery_threshold:
# Not enough improvement, keep current level
return current_level
return new_level
def _compare(self, value: float, threshold: float) -> bool:
"""Perform comparison based on operator."""
if self.operator == ComparisonOperator.GT:
return value > threshold
elif self.operator == ComparisonOperator.GTE:
return value >= threshold
elif self.operator == ComparisonOperator.LT:
return value < threshold
elif self.operator == ComparisonOperator.LTE:
return value <= threshold
elif self.operator == ComparisonOperator.EQ:
return abs(value - threshold) < 1e-9 # Float comparison
elif self.operator == ComparisonOperator.NEQ:
return abs(value - threshold) >= 1e-9
return False
class ThresholdChecker:
"""Main threshold checking and alerting system."""
def __init__(
self,
config: Dict[str, Any],
notification_callback: Optional[Callable] = None,
renotify_interval: int = 3600,
journal: Optional[Any] = None,
):
"""
Initialize threshold checker.
Args:
config: Threshold configuration dictionary from YAML
notification_callback: Function to call for notifications
renotify_interval: Seconds between repeat notifications (default: 1 hour)
journal: Optional MessageJournal instance for logging threshold events
"""
self.thresholds = {} # {metric_path: ThresholdConfig}
self.notification_callback = notification_callback
self.renotify_interval = renotify_interval
self.journal = journal
# Parse configuration
self._parse_config(config)
logger.info("ThresholdChecker initialized with %d thresholds", len(self.thresholds))
def _parse_config(self, config: Dict[str, Any]):
"""Parse threshold configuration from YAML structure."""
if not config or "thresholds" not in config:
logger.info("No thresholds configured")
return
thresholds_config = config["thresholds"]
for plugin_name, plugin_thresholds in thresholds_config.items():
if not isinstance(plugin_thresholds, dict):
continue
self._parse_plugin_thresholds(plugin_name, plugin_thresholds)
def _parse_plugin_thresholds(self, plugin_name: str, thresholds: Dict[str, Any]):
"""Parse thresholds for a specific plugin."""
for metric_name, threshold_config in thresholds.items():
if not isinstance(threshold_config, dict):
continue
# Handle nested metrics (e.g., partitions./.percent)
if metric_name == "partitions":
self._parse_partition_thresholds(plugin_name, threshold_config)
continue
metric_path = f"{plugin_name}.{metric_name}"
# Extract threshold values
warning = threshold_config.get("warning")
critical = threshold_config.get("critical")
operator = threshold_config.get("operator", ">")
hysteresis = threshold_config.get("hysteresis", 0.1) # 10% default
enabled = threshold_config.get("enabled", True)
if warning is None and critical is None:
logger.warning("No thresholds defined for %s, skipping", metric_path)
continue
threshold = ThresholdConfig(
metric_path=metric_path,
warning=warning,
critical=critical,
operator=operator,
hysteresis=hysteresis,
enabled=enabled,
)
self.thresholds[metric_path] = threshold
logger.debug(
"Registered threshold for %s: warn=%s, crit=%s, op=%s",
metric_path,
warning,
critical,
operator
)
def _parse_partition_thresholds(self, plugin_name: str, partitions: Dict[str, Any]):
"""Parse partition-specific thresholds for disk monitoring."""
for partition, metrics in partitions.items():
if not isinstance(metrics, dict):
continue
for metric_name, threshold_config in metrics.items():
if not isinstance(threshold_config, dict):
continue
# Create metric path like "disk_monitor./dev/sda1.percent"
metric_path = f"{plugin_name}.{partition}.{metric_name}"
warning = threshold_config.get("warning")
critical = threshold_config.get("critical")
operator = threshold_config.get("operator", ">")
hysteresis = threshold_config.get("hysteresis", 0.1)
enabled = threshold_config.get("enabled", True)
if warning is None and critical is None:
continue
threshold = ThresholdConfig(
metric_path=metric_path,
warning=warning,
critical=critical,
operator=operator,
hysteresis=hysteresis,
enabled=enabled,
)
self.thresholds[metric_path] = threshold
def check_plugin_data(
self,
host_name: str,
plugin_name: str,
data: Dict[str, Any],
alert_states: Dict[str, AlertState],
) -> list:
"""
Check plugin data against configured thresholds.
Args:
host_name: Name of the host
plugin_name: Name of the plugin
data: Plugin data dictionary
alert_states: Host's alert_states dictionary
Returns:
List of (metric_path, old_level, new_level, value) tuples for state changes
"""
state_changes = []
# Check flat metrics
for metric_name, value in data.items():
metric_path = f"{plugin_name}.{metric_name}"
if metric_path not in self.thresholds:
continue
threshold = self.thresholds[metric_path]
# Get or create alert state
if metric_path not in alert_states:
alert_states[metric_path] = AlertState(metric_path)
alert_state = alert_states[metric_path]
# Evaluate threshold with hysteresis
new_level = threshold.evaluate_with_hysteresis(
value,
alert_state.level
)
# Update state and check for changes
old_level = alert_state.level
if alert_state.update(new_level, value):
state_changes.append((metric_path, old_level, new_level, value))
self._trigger_notification(host_name, metric_path, old_level, new_level, value)
elif new_level != AlertLevel.OK:
# Check if we should re-notify
self._check_renotify(host_name, alert_state, metric_path, value)
# Check nested metrics (e.g., partition data in disk_monitor)
self._check_nested_metrics(
host_name,
plugin_name,
data,
alert_states,
state_changes
)
return state_changes
def _check_nested_metrics(
self,
host_name: str,
plugin_name: str,
data: Dict[str, Any],
alert_states: Dict[str, AlertState],
state_changes: list,
):
"""Check nested metrics like partition-specific thresholds."""
# Look for partition data in disk_monitor
if plugin_name == "disk_monitor" and "partitions" in data:
partitions = data["partitions"]
if not isinstance(partitions, dict):
return
for partition, metrics in partitions.items():
if not isinstance(metrics, dict):
continue
for metric_name, value in metrics.items():
metric_path = f"{plugin_name}.{partition}.{metric_name}"
if metric_path not in self.thresholds:
continue
threshold = self.thresholds[metric_path]
if metric_path not in alert_states:
alert_states[metric_path] = AlertState(metric_path)
alert_state = alert_states[metric_path]
new_level = threshold.evaluate_with_hysteresis(
value,
alert_state.level
)
old_level = alert_state.level
if alert_state.update(new_level, value):
state_changes.append((metric_path, old_level, new_level, value))
self._trigger_notification(
host_name,
metric_path,
old_level,
new_level,
value
)
elif new_level != AlertLevel.OK:
self._check_renotify(host_name, alert_state, metric_path, value)
def _trigger_notification(
self,
host_name: str,
metric_path: str,
old_level: AlertLevel,
new_level: AlertLevel,
value: Any,
):
"""Trigger a notification for an alert state change."""
# Format message
if new_level == AlertLevel.OK:
message = f"RECOVERED: {host_name} - {metric_path} = {value} ({old_level.name} -> OK)"
elif new_level == AlertLevel.WARNING:
message = f"WARNING: {host_name} - {metric_path} = {value}"
elif new_level == AlertLevel.CRITICAL:
message = f"CRITICAL: {host_name} - {metric_path} = {value}"
else:
message = f"UNKNOWN: {host_name} - {metric_path} = {value}"
# Send notification
if self.notification_callback is not None:
try:
self.notification_callback(message)
logger.info("Notification sent: %s", message)
except Exception as e:
logger.error("Failed to send notification: %s", e)
# Log to journal
if self.journal is not None:
try:
import asyncio
loop = asyncio.get_event_loop()
loop.create_task(self.journal.log_threshold_event(
host_name=host_name,
metric_path=metric_path,
old_level=old_level.name,
new_level=new_level.name,
value=value,
))
except Exception as e:
logger.debug(f"Failed to log threshold event to journal: {e}")
# Log to eventlog as well
eventlog(host_name, message, service="threshold")
def _check_renotify(
self,
host_name: str,
alert_state: AlertState,
metric_path: str,
value: Any,
):
"""Check if we should send a repeat notification."""
if alert_state.level == AlertLevel.OK:
return
now = time.time()
# Check if we should re-notify
if alert_state.last_notification is None:
# First notification already sent during state change
alert_state.last_notification = now
alert_state.notification_count = 1
return
if (now - alert_state.last_notification) >= self.renotify_interval:
# Time to re-notify
message = f"REMINDER ({alert_state.level.name}): {host_name} - {metric_path} = {value} (ongoing for {int(now - alert_state.since)}s)"
if self.notification_callback:
try:
self.notification_callback(message)
alert_state.last_notification = now
alert_state.notification_count += 1
logger.info("Re-notification sent: %s", message)
except Exception as e:
logger.error("Failed to send re-notification: %s", e)
def get_active_alerts(self, alert_states: Dict[str, AlertState]) -> list:
"""
Get all currently active (non-OK) alerts.
Args:
alert_states: Host's alert_states dictionary
Returns:
List of AlertState objects that are not OK
"""
return [
state for state in alert_states.values()
if state.level != AlertLevel.OK
]
def get_alert_summary(self, alert_states: Dict[str, AlertState]) -> Dict[str, int]:
"""
Get summary counts of alert levels.
Args:
alert_states: Host's alert_states dictionary
Returns:
Dictionary with counts: {"ok": N, "warning": N, "critical": N}
"""
summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0}
for state in alert_states.values():
if state.level == AlertLevel.OK:
summary["ok"] += 1
elif state.level == AlertLevel.WARNING:
summary["warning"] += 1
elif state.level == AlertLevel.CRITICAL:
summary["critical"] += 1
elif state.level == AlertLevel.UNKNOWN:
summary["unknown"] += 1
return summary
+5 -4
View File
@@ -12,7 +12,7 @@ from typing import Callable, Iterable, Optional
import websockets
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
_connections = set()
_loop: Optional[asyncio.AbstractEventLoop] = None
_get_hosts: Optional[Callable[[], Iterable]] = None
@@ -78,7 +78,7 @@ async def start(
ssl_context=None,
get_hosts: Optional[Callable] = None,
get_msgs: Optional[Callable] = None,
verbose: bool = False,
config: dict = {},
):
"""Start WebSocket servers and block until cancelled.
@@ -90,12 +90,13 @@ async def start(
_loop = asyncio.get_running_loop()
_get_hosts = get_hosts
_get_msgs = get_msgs
_verbose = verbose
_verbose = config.get("verbose", False),
_debug = config.get("debug", False),
servers = []
# plain WebSocket
websockets_logger = logging.getLogger("websockets.server")
websockets_logger.setLevel(logging.DEBUG if verbose else logging.INFO)
websockets_logger.setLevel(logging.DEBUG if _debug > 2 else logging.INFO)
# regular WebSocket
ws_server = websockets.serve(_handler, host, ws_port) # , subprotocols=["hbd"])
servers.append(ws_server)