3301dbfe34
Host Overview (plugins.html): show Update and Delete buttons in the host-right zone when the logged-in user is the host owner (or admin / unauthenticated mode). Buttons link to /u?h=<host> and /d?h=<host> with stopPropagation so they don't toggle the accordion; Delete prompts for confirmation first. ThresholdChecker.purge_stale_alerts(): removes alert states whose metric_path has no matching threshold in the current config. Called after startup pickle restore and after every SIGHUP config reload so alerts orphaned by upgrades or config changes do not persist indefinitely. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1337 lines
52 KiB
Python
1337 lines
52 KiB
Python
"""
|
|
Threshold checking and alerting for plugin metrics.
|
|
|
|
This module provides a flexible threshold checking system that:
|
|
- Evaluates plugin metrics against configured warning/critical thresholds
|
|
- Tracks alert states per host and metric
|
|
- Prevents alert flapping with hysteresis
|
|
- Triggers notifications only on state changes
|
|
- Supports multiple comparison operators
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from enum import Enum
|
|
from typing import Dict, List, Any, Optional, Tuple, Callable
|
|
from . import notify as notify_mod
|
|
from .config import THRESHOLD_DEFAULTS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
eventlog = notify_mod.eventlog
|
|
|
|
class AlertLevel(Enum):
|
|
"""Alert severity levels."""
|
|
OK = 0
|
|
WARNING = 1
|
|
CRITICAL = 2
|
|
UNKNOWN = 3
|
|
|
|
|
|
class ComparisonOperator(Enum):
|
|
"""Supported comparison operators for threshold checks."""
|
|
GT = ">" # Greater than
|
|
GTE = ">=" # Greater than or equal
|
|
LT = "<" # Less than
|
|
LTE = "<=" # Less than or equal
|
|
EQ = "==" # Equal to
|
|
NEQ = "!=" # Not equal to
|
|
|
|
|
|
class AlertState:
|
|
"""Represents the current alert state for a specific metric."""
|
|
|
|
def __init__(self, metric_path: str):
|
|
"""
|
|
Initialize alert state.
|
|
|
|
Args:
|
|
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
|
|
"""
|
|
self.metric_path = metric_path
|
|
self.level = AlertLevel.OK
|
|
self.since = time.time()
|
|
self.last_value = None
|
|
self.last_check = time.time()
|
|
self.notification_count = 0
|
|
self.last_notification = None
|
|
self.threshold_value = None # The threshold value that triggered alert
|
|
self.operator = None # The comparison operator (>, <, >=, etc.)
|
|
self.formatted_message = None # Formatted display message for UI
|
|
self.acknowledged = False # Whether alert has been acknowledged
|
|
self.acknowledged_at = None # Timestamp when acknowledged
|
|
self.consecutive_count = 0 # Consecutive exceedances while still OK (for count gating)
|
|
self.pending_since: Optional[float] = None # non-None while waiting out grace period before notifying
|
|
|
|
def update(
|
|
self,
|
|
level: AlertLevel,
|
|
value: Any,
|
|
threshold_value: Optional[float] = None,
|
|
operator: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Update alert state.
|
|
|
|
Args:
|
|
level: New alert level
|
|
value: Current metric value
|
|
threshold_value: The threshold value that was exceeded (if applicable)
|
|
operator: The comparison operator (>, <, >=, etc.)
|
|
|
|
Returns:
|
|
True if state changed (notification needed), False otherwise
|
|
"""
|
|
now = time.time()
|
|
self.last_check = now
|
|
self.last_value = value
|
|
|
|
# Update threshold info when alert is active
|
|
if level != AlertLevel.OK:
|
|
self.threshold_value = threshold_value
|
|
self.operator = operator
|
|
else:
|
|
# Clear threshold info when returning to OK
|
|
self.threshold_value = None
|
|
self.operator = None
|
|
|
|
# Check if state changed
|
|
if level != self.level:
|
|
logger.info(
|
|
"Alert state change for %s: %s -> %s (value: %s)",
|
|
self.metric_path,
|
|
self.level.name,
|
|
level.name,
|
|
value
|
|
)
|
|
self.level = level
|
|
self.since = now
|
|
self.notification_count = 0
|
|
self.last_notification = None # restart reminder interval on level change
|
|
# Reset acknowledgment on state change
|
|
if level != AlertLevel.OK:
|
|
# Only reset if changing to a different alert level
|
|
self.acknowledged = False
|
|
self.acknowledged_at = None
|
|
return True
|
|
|
|
return False
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert alert state to dictionary for serialization."""
|
|
import math
|
|
|
|
# Helper to sanitize numeric values for JSON (handle inf/nan)
|
|
def sanitize_value(val):
|
|
if isinstance(val, float):
|
|
if math.isinf(val):
|
|
return "overdue"
|
|
if math.isnan(val):
|
|
return None
|
|
return val
|
|
|
|
result = {
|
|
"metric_path": self.metric_path,
|
|
"level": self.level.name,
|
|
"since": self.since,
|
|
"last_value": sanitize_value(self.last_value),
|
|
"last_check": self.last_check,
|
|
"notification_count": self.notification_count,
|
|
"acknowledged": self.acknowledged,
|
|
}
|
|
|
|
# Include acknowledgment timestamp if acknowledged
|
|
if self.acknowledged_at is not None:
|
|
result["acknowledged_at"] = self.acknowledged_at
|
|
|
|
# Include threshold info if available
|
|
if self.threshold_value is not None:
|
|
result["threshold_value"] = sanitize_value(self.threshold_value)
|
|
if self.operator is not None:
|
|
result["operator"] = self.operator
|
|
if self.formatted_message is not None:
|
|
result["formatted_message"] = self.formatted_message
|
|
|
|
return result
|
|
|
|
def __setstate__(self, state):
|
|
"""Restore from pickle, backfilling fields added after the pickle was written."""
|
|
self.__dict__.update(state)
|
|
if not hasattr(self, 'consecutive_count'):
|
|
self.consecutive_count = 0
|
|
|
|
def acknowledge(self):
|
|
"""Acknowledge this alert to stop reminder notifications."""
|
|
self.acknowledged = True
|
|
self.acknowledged_at = time.time()
|
|
logger.info("Alert acknowledged for %s", self.metric_path)
|
|
|
|
def __str__(self):
|
|
return self.to_dict().__str__()
|
|
|
|
class ThresholdConfig:
|
|
"""Configuration for a single threshold check."""
|
|
|
|
def __init__(
|
|
self,
|
|
metric_path: str,
|
|
warning: Optional[float] = None,
|
|
critical: Optional[float] = None,
|
|
display: Optional[str] = None,
|
|
operator: str = ">",
|
|
hysteresis: float = 0.0,
|
|
enabled: bool = True,
|
|
count: int = 1,
|
|
):
|
|
"""
|
|
Initialize threshold configuration.
|
|
|
|
Args:
|
|
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
|
|
warning: Warning threshold value
|
|
critical: Critical threshold value
|
|
operator: Comparison operator (>, >=, <, <=, ==, !=)
|
|
hysteresis: Hysteresis percentage to prevent flapping (0.0-1.0)
|
|
enabled: Whether this threshold is enabled
|
|
count: Number of consecutive exceedances required before alerting (default 1)
|
|
"""
|
|
self.metric_path = metric_path
|
|
self.warning = warning
|
|
self.critical = critical
|
|
self.enabled = enabled
|
|
self.hysteresis = hysteresis
|
|
self.display = display
|
|
self.count = max(1, int(count))
|
|
|
|
# Parse operator
|
|
try:
|
|
self.operator = ComparisonOperator(operator)
|
|
except ValueError:
|
|
logger.warning(
|
|
"Invalid operator '%s' for %s, using '>' as default",
|
|
operator,
|
|
metric_path
|
|
)
|
|
self.operator = ComparisonOperator.GT
|
|
|
|
def evaluate(self, value: float) -> AlertLevel:
|
|
"""
|
|
Evaluate a value against this threshold.
|
|
|
|
Args:
|
|
value: Metric value to check
|
|
|
|
Returns:
|
|
AlertLevel indicating the severity
|
|
"""
|
|
if not self.enabled:
|
|
return AlertLevel.OK
|
|
|
|
try:
|
|
# Convert value to float for comparison
|
|
value = float(value)
|
|
except (TypeError, ValueError):
|
|
logger.warning("Cannot convert value %s to float for %s", value, self.metric_path)
|
|
return AlertLevel.UNKNOWN
|
|
|
|
# Check critical threshold first
|
|
if self.critical is not None:
|
|
if self._compare(value, self.critical):
|
|
return AlertLevel.CRITICAL
|
|
|
|
# Then check warning threshold
|
|
if self.warning is not None:
|
|
if self._compare(value, self.warning):
|
|
return AlertLevel.WARNING
|
|
|
|
return AlertLevel.OK
|
|
|
|
def evaluate_with_hysteresis(
|
|
self,
|
|
value: float,
|
|
current_level: AlertLevel
|
|
) -> AlertLevel:
|
|
"""
|
|
Evaluate with hysteresis to prevent flapping.
|
|
|
|
Args:
|
|
value: Current metric value
|
|
current_level: Current alert level
|
|
|
|
Returns:
|
|
New alert level considering hysteresis
|
|
"""
|
|
new_level = self.evaluate(value)
|
|
|
|
# If no hysteresis, return new level
|
|
if self.hysteresis == 0.0:
|
|
return new_level
|
|
|
|
# If improving (going to a lower severity), apply hysteresis
|
|
if new_level.value < current_level.value:
|
|
# For recovery, value must be better by hysteresis amount
|
|
if current_level == AlertLevel.CRITICAL and self.critical is not None:
|
|
threshold = self.critical
|
|
elif current_level == AlertLevel.WARNING and self.warning is not None:
|
|
threshold = self.warning
|
|
else:
|
|
return new_level
|
|
|
|
# Calculate hysteresis threshold
|
|
hysteresis_amount = abs(threshold * self.hysteresis)
|
|
|
|
if self.operator in [ComparisonOperator.GT, ComparisonOperator.GTE]:
|
|
# For "greater than" thresholds, value must go below by hysteresis
|
|
recovery_threshold = threshold - hysteresis_amount
|
|
if value >= recovery_threshold:
|
|
# Not enough improvement, keep current level
|
|
return current_level
|
|
elif self.operator in [ComparisonOperator.LT, ComparisonOperator.LTE]:
|
|
# For "less than" thresholds, value must go above by hysteresis
|
|
recovery_threshold = threshold + hysteresis_amount
|
|
if value <= recovery_threshold:
|
|
# Not enough improvement, keep current level
|
|
return current_level
|
|
|
|
return new_level
|
|
|
|
def _compare(self, value: float, threshold: float) -> bool:
|
|
"""Perform comparison based on operator."""
|
|
if self.operator == ComparisonOperator.GT:
|
|
return value > threshold
|
|
elif self.operator == ComparisonOperator.GTE:
|
|
return value >= threshold
|
|
elif self.operator == ComparisonOperator.LT:
|
|
return value < threshold
|
|
elif self.operator == ComparisonOperator.LTE:
|
|
return value <= threshold
|
|
elif self.operator == ComparisonOperator.EQ:
|
|
return abs(value - threshold) < 1e-9 # Float comparison
|
|
elif self.operator == ComparisonOperator.NEQ:
|
|
return abs(value - threshold) >= 1e-9
|
|
return False
|
|
|
|
|
|
class ThresholdChecker:
|
|
"""Main threshold checking and alerting system."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Dict[str, Any],
|
|
renotify_interval: int = 3600,
|
|
journal: Optional[Any] = None,
|
|
):
|
|
"""
|
|
Initialize threshold checker.
|
|
|
|
Args:
|
|
config: Threshold configuration dictionary from YAML
|
|
renotify_interval: Seconds between repeat notifications (default: 1 hour)
|
|
journal: Optional MessageJournal instance for logging threshold events
|
|
"""
|
|
# Named threshold configurations (pre-merged: defaults + overrides): {config_name: {metric_path: ThresholdConfig}}
|
|
self.threshold_configs = {}
|
|
|
|
# Raw overrides only for each named config (no defaults baked in): {config_name: {metric_path: ThresholdConfig}}
|
|
self.threshold_raw_configs: Dict[str, Dict[str, ThresholdConfig]] = {}
|
|
|
|
# Single threshold set for backward compatibility: {metric_path: ThresholdConfig}
|
|
self.thresholds = {}
|
|
|
|
# Host to ordered list of config names: {host_name: [config_name, ...]}
|
|
self.host_config_mapping: Dict[str, List[str]] = {}
|
|
|
|
# Default config name to use when no mapping exists
|
|
self.default_config = "default"
|
|
|
|
self.renotify_interval = renotify_interval
|
|
self.grace_seconds: float = float(config.get("grace", 2))
|
|
self.journal = journal
|
|
|
|
# Parse configuration
|
|
self._parse_config(config)
|
|
|
|
total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values())
|
|
if total_thresholds == 0 and len(self.thresholds) > 0:
|
|
# Backward compatibility: using single threshold set
|
|
total_thresholds = len(self.thresholds)
|
|
logger.info("ThresholdChecker initialized with %d thresholds (legacy format)", total_thresholds)
|
|
else:
|
|
logger.info(
|
|
"ThresholdChecker initialized with %d named configurations (%d total thresholds)",
|
|
len(self.threshold_configs),
|
|
total_thresholds
|
|
)
|
|
|
|
def reload(self, config: Dict[str, Any]):
|
|
"""Reload threshold configuration from new config dict.
|
|
|
|
This clears all existing thresholds and re-parses from the new configuration.
|
|
Alert states are preserved to maintain hysteresis across reloads.
|
|
|
|
Args:
|
|
config: New configuration dictionary
|
|
"""
|
|
logger.info("Reloading threshold configuration...")
|
|
|
|
# Clear old configuration
|
|
self.threshold_configs.clear()
|
|
self.threshold_raw_configs.clear()
|
|
self.thresholds.clear()
|
|
self.host_config_mapping.clear()
|
|
self.grace_seconds = float(config.get("grace", 2))
|
|
|
|
# Parse new configuration
|
|
self._parse_config(config)
|
|
|
|
total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values())
|
|
if total_thresholds == 0 and len(self.thresholds) > 0:
|
|
total_thresholds = len(self.thresholds)
|
|
|
|
logger.info("Threshold configuration reloaded: %d total thresholds", total_thresholds)
|
|
|
|
def _parse_config(self, config: Dict[str, Any]):
|
|
"""Parse threshold configuration from YAML structure.
|
|
|
|
Supports two formats:
|
|
1. Legacy format with direct 'thresholds' section
|
|
2. New format with 'threshold_configs' and 'host_threshold_mapping'
|
|
"""
|
|
# Check for new multi-config format
|
|
if "threshold_configs" in config:
|
|
self._parse_multi_config(config)
|
|
elif "thresholds" in config:
|
|
# Legacy single threshold configuration
|
|
self._parse_legacy_config(config)
|
|
else:
|
|
logger.info("No thresholds configured")
|
|
|
|
def _parse_multi_config(self, config: Dict[str, Any]):
|
|
"""Parse multiple named threshold configurations."""
|
|
threshold_configs = config.get("threshold_configs", {})
|
|
|
|
if not threshold_configs:
|
|
logger.info("No threshold configurations defined")
|
|
return
|
|
|
|
# Build effective_defaults: THRESHOLD_DEFAULTS merged with the 'default' config (if present).
|
|
# All other configs inherit any metric not explicitly defined from effective_defaults.
|
|
effective_defaults: Dict[str, ThresholdConfig] = {}
|
|
for plugin_name, plugin_thresholds in THRESHOLD_DEFAULTS.get("thresholds", {}).items():
|
|
if isinstance(plugin_thresholds, dict):
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults)
|
|
|
|
if "default" in threshold_configs:
|
|
default_data = threshold_configs["default"]
|
|
if isinstance(default_data, dict) and "thresholds" in default_data:
|
|
for plugin_name, plugin_thresholds in default_data["thresholds"].items():
|
|
if isinstance(plugin_thresholds, dict):
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults)
|
|
|
|
self.threshold_configs["default"] = dict(effective_defaults)
|
|
self.threshold_raw_configs["default"] = {}
|
|
logger.info("Registered 'default' threshold config with %d metrics", len(effective_defaults))
|
|
|
|
# Parse each named configuration
|
|
for config_name, config_data in threshold_configs.items():
|
|
if config_name == "default":
|
|
continue # already handled above
|
|
|
|
if not isinstance(config_data, dict):
|
|
logger.warning("Invalid threshold config '%s', skipping", config_name)
|
|
continue
|
|
|
|
if "thresholds" not in config_data:
|
|
logger.warning("No thresholds in config '%s', skipping", config_name)
|
|
continue
|
|
|
|
logger.info("Parsing threshold configuration: %s", config_name)
|
|
|
|
# Raw overrides only (used for multi-config layering)
|
|
raw_overrides: Dict[str, ThresholdConfig] = {}
|
|
thresholds_config = config_data["thresholds"]
|
|
for plugin_name, plugin_thresholds in thresholds_config.items():
|
|
if isinstance(plugin_thresholds, dict):
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=raw_overrides)
|
|
self.threshold_raw_configs[config_name] = raw_overrides
|
|
|
|
# Pre-merged version (defaults + overrides) for single-config fast path
|
|
self.threshold_configs[config_name] = dict(effective_defaults)
|
|
self.threshold_configs[config_name].update(raw_overrides)
|
|
|
|
# Parse host → config list mapping from two possible sources
|
|
|
|
def _normalise(value) -> List[str]:
|
|
"""Accept a string or list; always return a list."""
|
|
if isinstance(value, list):
|
|
return [str(v) for v in value]
|
|
return [str(value)]
|
|
|
|
# 1. hosts section with threshold_config attribute (string or list)
|
|
if "hosts" in config:
|
|
hosts_config = config["hosts"]
|
|
if isinstance(hosts_config, dict):
|
|
for host_name, host_attrs in hosts_config.items():
|
|
if isinstance(host_attrs, dict) and "threshold_config" in host_attrs:
|
|
self.host_config_mapping[host_name] = _normalise(host_attrs["threshold_config"])
|
|
|
|
# 2. Legacy host_threshold_mapping section (string values only)
|
|
if "host_threshold_mapping" in config:
|
|
legacy_mapping = config.get("host_threshold_mapping", {})
|
|
if isinstance(legacy_mapping, dict):
|
|
for host_name, value in legacy_mapping.items():
|
|
self.host_config_mapping[host_name] = _normalise(value)
|
|
|
|
# Set default config (first one alphabetically or explicitly set)
|
|
self.default_config = config.get("default_threshold_config", "default")
|
|
if self.default_config not in self.threshold_configs and self.threshold_configs:
|
|
# Use first available config as default
|
|
self.default_config = sorted(self.threshold_configs.keys())[0]
|
|
logger.info("Using '%s' as default threshold config", self.default_config)
|
|
|
|
logger.info(
|
|
"Loaded %d threshold configurations with %d host mappings",
|
|
len(self.threshold_configs),
|
|
len(self.host_config_mapping)
|
|
)
|
|
|
|
def _parse_legacy_config(self, config: Dict[str, Any]):
|
|
"""Parse legacy single threshold configuration for backward compatibility."""
|
|
if not config or "thresholds" not in config:
|
|
logger.info("No thresholds configured")
|
|
return
|
|
|
|
thresholds_config = config["thresholds"]
|
|
|
|
for plugin_name, plugin_thresholds in thresholds_config.items():
|
|
if not isinstance(plugin_thresholds, dict):
|
|
continue
|
|
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=self.thresholds)
|
|
|
|
def _parse_plugin_thresholds(
|
|
self,
|
|
plugin_name: str,
|
|
thresholds: Dict[str, Any],
|
|
target_dict: Optional[Dict[str, ThresholdConfig]] = None
|
|
):
|
|
"""Parse thresholds for a specific plugin.
|
|
|
|
Args:
|
|
plugin_name: Name of the plugin
|
|
thresholds: Threshold configuration dictionary
|
|
target_dict: Dictionary to store parsed thresholds (defaults to self.thresholds)
|
|
"""
|
|
if target_dict is None:
|
|
target_dict = self.thresholds
|
|
|
|
# Special handling for RTT thresholds (per-host)
|
|
if plugin_name == "rtt":
|
|
self._parse_rtt_thresholds(thresholds, target_dict)
|
|
return
|
|
|
|
for metric_name, threshold_config in thresholds.items():
|
|
if not isinstance(threshold_config, dict):
|
|
continue
|
|
|
|
# Handle nested metrics (e.g., partitions./.percent)
|
|
if metric_name == "partitions":
|
|
self._parse_partition_thresholds(plugin_name, threshold_config, target_dict)
|
|
continue
|
|
|
|
metric_path = f"{plugin_name}.{metric_name}"
|
|
|
|
# Extract threshold values
|
|
warning = threshold_config.get("warning")
|
|
critical = threshold_config.get("critical")
|
|
operator = threshold_config.get("operator", ">")
|
|
display = threshold_config.get("display", "(threshold: {op_symbol} {threshold_value})")
|
|
hysteresis = threshold_config.get("hysteresis", 0.1) # 10% default
|
|
enabled = threshold_config.get("enabled", True)
|
|
|
|
if warning is None and critical is None:
|
|
logger.warning("No thresholds defined for %s, skipping", metric_path)
|
|
continue
|
|
|
|
threshold = ThresholdConfig(
|
|
metric_path=metric_path,
|
|
warning=warning,
|
|
critical=critical,
|
|
operator=operator,
|
|
hysteresis=hysteresis,
|
|
enabled=enabled,
|
|
display=display
|
|
)
|
|
|
|
target_dict[metric_path] = threshold
|
|
logger.debug(
|
|
"Registered threshold for %s: warn=%s, crit=%s, op=%s",
|
|
metric_path,
|
|
warning,
|
|
critical,
|
|
operator
|
|
)
|
|
|
|
def _parse_partition_thresholds(
|
|
self,
|
|
plugin_name: str,
|
|
partitions: Dict[str, Any],
|
|
target_dict: Optional[Dict[str, ThresholdConfig]] = None
|
|
):
|
|
"""Parse partition-specific thresholds for disk monitoring.
|
|
|
|
Args:
|
|
plugin_name: Name of the plugin
|
|
partitions: Partition threshold configuration
|
|
target_dict: Dictionary to store parsed thresholds
|
|
"""
|
|
if target_dict is None:
|
|
target_dict = self.thresholds
|
|
|
|
for partition, metrics in partitions.items():
|
|
if not isinstance(metrics, dict):
|
|
continue
|
|
|
|
for metric_name, threshold_config in metrics.items():
|
|
if not isinstance(threshold_config, dict):
|
|
continue
|
|
|
|
# Create metric path like "disk_monitor./dev/sda1.percent"
|
|
metric_path = f"{plugin_name}.{partition}.{metric_name}"
|
|
|
|
warning = threshold_config.get("warning")
|
|
critical = threshold_config.get("critical")
|
|
operator = threshold_config.get("operator", ">")
|
|
hysteresis = threshold_config.get("hysteresis", 0.1)
|
|
enabled = threshold_config.get("enabled", True)
|
|
display = threshold_config.get("display")
|
|
if warning is None and critical is None:
|
|
continue
|
|
|
|
threshold = ThresholdConfig(
|
|
metric_path=metric_path,
|
|
warning=warning,
|
|
critical=critical,
|
|
operator=operator,
|
|
hysteresis=hysteresis,
|
|
enabled=enabled,
|
|
display=display
|
|
)
|
|
|
|
target_dict[metric_path] = threshold
|
|
|
|
def _parse_rtt_thresholds(
|
|
self,
|
|
rtt_thresholds: Dict[str, Any],
|
|
target_dict: Optional[Dict[str, ThresholdConfig]] = None
|
|
):
|
|
"""Parse RTT thresholds (network latency thresholds).
|
|
|
|
RTT thresholds are configured as:
|
|
thresholds:
|
|
rtt:
|
|
warning: 100.0 # ms
|
|
critical: 500.0 # ms
|
|
|
|
Args:
|
|
rtt_thresholds: RTT threshold configuration
|
|
target_dict: Dictionary to store parsed thresholds
|
|
"""
|
|
if target_dict is None:
|
|
target_dict = self.thresholds
|
|
|
|
if not isinstance(rtt_thresholds, dict):
|
|
return
|
|
|
|
# Metric path is simply "rtt" (not per-host)
|
|
metric_path = "rtt"
|
|
|
|
warning = rtt_thresholds.get("warning")
|
|
critical = rtt_thresholds.get("critical")
|
|
operator = rtt_thresholds.get("operator", ">")
|
|
hysteresis = rtt_thresholds.get("hysteresis", 0.1) # 10% default
|
|
enabled = rtt_thresholds.get("enabled", True)
|
|
display = rtt_thresholds.get("display")
|
|
count = rtt_thresholds.get("count", 1)
|
|
|
|
if warning is None and critical is None:
|
|
logger.warning("No RTT thresholds defined, skipping")
|
|
return
|
|
|
|
threshold = ThresholdConfig(
|
|
metric_path=metric_path,
|
|
warning=warning,
|
|
critical=critical,
|
|
operator=operator,
|
|
hysteresis=hysteresis,
|
|
enabled=enabled,
|
|
display=display,
|
|
count=count,
|
|
)
|
|
|
|
target_dict[metric_path] = threshold
|
|
logger.debug(
|
|
"Registered RTT threshold: warn=%s ms, crit=%s ms, count=%d",
|
|
warning,
|
|
critical,
|
|
count,
|
|
)
|
|
|
|
def get_thresholds_for_host(self, host_name: str) -> Dict[str, ThresholdConfig]:
|
|
"""Get the effective threshold configuration for a host.
|
|
|
|
When threshold_config is a list, configs are applied left-to-right on top
|
|
of the default thresholds so earlier entries can be overridden by later ones.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
|
|
Returns:
|
|
Dictionary of thresholds for this host
|
|
"""
|
|
# Legacy mode: single threshold set for all hosts
|
|
if self.thresholds and not self.threshold_configs:
|
|
return self.thresholds
|
|
|
|
if not self.threshold_configs:
|
|
return {}
|
|
|
|
config_names = self.host_config_mapping.get(host_name)
|
|
|
|
# No host-specific mapping → return pre-merged default
|
|
if not config_names:
|
|
return self.threshold_configs.get(self.default_config, {})
|
|
|
|
# Single config → fast path using pre-merged copy
|
|
if len(config_names) == 1:
|
|
name = config_names[0]
|
|
if name in self.threshold_configs:
|
|
return self.threshold_configs[name]
|
|
logger.warning(
|
|
"Threshold config '%s' not found for host '%s', using default '%s'",
|
|
name, host_name, self.default_config,
|
|
)
|
|
return self.threshold_configs.get(self.default_config, {})
|
|
|
|
# Multiple configs → start from defaults, layer raw overrides in order
|
|
result = dict(self.threshold_configs.get(self.default_config, {}))
|
|
for name in config_names:
|
|
if name == self.default_config:
|
|
continue # defaults already the base
|
|
raw = self.threshold_raw_configs.get(name)
|
|
if raw is None:
|
|
logger.warning(
|
|
"Threshold config '%s' not found for host '%s', skipping",
|
|
name, host_name,
|
|
)
|
|
else:
|
|
result.update(raw)
|
|
return result
|
|
|
|
def check_value(
|
|
self,
|
|
host_name: str,
|
|
metric_path: str,
|
|
value: float,
|
|
alert_states: Dict[str, AlertState],
|
|
) -> Optional[Tuple[AlertLevel, AlertLevel]]:
|
|
"""
|
|
Check a single value against configured threshold.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
metric_path: Full metric path (e.g., "rtt.hostname")
|
|
value: The metric value to check
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
Tuple of (old_level, new_level) if state changed, None otherwise
|
|
"""
|
|
# Get host-specific thresholds
|
|
thresholds = self.get_thresholds_for_host(host_name)
|
|
|
|
if metric_path not in thresholds:
|
|
return None
|
|
|
|
threshold = thresholds[metric_path]
|
|
|
|
# Get or create alert state
|
|
if metric_path not in alert_states:
|
|
alert_states[metric_path] = AlertState(metric_path)
|
|
|
|
alert_state = alert_states[metric_path]
|
|
|
|
# Evaluate threshold with hysteresis
|
|
new_level = threshold.evaluate_with_hysteresis(
|
|
value,
|
|
alert_state.level
|
|
)
|
|
|
|
# Apply consecutive-count gating: when currently OK, require threshold.count
|
|
# consecutive exceedances before escalating to WARNING/CRITICAL.
|
|
if new_level == AlertLevel.OK:
|
|
# Value is fine (or recovered) — reset the pending counter immediately.
|
|
alert_state.consecutive_count = 0
|
|
elif alert_state.level == AlertLevel.OK and new_level != AlertLevel.OK:
|
|
# First time we exceed while still OK: count up.
|
|
alert_state.consecutive_count += 1
|
|
if alert_state.consecutive_count < threshold.count:
|
|
logger.debug(
|
|
"RTT threshold exceeded %d/%d consecutive times for %s on %s",
|
|
alert_state.consecutive_count,
|
|
threshold.count,
|
|
metric_path,
|
|
host_name,
|
|
)
|
|
return None
|
|
# Count reached — fire the alert and reset the counter.
|
|
alert_state.consecutive_count = 0
|
|
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
# Update state and check for changes
|
|
old_level = alert_state.level
|
|
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
|
|
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, None)
|
|
return (old_level, new_level)
|
|
elif new_level != AlertLevel.OK:
|
|
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, None)
|
|
|
|
return None
|
|
def _find_threshold(
|
|
self, thresholds: Dict[str, "ThresholdConfig"], metric_path: str
|
|
) -> Optional["ThresholdConfig"]:
|
|
"""Return the threshold for *metric_path*, falling back to suffix matches.
|
|
|
|
Allows generic thresholds like ``ping_monitor.rtt_avg`` to match
|
|
fully-qualified paths like ``ping_monitor.8_8_8_8_rtt_avg``.
|
|
The exact match is always tried first; then successive leading
|
|
underscore-delimited segments are stripped from the field name until
|
|
a match is found or no segments remain.
|
|
"""
|
|
if metric_path in thresholds:
|
|
return thresholds[metric_path]
|
|
plugin, sep, field = metric_path.partition(".")
|
|
if not sep:
|
|
return None
|
|
parts = field.split("_")
|
|
for i in range(1, len(parts)):
|
|
candidate = plugin + "." + "_".join(parts[i:])
|
|
if candidate in thresholds:
|
|
return thresholds[candidate]
|
|
return None
|
|
|
|
def check_plugin_data(
|
|
self,
|
|
host_name: str,
|
|
plugin_name: str,
|
|
data: Dict[str, Any],
|
|
alert_states: Dict[str, AlertState],
|
|
) -> list:
|
|
"""
|
|
Check plugin data against configured thresholds.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
plugin_name: Name of the plugin
|
|
data: Plugin data dictionary
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
List of (metric_path, old_level, new_level, value) tuples for state changes
|
|
"""
|
|
state_changes = []
|
|
|
|
# Get host-specific thresholds
|
|
thresholds = self.get_thresholds_for_host(host_name)
|
|
|
|
# Check flat metrics
|
|
for metric_name, value in data.items():
|
|
metric_path = f"{plugin_name}.{metric_name}"
|
|
|
|
threshold = self._find_threshold(thresholds, metric_path)
|
|
if threshold is None:
|
|
continue
|
|
|
|
# Get or create alert state
|
|
if metric_path not in alert_states:
|
|
alert_states[metric_path] = AlertState(metric_path)
|
|
|
|
alert_state = alert_states[metric_path]
|
|
|
|
# Evaluate threshold with hysteresis
|
|
new_level = threshold.evaluate_with_hysteresis(
|
|
value,
|
|
alert_state.level
|
|
)
|
|
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
# Update state and check for changes
|
|
old_level = alert_state.level
|
|
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
|
|
state_changes.append((metric_path, old_level, new_level, value))
|
|
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data)
|
|
elif new_level != AlertLevel.OK:
|
|
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data)
|
|
|
|
# Check nested metrics (e.g., partition data in disk_monitor)
|
|
self._check_nested_metrics(
|
|
host_name,
|
|
plugin_name,
|
|
data,
|
|
alert_states,
|
|
state_changes
|
|
)
|
|
|
|
return state_changes
|
|
|
|
def _check_nested_metrics(
|
|
self,
|
|
host_name: str,
|
|
plugin_name: str,
|
|
data: Dict[str, Any],
|
|
alert_states: Dict[str, AlertState],
|
|
state_changes: list,
|
|
):
|
|
"""Check nested metrics like partition-specific thresholds."""
|
|
# Get host-specific thresholds
|
|
thresholds = self.get_thresholds_for_host(host_name)
|
|
|
|
# Look for partition data in disk_monitor
|
|
if plugin_name == "disk_monitor" and "partitions" in data:
|
|
partitions = data["partitions"]
|
|
if not isinstance(partitions, dict):
|
|
return
|
|
|
|
for partition, metrics in partitions.items():
|
|
if not isinstance(metrics, dict):
|
|
continue
|
|
|
|
for metric_name, value in metrics.items():
|
|
metric_path = f"{plugin_name}.{partition}.{metric_name}"
|
|
|
|
if metric_path not in thresholds:
|
|
continue
|
|
|
|
threshold = thresholds[metric_path]
|
|
|
|
if metric_path not in alert_states:
|
|
alert_states[metric_path] = AlertState(metric_path)
|
|
|
|
alert_state = alert_states[metric_path]
|
|
|
|
new_level = threshold.evaluate_with_hysteresis(
|
|
value,
|
|
alert_state.level
|
|
)
|
|
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
old_level = alert_state.level
|
|
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
|
|
state_changes.append((metric_path, old_level, new_level, value))
|
|
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data)
|
|
elif new_level != AlertLevel.OK:
|
|
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data)
|
|
|
|
def _trigger_notification(
|
|
self,
|
|
host_name: str,
|
|
metric_path: str,
|
|
old_level: AlertLevel,
|
|
new_level: AlertLevel,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]] = None,
|
|
):
|
|
"""Trigger a notification for an alert state change.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
metric_path: Full metric path
|
|
old_level: Previous alert level
|
|
new_level: New alert level
|
|
value: Current metric value
|
|
threshold: Threshold configuration
|
|
plugin_data: Optional dictionary of all plugin data fields for format string
|
|
"""
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
# Format operator symbol
|
|
op_symbol = threshold.operator.value
|
|
|
|
# Use a display-friendly value (inf is the sentinel for "overdue")
|
|
import math
|
|
display_value = "overdue" if isinstance(value, float) and math.isinf(value) else value
|
|
|
|
# Format message
|
|
if new_level == AlertLevel.OK:
|
|
lvl = "RECOVER"
|
|
message = f"{metric_path} = {display_value} ({old_level.name} -> OK)"
|
|
elif new_level == AlertLevel.WARNING:
|
|
lvl = "WARNING"
|
|
if threshold_value is not None:
|
|
threshold_info = self._format_display(
|
|
threshold.display,
|
|
value=display_value,
|
|
threshold_value=threshold_value,
|
|
op_symbol=op_symbol,
|
|
plugin_data=plugin_data
|
|
)
|
|
message = f"{metric_path} = {display_value} {threshold_info}"
|
|
else:
|
|
message = f"{metric_path} = {display_value}"
|
|
elif new_level == AlertLevel.CRITICAL:
|
|
lvl = "CRITICAL"
|
|
if threshold_value is not None:
|
|
threshold_info = self._format_display(
|
|
threshold.display,
|
|
value=display_value,
|
|
threshold_value=threshold_value,
|
|
op_symbol=op_symbol,
|
|
plugin_data=plugin_data
|
|
)
|
|
message = f"{metric_path} = {display_value} {threshold_info}"
|
|
else:
|
|
message = f"{metric_path} = {display_value}"
|
|
else:
|
|
lvl = "UNKNOWN"
|
|
message = f"{metric_path} = {display_value}"
|
|
|
|
# Return the formatted threshold info for storing in AlertState
|
|
formatted_threshold_msg = None
|
|
if threshold_value is not None and new_level != AlertLevel.OK:
|
|
formatted_threshold_msg = self._format_display(
|
|
threshold.display,
|
|
value=display_value,
|
|
threshold_value=threshold_value,
|
|
op_symbol=op_symbol,
|
|
plugin_data=plugin_data
|
|
)
|
|
|
|
return lvl, message, formatted_threshold_msg
|
|
|
|
def _send_notification(
|
|
self,
|
|
host_name: str,
|
|
lvl: str,
|
|
message: str,
|
|
metric_path: str,
|
|
old_level: AlertLevel,
|
|
new_level: AlertLevel,
|
|
value: Any,
|
|
):
|
|
"""Send notification and log to journal/eventlog."""
|
|
from . import hbdclass
|
|
host = hbdclass.Host.hosts.get(host_name)
|
|
if host is not None and not host.watched:
|
|
eventlog(host_name, lvl, message, service="threshold")
|
|
return
|
|
asyncio.get_event_loop().create_task(notify_mod.send_notification(
|
|
host_name,
|
|
notify_mod.Notification(
|
|
title=f"[{lvl}] {host_name}",
|
|
body=message,
|
|
level=lvl,
|
|
),
|
|
))
|
|
|
|
# Log to journal
|
|
if self.journal is not None:
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
loop.create_task(self.journal.log_threshold_event(
|
|
host_name=host_name,
|
|
metric_path=metric_path,
|
|
old_level=old_level.name,
|
|
new_level=new_level.name,
|
|
value=value,
|
|
))
|
|
except Exception as e:
|
|
logger.debug(f"Failed to log threshold event to journal: {e}")
|
|
# Log to eventlog as well
|
|
eventlog(host_name, lvl, message, service="threshold")
|
|
|
|
def _format_display(
|
|
self,
|
|
display_format: str,
|
|
value: Any,
|
|
threshold_value: float,
|
|
op_symbol: str,
|
|
plugin_data: Optional[Dict[str, Any]] = None,
|
|
) -> str:
|
|
"""Format the display string using available data.
|
|
|
|
Args:
|
|
display_format: Format string from threshold config
|
|
value: Current metric value
|
|
threshold_value: Threshold value that was exceeded
|
|
op_symbol: Comparison operator symbol
|
|
plugin_data: Optional dictionary of plugin data fields
|
|
|
|
Returns:
|
|
Formatted display string
|
|
"""
|
|
# Build format context with standard variables
|
|
format_context = {
|
|
'value': value,
|
|
'threshold_value': threshold_value,
|
|
'op_symbol': op_symbol,
|
|
}
|
|
|
|
# Add all plugin data fields if available
|
|
if plugin_data:
|
|
format_context.update(plugin_data)
|
|
|
|
try:
|
|
# Format the display string
|
|
return display_format.format(**format_context)
|
|
except KeyError as e:
|
|
logger.warning(
|
|
"Missing format variable in display string '%s': %s",
|
|
display_format,
|
|
e
|
|
)
|
|
# Fallback to default format
|
|
return f"(threshold: {op_symbol} {threshold_value})"
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error formatting display string '%s': %s",
|
|
display_format,
|
|
e
|
|
)
|
|
return f"(threshold: {op_symbol} {threshold_value})"
|
|
|
|
def _apply_grace(
|
|
self,
|
|
host_name: str,
|
|
alert_state: AlertState,
|
|
metric_path: str,
|
|
old_level: AlertLevel,
|
|
new_level: AlertLevel,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]],
|
|
) -> None:
|
|
"""Handle a state-change transition with grace-period logic.
|
|
|
|
Transitioning INTO alert (worsening): defers the notification for grace_seconds.
|
|
De-escalation within alert states (e.g. CRITICAL→WARNING): no new notification;
|
|
the metric is still alerting so no RECOVER was sent.
|
|
Transitioning TO OK:
|
|
- Still in grace window (pending_since set): suppresses both the alert
|
|
and the recovery — the spike never warranted a page.
|
|
- Past grace: fires the RECOVER notification normally.
|
|
"""
|
|
lvl, message, formatted_msg = self._trigger_notification(
|
|
host_name, metric_path, old_level, new_level, value, threshold, plugin_data
|
|
)
|
|
alert_state.formatted_message = formatted_msg
|
|
|
|
if new_level == AlertLevel.OK:
|
|
if alert_state.pending_since is not None:
|
|
logger.info(
|
|
"Alert suppressed (recovered within %.0fs grace): %s on %s",
|
|
self.grace_seconds, metric_path, host_name,
|
|
)
|
|
alert_state.pending_since = None
|
|
else:
|
|
self._send_notification(host_name, lvl, message, metric_path, old_level, new_level, value)
|
|
elif new_level.value > old_level.value:
|
|
# Worsening (OK→WARNING, OK→CRITICAL, WARNING→CRITICAL): schedule notification.
|
|
alert_state.pending_since = time.time()
|
|
logger.debug(
|
|
"Alert deferred (%.0fs grace): %s on %s = %s",
|
|
self.grace_seconds, metric_path, host_name, value,
|
|
)
|
|
else:
|
|
# De-escalation within alert states (e.g. CRITICAL→WARNING): metric is still
|
|
# alerting but did not recover, so no new notification.
|
|
logger.debug(
|
|
"De-escalation %s→%s for %s on %s, no notification",
|
|
old_level.name, new_level.name, metric_path, host_name,
|
|
)
|
|
|
|
def _check_pending_or_renotify(
|
|
self,
|
|
host_name: str,
|
|
alert_state: AlertState,
|
|
metric_path: str,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]],
|
|
) -> None:
|
|
"""Called when alert level is unchanged and non-OK.
|
|
|
|
If a deferred notification is pending and grace_seconds have elapsed,
|
|
fires it now. Otherwise falls through to normal reminder logic.
|
|
"""
|
|
if alert_state.pending_since is not None:
|
|
if time.time() - alert_state.pending_since >= self.grace_seconds:
|
|
lvl, message, formatted_msg = self._trigger_notification(
|
|
host_name, metric_path, AlertLevel.OK, alert_state.level, value, threshold, plugin_data
|
|
)
|
|
alert_state.formatted_message = formatted_msg
|
|
self._send_notification(
|
|
host_name, lvl, message, metric_path, AlertLevel.OK, alert_state.level, value
|
|
)
|
|
alert_state.pending_since = None
|
|
# else: still within grace window, do nothing
|
|
else:
|
|
self._check_renotify(host_name, alert_state, metric_path, value, threshold, plugin_data)
|
|
|
|
def _check_renotify(
|
|
self,
|
|
host_name: str,
|
|
alert_state: AlertState,
|
|
metric_path: str,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]] = None,
|
|
):
|
|
"""Check if we should send a repeat notification.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
alert_state: Current alert state
|
|
metric_path: Full metric path
|
|
value: Current metric value
|
|
threshold: Threshold configuration
|
|
plugin_data: Optional dictionary of all plugin data fields
|
|
"""
|
|
if alert_state.level != AlertLevel.CRITICAL:
|
|
return
|
|
|
|
# Skip reminders if alert has been acknowledged
|
|
if alert_state.acknowledged:
|
|
return
|
|
|
|
now = time.time()
|
|
|
|
# Check if we should re-notify
|
|
if alert_state.last_notification is None:
|
|
# First notification already sent during state change
|
|
alert_state.last_notification = now
|
|
alert_state.notification_count = 1
|
|
return
|
|
|
|
if (now - alert_state.last_notification) >= self.renotify_interval:
|
|
# Determine which threshold is active
|
|
threshold_value = None
|
|
if alert_state.level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif alert_state.level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
# Format operator symbol
|
|
op_symbol = threshold.operator.value
|
|
|
|
# Time to re-notify
|
|
if threshold_value is not None:
|
|
# Use display format string
|
|
threshold_info = self._format_display(
|
|
threshold.display,
|
|
value=value,
|
|
threshold_value=threshold_value,
|
|
op_symbol=op_symbol,
|
|
plugin_data=plugin_data
|
|
)
|
|
message = f"REMINDER ({alert_state.level.name}): {host_name} - {metric_path} = {value} {threshold_info}, ongoing for {int(now - alert_state.since)}s"
|
|
else:
|
|
message = f"REMINDER ({alert_state.level.name}): {host_name} - {metric_path} = {value} (ongoing for {int(now - alert_state.since)}s)"
|
|
|
|
from . import hbdclass
|
|
host = hbdclass.Host.hosts.get(host_name)
|
|
if host is None or host.watched:
|
|
asyncio.get_event_loop().create_task(notify_mod.send_notification(
|
|
host_name,
|
|
notify_mod.Notification(
|
|
title=f"[REMINDER/{alert_state.level.name}] {host_name}",
|
|
body=message,
|
|
level=alert_state.level.name,
|
|
),
|
|
))
|
|
logger.info("Re-notification sent: %s", message)
|
|
alert_state.last_notification = now
|
|
alert_state.notification_count += 1
|
|
|
|
def purge_stale_alerts(self, hbdclass) -> None:
|
|
"""Remove alert states that have no matching threshold configuration.
|
|
|
|
Called after startup (pickle restore) and after each config reload so
|
|
that alerts orphaned by configuration changes do not linger forever.
|
|
Alerts whose metric_path is not present in the current threshold config
|
|
for that host are silently dropped.
|
|
"""
|
|
for hostname, host in hbdclass.Host.hosts.items():
|
|
if not host.alert_states:
|
|
continue
|
|
configured = self.get_thresholds_for_host(hostname)
|
|
stale = [mp for mp in host.alert_states if mp not in configured]
|
|
for mp in stale:
|
|
logger.info(
|
|
"Purging stale alert state for %s / %s (no threshold configured)",
|
|
hostname, mp,
|
|
)
|
|
del host.alert_states[mp]
|
|
|
|
def get_active_alerts(self, alert_states: Dict[str, AlertState]) -> list:
|
|
"""
|
|
Get all currently active (non-OK) alerts.
|
|
|
|
Args:
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
List of AlertState objects that are not OK
|
|
"""
|
|
return [
|
|
state for state in alert_states.values()
|
|
if state.level != AlertLevel.OK
|
|
]
|
|
|
|
def get_alert_summary(self, alert_states: Dict[str, AlertState]) -> Dict[str, int]:
|
|
"""
|
|
Get summary counts of alert levels.
|
|
|
|
Args:
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
Dictionary with counts: {"ok": N, "warning": N, "critical": N}
|
|
"""
|
|
summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0}
|
|
|
|
for state in alert_states.values():
|
|
if state.level == AlertLevel.OK:
|
|
summary["ok"] += 1
|
|
elif state.level == AlertLevel.WARNING:
|
|
summary["warning"] += 1
|
|
elif state.level == AlertLevel.CRITICAL:
|
|
summary["critical"] += 1
|
|
elif state.level == AlertLevel.UNKNOWN:
|
|
summary["unknown"] += 1
|
|
|
|
return summary
|