""" Threshold checking and alerting for plugin metrics. This module provides a flexible threshold checking system that: - Evaluates plugin metrics against configured warning/critical thresholds - Tracks alert states per host and metric - Prevents alert flapping with hysteresis - Triggers notifications only on state changes - Supports multiple comparison operators """ import asyncio import logging import time from enum import Enum from typing import Dict, List, Any, Optional, Tuple, Callable from . import notify as notify_mod from .config import THRESHOLD_DEFAULTS logger = logging.getLogger(__name__) eventlog = notify_mod.eventlog class AlertLevel(Enum): """Alert severity levels.""" OK = 0 WARNING = 1 CRITICAL = 2 UNKNOWN = 3 class ComparisonOperator(Enum): """Supported comparison operators for threshold checks.""" GT = ">" # Greater than GTE = ">=" # Greater than or equal LT = "<" # Less than LTE = "<=" # Less than or equal EQ = "==" # Equal to NEQ = "!=" # Not equal to NAGIOS = "nagios" # Nagios exit-code semantics: 0=OK 1=WARNING 2=CRITICAL 3=UNKNOWN class AlertState: """Represents the current alert state for a specific metric.""" def __init__(self, metric_path: str): """ Initialize alert state. Args: metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent") """ self.metric_path = metric_path self.level = AlertLevel.OK self.since = time.time() self.last_value = None self.last_check = time.time() self.notification_count = 0 self.last_notification = None self.threshold_value = None # The threshold value that triggered alert self.operator = None # The comparison operator (>, <, >=, etc.) self.hysteresis: Optional[float] = None # Hysteresis fraction used for recovery self.formatted_message = None # Formatted display message for UI self.acknowledged = False # Whether alert has been acknowledged self.acknowledged_at = None # Timestamp when acknowledged self.consecutive_count = 0 # Consecutive exceedances while still OK (for count gating) self.pending_since: Optional[float] = None # non-None while waiting out grace period before notifying def update( self, level: AlertLevel, value: Any, threshold_value: Optional[float] = None, operator: Optional[str] = None ) -> bool: """ Update alert state. Args: level: New alert level value: Current metric value threshold_value: The threshold value that was exceeded (if applicable) operator: The comparison operator (>, <, >=, etc.) Returns: True if state changed (notification needed), False otherwise """ now = time.time() self.last_check = now self.last_value = value # Update threshold info when alert is active if level != AlertLevel.OK: self.threshold_value = threshold_value self.operator = operator else: # Clear threshold info when returning to OK self.threshold_value = None self.operator = None # Check if state changed if level != self.level: logger.info( "Alert state change for %s: %s -> %s (value: %s)", self.metric_path, self.level.name, level.name, value ) self.level = level self.since = now self.notification_count = 0 self.last_notification = None # restart reminder interval on level change # Reset acknowledgment on state change if level != AlertLevel.OK: # Only reset if changing to a different alert level self.acknowledged = False self.acknowledged_at = None return True return False def to_dict(self) -> dict: """Convert alert state to dictionary for serialization.""" import math # Helper to sanitize numeric values for JSON (handle inf/nan) def sanitize_value(val): if isinstance(val, float): if math.isinf(val): return "overdue" if math.isnan(val): return None return val result = { "metric_path": self.metric_path, "level": self.level.name, "since": self.since, "last_value": sanitize_value(self.last_value), "last_check": self.last_check, "notification_count": self.notification_count, "acknowledged": self.acknowledged, } # Include acknowledgment timestamp if acknowledged if self.acknowledged_at is not None: result["acknowledged_at"] = self.acknowledged_at # Include threshold info if available if self.threshold_value is not None: result["threshold_value"] = sanitize_value(self.threshold_value) if self.operator is not None: result["operator"] = self.operator if self.formatted_message is not None: result["formatted_message"] = self.formatted_message # Compute and expose the recovery threshold so the UI can display it if (self.hysteresis and self.threshold_value is not None and self.operator is not None): ha = abs(self.threshold_value * self.hysteresis) if self.operator in ('>', '>='): result["recovery_threshold"] = round(self.threshold_value - ha, 4) elif self.operator in ('<', '<='): result["recovery_threshold"] = round(self.threshold_value + ha, 4) return result def __setstate__(self, state): """Restore from pickle, backfilling fields added after the pickle was written.""" self.__dict__.update(state) if not hasattr(self, 'consecutive_count'): self.consecutive_count = 0 if not hasattr(self, 'hysteresis'): self.hysteresis = None def acknowledge(self): """Acknowledge this alert to stop reminder notifications.""" self.acknowledged = True self.acknowledged_at = time.time() logger.info("Alert acknowledged for %s", self.metric_path) def __str__(self): return self.to_dict().__str__() class ThresholdConfig: """Configuration for a single threshold check.""" def __init__( self, metric_path: str, warning: Optional[float] = None, critical: Optional[float] = None, display: Optional[str] = None, operator: str = ">", hysteresis: float = 0.0, enabled: bool = True, count: int = 1, ): """ Initialize threshold configuration. Args: metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent") warning: Warning threshold value critical: Critical threshold value operator: Comparison operator (>, >=, <, <=, ==, !=) hysteresis: Hysteresis percentage to prevent flapping (0.0-1.0) enabled: Whether this threshold is enabled count: Number of consecutive exceedances required before alerting (default 1) """ self.metric_path = metric_path self.warning = warning self.critical = critical self.enabled = enabled self.hysteresis = hysteresis self.display = display self.count = max(1, int(count)) # Parse operator try: self.operator = ComparisonOperator(operator) except ValueError: logger.warning( "Invalid operator '%s' for %s, using '>' as default", operator, metric_path ) self.operator = ComparisonOperator.GT def evaluate(self, value: float) -> AlertLevel: """ Evaluate a value against this threshold. Args: value: Metric value to check Returns: AlertLevel indicating the severity """ if not self.enabled: return AlertLevel.OK # Nagios exit-code semantics: value IS the severity if self.operator == ComparisonOperator.NAGIOS: try: code = int(value) except (TypeError, ValueError): return AlertLevel.UNKNOWN return {0: AlertLevel.OK, 1: AlertLevel.WARNING, 2: AlertLevel.CRITICAL}.get( code, AlertLevel.UNKNOWN ) try: # Convert value to float for comparison value = float(value) except (TypeError, ValueError): logger.warning("Cannot convert value %s to float for %s", value, self.metric_path) return AlertLevel.UNKNOWN # Check critical threshold first if self.critical is not None: if self._compare(value, self.critical): return AlertLevel.CRITICAL # Then check warning threshold if self.warning is not None: if self._compare(value, self.warning): return AlertLevel.WARNING return AlertLevel.OK def evaluate_with_hysteresis( self, value: float, current_level: AlertLevel ) -> AlertLevel: """ Evaluate with hysteresis to prevent flapping. Args: value: Current metric value current_level: Current alert level Returns: New alert level considering hysteresis """ new_level = self.evaluate(value) # Nagios exit codes are discrete integers — hysteresis doesn't apply if self.operator == ComparisonOperator.NAGIOS: return new_level # If no hysteresis, return new level if self.hysteresis == 0.0: return new_level # If improving (going to a lower severity), apply hysteresis if new_level.value < current_level.value: # For recovery, value must be better by hysteresis amount if current_level == AlertLevel.CRITICAL and self.critical is not None: threshold = self.critical elif current_level == AlertLevel.WARNING and self.warning is not None: threshold = self.warning else: return new_level # Calculate hysteresis threshold hysteresis_amount = abs(threshold * self.hysteresis) if self.operator in [ComparisonOperator.GT, ComparisonOperator.GTE]: # For "greater than" thresholds, value must go below by hysteresis recovery_threshold = threshold - hysteresis_amount if value >= recovery_threshold: # Not enough improvement, keep current level return current_level elif self.operator in [ComparisonOperator.LT, ComparisonOperator.LTE]: # For "less than" thresholds, value must go above by hysteresis recovery_threshold = threshold + hysteresis_amount if value <= recovery_threshold: # Not enough improvement, keep current level return current_level return new_level def _compare(self, value: float, threshold: float) -> bool: """Perform comparison based on operator.""" if self.operator == ComparisonOperator.GT: return value > threshold elif self.operator == ComparisonOperator.GTE: return value >= threshold elif self.operator == ComparisonOperator.LT: return value < threshold elif self.operator == ComparisonOperator.LTE: return value <= threshold elif self.operator == ComparisonOperator.EQ: return abs(value - threshold) < 1e-9 # Float comparison elif self.operator == ComparisonOperator.NEQ: return abs(value - threshold) >= 1e-9 return False class ThresholdChecker: """Main threshold checking and alerting system.""" def __init__( self, config: Dict[str, Any], renotify_interval: int = 3600, journal: Optional[Any] = None, ): """ Initialize threshold checker. Args: config: Threshold configuration dictionary from YAML renotify_interval: Seconds between repeat notifications (default: 1 hour) journal: Optional MessageJournal instance for logging threshold events """ # Named threshold configurations (pre-merged: defaults + overrides): {config_name: {metric_path: ThresholdConfig}} self.threshold_configs = {} # Raw overrides only for each named config (no defaults baked in): {config_name: {metric_path: ThresholdConfig}} self.threshold_raw_configs: Dict[str, Dict[str, ThresholdConfig]] = {} # Single threshold set for backward compatibility: {metric_path: ThresholdConfig} self.thresholds = {} # Host to ordered list of config names: {host_name: [config_name, ...]} self.host_config_mapping: Dict[str, List[str]] = {} # Default config name to use when no mapping exists self.default_config = "default" self.renotify_interval = renotify_interval self.grace_seconds: float = float(config.get("grace", 2)) self.journal = journal # Parse configuration self._parse_config(config) total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values()) if total_thresholds == 0 and len(self.thresholds) > 0: # Backward compatibility: using single threshold set total_thresholds = len(self.thresholds) logger.info("ThresholdChecker initialized with %d thresholds (legacy format)", total_thresholds) else: logger.info( "ThresholdChecker initialized with %d named configurations (%d total thresholds)", len(self.threshold_configs), total_thresholds ) def reload(self, config: Dict[str, Any]): """Reload threshold configuration from new config dict. This clears all existing thresholds and re-parses from the new configuration. Alert states are preserved to maintain hysteresis across reloads. Args: config: New configuration dictionary """ logger.info("Reloading threshold configuration...") # Clear old configuration self.threshold_configs.clear() self.threshold_raw_configs.clear() self.thresholds.clear() self.host_config_mapping.clear() self.grace_seconds = float(config.get("grace", 2)) # Parse new configuration self._parse_config(config) total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values()) if total_thresholds == 0 and len(self.thresholds) > 0: total_thresholds = len(self.thresholds) logger.info("Threshold configuration reloaded: %d total thresholds", total_thresholds) def _parse_config(self, config: Dict[str, Any]): """Parse threshold configuration from YAML structure. Supports two formats: 1. Legacy format with direct 'thresholds' section 2. New format with 'threshold_configs' and 'host_threshold_mapping' In all cases, THRESHOLD_DEFAULTS are seeded into threshold_configs["default"] so the Settings page always shows the built-in defaults. _parse_multi_config() overwrites this with the fully-merged effective defaults. """ # Always expose built-in defaults through threshold_configs["default"] so # the Settings page has something to display even in legacy/no-config mode. seed: Dict[str, ThresholdConfig] = {} for plugin_name, plugin_thresholds in THRESHOLD_DEFAULTS.get("thresholds", {}).items(): if isinstance(plugin_thresholds, dict): self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=seed) if seed: self.threshold_configs["default"] = seed self.threshold_raw_configs["default"] = {} # Check for new multi-config format if "threshold_configs" in config: self._parse_multi_config(config) # overwrites threshold_configs["default"] elif "thresholds" in config: # Legacy single threshold configuration self._parse_legacy_config(config) else: logger.info("No thresholds configured") def _parse_multi_config(self, config: Dict[str, Any]): """Parse multiple named threshold configurations.""" threshold_configs = config.get("threshold_configs", {}) if not threshold_configs: logger.info("No threshold configurations defined") return # Build effective_defaults: THRESHOLD_DEFAULTS merged with the 'default' config (if present). # All other configs inherit any metric not explicitly defined from effective_defaults. effective_defaults: Dict[str, ThresholdConfig] = {} for plugin_name, plugin_thresholds in THRESHOLD_DEFAULTS.get("thresholds", {}).items(): if isinstance(plugin_thresholds, dict): self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults) if "default" in threshold_configs: default_data = threshold_configs["default"] if isinstance(default_data, dict) and "thresholds" in default_data: for plugin_name, plugin_thresholds in default_data["thresholds"].items(): if isinstance(plugin_thresholds, dict): self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults) self.threshold_configs["default"] = dict(effective_defaults) self.threshold_raw_configs["default"] = {} logger.info("Registered 'default' threshold config with %d metrics", len(effective_defaults)) # Parse each named configuration for config_name, config_data in threshold_configs.items(): if config_name == "default": continue # already handled above if not isinstance(config_data, dict): logger.warning("Invalid threshold config '%s', skipping", config_name) continue if "thresholds" not in config_data: logger.warning("No thresholds in config '%s', skipping", config_name) continue logger.info("Parsing threshold configuration: %s", config_name) # Raw overrides only (used for multi-config layering) raw_overrides: Dict[str, ThresholdConfig] = {} thresholds_config = config_data["thresholds"] for plugin_name, plugin_thresholds in thresholds_config.items(): if isinstance(plugin_thresholds, dict): self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=raw_overrides) self.threshold_raw_configs[config_name] = raw_overrides # Pre-merged version (defaults + overrides) for single-config fast path self.threshold_configs[config_name] = dict(effective_defaults) self.threshold_configs[config_name].update(raw_overrides) # Parse host → config list mapping from two possible sources def _normalise(value) -> List[str]: """Accept a string or list; always return a list.""" if isinstance(value, list): return [str(v) for v in value] return [str(value)] # 1. hosts section with threshold_config attribute (string or list) if "hosts" in config: hosts_config = config["hosts"] if isinstance(hosts_config, dict): for host_name, host_attrs in hosts_config.items(): if isinstance(host_attrs, dict) and "threshold_config" in host_attrs: self.host_config_mapping[host_name] = _normalise(host_attrs["threshold_config"]) # 2. Legacy host_threshold_mapping section (string values only) if "host_threshold_mapping" in config: legacy_mapping = config.get("host_threshold_mapping", {}) if isinstance(legacy_mapping, dict): for host_name, value in legacy_mapping.items(): self.host_config_mapping[host_name] = _normalise(value) # Set default config (first one alphabetically or explicitly set) self.default_config = config.get("default_threshold_config", "default") if self.default_config not in self.threshold_configs and self.threshold_configs: # Use first available config as default self.default_config = sorted(self.threshold_configs.keys())[0] logger.info("Using '%s' as default threshold config", self.default_config) logger.info( "Loaded %d threshold configurations with %d host mappings", len(self.threshold_configs), len(self.host_config_mapping) ) def _parse_legacy_config(self, config: Dict[str, Any]): """Parse legacy single threshold configuration for backward compatibility.""" if not config or "thresholds" not in config: logger.info("No thresholds configured") return thresholds_config = config["thresholds"] for plugin_name, plugin_thresholds in thresholds_config.items(): if not isinstance(plugin_thresholds, dict): continue self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=self.thresholds) def _parse_plugin_thresholds( self, plugin_name: str, thresholds: Dict[str, Any], target_dict: Optional[Dict[str, ThresholdConfig]] = None ): """Parse thresholds for a specific plugin. Args: plugin_name: Name of the plugin thresholds: Threshold configuration dictionary target_dict: Dictionary to store parsed thresholds (defaults to self.thresholds) """ if target_dict is None: target_dict = self.thresholds # Special handling for RTT thresholds (per-host) if plugin_name == "rtt": self._parse_rtt_thresholds(thresholds, target_dict) return for metric_name, threshold_config in thresholds.items(): if not isinstance(threshold_config, dict): continue # Handle nested metrics (e.g., partitions./.percent or pools.*.health_ok) if metric_name == "partitions": self._parse_partition_thresholds(plugin_name, threshold_config, target_dict) continue if metric_name == "pools": self._parse_pool_thresholds(plugin_name, threshold_config, target_dict) continue metric_path = f"{plugin_name}.{metric_name}" # Extract threshold values warning = threshold_config.get("warning") critical = threshold_config.get("critical") operator = threshold_config.get("operator", ">") # Nagios operator maps exit codes directly; no numeric thresholds needed is_nagios_op = (operator == "nagios") default_display = "{check_name}: {output}" if is_nagios_op else "(threshold: {op_symbol} {threshold_value})" display = threshold_config.get("display", default_display) hysteresis = threshold_config.get("hysteresis", 0.0 if is_nagios_op else 0.02) enabled = threshold_config.get("enabled", True) if warning is None and critical is None and not is_nagios_op: logger.warning("No thresholds defined for %s, skipping", metric_path) continue threshold = ThresholdConfig( metric_path=metric_path, warning=warning, critical=critical, operator=operator, hysteresis=hysteresis, enabled=enabled, display=display ) target_dict[metric_path] = threshold logger.debug( "Registered threshold for %s: warn=%s, crit=%s, op=%s", metric_path, warning, critical, operator ) def _parse_partition_thresholds( self, plugin_name: str, partitions: Dict[str, Any], target_dict: Optional[Dict[str, ThresholdConfig]] = None ): """Parse partition-specific thresholds for disk monitoring. Args: plugin_name: Name of the plugin partitions: Partition threshold configuration target_dict: Dictionary to store parsed thresholds """ if target_dict is None: target_dict = self.thresholds for partition, metrics in partitions.items(): if not isinstance(metrics, dict): continue for metric_name, threshold_config in metrics.items(): if not isinstance(threshold_config, dict): continue # Create metric path like "disk_monitor./dev/sda1.percent" metric_path = f"{plugin_name}.{partition}.{metric_name}" warning = threshold_config.get("warning") critical = threshold_config.get("critical") operator = threshold_config.get("operator", ">") hysteresis = threshold_config.get("hysteresis", 0.1) enabled = threshold_config.get("enabled", True) display = threshold_config.get("display") if warning is None and critical is None: continue threshold = ThresholdConfig( metric_path=metric_path, warning=warning, critical=critical, operator=operator, hysteresis=hysteresis, enabled=enabled, display=display ) target_dict[metric_path] = threshold def _parse_pool_thresholds( self, plugin_name: str, pools: Dict[str, Any], target_dict: Optional[Dict[str, ThresholdConfig]] = None, ): """Parse ZFS pool thresholds. Pool names may be literal or '*' (all pools). Config shape:: zfs_monitor: pools: '*': health_ok: critical: 1 operator: '<' tank: capacity: warning: 80 critical: 90 """ if target_dict is None: target_dict = self.thresholds for pool_name, metrics in pools.items(): if not isinstance(metrics, dict): continue for metric_name, threshold_config in metrics.items(): if not isinstance(threshold_config, dict): continue metric_path = f"{plugin_name}.{pool_name}.{metric_name}" warning = threshold_config.get("warning") critical = threshold_config.get("critical") operator = threshold_config.get("operator", ">") hysteresis = threshold_config.get("hysteresis", 0.02) enabled = threshold_config.get("enabled", True) display = threshold_config.get("display") if warning is None and critical is None: continue target_dict[metric_path] = ThresholdConfig( metric_path=metric_path, warning=warning, critical=critical, operator=operator, hysteresis=hysteresis, enabled=enabled, display=display, ) def _parse_rtt_thresholds( self, rtt_thresholds: Dict[str, Any], target_dict: Optional[Dict[str, ThresholdConfig]] = None ): """Parse RTT thresholds (network latency thresholds). RTT thresholds are configured as: thresholds: rtt: warning: 100.0 # ms critical: 500.0 # ms Args: rtt_thresholds: RTT threshold configuration target_dict: Dictionary to store parsed thresholds """ if target_dict is None: target_dict = self.thresholds if not isinstance(rtt_thresholds, dict): return # Metric path is simply "rtt" (not per-host) metric_path = "rtt" warning = rtt_thresholds.get("warning") critical = rtt_thresholds.get("critical") operator = rtt_thresholds.get("operator", ">") hysteresis = rtt_thresholds.get("hysteresis", 0.02) # 2% default enabled = rtt_thresholds.get("enabled", True) display = rtt_thresholds.get("display") count = rtt_thresholds.get("count", 1) if warning is None and critical is None: logger.warning("No RTT thresholds defined, skipping") return threshold = ThresholdConfig( metric_path=metric_path, warning=warning, critical=critical, operator=operator, hysteresis=hysteresis, enabled=enabled, display=display, count=count, ) target_dict[metric_path] = threshold logger.debug( "Registered RTT threshold: warn=%s ms, crit=%s ms, count=%d", warning, critical, count, ) def get_thresholds_for_host(self, host_name: str) -> Dict[str, ThresholdConfig]: """Get the effective threshold configuration for a host. When threshold_config is a list, configs are applied left-to-right on top of the default thresholds so earlier entries can be overridden by later ones. Args: host_name: Name of the host Returns: Dictionary of thresholds for this host """ # Legacy mode: single threshold set for all hosts if self.thresholds and not self.threshold_configs: return self.thresholds if not self.threshold_configs: return {} config_names = self.host_config_mapping.get(host_name) # No host-specific mapping → return pre-merged default if not config_names: return self.threshold_configs.get(self.default_config, {}) # Single config → fast path using pre-merged copy if len(config_names) == 1: name = config_names[0] if name in self.threshold_configs: return self.threshold_configs[name] logger.warning( "Threshold config '%s' not found for host '%s', using default '%s'", name, host_name, self.default_config, ) return self.threshold_configs.get(self.default_config, {}) # Multiple configs → start from defaults, layer raw overrides in order result = dict(self.threshold_configs.get(self.default_config, {})) for name in config_names: if name == self.default_config: continue # defaults already the base raw = self.threshold_raw_configs.get(name) if raw is None: logger.warning( "Threshold config '%s' not found for host '%s', skipping", name, host_name, ) else: result.update(raw) return result def check_value( self, host_name: str, metric_path: str, value: float, alert_states: Dict[str, AlertState], ) -> Optional[Tuple[AlertLevel, AlertLevel]]: """ Check a single value against configured threshold. Args: host_name: Name of the host metric_path: Full metric path (e.g., "rtt.hostname") value: The metric value to check alert_states: Host's alert_states dictionary Returns: Tuple of (old_level, new_level) if state changed, None otherwise """ # Get host-specific thresholds thresholds = self.get_thresholds_for_host(host_name) if metric_path not in thresholds: return None threshold = thresholds[metric_path] # Get or create alert state if metric_path not in alert_states: alert_states[metric_path] = AlertState(metric_path) alert_state = alert_states[metric_path] # Evaluate threshold with hysteresis new_level = threshold.evaluate_with_hysteresis( value, alert_state.level ) # Apply consecutive-count gating: when currently OK, require threshold.count # consecutive exceedances before escalating to WARNING/CRITICAL. if new_level == AlertLevel.OK: # Value is fine (or recovered) — reset the pending counter immediately. alert_state.consecutive_count = 0 elif alert_state.level == AlertLevel.OK and new_level != AlertLevel.OK: # First time we exceed while still OK: count up. alert_state.consecutive_count += 1 if alert_state.consecutive_count < threshold.count: logger.debug( "RTT threshold exceeded %d/%d consecutive times for %s on %s", alert_state.consecutive_count, threshold.count, metric_path, host_name, ) return None # Count reached — fire the alert and reset the counter. alert_state.consecutive_count = 0 # Determine which threshold was exceeded threshold_value = None if new_level == AlertLevel.CRITICAL and threshold.critical is not None: threshold_value = threshold.critical elif new_level == AlertLevel.WARNING and threshold.warning is not None: threshold_value = threshold.warning # Keep hysteresis on the state so the UI can show the recovery threshold if new_level != AlertLevel.OK: alert_state.hysteresis = threshold.hysteresis else: alert_state.hysteresis = None # Update state and check for changes old_level = alert_state.level if alert_state.update(new_level, value, threshold_value, threshold.operator.value): self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, None) return (old_level, new_level) elif new_level != AlertLevel.OK: self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, None) return None def _find_threshold( self, thresholds: Dict[str, "ThresholdConfig"], metric_path: str ) -> Tuple[Optional["ThresholdConfig"], Optional[str]]: """Return (threshold, check_name) for *metric_path*, falling back to suffix matches. Allows generic thresholds like ``nagios_runner.status_code`` to match fully-qualified paths like ``nagios_runner.check_disk_root_status_code``. The exact match is always tried first; then successive leading underscore-delimited segments are stripped from the field name until a match is found or no segments remain. Returns: (ThresholdConfig, None) for an exact match. (ThresholdConfig, "check_disk_root") for a suffix match — the second element is the stripped prefix, available as ``{check_name}`` in display format templates. (None, None) when no threshold is found. """ if metric_path in thresholds: return thresholds[metric_path], None plugin, sep, field = metric_path.partition(".") if not sep: return None, None parts = field.split("_") for i in range(1, len(parts)): candidate = plugin + "." + "_".join(parts[i:]) if candidate in thresholds: return thresholds[candidate], "_".join(parts[:i]) return None, None def check_plugin_data( self, host_name: str, plugin_name: str, data: Dict[str, Any], alert_states: Dict[str, AlertState], ) -> list: """ Check plugin data against configured thresholds. Args: host_name: Name of the host plugin_name: Name of the plugin data: Plugin data dictionary alert_states: Host's alert_states dictionary Returns: List of (metric_path, old_level, new_level, value) tuples for state changes """ state_changes = [] # Get host-specific thresholds thresholds = self.get_thresholds_for_host(host_name) # Check flat metrics for metric_name, value in data.items(): metric_path = f"{plugin_name}.{metric_name}" threshold, check_name = self._find_threshold(thresholds, metric_path) if threshold is None: continue # Get or create alert state if metric_path not in alert_states: alert_states[metric_path] = AlertState(metric_path) alert_state = alert_states[metric_path] # Evaluate threshold with hysteresis new_level = threshold.evaluate_with_hysteresis( value, alert_state.level ) # Determine which threshold was exceeded threshold_value = None if new_level == AlertLevel.CRITICAL and threshold.critical is not None: threshold_value = threshold.critical elif new_level == AlertLevel.WARNING and threshold.warning is not None: threshold_value = threshold.warning alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None # Update state and check for changes old_level = alert_state.level if alert_state.update(new_level, value, threshold_value, threshold.operator.value): state_changes.append((metric_path, old_level, new_level, value)) self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data, check_name=check_name, metric_name=metric_name) elif new_level != AlertLevel.OK: self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data, check_name=check_name, metric_name=metric_name) # Check nested metrics (e.g., partition data in disk_monitor) self._check_nested_metrics( host_name, plugin_name, data, alert_states, state_changes ) return state_changes def _check_nested_metrics( self, host_name: str, plugin_name: str, data: Dict[str, Any], alert_states: Dict[str, AlertState], state_changes: list, ): """Check nested metrics like partition-specific thresholds.""" # Get host-specific thresholds thresholds = self.get_thresholds_for_host(host_name) # ZFS pool health checks if plugin_name == "zfs_monitor" and "pools" in data: pools = data["pools"] if isinstance(pools, dict): for pool_name, pool_metrics in pools.items(): if not isinstance(pool_metrics, dict): continue for metric_name, value in pool_metrics.items(): # Try specific pool name first, then wildcard '*' metric_path = f"{plugin_name}.{pool_name}.{metric_name}" wildcard_path = f"{plugin_name}.*.{metric_name}" threshold = thresholds.get(metric_path) or thresholds.get(wildcard_path) if threshold is None: continue if metric_path not in alert_states: alert_states[metric_path] = AlertState(metric_path) alert_state = alert_states[metric_path] new_level = threshold.evaluate_with_hysteresis(value, alert_state.level) threshold_value = None if new_level == AlertLevel.CRITICAL and threshold.critical is not None: threshold_value = threshold.critical elif new_level == AlertLevel.WARNING and threshold.warning is not None: threshold_value = threshold.warning alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None pool_context = dict(pool_metrics) pool_context["pool_name"] = pool_name old_level = alert_state.level if alert_state.update(new_level, value, threshold_value, threshold.operator.value): state_changes.append((metric_path, old_level, new_level, value)) self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, pool_context, metric_name=pool_name) elif new_level != AlertLevel.OK: self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, pool_context, metric_name=pool_name) # Look for partition data in disk_monitor if plugin_name == "disk_monitor" and "partitions" in data: partitions = data["partitions"] if not isinstance(partitions, dict): return for partition, metrics in partitions.items(): if not isinstance(metrics, dict): continue for metric_name, value in metrics.items(): metric_path = f"{plugin_name}.{partition}.{metric_name}" if metric_path not in thresholds: continue threshold = thresholds[metric_path] if metric_path not in alert_states: alert_states[metric_path] = AlertState(metric_path) alert_state = alert_states[metric_path] new_level = threshold.evaluate_with_hysteresis( value, alert_state.level ) # Determine which threshold was exceeded threshold_value = None if new_level == AlertLevel.CRITICAL and threshold.critical is not None: threshold_value = threshold.critical elif new_level == AlertLevel.WARNING and threshold.warning is not None: threshold_value = threshold.warning alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None old_level = alert_state.level if alert_state.update(new_level, value, threshold_value, threshold.operator.value): state_changes.append((metric_path, old_level, new_level, value)) self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data) elif new_level != AlertLevel.OK: self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data) def _trigger_notification( self, host_name: str, metric_path: str, old_level: AlertLevel, new_level: AlertLevel, value: Any, threshold: ThresholdConfig, plugin_data: Optional[Dict[str, Any]] = None, check_name: Optional[str] = None, metric_name: Optional[str] = None, ): """Trigger a notification for an alert state change. Args: host_name: Name of the host metric_path: Full metric path old_level: Previous alert level new_level: New alert level value: Current metric value threshold: Threshold configuration plugin_data: Optional dictionary of all plugin data fields for format string """ # Determine which threshold was exceeded threshold_value = None if new_level == AlertLevel.CRITICAL and threshold.critical is not None: threshold_value = threshold.critical elif new_level == AlertLevel.WARNING and threshold.warning is not None: threshold_value = threshold.warning # Format operator symbol op_symbol = threshold.operator.value # Short metric label: strip the plugin-name prefix and _status_code suffix short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code") # Use a display-friendly value (inf is the sentinel for "overdue") import math display_value = "overdue" if isinstance(value, float) and math.isinf(value) else value # Format message — for the nagios operator there is no numeric threshold_value; # render the display template whenever one is available. has_display = threshold_value is not None or threshold.operator == ComparisonOperator.NAGIOS def _fmt(): return self._format_display( threshold.display, value=display_value, threshold_value=threshold_value, op_symbol=op_symbol, plugin_data=plugin_data, check_name=check_name, metric_name=metric_name, ) if new_level == AlertLevel.OK: lvl = "RECOVER" message = f"{short_path} = {display_value} ({old_level.name} -> OK)" elif new_level == AlertLevel.WARNING: lvl = "WARNING" if has_display: message = f"{short_path} = {display_value} {_fmt()}" else: message = f"{short_path} = {display_value}" elif new_level == AlertLevel.CRITICAL: lvl = "CRITICAL" if has_display: message = f"{short_path} = {display_value} {_fmt()}" else: message = f"{short_path} = {display_value}" else: lvl = "UNKNOWN" if has_display: message = f"{short_path} = {display_value} {_fmt()}" else: message = f"{short_path} = {display_value}" # Formatted threshold info stored on AlertState for the UI formatted_threshold_msg = _fmt() if has_display and new_level != AlertLevel.OK else None return lvl, message, formatted_threshold_msg def _send_notification( self, host_name: str, lvl: str, message: str, metric_path: str, old_level: AlertLevel, new_level: AlertLevel, value: Any, ): """Send notification and log to journal/eventlog.""" from . import hbdclass host = hbdclass.Host.hosts.get(host_name) if host is not None and not host.watched: eventlog(host_name, lvl, message, service="threshold") return short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code") title = f"[{lvl}] {host_name} {short_path}" # Strip the "metric = " prefix from message so body is just the value/detail prefix = short_path + " = " body = message[len(prefix):] if message.startswith(prefix) else message asyncio.get_event_loop().create_task(notify_mod.send_notification( host_name, notify_mod.Notification( title=title, body=body, level=lvl, ), )) # Log to journal if self.journal is not None: try: loop = asyncio.get_event_loop() loop.create_task(self.journal.log_threshold_event( host_name=host_name, metric_path=metric_path, old_level=old_level.name, new_level=new_level.name, value=value, )) except Exception as e: logger.debug(f"Failed to log threshold event to journal: {e}") # Log to eventlog as well eventlog(host_name, lvl, message, service="threshold") def _format_display( self, display_format: str, value: Any, threshold_value: Optional[float], op_symbol: str, plugin_data: Optional[Dict[str, Any]] = None, check_name: Optional[str] = None, metric_name: Optional[str] = None, ) -> str: """Format the display string using available data. Available template variables: {value} - current metric value {threshold_value} - threshold that was exceeded {op_symbol} - comparison operator (>, <, >=, <=, ==, !=) {check_name} - prefix stripped for generic threshold match (e.g. "check_disk_root" when metric "check_disk_root_status_code" matched generic threshold "status_code") {metric_name} - field name within the plugin data dict Any key from plugin_data is also available. Returns: Formatted display string """ if not display_format: display_format = "(threshold: {op_symbol} {threshold_value})" if threshold_value is not None else "" # Build format context with standard variables format_context = { 'value': value, 'op_symbol': op_symbol, } if threshold_value is not None: format_context['threshold_value'] = threshold_value # Add generic-match context variables when available if check_name is not None: format_context['check_name'] = check_name if metric_name is not None: format_context['metric_name'] = metric_name # Add all plugin data fields if available if plugin_data: format_context.update(plugin_data) # For nagios_runner generic matches, expose the matched check's output # and status as short aliases {output} and {status} so display templates # don't need to use the full {check_disk_root_output} form. if check_name and plugin_data: if 'output' not in format_context: output = plugin_data.get(f"{check_name}_output") if output is not None: format_context['output'] = output if 'status' not in format_context: status = plugin_data.get(f"{check_name}_status") if status is not None: format_context['status'] = status try: # Format the display string return display_format.format(**format_context) except KeyError as e: logger.warning( "Missing format variable in display string '%s': %s", display_format, e ) # Fallback to default format return f"(threshold: {op_symbol} {threshold_value})" except Exception as e: logger.error( "Error formatting display string '%s': %s", display_format, e ) return f"(threshold: {op_symbol} {threshold_value})" def _apply_grace( self, host_name: str, alert_state: AlertState, metric_path: str, old_level: AlertLevel, new_level: AlertLevel, value: Any, threshold: ThresholdConfig, plugin_data: Optional[Dict[str, Any]], check_name: Optional[str] = None, metric_name: Optional[str] = None, ) -> None: """Handle a state-change transition with grace-period logic. Transitioning INTO alert (worsening): defers the notification for grace_seconds. De-escalation within alert states (e.g. CRITICAL→WARNING): no new notification; the metric is still alerting so no RECOVER was sent. Transitioning TO OK: - Still in grace window (pending_since set): suppresses both the alert and the recovery — the spike never warranted a page. - Past grace: fires the RECOVER notification normally. """ lvl, message, formatted_msg = self._trigger_notification( host_name, metric_path, old_level, new_level, value, threshold, plugin_data, check_name=check_name, metric_name=metric_name, ) alert_state.formatted_message = formatted_msg if new_level == AlertLevel.OK: if alert_state.pending_since is not None: logger.info( "Alert suppressed (recovered within %.0fs grace): %s on %s", self.grace_seconds, metric_path, host_name, ) alert_state.pending_since = None else: self._send_notification(host_name, lvl, message, metric_path, old_level, new_level, value) elif new_level.value > old_level.value: # Worsening (OK→WARNING, OK→CRITICAL, WARNING→CRITICAL): schedule notification. alert_state.pending_since = time.time() logger.debug( "Alert deferred (%.0fs grace): %s on %s = %s", self.grace_seconds, metric_path, host_name, value, ) else: # De-escalation within alert states (e.g. CRITICAL→WARNING): metric is still # alerting but did not recover, so no new notification. logger.debug( "De-escalation %s→%s for %s on %s, no notification", old_level.name, new_level.name, metric_path, host_name, ) def _check_pending_or_renotify( self, host_name: str, alert_state: AlertState, metric_path: str, value: Any, threshold: ThresholdConfig, plugin_data: Optional[Dict[str, Any]], check_name: Optional[str] = None, metric_name: Optional[str] = None, ) -> None: """Called when alert level is unchanged and non-OK. If a deferred notification is pending and grace_seconds have elapsed, fires it now. Otherwise falls through to normal reminder logic. """ if alert_state.pending_since is not None: if time.time() - alert_state.pending_since >= self.grace_seconds: lvl, message, formatted_msg = self._trigger_notification( host_name, metric_path, AlertLevel.OK, alert_state.level, value, threshold, plugin_data, check_name=check_name, metric_name=metric_name, ) alert_state.formatted_message = formatted_msg self._send_notification( host_name, lvl, message, metric_path, AlertLevel.OK, alert_state.level, value ) alert_state.pending_since = None # else: still within grace window, do nothing else: self._check_renotify(host_name, alert_state, metric_path, value, threshold, plugin_data, check_name=check_name, metric_name=metric_name) def _check_renotify( self, host_name: str, alert_state: AlertState, metric_path: str, value: Any, threshold: ThresholdConfig, plugin_data: Optional[Dict[str, Any]] = None, check_name: Optional[str] = None, metric_name: Optional[str] = None, ): """Check if we should send a repeat notification. Args: host_name: Name of the host alert_state: Current alert state metric_path: Full metric path value: Current metric value threshold: Threshold configuration plugin_data: Optional dictionary of all plugin data fields """ if alert_state.level != AlertLevel.CRITICAL: return # Skip reminders if alert has been acknowledged if alert_state.acknowledged: return now = time.time() # Check if we should re-notify if alert_state.last_notification is None: # First notification already sent during state change alert_state.last_notification = now alert_state.notification_count = 1 return if (now - alert_state.last_notification) >= self.renotify_interval: # Determine which threshold is active threshold_value = None if alert_state.level == AlertLevel.CRITICAL and threshold.critical is not None: threshold_value = threshold.critical elif alert_state.level == AlertLevel.WARNING and threshold.warning is not None: threshold_value = threshold.warning # Format operator symbol op_symbol = threshold.operator.value short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code") # Time to re-notify if threshold_value is not None: # Use display format string threshold_info = self._format_display( threshold.display, value=value, threshold_value=threshold_value, op_symbol=op_symbol, plugin_data=plugin_data, check_name=check_name, metric_name=metric_name, ) body = f"{value} {threshold_info}, ongoing for {int(now - alert_state.since)}s" else: body = f"{value} (ongoing for {int(now - alert_state.since)}s)" message = f"REMINDER ({alert_state.level.name}): {host_name} - {short_path} = {body}" from . import hbdclass host = hbdclass.Host.hosts.get(host_name) if host is None or host.watched: asyncio.get_event_loop().create_task(notify_mod.send_notification( host_name, notify_mod.Notification( title=f"[REMINDER/{alert_state.level.name}] {host_name} {short_path}", body=body, level=alert_state.level.name, ), )) logger.info("Re-notification sent: %s", message) alert_state.last_notification = now alert_state.notification_count += 1 def purge_stale_alerts(self, hbdclass) -> None: """Remove alert states that have no matching threshold configuration. Called after startup (pickle restore) and after each config reload so that alerts orphaned by configuration changes do not linger forever. Alerts whose metric_path is not present in the current threshold config for that host are silently dropped. """ for hostname, host in hbdclass.Host.hosts.items(): if not host.alert_states: continue configured = self.get_thresholds_for_host(hostname) stale = [mp for mp in host.alert_states if self._find_threshold(configured, mp)[0] is None] for mp in stale: logger.info( "Purging stale alert state for %s / %s (no threshold configured)", hostname, mp, ) del host.alert_states[mp] def get_active_alerts(self, alert_states: Dict[str, AlertState]) -> list: """ Get all currently active (non-OK) alerts. Args: alert_states: Host's alert_states dictionary Returns: List of AlertState objects that are not OK """ return [ state for state in alert_states.values() if state.level != AlertLevel.OK ] def get_alert_summary(self, alert_states: Dict[str, AlertState]) -> Dict[str, int]: """ Get summary counts of alert levels. Args: alert_states: Host's alert_states dictionary Returns: Dictionary with counts: {"ok": N, "warning": N, "critical": N} """ summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0} for state in alert_states.values(): if state.level == AlertLevel.OK: summary["ok"] += 1 elif state.level == AlertLevel.WARNING: summary["warning"] += 1 elif state.level == AlertLevel.CRITICAL: summary["critical"] += 1 elif state.level == AlertLevel.UNKNOWN: summary["unknown"] += 1 return summary