""" Threshold checking and alerting for plugin metrics. This module provides a flexible threshold checking system that: - Evaluates plugin metrics against configured warning/critical thresholds - Tracks alert states per host and metric - Prevents alert flapping with hysteresis - Triggers notifications only on state changes - Supports multiple comparison operators """ import logging import time from enum import Enum from typing import Dict, Any, Optional, Tuple, Callable from . import notify as notify_mod from .config import THRESHOLD_DEFAULTS logger = logging.getLogger(__name__) eventlog = notify_mod.eventlog class AlertLevel(Enum): """Alert severity levels.""" OK = 0 WARNING = 1 CRITICAL = 2 UNKNOWN = 3 class ComparisonOperator(Enum): """Supported comparison operators for threshold checks.""" GT = ">" # Greater than GTE = ">=" # Greater than or equal LT = "<" # Less than LTE = "<=" # Less than or equal EQ = "==" # Equal to NEQ = "!=" # Not equal to class AlertState: """Represents the current alert state for a specific metric.""" def __init__(self, metric_path: str): """ Initialize alert state. Args: metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent") """ self.metric_path = metric_path self.level = AlertLevel.OK self.since = time.time() self.last_value = None self.last_check = time.time() self.notification_count = 0 self.last_notification = None self.threshold_value = None # The threshold value that triggered alert self.operator = None # The comparison operator (>, <, >=, etc.) self.formatted_message = None # Formatted display message for UI self.acknowledged = False # Whether alert has been acknowledged self.acknowledged_at = None # Timestamp when acknowledged self.consecutive_count = 0 # Consecutive exceedances while still OK (for count gating) def update( self, level: AlertLevel, value: Any, threshold_value: Optional[float] = None, operator: Optional[str] = None ) -> bool: """ Update alert state. Args: level: New alert level value: Current metric value threshold_value: The threshold value that was exceeded (if applicable) operator: The comparison operator (>, <, >=, etc.) Returns: True if state changed (notification needed), False otherwise """ now = time.time() self.last_check = now self.last_value = value # Update threshold info when alert is active if level != AlertLevel.OK: self.threshold_value = threshold_value self.operator = operator else: # Clear threshold info when returning to OK self.threshold_value = None self.operator = None # Check if state changed if level != self.level: logger.info( "Alert state change for %s: %s -> %s (value: %s)", self.metric_path, self.level.name, level.name, value ) self.level = level self.since = now self.notification_count = 0 # Reset acknowledgment on state change if level != AlertLevel.OK: # Only reset if changing to a different alert level self.acknowledged = False self.acknowledged_at = None return True return False def to_dict(self) -> dict: """Convert alert state to dictionary for serialization.""" import math # Helper to sanitize numeric values for JSON (handle inf/nan) def sanitize_value(val): if isinstance(val, float): if math.isinf(val): return "overdue" if math.isnan(val): return None return val result = { "metric_path": self.metric_path, "level": self.level.name, "since": self.since, "last_value": sanitize_value(self.last_value), "last_check": self.last_check, "notification_count": self.notification_count, "acknowledged": self.acknowledged, } # Include acknowledgment timestamp if acknowledged if self.acknowledged_at is not None: result["acknowledged_at"] = self.acknowledged_at # Include threshold info if available if self.threshold_value is not None: result["threshold_value"] = sanitize_value(self.threshold_value) if self.operator is not None: result["operator"] = self.operator if self.formatted_message is not None: result["formatted_message"] = self.formatted_message return result def __setstate__(self, state): """Restore from pickle, backfilling fields added after the pickle was written.""" self.__dict__.update(state) if not hasattr(self, 'consecutive_count'): self.consecutive_count = 0 def acknowledge(self): """Acknowledge this alert to stop reminder notifications.""" self.acknowledged = True self.acknowledged_at = time.time() logger.info("Alert acknowledged for %s", self.metric_path) def __str__(self): return self.to_dict().__str__() class ThresholdConfig: """Configuration for a single threshold check.""" def __init__( self, metric_path: str, warning: Optional[float] = None, critical: Optional[float] = None, display: Optional[str] = None, operator: str = ">", hysteresis: float = 0.0, enabled: bool = True, count: int = 1, ): """ Initialize threshold configuration. Args: metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent") warning: Warning threshold value critical: Critical threshold value operator: Comparison operator (>, >=, <, <=, ==, !=) hysteresis: Hysteresis percentage to prevent flapping (0.0-1.0) enabled: Whether this threshold is enabled count: Number of consecutive exceedances required before alerting (default 1) """ self.metric_path = metric_path self.warning = warning self.critical = critical self.enabled = enabled self.hysteresis = hysteresis self.display = display self.count = max(1, int(count)) # Parse operator try: self.operator = ComparisonOperator(operator) except ValueError: logger.warning( "Invalid operator '%s' for %s, using '>' as default", operator, metric_path ) self.operator = ComparisonOperator.GT def evaluate(self, value: float) -> AlertLevel: """ Evaluate a value against this threshold. Args: value: Metric value to check Returns: AlertLevel indicating the severity """ if not self.enabled: return AlertLevel.OK try: # Convert value to float for comparison value = float(value) except (TypeError, ValueError): logger.warning("Cannot convert value %s to float for %s", value, self.metric_path) return AlertLevel.UNKNOWN # Check critical threshold first if self.critical is not None: if self._compare(value, self.critical): return AlertLevel.CRITICAL # Then check warning threshold if self.warning is not None: if self._compare(value, self.warning): return AlertLevel.WARNING return AlertLevel.OK def evaluate_with_hysteresis( self, value: float, current_level: AlertLevel ) -> AlertLevel: """ Evaluate with hysteresis to prevent flapping. Args: value: Current metric value current_level: Current alert level Returns: New alert level considering hysteresis """ new_level = self.evaluate(value) # If no hysteresis, return new level if self.hysteresis == 0.0: return new_level # If improving (going to a lower severity), apply hysteresis if new_level.value < current_level.value: # For recovery, value must be better by hysteresis amount if current_level == AlertLevel.CRITICAL and self.critical is not None: threshold = self.critical elif current_level == AlertLevel.WARNING and self.warning is not None: threshold = self.warning else: return new_level # Calculate hysteresis threshold hysteresis_amount = abs(threshold * self.hysteresis) if self.operator in [ComparisonOperator.GT, ComparisonOperator.GTE]: # For "greater than" thresholds, value must go below by hysteresis recovery_threshold = threshold - hysteresis_amount if value >= recovery_threshold: # Not enough improvement, keep current level return current_level elif self.operator in [ComparisonOperator.LT, ComparisonOperator.LTE]: # For "less than" thresholds, value must go above by hysteresis recovery_threshold = threshold + hysteresis_amount if value <= recovery_threshold: # Not enough improvement, keep current level return current_level return new_level def _compare(self, value: float, threshold: float) -> bool: """Perform comparison based on operator.""" if self.operator == ComparisonOperator.GT: return value > threshold elif self.operator == ComparisonOperator.GTE: return value >= threshold elif self.operator == ComparisonOperator.LT: return value < threshold elif self.operator == ComparisonOperator.LTE: return value <= threshold elif self.operator == ComparisonOperator.EQ: return abs(value - threshold) < 1e-9 # Float comparison elif self.operator == ComparisonOperator.NEQ: return abs(value - threshold) >= 1e-9 return False class ThresholdChecker: """Main threshold checking and alerting system.""" def __init__( self, config: Dict[str, Any], renotify_interval: int = 3600, journal: Optional[Any] = None, ): """ Initialize threshold checker. Args: config: Threshold configuration dictionary from YAML renotify_interval: Seconds between repeat notifications (default: 1 hour) journal: Optional MessageJournal instance for logging threshold events """ # Named threshold configurations: {config_name: {metric_path: ThresholdConfig}} self.threshold_configs = {} # Single threshold set for backward compatibility: {metric_path: ThresholdConfig} self.thresholds = {} # Host to config name mapping: {host_name: config_name} self.host_config_mapping = {} # Default config name to use when no mapping exists self.default_config = "default" self.renotify_interval = renotify_interval self.journal = journal # Parse configuration self._parse_config(config) total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values()) if total_thresholds == 0 and len(self.thresholds) > 0: # Backward compatibility: using single threshold set total_thresholds = len(self.thresholds) logger.info("ThresholdChecker initialized with %d thresholds (legacy format)", total_thresholds) else: logger.info( "ThresholdChecker initialized with %d named configurations (%d total thresholds)", len(self.threshold_configs), total_thresholds ) def reload(self, config: Dict[str, Any]): """Reload threshold configuration from new config dict. This clears all existing thresholds and re-parses from the new configuration. Alert states are preserved to maintain hysteresis across reloads. Args: config: New configuration dictionary """ logger.info("Reloading threshold configuration...") # Clear old configuration self.threshold_configs.clear() self.thresholds.clear() self.host_config_mapping.clear() # Parse new configuration self._parse_config(config) total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values()) if total_thresholds == 0 and len(self.thresholds) > 0: total_thresholds = len(self.thresholds) logger.info("Threshold configuration reloaded: %d total thresholds", total_thresholds) def _parse_config(self, config: Dict[str, Any]): """Parse threshold configuration from YAML structure. Supports two formats: 1. Legacy format with direct 'thresholds' section 2. New format with 'threshold_configs' and 'host_threshold_mapping' """ # Check for new multi-config format if "threshold_configs" in config: self._parse_multi_config(config) elif "thresholds" in config: # Legacy single threshold configuration self._parse_legacy_config(config) else: logger.info("No thresholds configured") def _parse_multi_config(self, config: Dict[str, Any]): """Parse multiple named threshold configurations.""" threshold_configs = config.get("threshold_configs", {}) if not threshold_configs: logger.info("No threshold configurations defined") return # Build effective_defaults: THRESHOLD_DEFAULTS merged with the 'default' config (if present). # All other configs inherit any metric not explicitly defined from effective_defaults. effective_defaults: Dict[str, ThresholdConfig] = {} for plugin_name, plugin_thresholds in THRESHOLD_DEFAULTS.get("thresholds", {}).items(): if isinstance(plugin_thresholds, dict): self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults) if "default" in threshold_configs: default_data = threshold_configs["default"] if isinstance(default_data, dict) and "thresholds" in default_data: for plugin_name, plugin_thresholds in default_data["thresholds"].items(): if isinstance(plugin_thresholds, dict): self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults) self.threshold_configs["default"] = dict(effective_defaults) logger.info("Registered 'default' threshold config with %d metrics", len(effective_defaults)) # Parse each named configuration, seeding it with effective_defaults first for config_name, config_data in threshold_configs.items(): if config_name == "default": continue # already handled above if not isinstance(config_data, dict): logger.warning("Invalid threshold config '%s', skipping", config_name) continue if "thresholds" not in config_data: logger.warning("No thresholds in config '%s', skipping", config_name) continue logger.info("Parsing threshold configuration: %s", config_name) self.threshold_configs[config_name] = dict(effective_defaults) thresholds_config = config_data["thresholds"] for plugin_name, plugin_thresholds in thresholds_config.items(): if not isinstance(plugin_thresholds, dict): continue self._parse_plugin_thresholds( plugin_name, plugin_thresholds, target_dict=self.threshold_configs[config_name] ) # Parse host to config mapping from two possible sources # 1. New format: hosts section with threshold_config attribute if "hosts" in config: hosts_config = config["hosts"] if isinstance(hosts_config, dict): for host_name, host_attrs in hosts_config.items(): if isinstance(host_attrs, dict) and "threshold_config" in host_attrs: self.host_config_mapping[host_name] = host_attrs["threshold_config"] # 2. Legacy format: host_threshold_mapping section (for backward compatibility) if "host_threshold_mapping" in config: legacy_mapping = config.get("host_threshold_mapping", {}) if isinstance(legacy_mapping, dict): self.host_config_mapping.update(legacy_mapping) # Set default config (first one alphabetically or explicitly set) self.default_config = config.get("default_threshold_config", "default") if self.default_config not in self.threshold_configs and self.threshold_configs: # Use first available config as default self.default_config = sorted(self.threshold_configs.keys())[0] logger.info("Using '%s' as default threshold config", self.default_config) logger.info( "Loaded %d threshold configurations with %d host mappings", len(self.threshold_configs), len(self.host_config_mapping) ) def _parse_legacy_config(self, config: Dict[str, Any]): """Parse legacy single threshold configuration for backward compatibility.""" if not config or "thresholds" not in config: logger.info("No thresholds configured") return thresholds_config = config["thresholds"] for plugin_name, plugin_thresholds in thresholds_config.items(): if not isinstance(plugin_thresholds, dict): continue self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=self.thresholds) def _parse_plugin_thresholds( self, plugin_name: str, thresholds: Dict[str, Any], target_dict: Optional[Dict[str, ThresholdConfig]] = None ): """Parse thresholds for a specific plugin. Args: plugin_name: Name of the plugin thresholds: Threshold configuration dictionary target_dict: Dictionary to store parsed thresholds (defaults to self.thresholds) """ if target_dict is None: target_dict = self.thresholds # Special handling for RTT thresholds (per-host) if plugin_name == "rtt": self._parse_rtt_thresholds(thresholds, target_dict) return for metric_name, threshold_config in thresholds.items(): if not isinstance(threshold_config, dict): continue # Handle nested metrics (e.g., partitions./.percent) if metric_name == "partitions": self._parse_partition_thresholds(plugin_name, threshold_config, target_dict) continue metric_path = f"{plugin_name}.{metric_name}" # Extract threshold values warning = threshold_config.get("warning") critical = threshold_config.get("critical") operator = threshold_config.get("operator", ">") display = threshold_config.get("display", "(threshold: {op_symbol} {threshold_value})") hysteresis = threshold_config.get("hysteresis", 0.1) # 10% default enabled = threshold_config.get("enabled", True) if warning is None and critical is None: logger.warning("No thresholds defined for %s, skipping", metric_path) continue threshold = ThresholdConfig( metric_path=metric_path, warning=warning, critical=critical, operator=operator, hysteresis=hysteresis, enabled=enabled, display=display ) target_dict[metric_path] = threshold logger.debug( "Registered threshold for %s: warn=%s, crit=%s, op=%s", metric_path, warning, critical, operator ) def _parse_partition_thresholds( self, plugin_name: str, partitions: Dict[str, Any], target_dict: Optional[Dict[str, ThresholdConfig]] = None ): """Parse partition-specific thresholds for disk monitoring. Args: plugin_name: Name of the plugin partitions: Partition threshold configuration target_dict: Dictionary to store parsed thresholds """ if target_dict is None: target_dict = self.thresholds for partition, metrics in partitions.items(): if not isinstance(metrics, dict): continue for metric_name, threshold_config in metrics.items(): if not isinstance(threshold_config, dict): continue # Create metric path like "disk_monitor./dev/sda1.percent" metric_path = f"{plugin_name}.{partition}.{metric_name}" warning = threshold_config.get("warning") critical = threshold_config.get("critical") operator = threshold_config.get("operator", ">") hysteresis = threshold_config.get("hysteresis", 0.1) enabled = threshold_config.get("enabled", True) display = threshold_config.get("display") if warning is None and critical is None: continue threshold = ThresholdConfig( metric_path=metric_path, warning=warning, critical=critical, operator=operator, hysteresis=hysteresis, enabled=enabled, display=display ) target_dict[metric_path] = threshold def _parse_rtt_thresholds( self, rtt_thresholds: Dict[str, Any], target_dict: Optional[Dict[str, ThresholdConfig]] = None ): """Parse RTT thresholds (network latency thresholds). RTT thresholds are configured as: thresholds: rtt: warning: 100.0 # ms critical: 500.0 # ms Args: rtt_thresholds: RTT threshold configuration target_dict: Dictionary to store parsed thresholds """ if target_dict is None: target_dict = self.thresholds if not isinstance(rtt_thresholds, dict): return # Metric path is simply "rtt" (not per-host) metric_path = "rtt" warning = rtt_thresholds.get("warning") critical = rtt_thresholds.get("critical") operator = rtt_thresholds.get("operator", ">") hysteresis = rtt_thresholds.get("hysteresis", 0.1) # 10% default enabled = rtt_thresholds.get("enabled", True) display = rtt_thresholds.get("display") count = rtt_thresholds.get("count", 1) if warning is None and critical is None: logger.warning("No RTT thresholds defined, skipping") return threshold = ThresholdConfig( metric_path=metric_path, warning=warning, critical=critical, operator=operator, hysteresis=hysteresis, enabled=enabled, display=display, count=count, ) target_dict[metric_path] = threshold logger.debug( "Registered RTT threshold: warn=%s ms, crit=%s ms, count=%d", warning, critical, count, ) def get_thresholds_for_host(self, host_name: str) -> Dict[str, ThresholdConfig]: """Get the appropriate threshold configuration for a host. Args: host_name: Name of the host Returns: Dictionary of thresholds for this host """ # Legacy mode: single threshold set for all hosts if self.thresholds and not self.threshold_configs: return self.thresholds # Multi-config mode: look up host-specific configuration if self.threshold_configs: config_name = self.host_config_mapping.get(host_name, self.default_config) if config_name in self.threshold_configs: return self.threshold_configs[config_name] else: logger.warning( "Threshold config '%s' not found for host '%s', using default '%s'", config_name, host_name, self.default_config ) return self.threshold_configs.get(self.default_config, {}) # No thresholds configured return {} def check_value( self, host_name: str, metric_path: str, value: float, alert_states: Dict[str, AlertState], ) -> Optional[Tuple[AlertLevel, AlertLevel]]: """ Check a single value against configured threshold. Args: host_name: Name of the host metric_path: Full metric path (e.g., "rtt.hostname") value: The metric value to check alert_states: Host's alert_states dictionary Returns: Tuple of (old_level, new_level) if state changed, None otherwise """ # Get host-specific thresholds thresholds = self.get_thresholds_for_host(host_name) if metric_path not in thresholds: return None threshold = thresholds[metric_path] # Get or create alert state if metric_path not in alert_states: alert_states[metric_path] = AlertState(metric_path) alert_state = alert_states[metric_path] # Evaluate threshold with hysteresis new_level = threshold.evaluate_with_hysteresis( value, alert_state.level ) # Apply consecutive-count gating: when currently OK, require threshold.count # consecutive exceedances before escalating to WARNING/CRITICAL. if new_level == AlertLevel.OK: # Value is fine (or recovered) — reset the pending counter immediately. alert_state.consecutive_count = 0 elif alert_state.level == AlertLevel.OK and new_level != AlertLevel.OK: # First time we exceed while still OK: count up. alert_state.consecutive_count += 1 if alert_state.consecutive_count < threshold.count: logger.debug( "RTT threshold exceeded %d/%d consecutive times for %s on %s", alert_state.consecutive_count, threshold.count, metric_path, host_name, ) return None # Count reached — fire the alert and reset the counter. alert_state.consecutive_count = 0 # Determine which threshold was exceeded threshold_value = None if new_level == AlertLevel.CRITICAL and threshold.critical is not None: threshold_value = threshold.critical elif new_level == AlertLevel.WARNING and threshold.warning is not None: threshold_value = threshold.warning # Update state and check for changes old_level = alert_state.level if alert_state.update(new_level, value, threshold_value, threshold.operator.value): # For check_value, we don't have full plugin data, pass None lvl, message, formatted_msg = self._trigger_notification(host_name, metric_path, old_level, new_level, value, threshold, None) # Update alert state with formatted message alert_state.formatted_message = formatted_msg self._send_notification(host_name, lvl, message, metric_path, old_level, new_level, value) return (old_level, new_level) elif new_level != AlertLevel.OK: # Check if we should re-notify self._check_renotify(host_name, alert_state, metric_path, value, threshold, None) return None def check_plugin_data( self, host_name: str, plugin_name: str, data: Dict[str, Any], alert_states: Dict[str, AlertState], ) -> list: """ Check plugin data against configured thresholds. Args: host_name: Name of the host plugin_name: Name of the plugin data: Plugin data dictionary alert_states: Host's alert_states dictionary Returns: List of (metric_path, old_level, new_level, value) tuples for state changes """ state_changes = [] # Get host-specific thresholds thresholds = self.get_thresholds_for_host(host_name) # Check flat metrics for metric_name, value in data.items(): metric_path = f"{plugin_name}.{metric_name}" if metric_path not in thresholds: continue threshold = thresholds[metric_path] # Get or create alert state if metric_path not in alert_states: alert_states[metric_path] = AlertState(metric_path) alert_state = alert_states[metric_path] # Evaluate threshold with hysteresis new_level = threshold.evaluate_with_hysteresis( value, alert_state.level ) # Determine which threshold was exceeded threshold_value = None if new_level == AlertLevel.CRITICAL and threshold.critical is not None: threshold_value = threshold.critical elif new_level == AlertLevel.WARNING and threshold.warning is not None: threshold_value = threshold.warning # Update state and check for changes old_level = alert_state.level if alert_state.update(new_level, value, threshold_value, threshold.operator.value): state_changes.append((metric_path, old_level, new_level, value)) lvl, message, formatted_msg = self._trigger_notification(host_name, metric_path, old_level, new_level, value, threshold, data) # Update alert state with formatted message alert_state.formatted_message = formatted_msg self._send_notification(host_name, lvl, message, metric_path, old_level, new_level, value) elif new_level != AlertLevel.OK: # Check if we should re-notify self._check_renotify(host_name, alert_state, metric_path, value, threshold, data) # Check nested metrics (e.g., partition data in disk_monitor) self._check_nested_metrics( host_name, plugin_name, data, alert_states, state_changes ) return state_changes def _check_nested_metrics( self, host_name: str, plugin_name: str, data: Dict[str, Any], alert_states: Dict[str, AlertState], state_changes: list, ): """Check nested metrics like partition-specific thresholds.""" # Get host-specific thresholds thresholds = self.get_thresholds_for_host(host_name) # Look for partition data in disk_monitor if plugin_name == "disk_monitor" and "partitions" in data: partitions = data["partitions"] if not isinstance(partitions, dict): return for partition, metrics in partitions.items(): if not isinstance(metrics, dict): continue for metric_name, value in metrics.items(): metric_path = f"{plugin_name}.{partition}.{metric_name}" if metric_path not in thresholds: continue threshold = thresholds[metric_path] if metric_path not in alert_states: alert_states[metric_path] = AlertState(metric_path) alert_state = alert_states[metric_path] new_level = threshold.evaluate_with_hysteresis( value, alert_state.level ) # Determine which threshold was exceeded threshold_value = None if new_level == AlertLevel.CRITICAL and threshold.critical is not None: threshold_value = threshold.critical elif new_level == AlertLevel.WARNING and threshold.warning is not None: threshold_value = threshold.warning old_level = alert_state.level if alert_state.update(new_level, value, threshold_value, threshold.operator.value): state_changes.append((metric_path, old_level, new_level, value)) lvl, message, formatted_msg = self._trigger_notification( host_name, metric_path, old_level, new_level, value, threshold, data # Pass full plugin data for format string ) # Update alert state with formatted message alert_state.formatted_message = formatted_msg self._send_notification(host_name, lvl, message, metric_path, old_level, new_level, value) elif new_level != AlertLevel.OK: self._check_renotify(host_name, alert_state, metric_path, value, threshold, data) def _trigger_notification( self, host_name: str, metric_path: str, old_level: AlertLevel, new_level: AlertLevel, value: Any, threshold: ThresholdConfig, plugin_data: Optional[Dict[str, Any]] = None, ): """Trigger a notification for an alert state change. Args: host_name: Name of the host metric_path: Full metric path old_level: Previous alert level new_level: New alert level value: Current metric value threshold: Threshold configuration plugin_data: Optional dictionary of all plugin data fields for format string """ # Determine which threshold was exceeded threshold_value = None if new_level == AlertLevel.CRITICAL and threshold.critical is not None: threshold_value = threshold.critical elif new_level == AlertLevel.WARNING and threshold.warning is not None: threshold_value = threshold.warning # Format operator symbol op_symbol = threshold.operator.value # Use a display-friendly value (inf is the sentinel for "overdue") import math display_value = "overdue" if isinstance(value, float) and math.isinf(value) else value # Format message if new_level == AlertLevel.OK: lvl = "RECOVERED" message = f"{metric_path} = {display_value} ({old_level.name} -> OK)" elif new_level == AlertLevel.WARNING: lvl = "WARNING" if threshold_value is not None: threshold_info = self._format_display( threshold.display, value=display_value, threshold_value=threshold_value, op_symbol=op_symbol, plugin_data=plugin_data ) message = f"{metric_path} = {display_value} {threshold_info}" else: message = f"{metric_path} = {display_value}" elif new_level == AlertLevel.CRITICAL: lvl = "CRITICAL" if threshold_value is not None: threshold_info = self._format_display( threshold.display, value=display_value, threshold_value=threshold_value, op_symbol=op_symbol, plugin_data=plugin_data ) message = f"{metric_path} = {display_value} {threshold_info}" else: message = f"{metric_path} = {display_value}" else: lvl = "UNKNOWN" message = f"{metric_path} = {display_value}" # Return the formatted threshold info for storing in AlertState formatted_threshold_msg = None if threshold_value is not None and new_level != AlertLevel.OK: formatted_threshold_msg = self._format_display( threshold.display, value=display_value, threshold_value=threshold_value, op_symbol=op_symbol, plugin_data=plugin_data ) return lvl, message, formatted_threshold_msg def _send_notification( self, host_name: str, lvl: str, message: str, metric_path: str, old_level: AlertLevel, new_level: AlertLevel, value: Any, ): """Send notification and log to journal/eventlog.""" # Send notification using host-specific channels try: notify_mod.pushmsg_for_host(host_name, f"{lvl}: {host_name} - {message}") logger.info("Notification sent: %s", message) except Exception as e: logger.error("Failed to send notification: %s", e) # Log to journal if self.journal is not None: try: import asyncio loop = asyncio.get_event_loop() loop.create_task(self.journal.log_threshold_event( host_name=host_name, metric_path=metric_path, old_level=old_level.name, new_level=new_level.name, value=value, )) except Exception as e: logger.debug(f"Failed to log threshold event to journal: {e}") # Log to eventlog as well eventlog(host_name, lvl, message, service="threshold") def _format_display( self, display_format: str, value: Any, threshold_value: float, op_symbol: str, plugin_data: Optional[Dict[str, Any]] = None, ) -> str: """Format the display string using available data. Args: display_format: Format string from threshold config value: Current metric value threshold_value: Threshold value that was exceeded op_symbol: Comparison operator symbol plugin_data: Optional dictionary of plugin data fields Returns: Formatted display string """ # Build format context with standard variables format_context = { 'value': value, 'threshold_value': threshold_value, 'op_symbol': op_symbol, } # Add all plugin data fields if available if plugin_data: format_context.update(plugin_data) try: # Format the display string return display_format.format(**format_context) except KeyError as e: logger.warning( "Missing format variable in display string '%s': %s", display_format, e ) # Fallback to default format return f"(threshold: {op_symbol} {threshold_value})" except Exception as e: logger.error( "Error formatting display string '%s': %s", display_format, e ) return f"(threshold: {op_symbol} {threshold_value})" def _check_renotify( self, host_name: str, alert_state: AlertState, metric_path: str, value: Any, threshold: ThresholdConfig, plugin_data: Optional[Dict[str, Any]] = None, ): """Check if we should send a repeat notification. Args: host_name: Name of the host alert_state: Current alert state metric_path: Full metric path value: Current metric value threshold: Threshold configuration plugin_data: Optional dictionary of all plugin data fields """ if alert_state.level != AlertLevel.CRITICAL: return # Skip reminders if alert has been acknowledged if alert_state.acknowledged: return now = time.time() # Check if we should re-notify if alert_state.last_notification is None: # First notification already sent during state change alert_state.last_notification = now alert_state.notification_count = 1 return if (now - alert_state.last_notification) >= self.renotify_interval: # Determine which threshold is active threshold_value = None if alert_state.level == AlertLevel.CRITICAL and threshold.critical is not None: threshold_value = threshold.critical elif alert_state.level == AlertLevel.WARNING and threshold.warning is not None: threshold_value = threshold.warning # Format operator symbol op_symbol = threshold.operator.value # Time to re-notify if threshold_value is not None: # Use display format string threshold_info = self._format_display( threshold.display, value=value, threshold_value=threshold_value, op_symbol=op_symbol, plugin_data=plugin_data ) message = f"REMINDER ({alert_state.level.name}): {host_name} - {metric_path} = {value} {threshold_info}, ongoing for {int(now - alert_state.since)}s" else: message = f"REMINDER ({alert_state.level.name}): {host_name} - {metric_path} = {value} (ongoing for {int(now - alert_state.since)}s)" # Send re-notification using host-specific channels try: notify_mod.pushmsg_for_host(host_name, message) alert_state.last_notification = now alert_state.notification_count += 1 logger.info("Re-notification sent: %s", message) except Exception as e: logger.error("Failed to send re-notification: %s", e) def get_active_alerts(self, alert_states: Dict[str, AlertState]) -> list: """ Get all currently active (non-OK) alerts. Args: alert_states: Host's alert_states dictionary Returns: List of AlertState objects that are not OK """ return [ state for state in alert_states.values() if state.level != AlertLevel.OK ] def get_alert_summary(self, alert_states: Dict[str, AlertState]) -> Dict[str, int]: """ Get summary counts of alert levels. Args: alert_states: Host's alert_states dictionary Returns: Dictionary with counts: {"ok": N, "warning": N, "critical": N} """ summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0} for state in alert_states.values(): if state.level == AlertLevel.OK: summary["ok"] += 1 elif state.level == AlertLevel.WARNING: summary["warning"] += 1 elif state.level == AlertLevel.CRITICAL: summary["critical"] += 1 elif state.level == AlertLevel.UNKNOWN: summary["unknown"] += 1 return summary