3da6976b53
These entries are set by the connection state machine, not by threshold config, so they have no threshold entry and were being deleted on every startup. Guard them explicitly so overdue/down alerts survive the purge. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1614 lines
66 KiB
Python
1614 lines
66 KiB
Python
"""
|
|
Threshold checking and alerting for plugin metrics.
|
|
|
|
This module provides a flexible threshold checking system that:
|
|
- Evaluates plugin metrics against configured warning/critical thresholds
|
|
- Tracks alert states per host and metric
|
|
- Prevents alert flapping with hysteresis
|
|
- Triggers notifications only on state changes
|
|
- Supports multiple comparison operators
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from enum import Enum
|
|
from typing import Dict, List, Any, Optional, Tuple, Callable
|
|
from . import notify as notify_mod
|
|
from .config import THRESHOLD_DEFAULTS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
eventlog = notify_mod.eventlog
|
|
|
|
class AlertLevel(Enum):
|
|
"""Alert severity levels."""
|
|
OK = 0
|
|
WARNING = 1
|
|
CRITICAL = 2
|
|
UNKNOWN = 3
|
|
|
|
|
|
class ComparisonOperator(Enum):
|
|
"""Supported comparison operators for threshold checks."""
|
|
GT = ">" # Greater than
|
|
GTE = ">=" # Greater than or equal
|
|
LT = "<" # Less than
|
|
LTE = "<=" # Less than or equal
|
|
EQ = "==" # Equal to
|
|
NEQ = "!=" # Not equal to
|
|
NAGIOS = "nagios" # Nagios exit-code semantics: 0=OK 1=WARNING 2=CRITICAL 3=UNKNOWN
|
|
|
|
|
|
class AlertState:
|
|
"""Represents the current alert state for a specific metric."""
|
|
|
|
def __init__(self, metric_path: str):
|
|
"""
|
|
Initialize alert state.
|
|
|
|
Args:
|
|
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
|
|
"""
|
|
self.metric_path = metric_path
|
|
self.level = AlertLevel.OK
|
|
self.since = time.time()
|
|
self.last_value = None
|
|
self.last_check = time.time()
|
|
self.notification_count = 0
|
|
self.last_notification = None
|
|
self.threshold_value = None # The threshold value that triggered alert
|
|
self.operator = None # The comparison operator (>, <, >=, etc.)
|
|
self.hysteresis: Optional[float] = None # Hysteresis fraction used for recovery
|
|
self.formatted_message = None # Formatted display message for UI
|
|
self.acknowledged = False # Whether alert has been acknowledged
|
|
self.acknowledged_at = None # Timestamp when acknowledged
|
|
self.consecutive_count = 0 # Consecutive exceedances while still OK (for count gating)
|
|
self.pending_since: Optional[float] = None # non-None while waiting out grace period before notifying
|
|
|
|
def update(
|
|
self,
|
|
level: AlertLevel,
|
|
value: Any,
|
|
threshold_value: Optional[float] = None,
|
|
operator: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Update alert state.
|
|
|
|
Args:
|
|
level: New alert level
|
|
value: Current metric value
|
|
threshold_value: The threshold value that was exceeded (if applicable)
|
|
operator: The comparison operator (>, <, >=, etc.)
|
|
|
|
Returns:
|
|
True if state changed (notification needed), False otherwise
|
|
"""
|
|
now = time.time()
|
|
self.last_check = now
|
|
self.last_value = value
|
|
|
|
# Update threshold info when alert is active
|
|
if level != AlertLevel.OK:
|
|
self.threshold_value = threshold_value
|
|
self.operator = operator
|
|
else:
|
|
# Clear threshold info when returning to OK
|
|
self.threshold_value = None
|
|
self.operator = None
|
|
|
|
# Check if state changed
|
|
if level != self.level:
|
|
logger.info(
|
|
"Alert state change for %s: %s -> %s (value: %s)",
|
|
self.metric_path,
|
|
self.level.name,
|
|
level.name,
|
|
value
|
|
)
|
|
self.level = level
|
|
self.since = now
|
|
self.notification_count = 0
|
|
self.last_notification = None # restart reminder interval on level change
|
|
# Reset acknowledgment on state change
|
|
if level != AlertLevel.OK:
|
|
# Only reset if changing to a different alert level
|
|
self.acknowledged = False
|
|
self.acknowledged_at = None
|
|
return True
|
|
|
|
return False
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert alert state to dictionary for serialization."""
|
|
import math
|
|
|
|
# Helper to sanitize numeric values for JSON (handle inf/nan)
|
|
def sanitize_value(val):
|
|
if isinstance(val, float):
|
|
if math.isinf(val):
|
|
return "overdue"
|
|
if math.isnan(val):
|
|
return None
|
|
return val
|
|
|
|
result = {
|
|
"metric_path": self.metric_path,
|
|
"level": self.level.name,
|
|
"since": self.since,
|
|
"last_value": sanitize_value(self.last_value),
|
|
"last_check": self.last_check,
|
|
"notification_count": self.notification_count,
|
|
"acknowledged": self.acknowledged,
|
|
}
|
|
|
|
# Include acknowledgment timestamp if acknowledged
|
|
if self.acknowledged_at is not None:
|
|
result["acknowledged_at"] = self.acknowledged_at
|
|
|
|
# Include threshold info if available
|
|
if self.threshold_value is not None:
|
|
result["threshold_value"] = sanitize_value(self.threshold_value)
|
|
if self.operator is not None:
|
|
result["operator"] = self.operator
|
|
if self.formatted_message is not None:
|
|
result["formatted_message"] = self.formatted_message
|
|
|
|
# Compute and expose the recovery threshold so the UI can display it
|
|
if (self.hysteresis and self.threshold_value is not None
|
|
and self.operator is not None):
|
|
ha = abs(self.threshold_value * self.hysteresis)
|
|
if self.operator in ('>', '>='):
|
|
result["recovery_threshold"] = round(self.threshold_value - ha, 4)
|
|
elif self.operator in ('<', '<='):
|
|
result["recovery_threshold"] = round(self.threshold_value + ha, 4)
|
|
|
|
return result
|
|
|
|
def __setstate__(self, state):
|
|
"""Restore from pickle, backfilling fields added after the pickle was written."""
|
|
self.__dict__.update(state)
|
|
if not hasattr(self, 'consecutive_count'):
|
|
self.consecutive_count = 0
|
|
if not hasattr(self, 'hysteresis'):
|
|
self.hysteresis = None
|
|
|
|
def acknowledge(self):
|
|
"""Acknowledge this alert to stop reminder notifications."""
|
|
self.acknowledged = True
|
|
self.acknowledged_at = time.time()
|
|
logger.info("Alert acknowledged for %s", self.metric_path)
|
|
|
|
def __str__(self):
|
|
return self.to_dict().__str__()
|
|
|
|
class ThresholdConfig:
|
|
"""Configuration for a single threshold check."""
|
|
|
|
def __init__(
|
|
self,
|
|
metric_path: str,
|
|
warning: Optional[float] = None,
|
|
critical: Optional[float] = None,
|
|
display: Optional[str] = None,
|
|
operator: str = ">",
|
|
hysteresis: float = 0.0,
|
|
enabled: bool = True,
|
|
count: int = 1,
|
|
grace: Optional[float] = None,
|
|
):
|
|
"""
|
|
Initialize threshold configuration.
|
|
|
|
Args:
|
|
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
|
|
warning: Warning threshold value
|
|
critical: Critical threshold value
|
|
operator: Comparison operator (>, >=, <, <=, ==, !=)
|
|
hysteresis: Hysteresis percentage to prevent flapping (0.0-1.0)
|
|
enabled: Whether this threshold is enabled
|
|
count: Number of consecutive exceedances required before alerting (default 1)
|
|
grace: Per-metric grace period in seconds; overrides global grace when set
|
|
"""
|
|
self.metric_path = metric_path
|
|
self.warning = warning
|
|
self.critical = critical
|
|
self.enabled = enabled
|
|
self.hysteresis = hysteresis
|
|
self.display = display
|
|
self.count = max(1, int(count))
|
|
self.grace = float(grace) if grace is not None else None
|
|
|
|
# Parse operator
|
|
try:
|
|
self.operator = ComparisonOperator(operator)
|
|
except ValueError:
|
|
logger.warning(
|
|
"Invalid operator '%s' for %s, using '>' as default",
|
|
operator,
|
|
metric_path
|
|
)
|
|
self.operator = ComparisonOperator.GT
|
|
|
|
def evaluate(self, value: float) -> AlertLevel:
|
|
"""
|
|
Evaluate a value against this threshold.
|
|
|
|
Args:
|
|
value: Metric value to check
|
|
|
|
Returns:
|
|
AlertLevel indicating the severity
|
|
"""
|
|
if not self.enabled:
|
|
return AlertLevel.OK
|
|
|
|
# Nagios exit-code semantics: value IS the severity
|
|
if self.operator == ComparisonOperator.NAGIOS:
|
|
try:
|
|
code = int(value)
|
|
except (TypeError, ValueError):
|
|
return AlertLevel.UNKNOWN
|
|
return {0: AlertLevel.OK, 1: AlertLevel.WARNING, 2: AlertLevel.CRITICAL}.get(
|
|
code, AlertLevel.UNKNOWN
|
|
)
|
|
|
|
try:
|
|
# Convert value to float for comparison
|
|
value = float(value)
|
|
except (TypeError, ValueError):
|
|
logger.warning("Cannot convert value %s to float for %s", value, self.metric_path)
|
|
return AlertLevel.UNKNOWN
|
|
|
|
# Check critical threshold first
|
|
if self.critical is not None:
|
|
if self._compare(value, self.critical):
|
|
return AlertLevel.CRITICAL
|
|
|
|
# Then check warning threshold
|
|
if self.warning is not None:
|
|
if self._compare(value, self.warning):
|
|
return AlertLevel.WARNING
|
|
|
|
return AlertLevel.OK
|
|
|
|
def evaluate_with_hysteresis(
|
|
self,
|
|
value: float,
|
|
current_level: AlertLevel
|
|
) -> AlertLevel:
|
|
"""
|
|
Evaluate with hysteresis to prevent flapping.
|
|
|
|
Args:
|
|
value: Current metric value
|
|
current_level: Current alert level
|
|
|
|
Returns:
|
|
New alert level considering hysteresis
|
|
"""
|
|
new_level = self.evaluate(value)
|
|
|
|
# Nagios exit codes are discrete integers — hysteresis doesn't apply
|
|
if self.operator == ComparisonOperator.NAGIOS:
|
|
return new_level
|
|
|
|
# If no hysteresis, return new level
|
|
if self.hysteresis == 0.0:
|
|
return new_level
|
|
|
|
# If improving (going to a lower severity), apply hysteresis
|
|
if new_level.value < current_level.value:
|
|
# For recovery, value must be better by hysteresis amount
|
|
if current_level == AlertLevel.CRITICAL and self.critical is not None:
|
|
threshold = self.critical
|
|
elif current_level == AlertLevel.WARNING and self.warning is not None:
|
|
threshold = self.warning
|
|
else:
|
|
return new_level
|
|
|
|
# Calculate hysteresis threshold
|
|
hysteresis_amount = abs(threshold * self.hysteresis)
|
|
|
|
if self.operator in [ComparisonOperator.GT, ComparisonOperator.GTE]:
|
|
# For "greater than" thresholds, value must go below by hysteresis
|
|
recovery_threshold = threshold - hysteresis_amount
|
|
if value >= recovery_threshold:
|
|
# Not enough improvement, keep current level
|
|
return current_level
|
|
elif self.operator in [ComparisonOperator.LT, ComparisonOperator.LTE]:
|
|
# For "less than" thresholds, value must go above by hysteresis
|
|
recovery_threshold = threshold + hysteresis_amount
|
|
if value <= recovery_threshold:
|
|
# Not enough improvement, keep current level
|
|
return current_level
|
|
|
|
return new_level
|
|
|
|
def _compare(self, value: float, threshold: float) -> bool:
|
|
"""Perform comparison based on operator."""
|
|
if self.operator == ComparisonOperator.GT:
|
|
return value > threshold
|
|
elif self.operator == ComparisonOperator.GTE:
|
|
return value >= threshold
|
|
elif self.operator == ComparisonOperator.LT:
|
|
return value < threshold
|
|
elif self.operator == ComparisonOperator.LTE:
|
|
return value <= threshold
|
|
elif self.operator == ComparisonOperator.EQ:
|
|
return abs(value - threshold) < 1e-9 # Float comparison
|
|
elif self.operator == ComparisonOperator.NEQ:
|
|
return abs(value - threshold) >= 1e-9
|
|
return False
|
|
|
|
|
|
class ThresholdChecker:
|
|
"""Main threshold checking and alerting system."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Dict[str, Any],
|
|
renotify_interval: int = 3600,
|
|
journal: Optional[Any] = None,
|
|
):
|
|
"""
|
|
Initialize threshold checker.
|
|
|
|
Args:
|
|
config: Threshold configuration dictionary from YAML
|
|
renotify_interval: Seconds between repeat notifications (default: 1 hour)
|
|
journal: Optional MessageJournal instance for logging threshold events
|
|
"""
|
|
# Named threshold configurations (pre-merged: defaults + overrides): {config_name: {metric_path: ThresholdConfig}}
|
|
self.threshold_configs = {}
|
|
|
|
# Raw overrides only for each named config (no defaults baked in): {config_name: {metric_path: ThresholdConfig}}
|
|
self.threshold_raw_configs: Dict[str, Dict[str, ThresholdConfig]] = {}
|
|
|
|
# Single threshold set for backward compatibility: {metric_path: ThresholdConfig}
|
|
self.thresholds = {}
|
|
|
|
# Host to ordered list of config names: {host_name: [config_name, ...]}
|
|
self.host_config_mapping: Dict[str, List[str]] = {}
|
|
|
|
# Default config name to use when no mapping exists
|
|
self.default_config = "default"
|
|
|
|
self.renotify_interval = renotify_interval
|
|
self.grace_seconds: float = float(config.get("grace", 2))
|
|
self.journal = journal
|
|
|
|
# Parse configuration
|
|
self._parse_config(config)
|
|
|
|
total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values())
|
|
if total_thresholds == 0 and len(self.thresholds) > 0:
|
|
# Backward compatibility: using single threshold set
|
|
total_thresholds = len(self.thresholds)
|
|
logger.info("ThresholdChecker initialized with %d thresholds (legacy format)", total_thresholds)
|
|
else:
|
|
logger.info(
|
|
"ThresholdChecker initialized with %d named configurations (%d total thresholds)",
|
|
len(self.threshold_configs),
|
|
total_thresholds
|
|
)
|
|
|
|
def reload(self, config: Dict[str, Any]):
|
|
"""Reload threshold configuration from new config dict.
|
|
|
|
This clears all existing thresholds and re-parses from the new configuration.
|
|
Alert states are preserved to maintain hysteresis across reloads.
|
|
|
|
Args:
|
|
config: New configuration dictionary
|
|
"""
|
|
logger.info("Reloading threshold configuration...")
|
|
|
|
# Clear old configuration
|
|
self.threshold_configs.clear()
|
|
self.threshold_raw_configs.clear()
|
|
self.thresholds.clear()
|
|
self.host_config_mapping.clear()
|
|
self.grace_seconds = float(config.get("grace", 2))
|
|
|
|
# Parse new configuration
|
|
self._parse_config(config)
|
|
|
|
total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values())
|
|
if total_thresholds == 0 and len(self.thresholds) > 0:
|
|
total_thresholds = len(self.thresholds)
|
|
|
|
logger.info("Threshold configuration reloaded: %d total thresholds", total_thresholds)
|
|
|
|
def _parse_config(self, config: Dict[str, Any]):
|
|
"""Parse threshold configuration from YAML structure.
|
|
|
|
Supports two formats:
|
|
1. Legacy format with direct 'thresholds' section
|
|
2. New format with 'threshold_configs' and 'host_threshold_mapping'
|
|
|
|
In all cases, THRESHOLD_DEFAULTS are seeded into threshold_configs["default"]
|
|
so the Settings page always shows the built-in defaults.
|
|
_parse_multi_config() overwrites this with the fully-merged effective defaults.
|
|
"""
|
|
# Always expose built-in defaults through threshold_configs["default"] so
|
|
# the Settings page has something to display even in legacy/no-config mode.
|
|
seed: Dict[str, ThresholdConfig] = {}
|
|
for plugin_name, plugin_thresholds in THRESHOLD_DEFAULTS.get("thresholds", {}).items():
|
|
if isinstance(plugin_thresholds, dict):
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=seed)
|
|
if seed:
|
|
self.threshold_configs["default"] = seed
|
|
self.threshold_raw_configs["default"] = {}
|
|
|
|
# Check for new multi-config format
|
|
if "threshold_configs" in config:
|
|
self._parse_multi_config(config) # overwrites threshold_configs["default"]
|
|
elif "thresholds" in config:
|
|
# Legacy single threshold configuration
|
|
self._parse_legacy_config(config)
|
|
else:
|
|
logger.info("No thresholds configured")
|
|
|
|
def _parse_multi_config(self, config: Dict[str, Any]):
|
|
"""Parse multiple named threshold configurations."""
|
|
threshold_configs = config.get("threshold_configs", {})
|
|
|
|
if not threshold_configs:
|
|
logger.info("No threshold configurations defined")
|
|
return
|
|
|
|
# Build effective_defaults: THRESHOLD_DEFAULTS merged with the 'default' config (if present).
|
|
# All other configs inherit any metric not explicitly defined from effective_defaults.
|
|
effective_defaults: Dict[str, ThresholdConfig] = {}
|
|
for plugin_name, plugin_thresholds in THRESHOLD_DEFAULTS.get("thresholds", {}).items():
|
|
if isinstance(plugin_thresholds, dict):
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults)
|
|
|
|
if "default" in threshold_configs:
|
|
default_data = threshold_configs["default"]
|
|
if isinstance(default_data, dict) and "thresholds" in default_data:
|
|
for plugin_name, plugin_thresholds in default_data["thresholds"].items():
|
|
if isinstance(plugin_thresholds, dict):
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults)
|
|
|
|
self.threshold_configs["default"] = dict(effective_defaults)
|
|
self.threshold_raw_configs["default"] = {}
|
|
logger.info("Registered 'default' threshold config with %d metrics", len(effective_defaults))
|
|
|
|
# Parse each named configuration
|
|
for config_name, config_data in threshold_configs.items():
|
|
if config_name == "default":
|
|
continue # already handled above
|
|
|
|
if not isinstance(config_data, dict):
|
|
logger.warning("Invalid threshold config '%s', skipping", config_name)
|
|
continue
|
|
|
|
if "thresholds" not in config_data:
|
|
logger.warning("No thresholds in config '%s', skipping", config_name)
|
|
continue
|
|
|
|
logger.info("Parsing threshold configuration: %s", config_name)
|
|
|
|
# Raw overrides only (used for multi-config layering)
|
|
raw_overrides: Dict[str, ThresholdConfig] = {}
|
|
thresholds_config = config_data["thresholds"]
|
|
for plugin_name, plugin_thresholds in thresholds_config.items():
|
|
if not isinstance(plugin_thresholds, dict):
|
|
continue
|
|
plugin_enabled = plugin_thresholds.get('enabled', plugin_thresholds.get('enable', True))
|
|
if not plugin_enabled:
|
|
# raw_overrides is empty at this point so there's nothing to delete.
|
|
# Instead, inject disabled stubs for every matching effective_default so
|
|
# the merge step overwrites the inherited defaults.
|
|
for key, tc in effective_defaults.items():
|
|
if key.startswith(f"{plugin_name}."):
|
|
raw_overrides[key] = ThresholdConfig(
|
|
metric_path=key,
|
|
warning=tc.warning,
|
|
critical=tc.critical,
|
|
operator=tc.operator.value,
|
|
enabled=False,
|
|
)
|
|
logger.info(
|
|
"Plugin-level disable in config '%s': disabled all thresholds for %s",
|
|
config_name, plugin_name,
|
|
)
|
|
else:
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=raw_overrides)
|
|
self.threshold_raw_configs[config_name] = raw_overrides
|
|
|
|
# Pre-merged version (defaults + overrides) for single-config fast path
|
|
self.threshold_configs[config_name] = dict(effective_defaults)
|
|
self.threshold_configs[config_name].update(raw_overrides)
|
|
|
|
# Parse host → config list mapping from two possible sources
|
|
|
|
def _normalise(value) -> List[str]:
|
|
"""Accept a string or list; always return a list."""
|
|
if isinstance(value, list):
|
|
return [str(v) for v in value]
|
|
return [str(value)]
|
|
|
|
# 1. hosts section with threshold_config attribute (string or list)
|
|
if "hosts" in config:
|
|
hosts_config = config["hosts"]
|
|
if isinstance(hosts_config, dict):
|
|
for host_name, host_attrs in hosts_config.items():
|
|
if isinstance(host_attrs, dict) and "threshold_config" in host_attrs:
|
|
self.host_config_mapping[host_name] = _normalise(host_attrs["threshold_config"])
|
|
|
|
# 2. Legacy host_threshold_mapping section (string values only)
|
|
if "host_threshold_mapping" in config:
|
|
legacy_mapping = config.get("host_threshold_mapping", {})
|
|
if isinstance(legacy_mapping, dict):
|
|
for host_name, value in legacy_mapping.items():
|
|
self.host_config_mapping[host_name] = _normalise(value)
|
|
|
|
# Set default config (first one alphabetically or explicitly set)
|
|
self.default_config = config.get("default_threshold_config", "default")
|
|
if self.default_config not in self.threshold_configs and self.threshold_configs:
|
|
# Use first available config as default
|
|
self.default_config = sorted(self.threshold_configs.keys())[0]
|
|
logger.info("Using '%s' as default threshold config", self.default_config)
|
|
|
|
logger.info(
|
|
"Loaded %d threshold configurations with %d host mappings",
|
|
len(self.threshold_configs),
|
|
len(self.host_config_mapping)
|
|
)
|
|
|
|
def _parse_legacy_config(self, config: Dict[str, Any]):
|
|
"""Parse legacy single threshold configuration for backward compatibility."""
|
|
if not config or "thresholds" not in config:
|
|
logger.info("No thresholds configured")
|
|
return
|
|
|
|
thresholds_config = config["thresholds"]
|
|
|
|
for plugin_name, plugin_thresholds in thresholds_config.items():
|
|
if not isinstance(plugin_thresholds, dict):
|
|
continue
|
|
|
|
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=self.thresholds)
|
|
|
|
def _parse_plugin_thresholds(
|
|
self,
|
|
plugin_name: str,
|
|
thresholds: Dict[str, Any],
|
|
target_dict: Optional[Dict[str, ThresholdConfig]] = None
|
|
):
|
|
"""Parse thresholds for a specific plugin.
|
|
|
|
Args:
|
|
plugin_name: Name of the plugin
|
|
thresholds: Threshold configuration dictionary
|
|
target_dict: Dictionary to store parsed thresholds (defaults to self.thresholds)
|
|
"""
|
|
if target_dict is None:
|
|
target_dict = self.thresholds
|
|
|
|
# Special handling for RTT thresholds (per-host)
|
|
if plugin_name == "rtt":
|
|
self._parse_rtt_thresholds(thresholds, target_dict)
|
|
return
|
|
|
|
# Plugin-level enabled: false (also accept 'enable' as a common typo) removes all
|
|
# thresholds for this plugin — e.g. memory_monitor: {enabled: false}.
|
|
plugin_enabled = thresholds.get('enabled', thresholds.get('enable', True))
|
|
if not plugin_enabled:
|
|
for key in [k for k in target_dict if k.startswith(f"{plugin_name}.")]:
|
|
del target_dict[key]
|
|
logger.info("Plugin-level disable: removed all thresholds for %s", plugin_name)
|
|
return
|
|
|
|
for metric_name, threshold_config in thresholds.items():
|
|
if not isinstance(threshold_config, dict):
|
|
continue
|
|
|
|
# Handle nested metrics (e.g., partitions./.percent or pools.*.status)
|
|
if metric_name == "partitions":
|
|
self._parse_partition_thresholds(plugin_name, threshold_config, target_dict)
|
|
continue
|
|
if metric_name == "pools":
|
|
self._parse_pool_thresholds(plugin_name, threshold_config, target_dict)
|
|
continue
|
|
|
|
metric_path = f"{plugin_name}.{metric_name}"
|
|
|
|
# Extract threshold values
|
|
warning = threshold_config.get("warning")
|
|
critical = threshold_config.get("critical")
|
|
operator = threshold_config.get("operator", ">")
|
|
# Nagios operator maps exit codes directly; no numeric thresholds needed
|
|
is_nagios_op = (operator == "nagios")
|
|
default_display = "{check_name}: {output}" if is_nagios_op else "(threshold: {op_symbol} {threshold_value})"
|
|
display = threshold_config.get("display", default_display)
|
|
hysteresis = threshold_config.get("hysteresis", 0.0 if is_nagios_op else 0.02)
|
|
enabled = threshold_config.get("enabled", True)
|
|
grace = threshold_config.get("grace", None)
|
|
|
|
if warning is None and critical is None and not is_nagios_op:
|
|
logger.warning("No thresholds defined for %s, skipping", metric_path)
|
|
continue
|
|
|
|
threshold = ThresholdConfig(
|
|
metric_path=metric_path,
|
|
warning=warning,
|
|
critical=critical,
|
|
operator=operator,
|
|
hysteresis=hysteresis,
|
|
enabled=enabled,
|
|
display=display,
|
|
grace=grace,
|
|
)
|
|
|
|
target_dict[metric_path] = threshold
|
|
logger.debug(
|
|
"Registered threshold for %s: warn=%s, crit=%s, op=%s",
|
|
metric_path,
|
|
warning,
|
|
critical,
|
|
operator
|
|
)
|
|
|
|
def _parse_partition_thresholds(
|
|
self,
|
|
plugin_name: str,
|
|
partitions: Dict[str, Any],
|
|
target_dict: Optional[Dict[str, ThresholdConfig]] = None
|
|
):
|
|
"""Parse partition-specific thresholds for disk monitoring.
|
|
|
|
Args:
|
|
plugin_name: Name of the plugin
|
|
partitions: Partition threshold configuration
|
|
target_dict: Dictionary to store parsed thresholds
|
|
"""
|
|
if target_dict is None:
|
|
target_dict = self.thresholds
|
|
|
|
for partition, metrics in partitions.items():
|
|
if not isinstance(metrics, dict):
|
|
continue
|
|
|
|
for metric_name, threshold_config in metrics.items():
|
|
if not isinstance(threshold_config, dict):
|
|
continue
|
|
|
|
# Create metric path like "disk_monitor./dev/sda1.percent"
|
|
metric_path = f"{plugin_name}.{partition}.{metric_name}"
|
|
|
|
warning = threshold_config.get("warning")
|
|
critical = threshold_config.get("critical")
|
|
operator = threshold_config.get("operator", ">")
|
|
hysteresis = threshold_config.get("hysteresis", 0.1)
|
|
enabled = threshold_config.get("enabled", True)
|
|
display = threshold_config.get("display")
|
|
grace = threshold_config.get("grace", None)
|
|
if warning is None and critical is None:
|
|
continue
|
|
|
|
threshold = ThresholdConfig(
|
|
metric_path=metric_path,
|
|
warning=warning,
|
|
critical=critical,
|
|
operator=operator,
|
|
hysteresis=hysteresis,
|
|
enabled=enabled,
|
|
display=display,
|
|
grace=grace,
|
|
)
|
|
|
|
target_dict[metric_path] = threshold
|
|
|
|
def _parse_pool_thresholds(
|
|
self,
|
|
plugin_name: str,
|
|
pools: Dict[str, Any],
|
|
target_dict: Optional[Dict[str, ThresholdConfig]] = None,
|
|
):
|
|
"""Parse ZFS pool thresholds. Pool names may be literal or '*' (all pools).
|
|
|
|
Config shape::
|
|
|
|
zfs_monitor:
|
|
pools:
|
|
'*':
|
|
status:
|
|
warning: 1
|
|
critical: 2
|
|
operator: '>'
|
|
tank:
|
|
capacity:
|
|
warning: 80
|
|
critical: 90
|
|
"""
|
|
if target_dict is None:
|
|
target_dict = self.thresholds
|
|
|
|
for pool_name, metrics in pools.items():
|
|
if not isinstance(metrics, dict):
|
|
continue
|
|
for metric_name, threshold_config in metrics.items():
|
|
if not isinstance(threshold_config, dict):
|
|
continue
|
|
metric_path = f"{plugin_name}.{pool_name}.{metric_name}"
|
|
warning = threshold_config.get("warning")
|
|
critical = threshold_config.get("critical")
|
|
operator = threshold_config.get("operator", ">")
|
|
hysteresis = threshold_config.get("hysteresis", 0.02)
|
|
enabled = threshold_config.get("enabled", True)
|
|
display = threshold_config.get("display")
|
|
grace = threshold_config.get("grace", None)
|
|
if warning is None and critical is None:
|
|
continue
|
|
target_dict[metric_path] = ThresholdConfig(
|
|
metric_path=metric_path,
|
|
warning=warning,
|
|
critical=critical,
|
|
operator=operator,
|
|
hysteresis=hysteresis,
|
|
enabled=enabled,
|
|
display=display,
|
|
grace=grace,
|
|
)
|
|
|
|
def _parse_rtt_thresholds(
|
|
self,
|
|
rtt_thresholds: Dict[str, Any],
|
|
target_dict: Optional[Dict[str, ThresholdConfig]] = None
|
|
):
|
|
"""Parse RTT thresholds (network latency thresholds).
|
|
|
|
RTT thresholds are configured as:
|
|
thresholds:
|
|
rtt:
|
|
warning: 100.0 # ms
|
|
critical: 500.0 # ms
|
|
|
|
Args:
|
|
rtt_thresholds: RTT threshold configuration
|
|
target_dict: Dictionary to store parsed thresholds
|
|
"""
|
|
if target_dict is None:
|
|
target_dict = self.thresholds
|
|
|
|
if not isinstance(rtt_thresholds, dict):
|
|
return
|
|
|
|
# Metric path is simply "rtt" (not per-host)
|
|
metric_path = "rtt"
|
|
|
|
warning = rtt_thresholds.get("warning")
|
|
critical = rtt_thresholds.get("critical")
|
|
operator = rtt_thresholds.get("operator", ">")
|
|
hysteresis = rtt_thresholds.get("hysteresis", 0.02) # 2% default
|
|
enabled = rtt_thresholds.get("enabled", True)
|
|
display = rtt_thresholds.get("display")
|
|
count = rtt_thresholds.get("count", 1)
|
|
grace = rtt_thresholds.get("grace", None)
|
|
|
|
if warning is None and critical is None:
|
|
logger.warning("No RTT thresholds defined, skipping")
|
|
return
|
|
|
|
threshold = ThresholdConfig(
|
|
metric_path=metric_path,
|
|
warning=warning,
|
|
critical=critical,
|
|
operator=operator,
|
|
hysteresis=hysteresis,
|
|
enabled=enabled,
|
|
display=display,
|
|
count=count,
|
|
grace=grace,
|
|
)
|
|
|
|
target_dict[metric_path] = threshold
|
|
logger.debug(
|
|
"Registered RTT threshold: warn=%s ms, crit=%s ms, count=%d",
|
|
warning,
|
|
critical,
|
|
count,
|
|
)
|
|
|
|
def get_thresholds_for_host(self, host_name: str) -> Dict[str, ThresholdConfig]:
|
|
"""Get the effective threshold configuration for a host.
|
|
|
|
When threshold_config is a list, configs are applied left-to-right on top
|
|
of the default thresholds so earlier entries can be overridden by later ones.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
|
|
Returns:
|
|
Dictionary of thresholds for this host
|
|
"""
|
|
# Legacy mode: single threshold set for all hosts
|
|
if self.thresholds and not self.threshold_configs:
|
|
return self.thresholds
|
|
|
|
if not self.threshold_configs:
|
|
return {}
|
|
|
|
config_names = self.host_config_mapping.get(host_name)
|
|
|
|
# No host-specific mapping → return pre-merged default
|
|
if not config_names:
|
|
return self.threshold_configs.get(self.default_config, {})
|
|
|
|
# Single config → fast path using pre-merged copy
|
|
if len(config_names) == 1:
|
|
name = config_names[0]
|
|
if name in self.threshold_configs:
|
|
return self.threshold_configs[name]
|
|
logger.warning(
|
|
"Threshold config '%s' not found for host '%s', using default '%s'",
|
|
name, host_name, self.default_config,
|
|
)
|
|
return self.threshold_configs.get(self.default_config, {})
|
|
|
|
# Multiple configs → start from defaults, layer raw overrides in order
|
|
result = dict(self.threshold_configs.get(self.default_config, {}))
|
|
for name in config_names:
|
|
if name == self.default_config:
|
|
continue # defaults already the base
|
|
raw = self.threshold_raw_configs.get(name)
|
|
if raw is None:
|
|
logger.warning(
|
|
"Threshold config '%s' not found for host '%s', skipping",
|
|
name, host_name,
|
|
)
|
|
else:
|
|
result.update(raw)
|
|
return result
|
|
|
|
def check_value(
|
|
self,
|
|
host_name: str,
|
|
metric_path: str,
|
|
value: float,
|
|
alert_states: Dict[str, AlertState],
|
|
) -> Optional[Tuple[AlertLevel, AlertLevel]]:
|
|
"""
|
|
Check a single value against configured threshold.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
metric_path: Full metric path (e.g., "rtt.hostname")
|
|
value: The metric value to check
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
Tuple of (old_level, new_level) if state changed, None otherwise
|
|
"""
|
|
# Get host-specific thresholds
|
|
thresholds = self.get_thresholds_for_host(host_name)
|
|
|
|
if metric_path not in thresholds:
|
|
return None
|
|
|
|
threshold = thresholds[metric_path]
|
|
|
|
# Get or create alert state
|
|
if metric_path not in alert_states:
|
|
alert_states[metric_path] = AlertState(metric_path)
|
|
|
|
alert_state = alert_states[metric_path]
|
|
|
|
# Evaluate threshold with hysteresis
|
|
new_level = threshold.evaluate_with_hysteresis(
|
|
value,
|
|
alert_state.level
|
|
)
|
|
|
|
# Apply consecutive-count gating: when currently OK, require threshold.count
|
|
# consecutive exceedances before escalating to WARNING/CRITICAL.
|
|
if new_level == AlertLevel.OK:
|
|
# Value is fine (or recovered) — reset the pending counter immediately.
|
|
alert_state.consecutive_count = 0
|
|
elif alert_state.level == AlertLevel.OK and new_level != AlertLevel.OK:
|
|
# First time we exceed while still OK: count up.
|
|
alert_state.consecutive_count += 1
|
|
if alert_state.consecutive_count < threshold.count:
|
|
logger.debug(
|
|
"RTT threshold exceeded %d/%d consecutive times for %s on %s",
|
|
alert_state.consecutive_count,
|
|
threshold.count,
|
|
metric_path,
|
|
host_name,
|
|
)
|
|
return None
|
|
# Count reached — fire the alert and reset the counter.
|
|
alert_state.consecutive_count = 0
|
|
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
# Keep hysteresis on the state so the UI can show the recovery threshold
|
|
if new_level != AlertLevel.OK:
|
|
alert_state.hysteresis = threshold.hysteresis
|
|
else:
|
|
alert_state.hysteresis = None
|
|
|
|
# Update state and check for changes
|
|
old_level = alert_state.level
|
|
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
|
|
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, None)
|
|
return (old_level, new_level)
|
|
elif new_level != AlertLevel.OK:
|
|
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, None)
|
|
|
|
return None
|
|
def _find_threshold(
|
|
self, thresholds: Dict[str, "ThresholdConfig"], metric_path: str
|
|
) -> Tuple[Optional["ThresholdConfig"], Optional[str]]:
|
|
"""Return (threshold, check_name) for *metric_path*, falling back to suffix matches.
|
|
|
|
Allows generic thresholds like ``nagios_runner.status_code`` to match
|
|
fully-qualified paths like ``nagios_runner.check_disk_root_status_code``.
|
|
The exact match is always tried first; then successive leading
|
|
underscore-delimited segments are stripped from the field name until
|
|
a match is found or no segments remain.
|
|
|
|
Returns:
|
|
(ThresholdConfig, None) for an exact match.
|
|
(ThresholdConfig, "check_disk_root") for a suffix match — the second
|
|
element is the stripped prefix, available as ``{check_name}`` in
|
|
display format templates.
|
|
(None, None) when no threshold is found.
|
|
"""
|
|
if metric_path in thresholds:
|
|
return thresholds[metric_path], None
|
|
plugin, sep, field = metric_path.partition(".")
|
|
if not sep:
|
|
return None, None
|
|
parts = field.split("_")
|
|
for i in range(1, len(parts)):
|
|
candidate = plugin + "." + "_".join(parts[i:])
|
|
if candidate in thresholds:
|
|
return thresholds[candidate], "_".join(parts[:i])
|
|
return None, None
|
|
|
|
def check_plugin_data(
|
|
self,
|
|
host_name: str,
|
|
plugin_name: str,
|
|
data: Dict[str, Any],
|
|
alert_states: Dict[str, AlertState],
|
|
) -> list:
|
|
"""
|
|
Check plugin data against configured thresholds.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
plugin_name: Name of the plugin
|
|
data: Plugin data dictionary
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
List of (metric_path, old_level, new_level, value) tuples for state changes
|
|
"""
|
|
state_changes = []
|
|
|
|
# Get host-specific thresholds
|
|
thresholds = self.get_thresholds_for_host(host_name)
|
|
|
|
# Check flat metrics
|
|
for metric_name, value in data.items():
|
|
metric_path = f"{plugin_name}.{metric_name}"
|
|
|
|
threshold, check_name = self._find_threshold(thresholds, metric_path)
|
|
if threshold is None:
|
|
continue
|
|
|
|
# Get or create alert state
|
|
if metric_path not in alert_states:
|
|
alert_states[metric_path] = AlertState(metric_path)
|
|
|
|
alert_state = alert_states[metric_path]
|
|
|
|
# Evaluate threshold with hysteresis
|
|
new_level = threshold.evaluate_with_hysteresis(
|
|
value,
|
|
alert_state.level
|
|
)
|
|
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None
|
|
|
|
# Update state and check for changes
|
|
old_level = alert_state.level
|
|
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
|
|
state_changes.append((metric_path, old_level, new_level, value))
|
|
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data, check_name=check_name, metric_name=metric_name)
|
|
elif new_level != AlertLevel.OK:
|
|
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data, check_name=check_name, metric_name=metric_name)
|
|
|
|
# Check nested metrics (e.g., partition data in disk_monitor)
|
|
self._check_nested_metrics(
|
|
host_name,
|
|
plugin_name,
|
|
data,
|
|
alert_states,
|
|
state_changes
|
|
)
|
|
|
|
return state_changes
|
|
|
|
def _check_nested_metrics(
|
|
self,
|
|
host_name: str,
|
|
plugin_name: str,
|
|
data: Dict[str, Any],
|
|
alert_states: Dict[str, AlertState],
|
|
state_changes: list,
|
|
):
|
|
"""Check nested metrics like partition-specific thresholds."""
|
|
# Get host-specific thresholds
|
|
thresholds = self.get_thresholds_for_host(host_name)
|
|
|
|
# ZFS pool health checks
|
|
if plugin_name == "zfs_monitor" and "pools" in data:
|
|
pools = data["pools"]
|
|
if isinstance(pools, dict):
|
|
for pool_name, pool_metrics in pools.items():
|
|
if not isinstance(pool_metrics, dict):
|
|
continue
|
|
# Synthesize status from health string for older clients
|
|
# that predate the status field.
|
|
pool_metrics_effective = dict(pool_metrics)
|
|
if "health" in pool_metrics and "status" not in pool_metrics:
|
|
pool_metrics_effective["status"] = 0 if pool_metrics["health"] == "ONLINE" else 1
|
|
for metric_name, value in pool_metrics_effective.items():
|
|
# Try specific pool name first, then wildcard '*'
|
|
metric_path = f"{plugin_name}.{pool_name}.{metric_name}"
|
|
wildcard_path = f"{plugin_name}.*.{metric_name}"
|
|
threshold = thresholds.get(metric_path) or thresholds.get(wildcard_path)
|
|
if threshold is None:
|
|
continue
|
|
if metric_path not in alert_states:
|
|
alert_states[metric_path] = AlertState(metric_path)
|
|
alert_state = alert_states[metric_path]
|
|
new_level = threshold.evaluate_with_hysteresis(value, alert_state.level)
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None
|
|
pool_context = dict(pool_metrics_effective)
|
|
pool_context["pool_name"] = pool_name
|
|
old_level = alert_state.level
|
|
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
|
|
state_changes.append((metric_path, old_level, new_level, value))
|
|
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, pool_context, metric_name=pool_name)
|
|
elif new_level != AlertLevel.OK:
|
|
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, pool_context, metric_name=pool_name)
|
|
|
|
# Look for partition data in disk_monitor
|
|
if plugin_name == "disk_monitor" and "partitions" in data:
|
|
partitions = data["partitions"]
|
|
if not isinstance(partitions, dict):
|
|
return
|
|
|
|
for partition, metrics in partitions.items():
|
|
if not isinstance(metrics, dict):
|
|
continue
|
|
|
|
for metric_name, value in metrics.items():
|
|
metric_path = f"{plugin_name}.{partition}.{metric_name}"
|
|
|
|
if metric_path not in thresholds:
|
|
continue
|
|
|
|
threshold = thresholds[metric_path]
|
|
|
|
if metric_path not in alert_states:
|
|
alert_states[metric_path] = AlertState(metric_path)
|
|
|
|
alert_state = alert_states[metric_path]
|
|
|
|
new_level = threshold.evaluate_with_hysteresis(
|
|
value,
|
|
alert_state.level
|
|
)
|
|
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
alert_state.hysteresis = threshold.hysteresis if new_level != AlertLevel.OK else None
|
|
|
|
old_level = alert_state.level
|
|
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
|
|
state_changes.append((metric_path, old_level, new_level, value))
|
|
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data)
|
|
elif new_level != AlertLevel.OK:
|
|
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data)
|
|
|
|
def _trigger_notification(
|
|
self,
|
|
host_name: str,
|
|
metric_path: str,
|
|
old_level: AlertLevel,
|
|
new_level: AlertLevel,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]] = None,
|
|
check_name: Optional[str] = None,
|
|
metric_name: Optional[str] = None,
|
|
):
|
|
"""Trigger a notification for an alert state change.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
metric_path: Full metric path
|
|
old_level: Previous alert level
|
|
new_level: New alert level
|
|
value: Current metric value
|
|
threshold: Threshold configuration
|
|
plugin_data: Optional dictionary of all plugin data fields for format string
|
|
"""
|
|
# Determine which threshold was exceeded
|
|
threshold_value = None
|
|
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
# Format operator symbol
|
|
op_symbol = threshold.operator.value
|
|
|
|
# Short metric label: strip the plugin-name prefix and _status_code suffix
|
|
short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
|
|
|
|
# Use a display-friendly value (inf is the sentinel for "overdue")
|
|
import math
|
|
display_value = "overdue" if isinstance(value, float) and math.isinf(value) else value
|
|
|
|
# Format message — for the nagios operator there is no numeric threshold_value;
|
|
# render the display template whenever one is available.
|
|
has_display = threshold_value is not None or threshold.operator == ComparisonOperator.NAGIOS
|
|
|
|
def _fmt():
|
|
return self._format_display(
|
|
threshold.display,
|
|
value=display_value,
|
|
threshold_value=threshold_value,
|
|
op_symbol=op_symbol,
|
|
plugin_data=plugin_data,
|
|
check_name=check_name,
|
|
metric_name=metric_name,
|
|
)
|
|
|
|
if new_level == AlertLevel.OK:
|
|
lvl = "RECOVER"
|
|
message = f"{short_path} = {display_value} ({old_level.name} -> OK)"
|
|
elif new_level == AlertLevel.WARNING:
|
|
lvl = "WARNING"
|
|
if has_display:
|
|
message = f"{short_path} = {display_value} {_fmt()}"
|
|
else:
|
|
message = f"{short_path} = {display_value}"
|
|
elif new_level == AlertLevel.CRITICAL:
|
|
lvl = "CRITICAL"
|
|
if has_display:
|
|
message = f"{short_path} = {display_value} {_fmt()}"
|
|
else:
|
|
message = f"{short_path} = {display_value}"
|
|
else:
|
|
lvl = "UNKNOWN"
|
|
if has_display:
|
|
message = f"{short_path} = {display_value} {_fmt()}"
|
|
else:
|
|
message = f"{short_path} = {display_value}"
|
|
|
|
# Formatted threshold info stored on AlertState for the UI
|
|
formatted_threshold_msg = _fmt() if has_display and new_level != AlertLevel.OK else None
|
|
|
|
return lvl, message, formatted_threshold_msg
|
|
|
|
def _send_notification(
|
|
self,
|
|
host_name: str,
|
|
lvl: str,
|
|
message: str,
|
|
metric_path: str,
|
|
old_level: AlertLevel,
|
|
new_level: AlertLevel,
|
|
value: Any,
|
|
):
|
|
"""Send notification and log to journal/eventlog."""
|
|
from . import hbdclass
|
|
host = hbdclass.Host.hosts.get(host_name)
|
|
if host is not None and not host.watched:
|
|
eventlog(host_name, lvl, message, service="threshold")
|
|
return
|
|
short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
|
|
title = f"[{lvl}] {host_name} {short_path}"
|
|
# Strip the "metric = " prefix from message so body is just the value/detail
|
|
prefix = short_path + " = "
|
|
body = message[len(prefix):] if message.startswith(prefix) else message
|
|
asyncio.get_event_loop().create_task(notify_mod.send_notification(
|
|
host_name,
|
|
notify_mod.Notification(
|
|
title=title,
|
|
body=body,
|
|
level=lvl,
|
|
),
|
|
))
|
|
|
|
# Log to journal
|
|
if self.journal is not None:
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
loop.create_task(self.journal.log_threshold_event(
|
|
host_name=host_name,
|
|
metric_path=metric_path,
|
|
old_level=old_level.name,
|
|
new_level=new_level.name,
|
|
value=value,
|
|
))
|
|
except Exception as e:
|
|
logger.debug(f"Failed to log threshold event to journal: {e}")
|
|
# Log to eventlog as well
|
|
eventlog(host_name, lvl, message, service="threshold")
|
|
|
|
def _format_display(
|
|
self,
|
|
display_format: str,
|
|
value: Any,
|
|
threshold_value: Optional[float],
|
|
op_symbol: str,
|
|
plugin_data: Optional[Dict[str, Any]] = None,
|
|
check_name: Optional[str] = None,
|
|
metric_name: Optional[str] = None,
|
|
) -> str:
|
|
"""Format the display string using available data.
|
|
|
|
Available template variables:
|
|
{value} - current metric value
|
|
{threshold_value} - threshold that was exceeded
|
|
{op_symbol} - comparison operator (>, <, >=, <=, ==, !=)
|
|
{check_name} - prefix stripped for generic threshold match
|
|
(e.g. "check_disk_root" when metric
|
|
"check_disk_root_status_code" matched generic
|
|
threshold "status_code")
|
|
{metric_name} - field name within the plugin data dict
|
|
Any key from plugin_data is also available.
|
|
|
|
Returns:
|
|
Formatted display string
|
|
"""
|
|
if not display_format:
|
|
display_format = "(threshold: {op_symbol} {threshold_value})" if threshold_value is not None else ""
|
|
|
|
# Build format context with standard variables
|
|
format_context = {
|
|
'value': value,
|
|
'op_symbol': op_symbol,
|
|
}
|
|
if threshold_value is not None:
|
|
format_context['threshold_value'] = threshold_value
|
|
|
|
# Add generic-match context variables when available
|
|
if check_name is not None:
|
|
format_context['check_name'] = check_name
|
|
if metric_name is not None:
|
|
format_context['metric_name'] = metric_name
|
|
|
|
# Add all plugin data fields if available
|
|
if plugin_data:
|
|
format_context.update(plugin_data)
|
|
|
|
# For nagios_runner generic matches, expose the matched check's output
|
|
# and status as short aliases {output} and {status} so display templates
|
|
# don't need to use the full {check_disk_root_output} form.
|
|
if check_name and plugin_data:
|
|
if 'output' not in format_context:
|
|
output = plugin_data.get(f"{check_name}_output")
|
|
if output is not None:
|
|
format_context['output'] = output
|
|
if 'status' not in format_context:
|
|
status = plugin_data.get(f"{check_name}_status")
|
|
if status is not None:
|
|
format_context['status'] = status
|
|
|
|
try:
|
|
# Format the display string
|
|
return display_format.format(**format_context)
|
|
except KeyError as e:
|
|
logger.warning(
|
|
"Missing format variable in display string '%s': %s",
|
|
display_format,
|
|
e
|
|
)
|
|
# Fallback to default format
|
|
return f"(threshold: {op_symbol} {threshold_value})"
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error formatting display string '%s': %s",
|
|
display_format,
|
|
e
|
|
)
|
|
return f"(threshold: {op_symbol} {threshold_value})"
|
|
|
|
def _apply_grace(
|
|
self,
|
|
host_name: str,
|
|
alert_state: AlertState,
|
|
metric_path: str,
|
|
old_level: AlertLevel,
|
|
new_level: AlertLevel,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]],
|
|
check_name: Optional[str] = None,
|
|
metric_name: Optional[str] = None,
|
|
) -> None:
|
|
"""Handle a state-change transition with grace-period logic.
|
|
|
|
Transitioning INTO alert (worsening): defers the notification for the effective
|
|
grace period (threshold.grace if set, else self.grace_seconds). Grace of 0 fires
|
|
the notification immediately with no deferral.
|
|
De-escalation within alert states (e.g. CRITICAL→WARNING): no new notification;
|
|
the metric is still alerting so no RECOVER was sent.
|
|
Transitioning TO OK:
|
|
- Still in grace window (pending_since set): suppresses both the alert
|
|
and the recovery — the spike never warranted a page.
|
|
- Past grace: fires the RECOVER notification normally.
|
|
"""
|
|
effective_grace = threshold.grace if threshold.grace is not None else self.grace_seconds
|
|
|
|
lvl, message, formatted_msg = self._trigger_notification(
|
|
host_name, metric_path, old_level, new_level, value, threshold, plugin_data,
|
|
check_name=check_name, metric_name=metric_name,
|
|
)
|
|
alert_state.formatted_message = formatted_msg
|
|
|
|
if new_level == AlertLevel.OK:
|
|
if alert_state.pending_since is not None:
|
|
logger.info(
|
|
"Alert suppressed (recovered within %.0fs grace): %s on %s",
|
|
effective_grace, metric_path, host_name,
|
|
)
|
|
alert_state.pending_since = None
|
|
else:
|
|
self._send_notification(host_name, lvl, message, metric_path, old_level, new_level, value)
|
|
elif new_level.value > old_level.value:
|
|
# Worsening (OK→WARNING, OK→CRITICAL, WARNING→CRITICAL).
|
|
if effective_grace <= 0:
|
|
# No grace period — fire immediately.
|
|
self._send_notification(host_name, lvl, message, metric_path, old_level, new_level, value)
|
|
now = time.time()
|
|
alert_state.last_notification = now
|
|
alert_state.notification_count = 1
|
|
else:
|
|
alert_state.pending_since = time.time()
|
|
logger.debug(
|
|
"Alert deferred (%.0fs grace): %s on %s = %s",
|
|
effective_grace, metric_path, host_name, value,
|
|
)
|
|
else:
|
|
# De-escalation within alert states (e.g. CRITICAL→WARNING): metric is still
|
|
# alerting but did not recover, so no new notification.
|
|
logger.debug(
|
|
"De-escalation %s→%s for %s on %s, no notification",
|
|
old_level.name, new_level.name, metric_path, host_name,
|
|
)
|
|
|
|
def _check_pending_or_renotify(
|
|
self,
|
|
host_name: str,
|
|
alert_state: AlertState,
|
|
metric_path: str,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]],
|
|
check_name: Optional[str] = None,
|
|
metric_name: Optional[str] = None,
|
|
) -> None:
|
|
"""Called when alert level is unchanged and non-OK.
|
|
|
|
If a deferred notification is pending and grace_seconds have elapsed,
|
|
fires it now. Otherwise falls through to normal reminder logic.
|
|
"""
|
|
effective_grace = threshold.grace if threshold.grace is not None else self.grace_seconds
|
|
if alert_state.pending_since is not None:
|
|
if time.time() - alert_state.pending_since >= effective_grace:
|
|
lvl, message, formatted_msg = self._trigger_notification(
|
|
host_name, metric_path, AlertLevel.OK, alert_state.level, value, threshold, plugin_data,
|
|
check_name=check_name, metric_name=metric_name,
|
|
)
|
|
alert_state.formatted_message = formatted_msg
|
|
self._send_notification(
|
|
host_name, lvl, message, metric_path, AlertLevel.OK, alert_state.level, value
|
|
)
|
|
alert_state.pending_since = None
|
|
now = time.time()
|
|
alert_state.last_notification = now
|
|
alert_state.notification_count = 1
|
|
# else: still within grace window, do nothing
|
|
else:
|
|
self._check_renotify(host_name, alert_state, metric_path, value, threshold, plugin_data, check_name=check_name, metric_name=metric_name)
|
|
|
|
@staticmethod
|
|
def _human_duration(seconds: float) -> str:
|
|
s = int(seconds)
|
|
if s < 120:
|
|
return f"{s}s"
|
|
if s < 3600:
|
|
return f"{s // 60}m {s % 60}s"
|
|
h, rem = divmod(s, 3600)
|
|
m = rem // 60
|
|
return f"{h}h {m}m" if m else f"{h}h"
|
|
|
|
def _check_renotify(
|
|
self,
|
|
host_name: str,
|
|
alert_state: AlertState,
|
|
metric_path: str,
|
|
value: Any,
|
|
threshold: ThresholdConfig,
|
|
plugin_data: Optional[Dict[str, Any]] = None,
|
|
check_name: Optional[str] = None,
|
|
metric_name: Optional[str] = None,
|
|
):
|
|
"""Check if we should send a repeat notification.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
alert_state: Current alert state
|
|
metric_path: Full metric path
|
|
value: Current metric value
|
|
threshold: Threshold configuration
|
|
plugin_data: Optional dictionary of all plugin data fields
|
|
"""
|
|
if alert_state.level != AlertLevel.CRITICAL:
|
|
return
|
|
|
|
# Skip reminders if alert has been acknowledged
|
|
if alert_state.acknowledged:
|
|
return
|
|
|
|
now = time.time()
|
|
|
|
# Check if we should re-notify
|
|
if alert_state.last_notification is None:
|
|
# First notification already sent during state change
|
|
alert_state.last_notification = now
|
|
alert_state.notification_count = 1
|
|
return
|
|
|
|
if (now - alert_state.last_notification) >= self.renotify_interval:
|
|
# Determine which threshold is active
|
|
threshold_value = None
|
|
if alert_state.level == AlertLevel.CRITICAL and threshold.critical is not None:
|
|
threshold_value = threshold.critical
|
|
elif alert_state.level == AlertLevel.WARNING and threshold.warning is not None:
|
|
threshold_value = threshold.warning
|
|
|
|
# Format operator symbol
|
|
op_symbol = threshold.operator.value
|
|
short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
|
|
|
|
# Time to re-notify
|
|
if threshold_value is not None:
|
|
# Use display format string
|
|
threshold_info = self._format_display(
|
|
threshold.display,
|
|
value=value,
|
|
threshold_value=threshold_value,
|
|
op_symbol=op_symbol,
|
|
plugin_data=plugin_data,
|
|
check_name=check_name,
|
|
metric_name=metric_name,
|
|
)
|
|
body = f"{value} {threshold_info}, ongoing for {self._human_duration(now - alert_state.since)}"
|
|
else:
|
|
body = f"{value} (ongoing for {self._human_duration(now - alert_state.since)})"
|
|
message = f"REMINDER ({alert_state.level.name}): {host_name} - {short_path} = {body}"
|
|
|
|
from . import hbdclass
|
|
host = hbdclass.Host.hosts.get(host_name)
|
|
if host is None or host.watched:
|
|
asyncio.get_event_loop().create_task(notify_mod.send_notification(
|
|
host_name,
|
|
notify_mod.Notification(
|
|
title=f"[REMINDER/{alert_state.level.name}] {host_name} {short_path}",
|
|
body=body,
|
|
level=alert_state.level.name,
|
|
),
|
|
))
|
|
logger.info("Re-notification sent: %s", message)
|
|
alert_state.last_notification = now
|
|
alert_state.notification_count += 1
|
|
|
|
def purge_stale_alerts(self, hbdclass) -> None:
|
|
"""Remove alert states that have no matching threshold configuration.
|
|
|
|
Called after startup (pickle restore) and after each config reload so
|
|
that alerts orphaned by configuration changes do not linger forever.
|
|
Alerts whose metric_path is not present in the current threshold config
|
|
for that host are silently dropped.
|
|
"""
|
|
for hostname, host in hbdclass.Host.hosts.items():
|
|
if not host.alert_states:
|
|
continue
|
|
configured = self.get_thresholds_for_host(hostname)
|
|
stale = []
|
|
for mp in host.alert_states:
|
|
# connectivity.* and rtt are managed by the connection state
|
|
# machine, not by threshold config — never purge them.
|
|
if mp == "rtt" or mp.startswith("connectivity."):
|
|
continue
|
|
if self._find_threshold(configured, mp)[0] is not None:
|
|
continue
|
|
# Also match wildcard pool/partition thresholds (e.g. "zfs_monitor.*.status"
|
|
# covers alert state "zfs_monitor.tank.status").
|
|
parts = mp.split(".")
|
|
if len(parts) == 3 and f"{parts[0]}.*.{parts[2]}" in configured:
|
|
continue
|
|
stale.append(mp)
|
|
for mp in stale:
|
|
logger.info(
|
|
"Purging stale alert state for %s / %s (no threshold configured)",
|
|
hostname, mp,
|
|
)
|
|
del host.alert_states[mp]
|
|
|
|
def get_active_alerts(self, alert_states: Dict[str, AlertState]) -> list:
|
|
"""
|
|
Get all currently active (non-OK) alerts.
|
|
|
|
Args:
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
List of AlertState objects that are not OK
|
|
"""
|
|
return [
|
|
state for state in alert_states.values()
|
|
if state.level != AlertLevel.OK
|
|
]
|
|
|
|
def get_alert_summary(self, alert_states: Dict[str, AlertState]) -> Dict[str, int]:
|
|
"""
|
|
Get summary counts of alert levels.
|
|
|
|
Args:
|
|
alert_states: Host's alert_states dictionary
|
|
|
|
Returns:
|
|
Dictionary with counts: {"ok": N, "warning": N, "critical": N}
|
|
"""
|
|
summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0}
|
|
|
|
for state in alert_states.values():
|
|
if state.level == AlertLevel.OK:
|
|
summary["ok"] += 1
|
|
elif state.level == AlertLevel.WARNING:
|
|
summary["warning"] += 1
|
|
elif state.level == AlertLevel.CRITICAL:
|
|
summary["critical"] += 1
|
|
elif state.level == AlertLevel.UNKNOWN:
|
|
summary["unknown"] += 1
|
|
|
|
return summary
|