967e05ed74
Older hbd clients send zfs_monitor data with a `health` string but no `health_ok` numeric field (added in a recent plugin update). Without health_ok in the data, the wildcard threshold check found nothing and no CRITICAL alert was raised for DEGRADED/SUSPENDED pools. Synthesize health_ok from the health string in the server's nested- metric loop so alerts fire regardless of client version. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1534 lines
62 KiB
Python
1534 lines
62 KiB
Python
"""
|
|
Threshold checking and alerting for plugin metrics.
|
|
|
|
This module provides a flexible threshold checking system that:
|
|
- Evaluates plugin metrics against configured warning/critical thresholds
|
|
- Tracks alert states per host and metric
|
|
- Prevents alert flapping with hysteresis
|
|
- Triggers notifications only on state changes
|
|
- Supports multiple comparison operators
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from enum import Enum
|
|
from typing import Dict, List, Any, Optional, Tuple, Callable
|
|
from . import notify as notify_mod
|
|
from .config import THRESHOLD_DEFAULTS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
eventlog = notify_mod.eventlog
|
|
|
|
class AlertLevel(Enum):
|
|
"""Alert severity levels."""
|
|
OK = 0
|
|
WARNING = 1
|
|
CRITICAL = 2
|
|
UNKNOWN = 3
|
|
|
|
|
|
class ComparisonOperator(Enum):
|
|
"""Supported comparison operators for threshold checks."""
|
|
GT = ">" # Greater than
|
|
GTE = ">=" # Greater than or equal
|
|
LT = "<" # Less than
|
|
LTE = "<=" # Less than or equal
|
|
EQ = "==" # Equal to
|
|
NEQ = "!=" # Not equal to
|
|
NAGIOS = "nagios" # Nagios exit-code semantics: 0=OK 1=WARNING 2=CRITICAL 3=UNKNOWN
|
|
|
|
|
|
class AlertState:
|
|
"""Represents the current alert state for a specific metric."""
|
|
|
|
def __init__(self, metric_path: str):
|
|
"""
|
|
Initialize alert state.
|
|
|
|
Args:
|
|
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
|
|
"""
|
|
self.metric_path = metric_path
|
|
self.level = AlertLevel.OK
|
|
self.since = time.time()
|
|
self.last_value = None
|
|
self.last_check = time.time()
|
|
self.notification_count = 0
|
|
self.last_notification = None
|
|
self.threshold_value = None # The threshold value that triggered alert
|
|
self.operator = None # The comparison operator (>, <, >=, etc.)
|
|
self.hysteresis: Optional[float] = None # Hysteresis fraction used for recovery
|
|
self.formatted_message = None # Formatted display message for UI
|
|
self.acknowledged = False # Whether alert has been acknowledged
|
|
self.acknowledged_at = None # Timestamp when acknowledged
|
|
self.consecutive_count = 0 # Consecutive exceedances while still OK (for count gating)
|
|
self.pending_since: Optional[float] = None # non-None while waiting out grace period before notifying
|
|
|
|
def update(
|
|
self,
|
|
level: AlertLevel,
|
|
value: Any,
|
|
threshold_value: Optional[float] = None,
|
|
operator: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Update alert state.
|
|
|
|
Args:
|
|
level: New alert level
|
|
value: Current metric value
|
|
threshold_value: The threshold value that was exceeded (if applicable)
|
|
operator: The comparison operator (>, <, >=, etc.)
|
|
|
|
Returns:
|
|
True if state changed (notification needed), False otherwise
|
|
"""
|
|
now = time.time()
|
|
self.last_check = now
|
|
self.last_value = value
|
|
|
|
# Update threshold info when alert is active
|
|
if level != AlertLevel.OK:
|
|
self.threshold_value = threshold_value
|
|
self.operator = operator
|
|
else:
|
|
# Clear threshold info when returning to OK
|
|
self.threshold_value = None
|
|
self.operator = None
|
|
|
|
# Check if state changed
|
|
if level != self.level:
|
|
logger.info(
|
|
"Alert state change for %s: %s -> %s (value: %s)",
|
|
self.metric_path,
|
|
self.level.name,
|
|
level.name,
|
|
value
|
|
)
|
|
self.level = level
|
|
self.since = now
|
|
self.notification_count = 0
|
|
self.last_notification = None # restart reminder interval on level change
|
|
# Reset acknowledgment on state change
|
|
if level != AlertLevel.OK:
|
|
# Only reset if changing to a different alert level
|
|
self.acknowledged = False
|
|
self.acknowledged_at = None
|
|
return True
|
|
|
|
return False
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert alert state to dictionary for serialization."""
|
|
import math
|
|
|
|
# Helper to sanitize numeric values for JSON (handle inf/nan)
|
|
def sanitize_value(val):
|
|
if isinstance(val, float):
|
|
if math.isinf(val):
|
|
return "overdue"
|
|
if math.isnan(val):
|
|
return None
|
|
return val
|
|
|
|
result = {
|
|
"metric_path": self.metric_path,
|
|
"level": self.level.name,
|
|
"since": self.since,
|
|
"last_value": sanitize_value(self.last_value),
|
|
"last_check": self.last_check,
|
|
"notification_count": self.notification_count,
|
|
"acknowledged": self.acknowledged,
|
|
}
|
|
|
|
# Include acknowledgment timestamp if acknowledged
|
|
if self.acknowledged_at is not None:
|
|
result["acknowledged_at"] = self.acknowledged_at
|
|
|
|
# Include threshold info if available
|
|
if self.threshold_value is not None:
|
|
result["threshold_value"] = sanitize_value(self.threshold_value)
|
|
if self.operator is not None:
|
|
result["operator"] = self.operator
|
|
if self.formatted_message is not None:
|
|
result["formatted_message"] = self.formatted_message
|
|
|
|
# Compute and expose the recovery threshold so the UI can display it
|
|
if (self.hysteresis and self.threshold_value is not None
|
|
and self.operator is not None):
|
|
ha = abs(self.threshold_value * self.hysteresis)
|
|
if self.operator in ('>', '>='):
|
|
result["recovery_threshold"] = round(self.threshold_value - ha, 4)
|
|
elif self.operator in ('<', '<='):
|
|
result["recovery_threshold"] = round(self.threshold_value + ha, 4)
|
|
|
|
return result
|
|
|
|
def __setstate__(self, state):
|
|
"""Restore from pickle, backfilling fields added after the pickle was written."""
|
|
self.__dict__.update(state)
|
|
if not hasattr(self, 'consecutive_count'):
|
|
self.consecutive_count = 0
|
|
if not hasattr(self, 'hysteresis'):
|
|
self.hysteresis = None
|
|
|
|
def acknowledge(self):
|
|
"""Acknowledge this alert to stop reminder notifications."""
|
|
self.acknowledged = True
|
|
self.acknowledged_at = time.time()
|
|
logger.info("Alert acknowledged for %s", self.metric_path)
|
|
|
|
def __str__(self):
|
|
return self.to_dict().__str__()
|
|
|
|
class ThresholdConfig:
|
|
"""Configuration for a single threshold check."""
|
|
|
|
def __init__(
|
|
self,
|
|
metric_path: str,
|
|
warning: Optional[float] = None,
|
|
critical: Optional[float] = None,
|
|
display: Optional[str] = None,
|
|
operator: str = ">",
|
|
hysteresis: float = 0.0,
|
|
enabled: bool = True,
|
|
count: int = 1,
|
|
):
|
|
"""
|
|
Initialize threshold configuration.
|
|
|
|
Args:
|
|
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
|
|
warning: Warning threshold value
|
|
critical: Critical threshold value
|
|
operator: Comparison operator (>, >=, <, <=, ==, !=)
|
|
hysteresis: Hysteresis percentage to prevent flapping (0.0-1.0)
|
|
enabled: Whether this threshold is enabled
|
|
count: Number of consecutive exceedances required before alerting (default 1)
|
|
"""
|
|
self.metric_path = metric_path
|
|
self.warning = warning
|
|
self.critical = critical
|
|
self.enabled = enabled
|
|
self.hysteresis = hysteresis
|
|
self.display = display
|
|
self.count = max(1, int(count))
|
|
|
|
# Parse operator
|
|
try:
|
|
self.operator = ComparisonOperator(operator)
|
|
except ValueError:
|
|
logger.warning(
|
|
"Invalid operator '%s' for %s, using '>' as default",
|
|
operator,
|
|
metric_path
|
|
)
|
|
self.operator = ComparisonOperator.GT
|
|
|
|
def evaluate(self, value: float) -> AlertLevel:
|
|
"""
|
|
Evaluate a value against this threshold.
|
|
|
|
Args:
|
|
value: Metric value to check
|
|
|
|
Returns:
|
|
AlertLevel indicating the severity
|
|
"""
|
|
if not self.enabled:
|
|
return AlertLevel.OK
|
|
|
|
# Nagios exit-code semantics: value IS the severity
|
|
if self.operator == ComparisonOperator.NAGIOS:
|
|
try:
|
|
code = int(value)
|
|
except (TypeError, ValueError):
|
|
return AlertLevel.UNKNOWN
|
|
return {0: AlertLevel.OK, 1: AlertLevel.WARNING, 2: AlertLevel.CRITICAL}.get(
|
|
code, AlertLevel.UNKNOWN
|
|
)
|
|
|
|
try:
|
|
# Convert value to float for comparison
|
|
value = float(value)
|
|
except (TypeError, ValueError):
|
|
logger.warning("Cannot convert value %s to float for %s", value, self.metric_path)
|
|
return AlertLevel.UNKNOWN
|
|
|
|
# Check critical threshold first
|
|
if self.critical is not None:
|
|
if self._compare(value, self.critical):
|
|
return AlertLevel.CRITICAL
|
|
|
|
# Then check warning threshold
|
|
if self.warning is not None:
|
|
if self._compare(value, self.warning):
|
|
return AlertLevel.WARNING
|
|
|
|
return AlertLevel.OK
|
|
|
|
def evaluate_with_hysteresis(
|
|
self,
|
|
value: float,
|
|
current_level: AlertLevel
|
|
) -> AlertLevel:
|
|
"""
|
|
Evaluate with hysteresis to prevent flapping.
|
|
|
|
Args:
|
|
value: Current metric value
|
|
current_level: Current alert level
|
|
|
|
Returns:
|
|
New alert level considering hysteresis
|
|
"""
|
|
new_level = self.evaluate(value)
|
|
|
|
# Nagios exit codes are discrete integers — hysteresis doesn't apply
|
|
if self.operator == ComparisonOperator.NAGIOS:
|
|
return new_level
|
|
|
|
# If no hysteresis, return new level
|
|
if self.hysteresis == 0.0:
|
|
return new_level
|
|
|
|
# If improving (going to a lower severity), apply hysteresis
|
|
if new_level.value < current_level.value:
|
|
# For recovery, value must be better by hysteresis amount
|
|
if current_level == AlertLevel.CRITICAL and self.critical is not None:
|
|
threshold = self.critical
|
|
elif current_level == AlertLevel.WARNING and self.warning is not None:
|
|
threshold = self.warning
|
|
else:
|
|
return new_level
|
|
|
|
# Calculate hysteresis threshold
|
|
hysteresis_amount = abs(threshold * self.hysteresis)
|
|
|
|
if self.operator in [ComparisonOperator.GT, ComparisonOperator.GTE]:
|
|
# For "greater than" thresholds, value must go below by hysteresis
|
|
recovery_threshold = threshold - hysteresis_amount
|
|
if value >= recovery_threshold:
|
|
# Not enough improvement, keep current level
|
|
return current_level
|
|
elif self.operator in [ComparisonOperator.LT, ComparisonOperator.LTE]:
|
|
# For "less than" thresholds, value must go above by hysteresis
|
|
recovery_threshold = threshold + hysteresis_amount
|
|
if value <= recovery_threshold:
|
|
# Not enough improvement, keep current level
|
|
return current_level
|
|
|
|
return new_level
|
|
|
|
def _compare(self, value: float, threshold: float) -> bool:
|
|
"""Perform comparison based on operator."""
|
|
if self.operator == ComparisonOperator.GT:
|
|
return value > threshold
|
|
elif self.operator == ComparisonOperator.GTE:
|
|
return value >= threshold
|
|
elif self.operator == ComparisonOperator.LT:
|
|
return value < threshold
|
|
elif self.operator == ComparisonOperator.LTE:
|
|
return value <= threshold
|
|
elif self.operator == ComparisonOperator.EQ:
|
|
return abs(value - threshold) < 1e-9 # Float comparison
|
|
elif self.operator == ComparisonOperator.NEQ:
|
|
return abs(value - threshold) >= 1e-9
|
|
return False
|
|
|
|
|
|
class ThresholdChecker:
|
|
"""Main threshold checking and alerting system."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Dict[str, Any],
|
|
renotify_interval: int = 3600,
|
|
journal: Optional[Any] = None,
|
|
):
|
|
"""
|
|
Initialize threshold checker.
|
|
|
|
Args:
|
|
config: Threshold configuration dictionary from YAML
|
|
renotify_interval: Seconds between repeat notifications (default: 1 hour)
|
|
journal: Optional MessageJournal instance for logging threshold events
|
|
"""
|
|
# Named threshold configurations (pre-merged: defaults + overrides): {config_name: {metric_path: ThresholdConfig}}
|
|
self.threshold_configs = {}
|
|
|
|
# Raw overrides only for each named config (no defaults baked in): {config_name: {metric_path: ThresholdConfig}}
|
|
self.threshold_raw_configs: Dict[str, Dict[str, ThresholdConfig]] = {}
|
|
|
|
# Single threshold set for backward compatibility: {metric_path: ThresholdConfig}
|
|
self.thresholds = {}
|
|
|
|
# Host to ordered list of config names: {host_name: [config_name, ...]}
|
|
self.host_config_mapping: Dict[str, List[str]] = {}
|
|
|
|
# Default config name to use when no mapping exists
|
|
self.default_config = "default"
|
|
|
|
self.renotify_interval = renotify_interval
|
|
self.grace_seconds: float = float(config.get("grace", 2))
|
|
self.journal = journal
|
|
|
|
# Parse configuration
|
|
self._parse_config(config)
|
|
|
|
total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values())
|
|
if total_thresholds == 0 and len(self.thresholds) > 0:
|
|
# Backward compatibility: using single threshold set
|
|
total_thresholds = len(self.thresholds)
|
|
logger.info("ThresholdChecker initialized with %d thresholds (legacy format)", total_thresholds)
|
|
else:
|
|
logger.info(
|
|
"ThresholdChecker initialized with %d named configurations (%d total thresholds)",
|
|
len(self.threshold_configs),
|
|
total_thresholds
|
|
)
|
|
|
|
def reload(self, config: Dict[str, Any]):
|
|
"""Reload threshold configuration from new config dict.
|
|
|
|
This clears all existing thresholds and re-parses from the new configuration.
|
|
Alert states are preserved to maintain hysteresis across reloads.
|
|
|
|
Args:
|
|
config: New configuration dictionary
|
|
"""
|
|
logger.info("Reloading threshold configuration...")
|
|
|
|
# Clear old configuration
|
|
self.threshold_configs.clear()
|
|
self.threshold_raw_configs.clear()
|
|
self.thresholds.clear()
|
|
self.host_config_mapping.clear()
|
|
self.grace_seconds = float(config.get("grace", 2))
|
|
|
|
# Parse new configuration
|
|
self._parse_config(config)
|
|
|
|
total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values())
|
|
if total_thresholds == 0 and len(self.thresholds) > 0:
|
|
total_thresholds = len(self.thresholds)
|
|
|
|
logger.info("Threshold configuration reloaded: %d total thresholds", total_thresholds)
|
|
|
|
def _parse_config(self, config: Dict[str, Any]):
|
|
"""Parse threshold configuration from YAML structure.
|
|
|
|
Supports two formats:
|
|
1. Legacy format with direct 'thresholds' section
|
|
2. New format with 'threshold_configs' and 'host_threshold_mapping'
|
|
|
|
In all cases, THRESHOLD_DEFAULTS are seeded into threshold_configs["default"]
|
|
so the Settings page always shows the built-in defaults.
|
|
_parse_multi_config() overwrites this with the fully-merged effective defaults.
|
|
"""
|
|
# Always expose built-in defaults through threshold_configs["default"] so
|
|
# the Settings page has something to display even in legacy/no-config mode.
|
|
seed: Dict[str, ThresholdConfig] = {}
|
|
for plugin_name, plugin_thresholds in THRESHOLD_DEFAULTS.get("thresholds", {}).items():
|
|
if isinstance(plugin_thresholds, dict):
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=seed)
|
|
if seed:
|
|
self.threshold_configs["default"] = seed
|
|
self.threshold_raw_configs["default"] = {}
|
|
|
|
# Check for new multi-config format
|
|
if "threshold_configs" in config:
|
|
self._parse_multi_config(config) # overwrites threshold_configs["default"]
|
|
elif "thresholds" in config:
|
|
# Legacy single threshold configuration
|
|
self._parse_legacy_config(config)
|
|
else:
|
|
logger.info("No thresholds configured")
|
|
|
|
def _parse_multi_config(self, config: Dict[str, Any]):
|
|
"""Parse multiple named threshold configurations."""
|
|
threshold_configs = config.get("threshold_configs", {})
|
|
|
|
if not threshold_configs:
|
|
logger.info("No threshold configurations defined")
|
|
return
|
|
|
|
# Build effective_defaults: THRESHOLD_DEFAULTS merged with the 'default' config (if present).
|
|
# All other configs inherit any metric not explicitly defined from effective_defaults.
|
|
effective_defaults: Dict[str, ThresholdConfig] = {}
|
|
for plugin_name, plugin_thresholds in THRESHOLD_DEFAULTS.get("thresholds", {}).items():
|
|
if isinstance(plugin_thresholds, dict):
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults)
|
|
|
|
if "default" in threshold_configs:
|
|
default_data = threshold_configs["default"]
|
|
if isinstance(default_data, dict) and "thresholds" in default_data:
|
|
for plugin_name, plugin_thresholds in default_data["thresholds"].items():
|
|
if isinstance(plugin_thresholds, dict):
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults)
|
|
|
|
self.threshold_configs["default"] = dict(effective_defaults)
|
|
self.threshold_raw_configs["default"] = {}
|
|
logger.info("Registered 'default' threshold config with %d metrics", len(effective_defaults))
|
|
|
|
# Parse each named configuration
|
|
for config_name, config_data in threshold_configs.items():
|
|
if config_name == "default":
|
|
continue # already handled above
|
|
|
|
if not isinstance(config_data, dict):
|
|
logger.warning("Invalid threshold config '%s', skipping", config_name)
|
|
continue
|
|
|
|
if "thresholds" not in config_data:
|
|
logger.warning("No thresholds in config '%s', skipping", config_name)
|
|
continue
|
|
|
|
logger.info("Parsing threshold configuration: %s", config_name)
|
|
|
|
# Raw overrides only (used for multi-config layering)
|
|
raw_overrides: Dict[str, ThresholdConfig] = {}
|
|
thresholds_config = config_data["thresholds"]
|
|
for plugin_name, plugin_thresholds in thresholds_config.items():
|
|
if isinstance(plugin_thresholds, dict):
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=raw_overrides)
|
|
self.threshold_raw_configs[config_name] = raw_overrides
|
|
|
|
# Pre-merged version (defaults + overrides) for single-config fast path
|
|
self.threshold_configs[config_name] = dict(effective_defaults)
|
|
self.threshold_configs[config_name].update(raw_overrides)
|
|
|
|
# Parse host → config list mapping from two possible sources
|
|
|
|
def _normalise(value) -> List[str]:
|
|
"""Accept a string or list; always return a list."""
|
|
if isinstance(value, list):
|
|
return [str(v) for v in value]
|
|
return [str(value)]
|
|
|
|
# 1. hosts section with threshold_config attribute (string or list)
|
|
if "hosts" in config:
|
|
hosts_config = config["hosts"]
|
|
if isinstance(hosts_config, dict):
|
|
for host_name, host_attrs in hosts_config.items():
|
|
if isinstance(host_attrs, dict) and "threshold_config" in host_attrs:
|
|
self.host_config_mapping[host_name] = _normalise(host_attrs["threshold_config"])
|
|
|
|
# 2. Legacy host_threshold_mapping section (string values only)
|
|
if "host_threshold_mapping" in config:
|
|
legacy_mapping = config.get("host_threshold_mapping", {})
|
|
if isinstance(legacy_mapping, dict):
|
|
for host_name, value in legacy_mapping.items():
|
|
self.host_config_mapping[host_name] = _normalise(value)
|
|
|
|
# Set default config (first one alphabetically or explicitly set)
|
|
self.default_config = config.get("default_threshold_config", "default")
|
|
if self.default_config not in self.threshold_configs and self.threshold_configs:
|
|
# Use first available config as default
|
|
self.default_config = sorted(self.threshold_configs.keys())[0]
|
|
logger.info("Using '%s' as default threshold config", self.default_config)
|
|
|
|
logger.info(
|
|
"Loaded %d threshold configurations with %d host mappings",
|
|
len(self.threshold_configs),
|
|
len(self.host_config_mapping)
|
|
)
|
|
|
|
def _parse_legacy_config(self, config: Dict[str, Any]):
|
|
"""Parse legacy single threshold configuration for backward compatibility."""
|
|
if not config or "thresholds" not in config:
|
|
logger.info("No thresholds configured")
|
|
return
|
|
|
|
thresholds_config = config["thresholds"]
|
|
|
|
for plugin_name, plugin_thresholds in thresholds_config.items():
|
|
if not isinstance(plugin_thresholds, dict):
|
|
continue
|
|
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=self.thresholds)
|
|
|
|
def _parse_plugin_thresholds(
|
|
self,
|
|
plugin_name: str,
|
|
thresholds: Dict[str, Any],
|
|
target_dict: Optional[Dict[str, ThresholdConfig]] = None
|
|
):
|
|
"""Parse thresholds for a specific plugin.
|
|
|
|
Args:
|
|
plugin_name: Name of the plugin
|
|
thresholds: Threshold configuration dictionary
|
|
target_dict: Dictionary to store parsed thresholds (defaults to self.thresholds)
|
|
"""
|
|
if target_dict is None:
|
|
target_dict = self.thresholds
|
|
|
|
# Special handling for RTT thresholds (per-host)
|
|
if plugin_name == "rtt":
|
|
self._parse_rtt_thresholds(thresholds, target_dict)
|
|
return
|
|
|
|
for metric_name, threshold_config in thresholds.items():
|
|
if not isinstance(threshold_config, dict):
|
|
continue
|
|
|
|
# Handle nested metrics (e.g., partitions./.percent or pools.*.health_ok)
|
|
if metric_name == "partitions":
|
|
self._parse_partition_thresholds(plugin_name, threshold_config, target_dict)
|
|
continue
|
|
if metric_name == "pools":
|
|
self._parse_pool_thresholds(plugin_name, threshold_config, target_dict)
|
|
continue
|
|
|
|
metric_path = f"{plugin_name}.{metric_name}"
|
|
|
|
# Extract threshold values
|
|
warning = threshold_config.get("warning")
|
|
critical = threshold_config.get("critical")
|
|
operator = threshold_config.get("operator", ">")
|
|
# Nagios operator maps exit codes directly; no numeric thresholds needed
|
|
is_nagios_op = (operator == "nagios")
|
|
default_display = "{check_name}: {output}" if is_nagios_op else "(threshold: {op_symbol} {threshold_value})"
|
|
display = threshold_config.get("display", default_display)
|
|
hysteresis = threshold_config.get("hysteresis", 0.0 if is_nagios_op else 0.02)
|
|
enabled = threshold_config.get("enabled", True)
|
|
|
|
if warning is None and critical is None and not is_nagios_op:
|
|
logger.warning("No thresholds defined for %s, skipping", metric_path)
|
|
continue
|
|
|
|
threshold = ThresholdConfig(
|
|
metric_path=metric_path,
|
|
warning=warning,
|
|
critical=critical,
|
|
operator=operator,
|
|
hysteresis=hysteresis,
|
|
enabled=enabled,
|
|
display=display
|
|
)
|
|
|
|
target_dict[metric_path] = threshold
|
|
logger.debug(
|
|
"Registered threshold for %s: warn=%s, crit=%s, op=%s",
|
|
metric_path,
|
|
warning,
|
|
critical,
|
|
operator
|
|
)
|
|
|
|
def _parse_partition_thresholds(
|
|
self,
|
|
plugin_name: str,
|
|
partitions: Dict[str, Any],
|
|
target_dict: Optional[Dict[str, ThresholdConfig]] = None
|
|
):
|
|
"""Parse partition-specific thresholds for disk monitoring.
|
|
|
|
Args:
|
|
plugin_name: Name of the plugin
|
|
partitions: Partition threshold configuration
|
|
target_dict: Dictionary to store parsed thresholds
|
|
"""
|
|
if target_dict is None:
|
|
target_dict = self.thresholds
|
|
|
|
for partition, metrics in partitions.items():
|
|
if not isinstance(metrics, dict):
|
|
continue
|
|
|
|
for metric_name, threshold_config in metrics.items():
|
|
if not isinstance(threshold_config, dict):
|
|
continue
|
|
|
|
# Create metric path like "disk_monitor./dev/sda1.percent"
|
|
metric_path = f"{plugin_name}.{partition}.{metric_name}"
|
|
|
|
warning = threshold_config.get("warning")
|
|
critical = threshold_config.get("critical")
|
|
operator = threshold_config.get("operator", ">")
|
|
hysteresis = threshold_config.get("hysteresis", 0.1)
|
|
enabled = threshold_config.get("enabled", True)
|
|
display = threshold_config.get("display")
|
|
if warning is None and critical is None:
|
|
continue
|
|
|
|
threshold = ThresholdConfig(
|
|
metric_path=metric_path,
|
|
warning=warning,
|
|
critical=critical,
|
|
operator=operator,
|
|
hysteresis=hysteresis,
|
|
enabled=enabled,
|
|
display=display
|
|
)
|
|
|
|
target_dict[metric_path] = threshold
|
|
|
|
def _parse_pool_thresholds(
|
|
self,
|
|
plugin_name: str,
|
|
pools: Dict[str, Any],
|
|
target_dict: Optional[Dict[str, ThresholdConfig]] = None,
|
|
):
|
|
"""Parse ZFS pool thresholds. Pool names may be literal or '*' (all pools).
|
|
|
|
Config shape::
|
|
|
|
zfs_monitor:
|
|
pools:
|
|
'*':
|
|
health_ok:
|
|
critical: 1
|
|
operator: '<'
|
|
tank:
|
|
capacity:
|
|
warning: 80
|
|
critical: 90
|
|
"""
|
|
if target_dict is None:
|
|
target_dict = self.thresholds
|
|
|
|
for pool_name, metrics in pools.items():
|
|
if not isinstance(metrics, dict):
|
|
continue
|
|
for metric_name, threshold_config in metrics.items():
|
|
if not isinstance(threshold_config, dict):
|
|
continue
|
|
metric_path = f"{plugin_name}.{pool_name}.{metric_name}"
|
|
warning = threshold_config.get("warning")
|
|
critical = threshold_config.get("critical")
|
|
operator = threshold_config.get("operator", ">")
|
|
hysteresis = threshold_config.get("hysteresis", 0.02)
|
|
enabled = threshold_config.get("enabled", True)
|
|
display = threshold_config.get("display")
|
|
if warning is None and critical is None:
|
|
continue
|
|
target_dict[metric_path] = ThresholdConfig(
|
|
metric_path=metric_path,
|
|
warning=warning,
|
|
critical=critical,
|
|
operator=operator,
|
|
hysteresis=hysteresis,
|
|
enabled=enabled,
|
|
display=display,
|
|
)
|
|
|
|
def _parse_rtt_thresholds(
|
|
self,
|
|
rtt_thresholds: Dict[str, Any],
|
|
target_dict: Optional[Dict[str, ThresholdConfig]] = None
|
|
):
|
|
"""Parse RTT thresholds (network latency thresholds).
|
|
|
|
RTT thresholds are configured as:
|
|
thresholds:
|
|
rtt:
|
|
warning: 100.0 # ms
|
|
critical: 500.0 # ms
|
|
|
|
Args:
|
|
rtt_thresholds: RTT threshold configuration
|
|
target_dict: Dictionary to store parsed thresholds
|
|
"""
|
|
if target_dict is None:
|
|
target_dict = self.thresholds
|
|
|
|
if not isinstance(rtt_thresholds, dict):
|
|
return
|
|
|
|
# Metric path is simply "rtt" (not per-host)
|
|
metric_path = "rtt"
|
|
|
|
warning = rtt_thresholds.get("warning")
|
|
critical = rtt_thresholds.get("critical")
|
|
operator = rtt_thresholds.get("operator", ">")
|
|
hysteresis = rtt_thresholds.get("hysteresis", 0.02) # 2% default
|
|
enabled = rtt_thresholds.get("enabled", True)
|
|
display = rtt_thresholds.get("display")
|
|
count = rtt_thresholds.get("count", 1)
|
|
|
|
if warning is None and critical is None:
|
|
logger.warning("No RTT thresholds defined, skipping")
|
|
return
|
|
|
|
threshold = ThresholdConfig(
|
|
metric_path=metric_path,
|
|
warning=warning,
|
|
critical=critical,
|
|
operator=operator,
|
|
hysteresis=hysteresis,
|
|
enabled=enabled,
|
|
display=display,
|
|
count=count,
|
|
)
|
|
|
|
target_dict[metric_path] = threshold
|
|
logger.debug(
|
|
"Registered RTT threshold: warn=%s ms, crit=%s ms, count=%d",
|
|
warning,
|
|
critical,
|
|
count,
|
|
)
|
|
|
|
def get_thresholds_for_host(self, host_name: str) -> Dict[str, ThresholdConfig]:
|
|
"""Get the effective threshold configuration for a host.
|
|
|
|
When threshold_config is a list, configs are applied left-to-right on top
|
|
of the default thresholds so earlier entries can be overridden by later ones.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
|
|
Returns:
|
|
Dictionary of thresholds for this host
|
|
"""
|
|
# Legacy mode: single threshold set for all hosts
|
|
if self.thresholds and not self.threshold_configs:
|
|
return self.thresholds
|
|
|
|
if not self.threshold_configs:
|
|
return {}
|
|
|
|
config_names = self.host_config_mapping.get(host_name)
|
|
|
|
# No host-specific mapping → return pre-merged default
|
|
if not config_names:
|
|
return self.threshold_configs.get(self.default_config, {})
|
|
|
|
# Single config → fast path using pre-merged copy
|
|
if len(config_names) == 1:
|
|
name = config_names[0]
|
|
if name in self.threshold_configs:
|
|
return self.threshold_configs[name]
|
|
logger.warning(
|
|
"Threshold config '%s' not found for host '%s', using default '%s'",
|
|
name, host_name, self.default_config,
|
|
)
|
|
return self.threshold_configs.get(self.default_config, {})
|
|
|
|
# Multiple configs → start from defaults, layer raw overrides in order
|
|
result = dict(self.threshold_configs.get(self.default_config, {}))
|
|
for name in config_names:
|
|
if name == self.default_config:
|
|
continue # defaults already the base
|
|
raw = self.threshold_raw_configs.get(name)
|
|
if raw is None:
|
|
logger.warning(
|
|
"Threshold config '%s' not found for host '%s', skipping",
|
|
name, host_name,
|
|
)
|
|
else:
|
|
result.update(raw)
|
|
return result
|
|
|
|
def check_value(
|
|
self,
|
|
host_name: str,
|
|
metric_path: str,
|
|
value: float,
|
|
alert_states: Dict[str, AlertState],
|
|
) -> Optional[Tuple[AlertLevel, AlertLevel]]:
|
|
"""
|
|
Check a single value against configured threshold.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
metric_path: Full metric path (e.g., "rtt.hostname")
|
|
value: The metric value to check
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
Tuple of (old_level, new_level) if state changed, None otherwise
|
|
"""
|
|
# Get host-specific thresholds
|
|
thresholds = self.get_thresholds_for_host(host_name)
|
|
|
|
if metric_path not in thresholds:
|
|
return None
|
|
|
|
threshold = thresholds[metric_path]
|
|
|
|
# Get or create alert state
|
|
if metric_path not in alert_states:
|
|
alert_states[metric_path] = AlertState(metric_path)
|
|
|
|
alert_state = alert_states[metric_path]
|
|
|
|
# Evaluate threshold with hysteresis
|
|
new_level = threshold.evaluate_with_hysteresis(
|
|
value,
|
|
alert_state.level
|
|
)
|
|
|
|
# Apply consecutive-count gating: when currently OK, require threshold.count
|
|
# consecutive exceedances before escalating to WARNING/CRITICAL.
|
|
if new_level == AlertLevel.OK:
|
|
# Value is fine (or recovered) — reset the pending counter immediately.
|
|
alert_state.consecutive_count = 0
|
|
elif alert_state.level == AlertLevel.OK and new_level != AlertLevel.OK:
|
|
# First time we exceed while still OK: count up.
|
|
alert_state.consecutive_count += 1
|
|
if alert_state.consecutive_count < threshold.count:
|
|
logger.debug(
|
|
"RTT threshold exceeded %d/%d consecutive times for %s on %s",
|
|
alert_state.consecutive_count,
|
|
threshold.count,
|
|
metric_path,
|
|
host_name,
|
|
)
|
|
return None
|
|
# Count reached — fire the alert and reset the counter.
|
|
alert_state.consecutive_count = 0
|
|
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
# Keep hysteresis on the state so the UI can show the recovery threshold
|
|
if new_level != AlertLevel.OK:
|
|
alert_state.hysteresis = threshold.hysteresis
|
|
else:
|
|
alert_state.hysteresis = None
|
|
|
|
# Update state and check for changes
|
|
old_level = alert_state.level
|
|
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
|
|
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, None)
|
|
return (old_level, new_level)
|
|
elif new_level != AlertLevel.OK:
|
|
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, None)
|
|
|
|
return None
|
|
def _find_threshold(
|
|
self, thresholds: Dict[str, "ThresholdConfig"], metric_path: str
|
|
) -> Tuple[Optional["ThresholdConfig"], Optional[str]]:
|
|
"""Return (threshold, check_name) for *metric_path*, falling back to suffix matches.
|
|
|
|
Allows generic thresholds like ``nagios_runner.status_code`` to match
|
|
fully-qualified paths like ``nagios_runner.check_disk_root_status_code``.
|
|
The exact match is always tried first; then successive leading
|
|
underscore-delimited segments are stripped from the field name until
|
|
a match is found or no segments remain.
|
|
|
|
Returns:
|
|
(ThresholdConfig, None) for an exact match.
|
|
(ThresholdConfig, "check_disk_root") for a suffix match — the second
|
|
element is the stripped prefix, available as ``{check_name}`` in
|
|
display format templates.
|
|
(None, None) when no threshold is found.
|
|
"""
|
|
if metric_path in thresholds:
|
|
return thresholds[metric_path], None
|
|
plugin, sep, field = metric_path.partition(".")
|
|
if not sep:
|
|
return None, None
|
|
parts = field.split("_")
|
|
for i in range(1, len(parts)):
|
|
candidate = plugin + "." + "_".join(parts[i:])
|
|
if candidate in thresholds:
|
|
return thresholds[candidate], "_".join(parts[:i])
|
|
return None, None
|
|
|
|
def check_plugin_data(
|
|
self,
|
|
host_name: str,
|
|
plugin_name: str,
|
|
data: Dict[str, Any],
|
|
alert_states: Dict[str, AlertState],
|
|
) -> list:
|
|
"""
|
|
Check plugin data against configured thresholds.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
plugin_name: Name of the plugin
|
|
data: Plugin data dictionary
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
List of (metric_path, old_level, new_level, value) tuples for state changes
|
|
"""
|
|
state_changes = []
|
|
|
|
# Get host-specific thresholds
|
|
thresholds = self.get_thresholds_for_host(host_name)
|
|
|
|
# Check flat metrics
|
|
for metric_name, value in data.items():
|
|
metric_path = f"{plugin_name}.{metric_name}"
|
|
|
|
threshold, check_name = self._find_threshold(thresholds, metric_path)
|
|
if threshold is None:
|
|
continue
|
|
|
|
# Get or create alert state
|
|
if metric_path not in alert_states:
|
|
alert_states[metric_path] = AlertState(metric_path)
|
|
|
|
alert_state = alert_states[metric_path]
|
|
|
|
# Evaluate threshold with hysteresis
|
|
new_level = threshold.evaluate_with_hysteresis(
|
|
value,
|
|
alert_state.level
|
|
)
|
|
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None
|
|
|
|
# Update state and check for changes
|
|
old_level = alert_state.level
|
|
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
|
|
state_changes.append((metric_path, old_level, new_level, value))
|
|
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data, check_name=check_name, metric_name=metric_name)
|
|
elif new_level != AlertLevel.OK:
|
|
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data, check_name=check_name, metric_name=metric_name)
|
|
|
|
# Check nested metrics (e.g., partition data in disk_monitor)
|
|
self._check_nested_metrics(
|
|
host_name,
|
|
plugin_name,
|
|
data,
|
|
alert_states,
|
|
state_changes
|
|
)
|
|
|
|
return state_changes
|
|
|
|
def _check_nested_metrics(
|
|
self,
|
|
host_name: str,
|
|
plugin_name: str,
|
|
data: Dict[str, Any],
|
|
alert_states: Dict[str, AlertState],
|
|
state_changes: list,
|
|
):
|
|
"""Check nested metrics like partition-specific thresholds."""
|
|
# Get host-specific thresholds
|
|
thresholds = self.get_thresholds_for_host(host_name)
|
|
|
|
# ZFS pool health checks
|
|
if plugin_name == "zfs_monitor" and "pools" in data:
|
|
pools = data["pools"]
|
|
if isinstance(pools, dict):
|
|
for pool_name, pool_metrics in pools.items():
|
|
if not isinstance(pool_metrics, dict):
|
|
continue
|
|
# Synthesize health_ok from health string for older clients
|
|
# that predate the health_ok field.
|
|
pool_metrics_effective = dict(pool_metrics)
|
|
if "health" in pool_metrics and "health_ok" not in pool_metrics:
|
|
pool_metrics_effective["health_ok"] = 1 if pool_metrics["health"] == "ONLINE" else 0
|
|
for metric_name, value in pool_metrics_effective.items():
|
|
# Try specific pool name first, then wildcard '*'
|
|
metric_path = f"{plugin_name}.{pool_name}.{metric_name}"
|
|
wildcard_path = f"{plugin_name}.*.{metric_name}"
|
|
threshold = thresholds.get(metric_path) or thresholds.get(wildcard_path)
|
|
if threshold is None:
|
|
continue
|
|
if metric_path not in alert_states:
|
|
alert_states[metric_path] = AlertState(metric_path)
|
|
alert_state = alert_states[metric_path]
|
|
new_level = threshold.evaluate_with_hysteresis(value, alert_state.level)
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None
|
|
pool_context = dict(pool_metrics_effective)
|
|
pool_context["pool_name"] = pool_name
|
|
old_level = alert_state.level
|
|
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
|
|
state_changes.append((metric_path, old_level, new_level, value))
|
|
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, pool_context, metric_name=pool_name)
|
|
elif new_level != AlertLevel.OK:
|
|
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, pool_context, metric_name=pool_name)
|
|
|
|
# Look for partition data in disk_monitor
|
|
if plugin_name == "disk_monitor" and "partitions" in data:
|
|
partitions = data["partitions"]
|
|
if not isinstance(partitions, dict):
|
|
return
|
|
|
|
for partition, metrics in partitions.items():
|
|
if not isinstance(metrics, dict):
|
|
continue
|
|
|
|
for metric_name, value in metrics.items():
|
|
metric_path = f"{plugin_name}.{partition}.{metric_name}"
|
|
|
|
if metric_path not in thresholds:
|
|
continue
|
|
|
|
threshold = thresholds[metric_path]
|
|
|
|
if metric_path not in alert_states:
|
|
alert_states[metric_path] = AlertState(metric_path)
|
|
|
|
alert_state = alert_states[metric_path]
|
|
|
|
new_level = threshold.evaluate_with_hysteresis(
|
|
value,
|
|
alert_state.level
|
|
)
|
|
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None
|
|
|
|
old_level = alert_state.level
|
|
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
|
|
state_changes.append((metric_path, old_level, new_level, value))
|
|
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data)
|
|
elif new_level != AlertLevel.OK:
|
|
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data)
|
|
|
|
def _trigger_notification(
|
|
self,
|
|
host_name: str,
|
|
metric_path: str,
|
|
old_level: AlertLevel,
|
|
new_level: AlertLevel,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]] = None,
|
|
check_name: Optional[str] = None,
|
|
metric_name: Optional[str] = None,
|
|
):
|
|
"""Trigger a notification for an alert state change.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
metric_path: Full metric path
|
|
old_level: Previous alert level
|
|
new_level: New alert level
|
|
value: Current metric value
|
|
threshold: Threshold configuration
|
|
plugin_data: Optional dictionary of all plugin data fields for format string
|
|
"""
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
# Format operator symbol
|
|
op_symbol = threshold.operator.value
|
|
|
|
# Short metric label: strip the plugin-name prefix and _status_code suffix
|
|
short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
|
|
|
|
# Use a display-friendly value (inf is the sentinel for "overdue")
|
|
import math
|
|
display_value = "overdue" if isinstance(value, float) and math.isinf(value) else value
|
|
|
|
# Format message — for the nagios operator there is no numeric threshold_value;
|
|
# render the display template whenever one is available.
|
|
has_display = threshold_value is not None or threshold.operator == ComparisonOperator.NAGIOS
|
|
|
|
def _fmt():
|
|
return self._format_display(
|
|
threshold.display,
|
|
value=display_value,
|
|
threshold_value=threshold_value,
|
|
op_symbol=op_symbol,
|
|
plugin_data=plugin_data,
|
|
check_name=check_name,
|
|
metric_name=metric_name,
|
|
)
|
|
|
|
if new_level == AlertLevel.OK:
|
|
lvl = "RECOVER"
|
|
message = f"{short_path} = {display_value} ({old_level.name} -> OK)"
|
|
elif new_level == AlertLevel.WARNING:
|
|
lvl = "WARNING"
|
|
if has_display:
|
|
message = f"{short_path} = {display_value} {_fmt()}"
|
|
else:
|
|
message = f"{short_path} = {display_value}"
|
|
elif new_level == AlertLevel.CRITICAL:
|
|
lvl = "CRITICAL"
|
|
if has_display:
|
|
message = f"{short_path} = {display_value} {_fmt()}"
|
|
else:
|
|
message = f"{short_path} = {display_value}"
|
|
else:
|
|
lvl = "UNKNOWN"
|
|
if has_display:
|
|
message = f"{short_path} = {display_value} {_fmt()}"
|
|
else:
|
|
message = f"{short_path} = {display_value}"
|
|
|
|
# Formatted threshold info stored on AlertState for the UI
|
|
formatted_threshold_msg = _fmt() if has_display and new_level != AlertLevel.OK else None
|
|
|
|
return lvl, message, formatted_threshold_msg
|
|
|
|
def _send_notification(
|
|
self,
|
|
host_name: str,
|
|
lvl: str,
|
|
message: str,
|
|
metric_path: str,
|
|
old_level: AlertLevel,
|
|
new_level: AlertLevel,
|
|
value: Any,
|
|
):
|
|
"""Send notification and log to journal/eventlog."""
|
|
from . import hbdclass
|
|
host = hbdclass.Host.hosts.get(host_name)
|
|
if host is not None and not host.watched:
|
|
eventlog(host_name, lvl, message, service="threshold")
|
|
return
|
|
short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
|
|
title = f"[{lvl}] {host_name} {short_path}"
|
|
# Strip the "metric = " prefix from message so body is just the value/detail
|
|
prefix = short_path + " = "
|
|
body = message[len(prefix):] if message.startswith(prefix) else message
|
|
asyncio.get_event_loop().create_task(notify_mod.send_notification(
|
|
host_name,
|
|
notify_mod.Notification(
|
|
title=title,
|
|
body=body,
|
|
level=lvl,
|
|
),
|
|
))
|
|
|
|
# Log to journal
|
|
if self.journal is not None:
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
loop.create_task(self.journal.log_threshold_event(
|
|
host_name=host_name,
|
|
metric_path=metric_path,
|
|
old_level=old_level.name,
|
|
new_level=new_level.name,
|
|
value=value,
|
|
))
|
|
except Exception as e:
|
|
logger.debug(f"Failed to log threshold event to journal: {e}")
|
|
# Log to eventlog as well
|
|
eventlog(host_name, lvl, message, service="threshold")
|
|
|
|
def _format_display(
|
|
self,
|
|
display_format: str,
|
|
value: Any,
|
|
threshold_value: Optional[float],
|
|
op_symbol: str,
|
|
plugin_data: Optional[Dict[str, Any]] = None,
|
|
check_name: Optional[str] = None,
|
|
metric_name: Optional[str] = None,
|
|
) -> str:
|
|
"""Format the display string using available data.
|
|
|
|
Available template variables:
|
|
{value} - current metric value
|
|
{threshold_value} - threshold that was exceeded
|
|
{op_symbol} - comparison operator (>, <, >=, <=, ==, !=)
|
|
{check_name} - prefix stripped for generic threshold match
|
|
(e.g. "check_disk_root" when metric
|
|
"check_disk_root_status_code" matched generic
|
|
threshold "status_code")
|
|
{metric_name} - field name within the plugin data dict
|
|
Any key from plugin_data is also available.
|
|
|
|
Returns:
|
|
Formatted display string
|
|
"""
|
|
if not display_format:
|
|
display_format = "(threshold: {op_symbol} {threshold_value})" if threshold_value is not None else ""
|
|
|
|
# Build format context with standard variables
|
|
format_context = {
|
|
'value': value,
|
|
'op_symbol': op_symbol,
|
|
}
|
|
if threshold_value is not None:
|
|
format_context['threshold_value'] = threshold_value
|
|
|
|
# Add generic-match context variables when available
|
|
if check_name is not None:
|
|
format_context['check_name'] = check_name
|
|
if metric_name is not None:
|
|
format_context['metric_name'] = metric_name
|
|
|
|
# Add all plugin data fields if available
|
|
if plugin_data:
|
|
format_context.update(plugin_data)
|
|
|
|
# For nagios_runner generic matches, expose the matched check's output
|
|
# and status as short aliases {output} and {status} so display templates
|
|
# don't need to use the full {check_disk_root_output} form.
|
|
if check_name and plugin_data:
|
|
if 'output' not in format_context:
|
|
output = plugin_data.get(f"{check_name}_output")
|
|
if output is not None:
|
|
format_context['output'] = output
|
|
if 'status' not in format_context:
|
|
status = plugin_data.get(f"{check_name}_status")
|
|
if status is not None:
|
|
format_context['status'] = status
|
|
|
|
try:
|
|
# Format the display string
|
|
return display_format.format(**format_context)
|
|
except KeyError as e:
|
|
logger.warning(
|
|
"Missing format variable in display string '%s': %s",
|
|
display_format,
|
|
e
|
|
)
|
|
# Fallback to default format
|
|
return f"(threshold: {op_symbol} {threshold_value})"
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error formatting display string '%s': %s",
|
|
display_format,
|
|
e
|
|
)
|
|
return f"(threshold: {op_symbol} {threshold_value})"
|
|
|
|
def _apply_grace(
|
|
self,
|
|
host_name: str,
|
|
alert_state: AlertState,
|
|
metric_path: str,
|
|
old_level: AlertLevel,
|
|
new_level: AlertLevel,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]],
|
|
check_name: Optional[str] = None,
|
|
metric_name: Optional[str] = None,
|
|
) -> None:
|
|
"""Handle a state-change transition with grace-period logic.
|
|
|
|
Transitioning INTO alert (worsening): defers the notification for grace_seconds.
|
|
De-escalation within alert states (e.g. CRITICAL→WARNING): no new notification;
|
|
the metric is still alerting so no RECOVER was sent.
|
|
Transitioning TO OK:
|
|
- Still in grace window (pending_since set): suppresses both the alert
|
|
and the recovery — the spike never warranted a page.
|
|
- Past grace: fires the RECOVER notification normally.
|
|
"""
|
|
lvl, message, formatted_msg = self._trigger_notification(
|
|
host_name, metric_path, old_level, new_level, value, threshold, plugin_data,
|
|
check_name=check_name, metric_name=metric_name,
|
|
)
|
|
alert_state.formatted_message = formatted_msg
|
|
|
|
if new_level == AlertLevel.OK:
|
|
if alert_state.pending_since is not None:
|
|
logger.info(
|
|
"Alert suppressed (recovered within %.0fs grace): %s on %s",
|
|
self.grace_seconds, metric_path, host_name,
|
|
)
|
|
alert_state.pending_since = None
|
|
else:
|
|
self._send_notification(host_name, lvl, message, metric_path, old_level, new_level, value)
|
|
elif new_level.value > old_level.value:
|
|
# Worsening (OK→WARNING, OK→CRITICAL, WARNING→CRITICAL): schedule notification.
|
|
alert_state.pending_since = time.time()
|
|
logger.debug(
|
|
"Alert deferred (%.0fs grace): %s on %s = %s",
|
|
self.grace_seconds, metric_path, host_name, value,
|
|
)
|
|
else:
|
|
# De-escalation within alert states (e.g. CRITICAL→WARNING): metric is still
|
|
# alerting but did not recover, so no new notification.
|
|
logger.debug(
|
|
"De-escalation %s→%s for %s on %s, no notification",
|
|
old_level.name, new_level.name, metric_path, host_name,
|
|
)
|
|
|
|
def _check_pending_or_renotify(
|
|
self,
|
|
host_name: str,
|
|
alert_state: AlertState,
|
|
metric_path: str,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]],
|
|
check_name: Optional[str] = None,
|
|
metric_name: Optional[str] = None,
|
|
) -> None:
|
|
"""Called when alert level is unchanged and non-OK.
|
|
|
|
If a deferred notification is pending and grace_seconds have elapsed,
|
|
fires it now. Otherwise falls through to normal reminder logic.
|
|
"""
|
|
if alert_state.pending_since is not None:
|
|
if time.time() - alert_state.pending_since >= self.grace_seconds:
|
|
lvl, message, formatted_msg = self._trigger_notification(
|
|
host_name, metric_path, AlertLevel.OK, alert_state.level, value, threshold, plugin_data,
|
|
check_name=check_name, metric_name=metric_name,
|
|
)
|
|
alert_state.formatted_message = formatted_msg
|
|
self._send_notification(
|
|
host_name, lvl, message, metric_path, AlertLevel.OK, alert_state.level, value
|
|
)
|
|
alert_state.pending_since = None
|
|
# else: still within grace window, do nothing
|
|
else:
|
|
self._check_renotify(host_name, alert_state, metric_path, value, threshold, plugin_data, check_name=check_name, metric_name=metric_name)
|
|
|
|
def _check_renotify(
|
|
self,
|
|
host_name: str,
|
|
alert_state: AlertState,
|
|
metric_path: str,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]] = None,
|
|
check_name: Optional[str] = None,
|
|
metric_name: Optional[str] = None,
|
|
):
|
|
"""Check if we should send a repeat notification.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
alert_state: Current alert state
|
|
metric_path: Full metric path
|
|
value: Current metric value
|
|
threshold: Threshold configuration
|
|
plugin_data: Optional dictionary of all plugin data fields
|
|
"""
|
|
if alert_state.level != AlertLevel.CRITICAL:
|
|
return
|
|
|
|
# Skip reminders if alert has been acknowledged
|
|
if alert_state.acknowledged:
|
|
return
|
|
|
|
now = time.time()
|
|
|
|
# Check if we should re-notify
|
|
if alert_state.last_notification is None:
|
|
# First notification already sent during state change
|
|
alert_state.last_notification = now
|
|
alert_state.notification_count = 1
|
|
return
|
|
|
|
if (now - alert_state.last_notification) >= self.renotify_interval:
|
|
# Determine which threshold is active
|
|
threshold_value = None
|
|
if alert_state.level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif alert_state.level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
# Format operator symbol
|
|
op_symbol = threshold.operator.value
|
|
short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
|
|
|
|
# Time to re-notify
|
|
if threshold_value is not None:
|
|
# Use display format string
|
|
threshold_info = self._format_display(
|
|
threshold.display,
|
|
value=value,
|
|
threshold_value=threshold_value,
|
|
op_symbol=op_symbol,
|
|
plugin_data=plugin_data,
|
|
check_name=check_name,
|
|
metric_name=metric_name,
|
|
)
|
|
body = f"{value} {threshold_info}, ongoing for {int(now - alert_state.since)}s"
|
|
else:
|
|
body = f"{value} (ongoing for {int(now - alert_state.since)}s)"
|
|
message = f"REMINDER ({alert_state.level.name}): {host_name} - {short_path} = {body}"
|
|
|
|
from . import hbdclass
|
|
host = hbdclass.Host.hosts.get(host_name)
|
|
if host is None or host.watched:
|
|
asyncio.get_event_loop().create_task(notify_mod.send_notification(
|
|
host_name,
|
|
notify_mod.Notification(
|
|
title=f"[REMINDER/{alert_state.level.name}] {host_name} {short_path}",
|
|
body=body,
|
|
level=alert_state.level.name,
|
|
),
|
|
))
|
|
logger.info("Re-notification sent: %s", message)
|
|
alert_state.last_notification = now
|
|
alert_state.notification_count += 1
|
|
|
|
def purge_stale_alerts(self, hbdclass) -> None:
|
|
"""Remove alert states that have no matching threshold configuration.
|
|
|
|
Called after startup (pickle restore) and after each config reload so
|
|
that alerts orphaned by configuration changes do not linger forever.
|
|
Alerts whose metric_path is not present in the current threshold config
|
|
for that host are silently dropped.
|
|
"""
|
|
for hostname, host in hbdclass.Host.hosts.items():
|
|
if not host.alert_states:
|
|
continue
|
|
configured = self.get_thresholds_for_host(hostname)
|
|
stale = [mp for mp in host.alert_states if self._find_threshold(configured, mp)[0] is None]
|
|
for mp in stale:
|
|
logger.info(
|
|
"Purging stale alert state for %s / %s (no threshold configured)",
|
|
hostname, mp,
|
|
)
|
|
del host.alert_states[mp]
|
|
|
|
def get_active_alerts(self, alert_states: Dict[str, AlertState]) -> list:
|
|
"""
|
|
Get all currently active (non-OK) alerts.
|
|
|
|
Args:
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
List of AlertState objects that are not OK
|
|
"""
|
|
return [
|
|
state for state in alert_states.values()
|
|
if state.level != AlertLevel.OK
|
|
]
|
|
|
|
def get_alert_summary(self, alert_states: Dict[str, AlertState]) -> Dict[str, int]:
|
|
"""
|
|
Get summary counts of alert levels.
|
|
|
|
Args:
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
Dictionary with counts: {"ok": N, "warning": N, "critical": N}
|
|
"""
|
|
summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0}
|
|
|
|
for state in alert_states.values():
|
|
if state.level == AlertLevel.OK:
|
|
summary["ok"] += 1
|
|
elif state.level == AlertLevel.WARNING:
|
|
summary["warning"] += 1
|
|
elif state.level == AlertLevel.CRITICAL:
|
|
summary["critical"] += 1
|
|
elif state.level == AlertLevel.UNKNOWN:
|
|
summary["unknown"] += 1
|
|
|
|
return summary
|