Files
heartbeat/hbd/server/threshold.py
T
Andreas Wrede 3da6976b53 fix: don't purge connectivity/rtt alerts in purge_stale_alerts
These entries are set by the connection state machine, not by threshold
config, so they have no threshold entry and were being deleted on every
startup. Guard them explicitly so overdue/down alerts survive the purge.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-06 14:45:47 -04:00

1614 lines
66 KiB
Python

"""
Threshold checking and alerting for plugin metrics.
This module provides a flexible threshold checking system that:
- Evaluates plugin metrics against configured warning/critical thresholds
- Tracks alert states per host and metric
- Prevents alert flapping with hysteresis
- Triggers notifications only on state changes
- Supports multiple comparison operators
"""
import asyncio
import logging
import time
from enum import Enum
from typing import Dict, List, Any, Optional, Tuple, Callable
from . import notify as notify_mod
from .config import THRESHOLD_DEFAULTS
logger = logging.getLogger(__name__)
eventlog = notify_mod.eventlog
class AlertLevel(Enum):
"""Alert severity levels."""
OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3
class ComparisonOperator(Enum):
"""Supported comparison operators for threshold checks."""
GT = ">" # Greater than
GTE = ">=" # Greater than or equal
LT = "<" # Less than
LTE = "<=" # Less than or equal
EQ = "==" # Equal to
NEQ = "!=" # Not equal to
NAGIOS = "nagios" # Nagios exit-code semantics: 0=OK 1=WARNING 2=CRITICAL 3=UNKNOWN
class AlertState:
"""Represents the current alert state for a specific metric."""
def __init__(self, metric_path: str):
"""
Initialize alert state.
Args:
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
"""
self.metric_path = metric_path
self.level = AlertLevel.OK
self.since = time.time()
self.last_value = None
self.last_check = time.time()
self.notification_count = 0
self.last_notification = None
self.threshold_value = None # The threshold value that triggered alert
self.operator = None # The comparison operator (>, <, >=, etc.)
self.hysteresis: Optional[float] = None # Hysteresis fraction used for recovery
self.formatted_message = None # Formatted display message for UI
self.acknowledged = False # Whether alert has been acknowledged
self.acknowledged_at = None # Timestamp when acknowledged
self.consecutive_count = 0 # Consecutive exceedances while still OK (for count gating)
self.pending_since: Optional[float] = None # non-None while waiting out grace period before notifying
def update(
self,
level: AlertLevel,
value: Any,
threshold_value: Optional[float] = None,
operator: Optional[str] = None
) -> bool:
"""
Update alert state.
Args:
level: New alert level
value: Current metric value
threshold_value: The threshold value that was exceeded (if applicable)
operator: The comparison operator (>, <, >=, etc.)
Returns:
True if state changed (notification needed), False otherwise
"""
now = time.time()
self.last_check = now
self.last_value = value
# Update threshold info when alert is active
if level != AlertLevel.OK:
self.threshold_value = threshold_value
self.operator = operator
else:
# Clear threshold info when returning to OK
self.threshold_value = None
self.operator = None
# Check if state changed
if level != self.level:
logger.info(
"Alert state change for %s: %s -> %s (value: %s)",
self.metric_path,
self.level.name,
level.name,
value
)
self.level = level
self.since = now
self.notification_count = 0
self.last_notification = None # restart reminder interval on level change
# Reset acknowledgment on state change
if level != AlertLevel.OK:
# Only reset if changing to a different alert level
self.acknowledged = False
self.acknowledged_at = None
return True
return False
def to_dict(self) -> dict:
"""Convert alert state to dictionary for serialization."""
import math
# Helper to sanitize numeric values for JSON (handle inf/nan)
def sanitize_value(val):
if isinstance(val, float):
if math.isinf(val):
return "overdue"
if math.isnan(val):
return None
return val
result = {
"metric_path": self.metric_path,
"level": self.level.name,
"since": self.since,
"last_value": sanitize_value(self.last_value),
"last_check": self.last_check,
"notification_count": self.notification_count,
"acknowledged": self.acknowledged,
}
# Include acknowledgment timestamp if acknowledged
if self.acknowledged_at is not None:
result["acknowledged_at"] = self.acknowledged_at
# Include threshold info if available
if self.threshold_value is not None:
result["threshold_value"] = sanitize_value(self.threshold_value)
if self.operator is not None:
result["operator"] = self.operator
if self.formatted_message is not None:
result["formatted_message"] = self.formatted_message
# Compute and expose the recovery threshold so the UI can display it
if (self.hysteresis and self.threshold_value is not None
and self.operator is not None):
ha = abs(self.threshold_value * self.hysteresis)
if self.operator in ('>', '>='):
result["recovery_threshold"] = round(self.threshold_value - ha, 4)
elif self.operator in ('<', '<='):
result["recovery_threshold"] = round(self.threshold_value + ha, 4)
return result
def __setstate__(self, state):
"""Restore from pickle, backfilling fields added after the pickle was written."""
self.__dict__.update(state)
if not hasattr(self, 'consecutive_count'):
self.consecutive_count = 0
if not hasattr(self, 'hysteresis'):
self.hysteresis = None
def acknowledge(self):
"""Acknowledge this alert to stop reminder notifications."""
self.acknowledged = True
self.acknowledged_at = time.time()
logger.info("Alert acknowledged for %s", self.metric_path)
def __str__(self):
return self.to_dict().__str__()
class ThresholdConfig:
"""Configuration for a single threshold check."""
def __init__(
self,
metric_path: str,
warning: Optional[float] = None,
critical: Optional[float] = None,
display: Optional[str] = None,
operator: str = ">",
hysteresis: float = 0.0,
enabled: bool = True,
count: int = 1,
grace: Optional[float] = None,
):
"""
Initialize threshold configuration.
Args:
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
warning: Warning threshold value
critical: Critical threshold value
operator: Comparison operator (>, >=, <, <=, ==, !=)
hysteresis: Hysteresis percentage to prevent flapping (0.0-1.0)
enabled: Whether this threshold is enabled
count: Number of consecutive exceedances required before alerting (default 1)
grace: Per-metric grace period in seconds; overrides global grace when set
"""
self.metric_path = metric_path
self.warning = warning
self.critical = critical
self.enabled = enabled
self.hysteresis = hysteresis
self.display = display
self.count = max(1, int(count))
self.grace = float(grace) if grace is not None else None
# Parse operator
try:
self.operator = ComparisonOperator(operator)
except ValueError:
logger.warning(
"Invalid operator '%s' for %s, using '>' as default",
operator,
metric_path
)
self.operator = ComparisonOperator.GT
def evaluate(self, value: float) -> AlertLevel:
"""
Evaluate a value against this threshold.
Args:
value: Metric value to check
Returns:
AlertLevel indicating the severity
"""
if not self.enabled:
return AlertLevel.OK
# Nagios exit-code semantics: value IS the severity
if self.operator == ComparisonOperator.NAGIOS:
try:
code = int(value)
except (TypeError, ValueError):
return AlertLevel.UNKNOWN
return {0: AlertLevel.OK, 1: AlertLevel.WARNING, 2: AlertLevel.CRITICAL}.get(
code, AlertLevel.UNKNOWN
)
try:
# Convert value to float for comparison
value = float(value)
except (TypeError, ValueError):
logger.warning("Cannot convert value %s to float for %s", value, self.metric_path)
return AlertLevel.UNKNOWN
# Check critical threshold first
if self.critical is not None:
if self._compare(value, self.critical):
return AlertLevel.CRITICAL
# Then check warning threshold
if self.warning is not None:
if self._compare(value, self.warning):
return AlertLevel.WARNING
return AlertLevel.OK
def evaluate_with_hysteresis(
self,
value: float,
current_level: AlertLevel
) -> AlertLevel:
"""
Evaluate with hysteresis to prevent flapping.
Args:
value: Current metric value
current_level: Current alert level
Returns:
New alert level considering hysteresis
"""
new_level = self.evaluate(value)
# Nagios exit codes are discrete integers — hysteresis doesn't apply
if self.operator == ComparisonOperator.NAGIOS:
return new_level
# If no hysteresis, return new level
if self.hysteresis == 0.0:
return new_level
# If improving (going to a lower severity), apply hysteresis
if new_level.value < current_level.value:
# For recovery, value must be better by hysteresis amount
if current_level == AlertLevel.CRITICAL and self.critical is not None:
threshold = self.critical
elif current_level == AlertLevel.WARNING and self.warning is not None:
threshold = self.warning
else:
return new_level
# Calculate hysteresis threshold
hysteresis_amount = abs(threshold * self.hysteresis)
if self.operator in [ComparisonOperator.GT, ComparisonOperator.GTE]:
# For "greater than" thresholds, value must go below by hysteresis
recovery_threshold = threshold - hysteresis_amount
if value >= recovery_threshold:
# Not enough improvement, keep current level
return current_level
elif self.operator in [ComparisonOperator.LT, ComparisonOperator.LTE]:
# For "less than" thresholds, value must go above by hysteresis
recovery_threshold = threshold + hysteresis_amount
if value <= recovery_threshold:
# Not enough improvement, keep current level
return current_level
return new_level
def _compare(self, value: float, threshold: float) -> bool:
"""Perform comparison based on operator."""
if self.operator == ComparisonOperator.GT:
return value > threshold
elif self.operator == ComparisonOperator.GTE:
return value >= threshold
elif self.operator == ComparisonOperator.LT:
return value < threshold
elif self.operator == ComparisonOperator.LTE:
return value <= threshold
elif self.operator == ComparisonOperator.EQ:
return abs(value - threshold) < 1e-9 # Float comparison
elif self.operator == ComparisonOperator.NEQ:
return abs(value - threshold) >= 1e-9
return False
class ThresholdChecker:
"""Main threshold checking and alerting system."""
def __init__(
self,
config: Dict[str, Any],
renotify_interval: int = 3600,
journal: Optional[Any] = None,
):
"""
Initialize threshold checker.
Args:
config: Threshold configuration dictionary from YAML
renotify_interval: Seconds between repeat notifications (default: 1 hour)
journal: Optional MessageJournal instance for logging threshold events
"""
# Named threshold configurations (pre-merged: defaults + overrides): {config_name: {metric_path: ThresholdConfig}}
self.threshold_configs = {}
# Raw overrides only for each named config (no defaults baked in): {config_name: {metric_path: ThresholdConfig}}
self.threshold_raw_configs: Dict[str, Dict[str, ThresholdConfig]] = {}
# Single threshold set for backward compatibility: {metric_path: ThresholdConfig}
self.thresholds = {}
# Host to ordered list of config names: {host_name: [config_name, ...]}
self.host_config_mapping: Dict[str, List[str]] = {}
# Default config name to use when no mapping exists
self.default_config = "default"
self.renotify_interval = renotify_interval
self.grace_seconds: float = float(config.get("grace", 2))
self.journal = journal
# Parse configuration
self._parse_config(config)
total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values())
if total_thresholds == 0 and len(self.thresholds) > 0:
# Backward compatibility: using single threshold set
total_thresholds = len(self.thresholds)
logger.info("ThresholdChecker initialized with %d thresholds (legacy format)", total_thresholds)
else:
logger.info(
"ThresholdChecker initialized with %d named configurations (%d total thresholds)",
len(self.threshold_configs),
total_thresholds
)
def reload(self, config: Dict[str, Any]):
"""Reload threshold configuration from new config dict.
This clears all existing thresholds and re-parses from the new configuration.
Alert states are preserved to maintain hysteresis across reloads.
Args:
config: New configuration dictionary
"""
logger.info("Reloading threshold configuration...")
# Clear old configuration
self.threshold_configs.clear()
self.threshold_raw_configs.clear()
self.thresholds.clear()
self.host_config_mapping.clear()
self.grace_seconds = float(config.get("grace", 2))
# Parse new configuration
self._parse_config(config)
total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values())
if total_thresholds == 0 and len(self.thresholds) > 0:
total_thresholds = len(self.thresholds)
logger.info("Threshold configuration reloaded: %d total thresholds", total_thresholds)
def _parse_config(self, config: Dict[str, Any]):
"""Parse threshold configuration from YAML structure.
Supports two formats:
1. Legacy format with direct 'thresholds' section
2. New format with 'threshold_configs' and 'host_threshold_mapping'
In all cases, THRESHOLD_DEFAULTS are seeded into threshold_configs["default"]
so the Settings page always shows the built-in defaults.
_parse_multi_config() overwrites this with the fully-merged effective defaults.
"""
# Always expose built-in defaults through threshold_configs["default"] so
# the Settings page has something to display even in legacy/no-config mode.
seed: Dict[str, ThresholdConfig] = {}
for plugin_name, plugin_thresholds in THRESHOLD_DEFAULTS.get("thresholds", {}).items():
if isinstance(plugin_thresholds, dict):
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=seed)
if seed:
self.threshold_configs["default"] = seed
self.threshold_raw_configs["default"] = {}
# Check for new multi-config format
if "threshold_configs" in config:
self._parse_multi_config(config) # overwrites threshold_configs["default"]
elif "thresholds" in config:
# Legacy single threshold configuration
self._parse_legacy_config(config)
else:
logger.info("No thresholds configured")
def _parse_multi_config(self, config: Dict[str, Any]):
"""Parse multiple named threshold configurations."""
threshold_configs = config.get("threshold_configs", {})
if not threshold_configs:
logger.info("No threshold configurations defined")
return
# Build effective_defaults: THRESHOLD_DEFAULTS merged with the 'default' config (if present).
# All other configs inherit any metric not explicitly defined from effective_defaults.
effective_defaults: Dict[str, ThresholdConfig] = {}
for plugin_name, plugin_thresholds in THRESHOLD_DEFAULTS.get("thresholds", {}).items():
if isinstance(plugin_thresholds, dict):
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults)
if "default" in threshold_configs:
default_data = threshold_configs["default"]
if isinstance(default_data, dict) and "thresholds" in default_data:
for plugin_name, plugin_thresholds in default_data["thresholds"].items():
if isinstance(plugin_thresholds, dict):
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults)
self.threshold_configs["default"] = dict(effective_defaults)
self.threshold_raw_configs["default"] = {}
logger.info("Registered 'default' threshold config with %d metrics", len(effective_defaults))
# Parse each named configuration
for config_name, config_data in threshold_configs.items():
if config_name == "default":
continue # already handled above
if not isinstance(config_data, dict):
logger.warning("Invalid threshold config '%s', skipping", config_name)
continue
if "thresholds" not in config_data:
logger.warning("No thresholds in config '%s', skipping", config_name)
continue
logger.info("Parsing threshold configuration: %s", config_name)
# Raw overrides only (used for multi-config layering)
raw_overrides: Dict[str, ThresholdConfig] = {}
thresholds_config = config_data["thresholds"]
for plugin_name, plugin_thresholds in thresholds_config.items():
if not isinstance(plugin_thresholds, dict):
continue
plugin_enabled = plugin_thresholds.get('enabled', plugin_thresholds.get('enable', True))
if not plugin_enabled:
# raw_overrides is empty at this point so there's nothing to delete.
# Instead, inject disabled stubs for every matching effective_default so
# the merge step overwrites the inherited defaults.
for key, tc in effective_defaults.items():
if key.startswith(f"{plugin_name}."):
raw_overrides[key] = ThresholdConfig(
metric_path=key,
warning=tc.warning,
critical=tc.critical,
operator=tc.operator.value,
enabled=False,
)
logger.info(
"Plugin-level disable in config '%s': disabled all thresholds for %s",
config_name, plugin_name,
)
else:
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=raw_overrides)
self.threshold_raw_configs[config_name] = raw_overrides
# Pre-merged version (defaults + overrides) for single-config fast path
self.threshold_configs[config_name] = dict(effective_defaults)
self.threshold_configs[config_name].update(raw_overrides)
# Parse host → config list mapping from two possible sources
def _normalise(value) -> List[str]:
"""Accept a string or list; always return a list."""
if isinstance(value, list):
return [str(v) for v in value]
return [str(value)]
# 1. hosts section with threshold_config attribute (string or list)
if "hosts" in config:
hosts_config = config["hosts"]
if isinstance(hosts_config, dict):
for host_name, host_attrs in hosts_config.items():
if isinstance(host_attrs, dict) and "threshold_config" in host_attrs:
self.host_config_mapping[host_name] = _normalise(host_attrs["threshold_config"])
# 2. Legacy host_threshold_mapping section (string values only)
if "host_threshold_mapping" in config:
legacy_mapping = config.get("host_threshold_mapping", {})
if isinstance(legacy_mapping, dict):
for host_name, value in legacy_mapping.items():
self.host_config_mapping[host_name] = _normalise(value)
# Set default config (first one alphabetically or explicitly set)
self.default_config = config.get("default_threshold_config", "default")
if self.default_config not in self.threshold_configs and self.threshold_configs:
# Use first available config as default
self.default_config = sorted(self.threshold_configs.keys())[0]
logger.info("Using '%s' as default threshold config", self.default_config)
logger.info(
"Loaded %d threshold configurations with %d host mappings",
len(self.threshold_configs),
len(self.host_config_mapping)
)
def _parse_legacy_config(self, config: Dict[str, Any]):
"""Parse legacy single threshold configuration for backward compatibility."""
if not config or "thresholds" not in config:
logger.info("No thresholds configured")
return
thresholds_config = config["thresholds"]
for plugin_name, plugin_thresholds in thresholds_config.items():
if not isinstance(plugin_thresholds, dict):
continue
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=self.thresholds)
def _parse_plugin_thresholds(
self,
plugin_name: str,
thresholds: Dict[str, Any],
target_dict: Optional[Dict[str, ThresholdConfig]] = None
):
"""Parse thresholds for a specific plugin.
Args:
plugin_name: Name of the plugin
thresholds: Threshold configuration dictionary
target_dict: Dictionary to store parsed thresholds (defaults to self.thresholds)
"""
if target_dict is None:
target_dict = self.thresholds
# Special handling for RTT thresholds (per-host)
if plugin_name == "rtt":
self._parse_rtt_thresholds(thresholds, target_dict)
return
# Plugin-level enabled: false (also accept 'enable' as a common typo) removes all
# thresholds for this plugin — e.g. memory_monitor: {enabled: false}.
plugin_enabled = thresholds.get('enabled', thresholds.get('enable', True))
if not plugin_enabled:
for key in [k for k in target_dict if k.startswith(f"{plugin_name}.")]:
del target_dict[key]
logger.info("Plugin-level disable: removed all thresholds for %s", plugin_name)
return
for metric_name, threshold_config in thresholds.items():
if not isinstance(threshold_config, dict):
continue
# Handle nested metrics (e.g., partitions./.percent or pools.*.status)
if metric_name == "partitions":
self._parse_partition_thresholds(plugin_name, threshold_config, target_dict)
continue
if metric_name == "pools":
self._parse_pool_thresholds(plugin_name, threshold_config, target_dict)
continue
metric_path = f"{plugin_name}.{metric_name}"
# Extract threshold values
warning = threshold_config.get("warning")
critical = threshold_config.get("critical")
operator = threshold_config.get("operator", ">")
# Nagios operator maps exit codes directly; no numeric thresholds needed
is_nagios_op = (operator == "nagios")
default_display = "{check_name}: {output}" if is_nagios_op else "(threshold: {op_symbol} {threshold_value})"
display = threshold_config.get("display", default_display)
hysteresis = threshold_config.get("hysteresis", 0.0 if is_nagios_op else 0.02)
enabled = threshold_config.get("enabled", True)
grace = threshold_config.get("grace", None)
if warning is None and critical is None and not is_nagios_op:
logger.warning("No thresholds defined for %s, skipping", metric_path)
continue
threshold = ThresholdConfig(
metric_path=metric_path,
warning=warning,
critical=critical,
operator=operator,
hysteresis=hysteresis,
enabled=enabled,
display=display,
grace=grace,
)
target_dict[metric_path] = threshold
logger.debug(
"Registered threshold for %s: warn=%s, crit=%s, op=%s",
metric_path,
warning,
critical,
operator
)
def _parse_partition_thresholds(
self,
plugin_name: str,
partitions: Dict[str, Any],
target_dict: Optional[Dict[str, ThresholdConfig]] = None
):
"""Parse partition-specific thresholds for disk monitoring.
Args:
plugin_name: Name of the plugin
partitions: Partition threshold configuration
target_dict: Dictionary to store parsed thresholds
"""
if target_dict is None:
target_dict = self.thresholds
for partition, metrics in partitions.items():
if not isinstance(metrics, dict):
continue
for metric_name, threshold_config in metrics.items():
if not isinstance(threshold_config, dict):
continue
# Create metric path like "disk_monitor./dev/sda1.percent"
metric_path = f"{plugin_name}.{partition}.{metric_name}"
warning = threshold_config.get("warning")
critical = threshold_config.get("critical")
operator = threshold_config.get("operator", ">")
hysteresis = threshold_config.get("hysteresis", 0.1)
enabled = threshold_config.get("enabled", True)
display = threshold_config.get("display")
grace = threshold_config.get("grace", None)
if warning is None and critical is None:
continue
threshold = ThresholdConfig(
metric_path=metric_path,
warning=warning,
critical=critical,
operator=operator,
hysteresis=hysteresis,
enabled=enabled,
display=display,
grace=grace,
)
target_dict[metric_path] = threshold
def _parse_pool_thresholds(
self,
plugin_name: str,
pools: Dict[str, Any],
target_dict: Optional[Dict[str, ThresholdConfig]] = None,
):
"""Parse ZFS pool thresholds. Pool names may be literal or '*' (all pools).
Config shape::
zfs_monitor:
pools:
'*':
status:
warning: 1
critical: 2
operator: '>'
tank:
capacity:
warning: 80
critical: 90
"""
if target_dict is None:
target_dict = self.thresholds
for pool_name, metrics in pools.items():
if not isinstance(metrics, dict):
continue
for metric_name, threshold_config in metrics.items():
if not isinstance(threshold_config, dict):
continue
metric_path = f"{plugin_name}.{pool_name}.{metric_name}"
warning = threshold_config.get("warning")
critical = threshold_config.get("critical")
operator = threshold_config.get("operator", ">")
hysteresis = threshold_config.get("hysteresis", 0.02)
enabled = threshold_config.get("enabled", True)
display = threshold_config.get("display")
grace = threshold_config.get("grace", None)
if warning is None and critical is None:
continue
target_dict[metric_path] = ThresholdConfig(
metric_path=metric_path,
warning=warning,
critical=critical,
operator=operator,
hysteresis=hysteresis,
enabled=enabled,
display=display,
grace=grace,
)
def _parse_rtt_thresholds(
self,
rtt_thresholds: Dict[str, Any],
target_dict: Optional[Dict[str, ThresholdConfig]] = None
):
"""Parse RTT thresholds (network latency thresholds).
RTT thresholds are configured as:
thresholds:
rtt:
warning: 100.0 # ms
critical: 500.0 # ms
Args:
rtt_thresholds: RTT threshold configuration
target_dict: Dictionary to store parsed thresholds
"""
if target_dict is None:
target_dict = self.thresholds
if not isinstance(rtt_thresholds, dict):
return
# Metric path is simply "rtt" (not per-host)
metric_path = "rtt"
warning = rtt_thresholds.get("warning")
critical = rtt_thresholds.get("critical")
operator = rtt_thresholds.get("operator", ">")
hysteresis = rtt_thresholds.get("hysteresis", 0.02) # 2% default
enabled = rtt_thresholds.get("enabled", True)
display = rtt_thresholds.get("display")
count = rtt_thresholds.get("count", 1)
grace = rtt_thresholds.get("grace", None)
if warning is None and critical is None:
logger.warning("No RTT thresholds defined, skipping")
return
threshold = ThresholdConfig(
metric_path=metric_path,
warning=warning,
critical=critical,
operator=operator,
hysteresis=hysteresis,
enabled=enabled,
display=display,
count=count,
grace=grace,
)
target_dict[metric_path] = threshold
logger.debug(
"Registered RTT threshold: warn=%s ms, crit=%s ms, count=%d",
warning,
critical,
count,
)
def get_thresholds_for_host(self, host_name: str) -> Dict[str, ThresholdConfig]:
"""Get the effective threshold configuration for a host.
When threshold_config is a list, configs are applied left-to-right on top
of the default thresholds so earlier entries can be overridden by later ones.
Args:
host_name: Name of the host
Returns:
Dictionary of thresholds for this host
"""
# Legacy mode: single threshold set for all hosts
if self.thresholds and not self.threshold_configs:
return self.thresholds
if not self.threshold_configs:
return {}
config_names = self.host_config_mapping.get(host_name)
# No host-specific mapping → return pre-merged default
if not config_names:
return self.threshold_configs.get(self.default_config, {})
# Single config → fast path using pre-merged copy
if len(config_names) == 1:
name = config_names[0]
if name in self.threshold_configs:
return self.threshold_configs[name]
logger.warning(
"Threshold config '%s' not found for host '%s', using default '%s'",
name, host_name, self.default_config,
)
return self.threshold_configs.get(self.default_config, {})
# Multiple configs → start from defaults, layer raw overrides in order
result = dict(self.threshold_configs.get(self.default_config, {}))
for name in config_names:
if name == self.default_config:
continue # defaults already the base
raw = self.threshold_raw_configs.get(name)
if raw is None:
logger.warning(
"Threshold config '%s' not found for host '%s', skipping",
name, host_name,
)
else:
result.update(raw)
return result
def check_value(
self,
host_name: str,
metric_path: str,
value: float,
alert_states: Dict[str, AlertState],
) -> Optional[Tuple[AlertLevel, AlertLevel]]:
"""
Check a single value against configured threshold.
Args:
host_name: Name of the host
metric_path: Full metric path (e.g., "rtt.hostname")
value: The metric value to check
alert_states: Host's alert_states dictionary
Returns:
Tuple of (old_level, new_level) if state changed, None otherwise
"""
# Get host-specific thresholds
thresholds = self.get_thresholds_for_host(host_name)
if metric_path not in thresholds:
return None
threshold = thresholds[metric_path]
# Get or create alert state
if metric_path not in alert_states:
alert_states[metric_path] = AlertState(metric_path)
alert_state = alert_states[metric_path]
# Evaluate threshold with hysteresis
new_level = threshold.evaluate_with_hysteresis(
value,
alert_state.level
)
# Apply consecutive-count gating: when currently OK, require threshold.count
# consecutive exceedances before escalating to WARNING/CRITICAL.
if new_level == AlertLevel.OK:
# Value is fine (or recovered) — reset the pending counter immediately.
alert_state.consecutive_count = 0
elif alert_state.level == AlertLevel.OK and new_level != AlertLevel.OK:
# First time we exceed while still OK: count up.
alert_state.consecutive_count += 1
if alert_state.consecutive_count < threshold.count:
logger.debug(
"RTT threshold exceeded %d/%d consecutive times for %s on %s",
alert_state.consecutive_count,
threshold.count,
metric_path,
host_name,
)
return None
# Count reached — fire the alert and reset the counter.
alert_state.consecutive_count = 0
# Determine which threshold was exceeded
threshold_value = None
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
threshold_value = threshold.critical
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
threshold_value = threshold.warning
# Keep hysteresis on the state so the UI can show the recovery threshold
if new_level != AlertLevel.OK:
alert_state.hysteresis = threshold.hysteresis
else:
alert_state.hysteresis = None
# Update state and check for changes
old_level = alert_state.level
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, None)
return (old_level, new_level)
elif new_level != AlertLevel.OK:
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, None)
return None
def _find_threshold(
self, thresholds: Dict[str, "ThresholdConfig"], metric_path: str
) -> Tuple[Optional["ThresholdConfig"], Optional[str]]:
"""Return (threshold, check_name) for *metric_path*, falling back to suffix matches.
Allows generic thresholds like ``nagios_runner.status_code`` to match
fully-qualified paths like ``nagios_runner.check_disk_root_status_code``.
The exact match is always tried first; then successive leading
underscore-delimited segments are stripped from the field name until
a match is found or no segments remain.
Returns:
(ThresholdConfig, None) for an exact match.
(ThresholdConfig, "check_disk_root") for a suffix match — the second
element is the stripped prefix, available as ``{check_name}`` in
display format templates.
(None, None) when no threshold is found.
"""
if metric_path in thresholds:
return thresholds[metric_path], None
plugin, sep, field = metric_path.partition(".")
if not sep:
return None, None
parts = field.split("_")
for i in range(1, len(parts)):
candidate = plugin + "." + "_".join(parts[i:])
if candidate in thresholds:
return thresholds[candidate], "_".join(parts[:i])
return None, None
def check_plugin_data(
self,
host_name: str,
plugin_name: str,
data: Dict[str, Any],
alert_states: Dict[str, AlertState],
) -> list:
"""
Check plugin data against configured thresholds.
Args:
host_name: Name of the host
plugin_name: Name of the plugin
data: Plugin data dictionary
alert_states: Host's alert_states dictionary
Returns:
List of (metric_path, old_level, new_level, value) tuples for state changes
"""
state_changes = []
# Get host-specific thresholds
thresholds = self.get_thresholds_for_host(host_name)
# Check flat metrics
for metric_name, value in data.items():
metric_path = f"{plugin_name}.{metric_name}"
threshold, check_name = self._find_threshold(thresholds, metric_path)
if threshold is None:
continue
# Get or create alert state
if metric_path not in alert_states:
alert_states[metric_path] = AlertState(metric_path)
alert_state = alert_states[metric_path]
# Evaluate threshold with hysteresis
new_level = threshold.evaluate_with_hysteresis(
value,
alert_state.level
)
# Determine which threshold was exceeded
threshold_value = None
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
threshold_value = threshold.critical
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
threshold_value = threshold.warning
alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None
# Update state and check for changes
old_level = alert_state.level
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
state_changes.append((metric_path, old_level, new_level, value))
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data, check_name=check_name, metric_name=metric_name)
elif new_level != AlertLevel.OK:
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data, check_name=check_name, metric_name=metric_name)
# Check nested metrics (e.g., partition data in disk_monitor)
self._check_nested_metrics(
host_name,
plugin_name,
data,
alert_states,
state_changes
)
return state_changes
def _check_nested_metrics(
self,
host_name: str,
plugin_name: str,
data: Dict[str, Any],
alert_states: Dict[str, AlertState],
state_changes: list,
):
"""Check nested metrics like partition-specific thresholds."""
# Get host-specific thresholds
thresholds = self.get_thresholds_for_host(host_name)
# ZFS pool health checks
if plugin_name == "zfs_monitor" and "pools" in data:
pools = data["pools"]
if isinstance(pools, dict):
for pool_name, pool_metrics in pools.items():
if not isinstance(pool_metrics, dict):
continue
# Synthesize status from health string for older clients
# that predate the status field.
pool_metrics_effective = dict(pool_metrics)
if "health" in pool_metrics and "status" not in pool_metrics:
pool_metrics_effective["status"] = 0 if pool_metrics["health"] == "ONLINE" else 1
for metric_name, value in pool_metrics_effective.items():
# Try specific pool name first, then wildcard '*'
metric_path = f"{plugin_name}.{pool_name}.{metric_name}"
wildcard_path = f"{plugin_name}.*.{metric_name}"
threshold = thresholds.get(metric_path) or thresholds.get(wildcard_path)
if threshold is None:
continue
if metric_path not in alert_states:
alert_states[metric_path] = AlertState(metric_path)
alert_state = alert_states[metric_path]
new_level = threshold.evaluate_with_hysteresis(value, alert_state.level)
threshold_value = None
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
threshold_value = threshold.critical
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
threshold_value = threshold.warning
alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None
pool_context = dict(pool_metrics_effective)
pool_context["pool_name"] = pool_name
old_level = alert_state.level
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
state_changes.append((metric_path, old_level, new_level, value))
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, pool_context, metric_name=pool_name)
elif new_level != AlertLevel.OK:
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, pool_context, metric_name=pool_name)
# Look for partition data in disk_monitor
if plugin_name == "disk_monitor" and "partitions" in data:
partitions = data["partitions"]
if not isinstance(partitions, dict):
return
for partition, metrics in partitions.items():
if not isinstance(metrics, dict):
continue
for metric_name, value in metrics.items():
metric_path = f"{plugin_name}.{partition}.{metric_name}"
if metric_path not in thresholds:
continue
threshold = thresholds[metric_path]
if metric_path not in alert_states:
alert_states[metric_path] = AlertState(metric_path)
alert_state = alert_states[metric_path]
new_level = threshold.evaluate_with_hysteresis(
value,
alert_state.level
)
# Determine which threshold was exceeded
threshold_value = None
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
threshold_value = threshold.critical
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
threshold_value = threshold.warning
alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None
old_level = alert_state.level
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
state_changes.append((metric_path, old_level, new_level, value))
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data)
elif new_level != AlertLevel.OK:
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data)
def _trigger_notification(
self,
host_name: str,
metric_path: str,
old_level: AlertLevel,
new_level: AlertLevel,
value: Any,
threshold: ThresholdConfig,
plugin_data: Optional[Dict[str, Any]] = None,
check_name: Optional[str] = None,
metric_name: Optional[str] = None,
):
"""Trigger a notification for an alert state change.
Args:
host_name: Name of the host
metric_path: Full metric path
old_level: Previous alert level
new_level: New alert level
value: Current metric value
threshold: Threshold configuration
plugin_data: Optional dictionary of all plugin data fields for format string
"""
# Determine which threshold was exceeded
threshold_value = None
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
threshold_value = threshold.critical
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
threshold_value = threshold.warning
# Format operator symbol
op_symbol = threshold.operator.value
# Short metric label: strip the plugin-name prefix and _status_code suffix
short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
# Use a display-friendly value (inf is the sentinel for "overdue")
import math
display_value = "overdue" if isinstance(value, float) and math.isinf(value) else value
# Format message — for the nagios operator there is no numeric threshold_value;
# render the display template whenever one is available.
has_display = threshold_value is not None or threshold.operator == ComparisonOperator.NAGIOS
def _fmt():
return self._format_display(
threshold.display,
value=display_value,
threshold_value=threshold_value,
op_symbol=op_symbol,
plugin_data=plugin_data,
check_name=check_name,
metric_name=metric_name,
)
if new_level == AlertLevel.OK:
lvl = "RECOVER"
message = f"{short_path} = {display_value} ({old_level.name} -> OK)"
elif new_level == AlertLevel.WARNING:
lvl = "WARNING"
if has_display:
message = f"{short_path} = {display_value} {_fmt()}"
else:
message = f"{short_path} = {display_value}"
elif new_level == AlertLevel.CRITICAL:
lvl = "CRITICAL"
if has_display:
message = f"{short_path} = {display_value} {_fmt()}"
else:
message = f"{short_path} = {display_value}"
else:
lvl = "UNKNOWN"
if has_display:
message = f"{short_path} = {display_value} {_fmt()}"
else:
message = f"{short_path} = {display_value}"
# Formatted threshold info stored on AlertState for the UI
formatted_threshold_msg = _fmt() if has_display and new_level != AlertLevel.OK else None
return lvl, message, formatted_threshold_msg
def _send_notification(
self,
host_name: str,
lvl: str,
message: str,
metric_path: str,
old_level: AlertLevel,
new_level: AlertLevel,
value: Any,
):
"""Send notification and log to journal/eventlog."""
from . import hbdclass
host = hbdclass.Host.hosts.get(host_name)
if host is not None and not host.watched:
eventlog(host_name, lvl, message, service="threshold")
return
short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
title = f"[{lvl}] {host_name} {short_path}"
# Strip the "metric = " prefix from message so body is just the value/detail
prefix = short_path + " = "
body = message[len(prefix):] if message.startswith(prefix) else message
asyncio.get_event_loop().create_task(notify_mod.send_notification(
host_name,
notify_mod.Notification(
title=title,
body=body,
level=lvl,
),
))
# Log to journal
if self.journal is not None:
try:
loop = asyncio.get_event_loop()
loop.create_task(self.journal.log_threshold_event(
host_name=host_name,
metric_path=metric_path,
old_level=old_level.name,
new_level=new_level.name,
value=value,
))
except Exception as e:
logger.debug(f"Failed to log threshold event to journal: {e}")
# Log to eventlog as well
eventlog(host_name, lvl, message, service="threshold")
def _format_display(
self,
display_format: str,
value: Any,
threshold_value: Optional[float],
op_symbol: str,
plugin_data: Optional[Dict[str, Any]] = None,
check_name: Optional[str] = None,
metric_name: Optional[str] = None,
) -> str:
"""Format the display string using available data.
Available template variables:
{value} - current metric value
{threshold_value} - threshold that was exceeded
{op_symbol} - comparison operator (>, <, >=, <=, ==, !=)
{check_name} - prefix stripped for generic threshold match
(e.g. "check_disk_root" when metric
"check_disk_root_status_code" matched generic
threshold "status_code")
{metric_name} - field name within the plugin data dict
Any key from plugin_data is also available.
Returns:
Formatted display string
"""
if not display_format:
display_format = "(threshold: {op_symbol} {threshold_value})" if threshold_value is not None else ""
# Build format context with standard variables
format_context = {
'value': value,
'op_symbol': op_symbol,
}
if threshold_value is not None:
format_context['threshold_value'] = threshold_value
# Add generic-match context variables when available
if check_name is not None:
format_context['check_name'] = check_name
if metric_name is not None:
format_context['metric_name'] = metric_name
# Add all plugin data fields if available
if plugin_data:
format_context.update(plugin_data)
# For nagios_runner generic matches, expose the matched check's output
# and status as short aliases {output} and {status} so display templates
# don't need to use the full {check_disk_root_output} form.
if check_name and plugin_data:
if 'output' not in format_context:
output = plugin_data.get(f"{check_name}_output")
if output is not None:
format_context['output'] = output
if 'status' not in format_context:
status = plugin_data.get(f"{check_name}_status")
if status is not None:
format_context['status'] = status
try:
# Format the display string
return display_format.format(**format_context)
except KeyError as e:
logger.warning(
"Missing format variable in display string '%s': %s",
display_format,
e
)
# Fallback to default format
return f"(threshold: {op_symbol} {threshold_value})"
except Exception as e:
logger.error(
"Error formatting display string '%s': %s",
display_format,
e
)
return f"(threshold: {op_symbol} {threshold_value})"
def _apply_grace(
self,
host_name: str,
alert_state: AlertState,
metric_path: str,
old_level: AlertLevel,
new_level: AlertLevel,
value: Any,
threshold: ThresholdConfig,
plugin_data: Optional[Dict[str, Any]],
check_name: Optional[str] = None,
metric_name: Optional[str] = None,
) -> None:
"""Handle a state-change transition with grace-period logic.
Transitioning INTO alert (worsening): defers the notification for the effective
grace period (threshold.grace if set, else self.grace_seconds). Grace of 0 fires
the notification immediately with no deferral.
De-escalation within alert states (e.g. CRITICAL→WARNING): no new notification;
the metric is still alerting so no RECOVER was sent.
Transitioning TO OK:
- Still in grace window (pending_since set): suppresses both the alert
and the recovery — the spike never warranted a page.
- Past grace: fires the RECOVER notification normally.
"""
effective_grace = threshold.grace if threshold.grace is not None else self.grace_seconds
lvl, message, formatted_msg = self._trigger_notification(
host_name, metric_path, old_level, new_level, value, threshold, plugin_data,
check_name=check_name, metric_name=metric_name,
)
alert_state.formatted_message = formatted_msg
if new_level == AlertLevel.OK:
if alert_state.pending_since is not None:
logger.info(
"Alert suppressed (recovered within %.0fs grace): %s on %s",
effective_grace, metric_path, host_name,
)
alert_state.pending_since = None
else:
self._send_notification(host_name, lvl, message, metric_path, old_level, new_level, value)
elif new_level.value > old_level.value:
# Worsening (OK→WARNING, OK→CRITICAL, WARNING→CRITICAL).
if effective_grace <= 0:
# No grace period — fire immediately.
self._send_notification(host_name, lvl, message, metric_path, old_level, new_level, value)
now = time.time()
alert_state.last_notification = now
alert_state.notification_count = 1
else:
alert_state.pending_since = time.time()
logger.debug(
"Alert deferred (%.0fs grace): %s on %s = %s",
effective_grace, metric_path, host_name, value,
)
else:
# De-escalation within alert states (e.g. CRITICAL→WARNING): metric is still
# alerting but did not recover, so no new notification.
logger.debug(
"De-escalation %s%s for %s on %s, no notification",
old_level.name, new_level.name, metric_path, host_name,
)
def _check_pending_or_renotify(
self,
host_name: str,
alert_state: AlertState,
metric_path: str,
value: Any,
threshold: ThresholdConfig,
plugin_data: Optional[Dict[str, Any]],
check_name: Optional[str] = None,
metric_name: Optional[str] = None,
) -> None:
"""Called when alert level is unchanged and non-OK.
If a deferred notification is pending and grace_seconds have elapsed,
fires it now. Otherwise falls through to normal reminder logic.
"""
effective_grace = threshold.grace if threshold.grace is not None else self.grace_seconds
if alert_state.pending_since is not None:
if time.time() - alert_state.pending_since >= effective_grace:
lvl, message, formatted_msg = self._trigger_notification(
host_name, metric_path, AlertLevel.OK, alert_state.level, value, threshold, plugin_data,
check_name=check_name, metric_name=metric_name,
)
alert_state.formatted_message = formatted_msg
self._send_notification(
host_name, lvl, message, metric_path, AlertLevel.OK, alert_state.level, value
)
alert_state.pending_since = None
now = time.time()
alert_state.last_notification = now
alert_state.notification_count = 1
# else: still within grace window, do nothing
else:
self._check_renotify(host_name, alert_state, metric_path, value, threshold, plugin_data, check_name=check_name, metric_name=metric_name)
@staticmethod
def _human_duration(seconds: float) -> str:
s = int(seconds)
if s < 120:
return f"{s}s"
if s < 3600:
return f"{s // 60}m {s % 60}s"
h, rem = divmod(s, 3600)
m = rem // 60
return f"{h}h {m}m" if m else f"{h}h"
def _check_renotify(
self,
host_name: str,
alert_state: AlertState,
metric_path: str,
value: Any,
threshold: ThresholdConfig,
plugin_data: Optional[Dict[str, Any]] = None,
check_name: Optional[str] = None,
metric_name: Optional[str] = None,
):
"""Check if we should send a repeat notification.
Args:
host_name: Name of the host
alert_state: Current alert state
metric_path: Full metric path
value: Current metric value
threshold: Threshold configuration
plugin_data: Optional dictionary of all plugin data fields
"""
if alert_state.level != AlertLevel.CRITICAL:
return
# Skip reminders if alert has been acknowledged
if alert_state.acknowledged:
return
now = time.time()
# Check if we should re-notify
if alert_state.last_notification is None:
# First notification already sent during state change
alert_state.last_notification = now
alert_state.notification_count = 1
return
if (now - alert_state.last_notification) >= self.renotify_interval:
# Determine which threshold is active
threshold_value = None
if alert_state.level == AlertLevel.CRITICAL and threshold.critical is not None:
threshold_value = threshold.critical
elif alert_state.level == AlertLevel.WARNING and threshold.warning is not None:
threshold_value = threshold.warning
# Format operator symbol
op_symbol = threshold.operator.value
short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
# Time to re-notify
if threshold_value is not None:
# Use display format string
threshold_info = self._format_display(
threshold.display,
value=value,
threshold_value=threshold_value,
op_symbol=op_symbol,
plugin_data=plugin_data,
check_name=check_name,
metric_name=metric_name,
)
body = f"{value} {threshold_info}, ongoing for {self._human_duration(now - alert_state.since)}"
else:
body = f"{value} (ongoing for {self._human_duration(now - alert_state.since)})"
message = f"REMINDER ({alert_state.level.name}): {host_name} - {short_path} = {body}"
from . import hbdclass
host = hbdclass.Host.hosts.get(host_name)
if host is None or host.watched:
asyncio.get_event_loop().create_task(notify_mod.send_notification(
host_name,
notify_mod.Notification(
title=f"[REMINDER/{alert_state.level.name}] {host_name} {short_path}",
body=body,
level=alert_state.level.name,
),
))
logger.info("Re-notification sent: %s", message)
alert_state.last_notification = now
alert_state.notification_count += 1
def purge_stale_alerts(self, hbdclass) -> None:
"""Remove alert states that have no matching threshold configuration.
Called after startup (pickle restore) and after each config reload so
that alerts orphaned by configuration changes do not linger forever.
Alerts whose metric_path is not present in the current threshold config
for that host are silently dropped.
"""
for hostname, host in hbdclass.Host.hosts.items():
if not host.alert_states:
continue
configured = self.get_thresholds_for_host(hostname)
stale = []
for mp in host.alert_states:
# connectivity.* and rtt are managed by the connection state
# machine, not by threshold config — never purge them.
if mp == "rtt" or mp.startswith("connectivity."):
continue
if self._find_threshold(configured, mp)[0] is not None:
continue
# Also match wildcard pool/partition thresholds (e.g. "zfs_monitor.*.status"
# covers alert state "zfs_monitor.tank.status").
parts = mp.split(".")
if len(parts) == 3 and f"{parts[0]}.*.{parts[2]}" in configured:
continue
stale.append(mp)
for mp in stale:
logger.info(
"Purging stale alert state for %s / %s (no threshold configured)",
hostname, mp,
)
del host.alert_states[mp]
def get_active_alerts(self, alert_states: Dict[str, AlertState]) -> list:
"""
Get all currently active (non-OK) alerts.
Args:
alert_states: Host's alert_states dictionary
Returns:
List of AlertState objects that are not OK
"""
return [
state for state in alert_states.values()
if state.level != AlertLevel.OK
]
def get_alert_summary(self, alert_states: Dict[str, AlertState]) -> Dict[str, int]:
"""
Get summary counts of alert levels.
Args:
alert_states: Host's alert_states dictionary
Returns:
Dictionary with counts: {"ok": N, "warning": N, "critical": N}
"""
summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0}
for state in alert_states.values():
if state.level == AlertLevel.OK:
summary["ok"] += 1
elif state.level == AlertLevel.WARNING:
summary["warning"] += 1
elif state.level == AlertLevel.CRITICAL:
summary["critical"] += 1
elif state.level == AlertLevel.UNKNOWN:
summary["unknown"] += 1
return summary