Files
heartbeat/hbd/server/threshold.py
T
Andreas Wrede c4f09e9ced
Release / release (push) Successful in 5s
version 5.1.8
- fix: matrix/sms_voipms notifications blocked the event loop on timeout;
  make send_notification async, dispatch all channel drivers as non-blocking
  tasks (asyncio.to_thread for sync drivers, asyncio.wait_for for async);
  update all call sites to fire-and-forget via create_task
- feat: add /about page with version, runtime, uptime counter, and repo link
- fix: hbc_mini plugin data format now matches full hbc client so Host
  Overview displays memory, disk, and network metrics correctly

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-01 05:33:27 -04:00

1244 lines
47 KiB
Python

"""
Threshold checking and alerting for plugin metrics.
This module provides a flexible threshold checking system that:
- Evaluates plugin metrics against configured warning/critical thresholds
- Tracks alert states per host and metric
- Prevents alert flapping with hysteresis
- Triggers notifications only on state changes
- Supports multiple comparison operators
"""
import logging
import time
from enum import Enum
from typing import Dict, Any, Optional, Tuple, Callable
from . import notify as notify_mod
from .config import THRESHOLD_DEFAULTS
logger = logging.getLogger(__name__)
eventlog = notify_mod.eventlog
class AlertLevel(Enum):
"""Alert severity levels."""
OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3
class ComparisonOperator(Enum):
"""Supported comparison operators for threshold checks."""
GT = ">" # Greater than
GTE = ">=" # Greater than or equal
LT = "<" # Less than
LTE = "<=" # Less than or equal
EQ = "==" # Equal to
NEQ = "!=" # Not equal to
class AlertState:
"""Represents the current alert state for a specific metric."""
def __init__(self, metric_path: str):
"""
Initialize alert state.
Args:
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
"""
self.metric_path = metric_path
self.level = AlertLevel.OK
self.since = time.time()
self.last_value = None
self.last_check = time.time()
self.notification_count = 0
self.last_notification = None
self.threshold_value = None # The threshold value that triggered alert
self.operator = None # The comparison operator (>, <, >=, etc.)
self.formatted_message = None # Formatted display message for UI
self.acknowledged = False # Whether alert has been acknowledged
self.acknowledged_at = None # Timestamp when acknowledged
self.consecutive_count = 0 # Consecutive exceedances while still OK (for count gating)
self.pending_since: Optional[float] = None # non-None while waiting out grace period before notifying
def update(
self,
level: AlertLevel,
value: Any,
threshold_value: Optional[float] = None,
operator: Optional[str] = None
) -> bool:
"""
Update alert state.
Args:
level: New alert level
value: Current metric value
threshold_value: The threshold value that was exceeded (if applicable)
operator: The comparison operator (>, <, >=, etc.)
Returns:
True if state changed (notification needed), False otherwise
"""
now = time.time()
self.last_check = now
self.last_value = value
# Update threshold info when alert is active
if level != AlertLevel.OK:
self.threshold_value = threshold_value
self.operator = operator
else:
# Clear threshold info when returning to OK
self.threshold_value = None
self.operator = None
# Check if state changed
if level != self.level:
logger.info(
"Alert state change for %s: %s -> %s (value: %s)",
self.metric_path,
self.level.name,
level.name,
value
)
self.level = level
self.since = now
self.notification_count = 0
self.last_notification = None # restart reminder interval on level change
# Reset acknowledgment on state change
if level != AlertLevel.OK:
# Only reset if changing to a different alert level
self.acknowledged = False
self.acknowledged_at = None
return True
return False
def to_dict(self) -> dict:
"""Convert alert state to dictionary for serialization."""
import math
# Helper to sanitize numeric values for JSON (handle inf/nan)
def sanitize_value(val):
if isinstance(val, float):
if math.isinf(val):
return "overdue"
if math.isnan(val):
return None
return val
result = {
"metric_path": self.metric_path,
"level": self.level.name,
"since": self.since,
"last_value": sanitize_value(self.last_value),
"last_check": self.last_check,
"notification_count": self.notification_count,
"acknowledged": self.acknowledged,
}
# Include acknowledgment timestamp if acknowledged
if self.acknowledged_at is not None:
result["acknowledged_at"] = self.acknowledged_at
# Include threshold info if available
if self.threshold_value is not None:
result["threshold_value"] = sanitize_value(self.threshold_value)
if self.operator is not None:
result["operator"] = self.operator
if self.formatted_message is not None:
result["formatted_message"] = self.formatted_message
return result
def __setstate__(self, state):
"""Restore from pickle, backfilling fields added after the pickle was written."""
self.__dict__.update(state)
if not hasattr(self, 'consecutive_count'):
self.consecutive_count = 0
def acknowledge(self):
"""Acknowledge this alert to stop reminder notifications."""
self.acknowledged = True
self.acknowledged_at = time.time()
logger.info("Alert acknowledged for %s", self.metric_path)
def __str__(self):
return self.to_dict().__str__()
class ThresholdConfig:
"""Configuration for a single threshold check."""
def __init__(
self,
metric_path: str,
warning: Optional[float] = None,
critical: Optional[float] = None,
display: Optional[str] = None,
operator: str = ">",
hysteresis: float = 0.0,
enabled: bool = True,
count: int = 1,
):
"""
Initialize threshold configuration.
Args:
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
warning: Warning threshold value
critical: Critical threshold value
operator: Comparison operator (>, >=, <, <=, ==, !=)
hysteresis: Hysteresis percentage to prevent flapping (0.0-1.0)
enabled: Whether this threshold is enabled
count: Number of consecutive exceedances required before alerting (default 1)
"""
self.metric_path = metric_path
self.warning = warning
self.critical = critical
self.enabled = enabled
self.hysteresis = hysteresis
self.display = display
self.count = max(1, int(count))
# Parse operator
try:
self.operator = ComparisonOperator(operator)
except ValueError:
logger.warning(
"Invalid operator '%s' for %s, using '>' as default",
operator,
metric_path
)
self.operator = ComparisonOperator.GT
def evaluate(self, value: float) -> AlertLevel:
"""
Evaluate a value against this threshold.
Args:
value: Metric value to check
Returns:
AlertLevel indicating the severity
"""
if not self.enabled:
return AlertLevel.OK
try:
# Convert value to float for comparison
value = float(value)
except (TypeError, ValueError):
logger.warning("Cannot convert value %s to float for %s", value, self.metric_path)
return AlertLevel.UNKNOWN
# Check critical threshold first
if self.critical is not None:
if self._compare(value, self.critical):
return AlertLevel.CRITICAL
# Then check warning threshold
if self.warning is not None:
if self._compare(value, self.warning):
return AlertLevel.WARNING
return AlertLevel.OK
def evaluate_with_hysteresis(
self,
value: float,
current_level: AlertLevel
) -> AlertLevel:
"""
Evaluate with hysteresis to prevent flapping.
Args:
value: Current metric value
current_level: Current alert level
Returns:
New alert level considering hysteresis
"""
new_level = self.evaluate(value)
# If no hysteresis, return new level
if self.hysteresis == 0.0:
return new_level
# If improving (going to a lower severity), apply hysteresis
if new_level.value < current_level.value:
# For recovery, value must be better by hysteresis amount
if current_level == AlertLevel.CRITICAL and self.critical is not None:
threshold = self.critical
elif current_level == AlertLevel.WARNING and self.warning is not None:
threshold = self.warning
else:
return new_level
# Calculate hysteresis threshold
hysteresis_amount = abs(threshold * self.hysteresis)
if self.operator in [ComparisonOperator.GT, ComparisonOperator.GTE]:
# For "greater than" thresholds, value must go below by hysteresis
recovery_threshold = threshold - hysteresis_amount
if value >= recovery_threshold:
# Not enough improvement, keep current level
return current_level
elif self.operator in [ComparisonOperator.LT, ComparisonOperator.LTE]:
# For "less than" thresholds, value must go above by hysteresis
recovery_threshold = threshold + hysteresis_amount
if value <= recovery_threshold:
# Not enough improvement, keep current level
return current_level
return new_level
def _compare(self, value: float, threshold: float) -> bool:
"""Perform comparison based on operator."""
if self.operator == ComparisonOperator.GT:
return value > threshold
elif self.operator == ComparisonOperator.GTE:
return value >= threshold
elif self.operator == ComparisonOperator.LT:
return value < threshold
elif self.operator == ComparisonOperator.LTE:
return value <= threshold
elif self.operator == ComparisonOperator.EQ:
return abs(value - threshold) < 1e-9 # Float comparison
elif self.operator == ComparisonOperator.NEQ:
return abs(value - threshold) >= 1e-9
return False
class ThresholdChecker:
"""Main threshold checking and alerting system."""
def __init__(
self,
config: Dict[str, Any],
renotify_interval: int = 3600,
journal: Optional[Any] = None,
):
"""
Initialize threshold checker.
Args:
config: Threshold configuration dictionary from YAML
renotify_interval: Seconds between repeat notifications (default: 1 hour)
journal: Optional MessageJournal instance for logging threshold events
"""
# Named threshold configurations: {config_name: {metric_path: ThresholdConfig}}
self.threshold_configs = {}
# Single threshold set for backward compatibility: {metric_path: ThresholdConfig}
self.thresholds = {}
# Host to config name mapping: {host_name: config_name}
self.host_config_mapping = {}
# Default config name to use when no mapping exists
self.default_config = "default"
self.renotify_interval = renotify_interval
self.grace_seconds: float = float(config.get("grace", 2))
self.journal = journal
# Parse configuration
self._parse_config(config)
total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values())
if total_thresholds == 0 and len(self.thresholds) > 0:
# Backward compatibility: using single threshold set
total_thresholds = len(self.thresholds)
logger.info("ThresholdChecker initialized with %d thresholds (legacy format)", total_thresholds)
else:
logger.info(
"ThresholdChecker initialized with %d named configurations (%d total thresholds)",
len(self.threshold_configs),
total_thresholds
)
def reload(self, config: Dict[str, Any]):
"""Reload threshold configuration from new config dict.
This clears all existing thresholds and re-parses from the new configuration.
Alert states are preserved to maintain hysteresis across reloads.
Args:
config: New configuration dictionary
"""
logger.info("Reloading threshold configuration...")
# Clear old configuration
self.threshold_configs.clear()
self.thresholds.clear()
self.host_config_mapping.clear()
self.grace_seconds = float(config.get("grace", 2))
# Parse new configuration
self._parse_config(config)
total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values())
if total_thresholds == 0 and len(self.thresholds) > 0:
total_thresholds = len(self.thresholds)
logger.info("Threshold configuration reloaded: %d total thresholds", total_thresholds)
def _parse_config(self, config: Dict[str, Any]):
"""Parse threshold configuration from YAML structure.
Supports two formats:
1. Legacy format with direct 'thresholds' section
2. New format with 'threshold_configs' and 'host_threshold_mapping'
"""
# Check for new multi-config format
if "threshold_configs" in config:
self._parse_multi_config(config)
elif "thresholds" in config:
# Legacy single threshold configuration
self._parse_legacy_config(config)
else:
logger.info("No thresholds configured")
def _parse_multi_config(self, config: Dict[str, Any]):
"""Parse multiple named threshold configurations."""
threshold_configs = config.get("threshold_configs", {})
if not threshold_configs:
logger.info("No threshold configurations defined")
return
# Build effective_defaults: THRESHOLD_DEFAULTS merged with the 'default' config (if present).
# All other configs inherit any metric not explicitly defined from effective_defaults.
effective_defaults: Dict[str, ThresholdConfig] = {}
for plugin_name, plugin_thresholds in THRESHOLD_DEFAULTS.get("thresholds", {}).items():
if isinstance(plugin_thresholds, dict):
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults)
if "default" in threshold_configs:
default_data = threshold_configs["default"]
if isinstance(default_data, dict) and "thresholds" in default_data:
for plugin_name, plugin_thresholds in default_data["thresholds"].items():
if isinstance(plugin_thresholds, dict):
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=effective_defaults)
self.threshold_configs["default"] = dict(effective_defaults)
logger.info("Registered 'default' threshold config with %d metrics", len(effective_defaults))
# Parse each named configuration, seeding it with effective_defaults first
for config_name, config_data in threshold_configs.items():
if config_name == "default":
continue # already handled above
if not isinstance(config_data, dict):
logger.warning("Invalid threshold config '%s', skipping", config_name)
continue
if "thresholds" not in config_data:
logger.warning("No thresholds in config '%s', skipping", config_name)
continue
logger.info("Parsing threshold configuration: %s", config_name)
self.threshold_configs[config_name] = dict(effective_defaults)
thresholds_config = config_data["thresholds"]
for plugin_name, plugin_thresholds in thresholds_config.items():
if not isinstance(plugin_thresholds, dict):
continue
self._parse_plugin_thresholds(
plugin_name,
plugin_thresholds,
target_dict=self.threshold_configs[config_name]
)
# Parse host to config mapping from two possible sources
# 1. New format: hosts section with threshold_config attribute
if "hosts" in config:
hosts_config = config["hosts"]
if isinstance(hosts_config, dict):
for host_name, host_attrs in hosts_config.items():
if isinstance(host_attrs, dict) and "threshold_config" in host_attrs:
self.host_config_mapping[host_name] = host_attrs["threshold_config"]
# 2. Legacy format: host_threshold_mapping section (for backward compatibility)
if "host_threshold_mapping" in config:
legacy_mapping = config.get("host_threshold_mapping", {})
if isinstance(legacy_mapping, dict):
self.host_config_mapping.update(legacy_mapping)
# Set default config (first one alphabetically or explicitly set)
self.default_config = config.get("default_threshold_config", "default")
if self.default_config not in self.threshold_configs and self.threshold_configs:
# Use first available config as default
self.default_config = sorted(self.threshold_configs.keys())[0]
logger.info("Using '%s' as default threshold config", self.default_config)
logger.info(
"Loaded %d threshold configurations with %d host mappings",
len(self.threshold_configs),
len(self.host_config_mapping)
)
def _parse_legacy_config(self, config: Dict[str, Any]):
"""Parse legacy single threshold configuration for backward compatibility."""
if not config or "thresholds" not in config:
logger.info("No thresholds configured")
return
thresholds_config = config["thresholds"]
for plugin_name, plugin_thresholds in thresholds_config.items():
if not isinstance(plugin_thresholds, dict):
continue
self._parse_plugin_thresholds(plugin_name, plugin_thresholds, target_dict=self.thresholds)
def _parse_plugin_thresholds(
self,
plugin_name: str,
thresholds: Dict[str, Any],
target_dict: Optional[Dict[str, ThresholdConfig]] = None
):
"""Parse thresholds for a specific plugin.
Args:
plugin_name: Name of the plugin
thresholds: Threshold configuration dictionary
target_dict: Dictionary to store parsed thresholds (defaults to self.thresholds)
"""
if target_dict is None:
target_dict = self.thresholds
# Special handling for RTT thresholds (per-host)
if plugin_name == "rtt":
self._parse_rtt_thresholds(thresholds, target_dict)
return
for metric_name, threshold_config in thresholds.items():
if not isinstance(threshold_config, dict):
continue
# Handle nested metrics (e.g., partitions./.percent)
if metric_name == "partitions":
self._parse_partition_thresholds(plugin_name, threshold_config, target_dict)
continue
metric_path = f"{plugin_name}.{metric_name}"
# Extract threshold values
warning = threshold_config.get("warning")
critical = threshold_config.get("critical")
operator = threshold_config.get("operator", ">")
display = threshold_config.get("display", "(threshold: {op_symbol} {threshold_value})")
hysteresis = threshold_config.get("hysteresis", 0.1) # 10% default
enabled = threshold_config.get("enabled", True)
if warning is None and critical is None:
logger.warning("No thresholds defined for %s, skipping", metric_path)
continue
threshold = ThresholdConfig(
metric_path=metric_path,
warning=warning,
critical=critical,
operator=operator,
hysteresis=hysteresis,
enabled=enabled,
display=display
)
target_dict[metric_path] = threshold
logger.debug(
"Registered threshold for %s: warn=%s, crit=%s, op=%s",
metric_path,
warning,
critical,
operator
)
def _parse_partition_thresholds(
self,
plugin_name: str,
partitions: Dict[str, Any],
target_dict: Optional[Dict[str, ThresholdConfig]] = None
):
"""Parse partition-specific thresholds for disk monitoring.
Args:
plugin_name: Name of the plugin
partitions: Partition threshold configuration
target_dict: Dictionary to store parsed thresholds
"""
if target_dict is None:
target_dict = self.thresholds
for partition, metrics in partitions.items():
if not isinstance(metrics, dict):
continue
for metric_name, threshold_config in metrics.items():
if not isinstance(threshold_config, dict):
continue
# Create metric path like "disk_monitor./dev/sda1.percent"
metric_path = f"{plugin_name}.{partition}.{metric_name}"
warning = threshold_config.get("warning")
critical = threshold_config.get("critical")
operator = threshold_config.get("operator", ">")
hysteresis = threshold_config.get("hysteresis", 0.1)
enabled = threshold_config.get("enabled", True)
display = threshold_config.get("display")
if warning is None and critical is None:
continue
threshold = ThresholdConfig(
metric_path=metric_path,
warning=warning,
critical=critical,
operator=operator,
hysteresis=hysteresis,
enabled=enabled,
display=display
)
target_dict[metric_path] = threshold
def _parse_rtt_thresholds(
self,
rtt_thresholds: Dict[str, Any],
target_dict: Optional[Dict[str, ThresholdConfig]] = None
):
"""Parse RTT thresholds (network latency thresholds).
RTT thresholds are configured as:
thresholds:
rtt:
warning: 100.0 # ms
critical: 500.0 # ms
Args:
rtt_thresholds: RTT threshold configuration
target_dict: Dictionary to store parsed thresholds
"""
if target_dict is None:
target_dict = self.thresholds
if not isinstance(rtt_thresholds, dict):
return
# Metric path is simply "rtt" (not per-host)
metric_path = "rtt"
warning = rtt_thresholds.get("warning")
critical = rtt_thresholds.get("critical")
operator = rtt_thresholds.get("operator", ">")
hysteresis = rtt_thresholds.get("hysteresis", 0.1) # 10% default
enabled = rtt_thresholds.get("enabled", True)
display = rtt_thresholds.get("display")
count = rtt_thresholds.get("count", 1)
if warning is None and critical is None:
logger.warning("No RTT thresholds defined, skipping")
return
threshold = ThresholdConfig(
metric_path=metric_path,
warning=warning,
critical=critical,
operator=operator,
hysteresis=hysteresis,
enabled=enabled,
display=display,
count=count,
)
target_dict[metric_path] = threshold
logger.debug(
"Registered RTT threshold: warn=%s ms, crit=%s ms, count=%d",
warning,
critical,
count,
)
def get_thresholds_for_host(self, host_name: str) -> Dict[str, ThresholdConfig]:
"""Get the appropriate threshold configuration for a host.
Args:
host_name: Name of the host
Returns:
Dictionary of thresholds for this host
"""
# Legacy mode: single threshold set for all hosts
if self.thresholds and not self.threshold_configs:
return self.thresholds
# Multi-config mode: look up host-specific configuration
if self.threshold_configs:
config_name = self.host_config_mapping.get(host_name, self.default_config)
if config_name in self.threshold_configs:
return self.threshold_configs[config_name]
else:
logger.warning(
"Threshold config '%s' not found for host '%s', using default '%s'",
config_name,
host_name,
self.default_config
)
return self.threshold_configs.get(self.default_config, {})
# No thresholds configured
return {}
def check_value(
self,
host_name: str,
metric_path: str,
value: float,
alert_states: Dict[str, AlertState],
) -> Optional[Tuple[AlertLevel, AlertLevel]]:
"""
Check a single value against configured threshold.
Args:
host_name: Name of the host
metric_path: Full metric path (e.g., "rtt.hostname")
value: The metric value to check
alert_states: Host's alert_states dictionary
Returns:
Tuple of (old_level, new_level) if state changed, None otherwise
"""
# Get host-specific thresholds
thresholds = self.get_thresholds_for_host(host_name)
if metric_path not in thresholds:
return None
threshold = thresholds[metric_path]
# Get or create alert state
if metric_path not in alert_states:
alert_states[metric_path] = AlertState(metric_path)
alert_state = alert_states[metric_path]
# Evaluate threshold with hysteresis
new_level = threshold.evaluate_with_hysteresis(
value,
alert_state.level
)
# Apply consecutive-count gating: when currently OK, require threshold.count
# consecutive exceedances before escalating to WARNING/CRITICAL.
if new_level == AlertLevel.OK:
# Value is fine (or recovered) — reset the pending counter immediately.
alert_state.consecutive_count = 0
elif alert_state.level == AlertLevel.OK and new_level != AlertLevel.OK:
# First time we exceed while still OK: count up.
alert_state.consecutive_count += 1
if alert_state.consecutive_count < threshold.count:
logger.debug(
"RTT threshold exceeded %d/%d consecutive times for %s on %s",
alert_state.consecutive_count,
threshold.count,
metric_path,
host_name,
)
return None
# Count reached — fire the alert and reset the counter.
alert_state.consecutive_count = 0
# Determine which threshold was exceeded
threshold_value = None
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
threshold_value = threshold.critical
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
threshold_value = threshold.warning
# Update state and check for changes
old_level = alert_state.level
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, None)
return (old_level, new_level)
elif new_level != AlertLevel.OK:
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, None)
return None
def check_plugin_data(
self,
host_name: str,
plugin_name: str,
data: Dict[str, Any],
alert_states: Dict[str, AlertState],
) -> list:
"""
Check plugin data against configured thresholds.
Args:
host_name: Name of the host
plugin_name: Name of the plugin
data: Plugin data dictionary
alert_states: Host's alert_states dictionary
Returns:
List of (metric_path, old_level, new_level, value) tuples for state changes
"""
state_changes = []
# Get host-specific thresholds
thresholds = self.get_thresholds_for_host(host_name)
# Check flat metrics
for metric_name, value in data.items():
metric_path = f"{plugin_name}.{metric_name}"
if metric_path not in thresholds:
continue
threshold = thresholds[metric_path]
# Get or create alert state
if metric_path not in alert_states:
alert_states[metric_path] = AlertState(metric_path)
alert_state = alert_states[metric_path]
# Evaluate threshold with hysteresis
new_level = threshold.evaluate_with_hysteresis(
value,
alert_state.level
)
# Determine which threshold was exceeded
threshold_value = None
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
threshold_value = threshold.critical
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
threshold_value = threshold.warning
# Update state and check for changes
old_level = alert_state.level
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
state_changes.append((metric_path, old_level, new_level, value))
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data)
elif new_level != AlertLevel.OK:
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data)
# Check nested metrics (e.g., partition data in disk_monitor)
self._check_nested_metrics(
host_name,
plugin_name,
data,
alert_states,
state_changes
)
return state_changes
def _check_nested_metrics(
self,
host_name: str,
plugin_name: str,
data: Dict[str, Any],
alert_states: Dict[str, AlertState],
state_changes: list,
):
"""Check nested metrics like partition-specific thresholds."""
# Get host-specific thresholds
thresholds = self.get_thresholds_for_host(host_name)
# Look for partition data in disk_monitor
if plugin_name == "disk_monitor" and "partitions" in data:
partitions = data["partitions"]
if not isinstance(partitions, dict):
return
for partition, metrics in partitions.items():
if not isinstance(metrics, dict):
continue
for metric_name, value in metrics.items():
metric_path = f"{plugin_name}.{partition}.{metric_name}"
if metric_path not in thresholds:
continue
threshold = thresholds[metric_path]
if metric_path not in alert_states:
alert_states[metric_path] = AlertState(metric_path)
alert_state = alert_states[metric_path]
new_level = threshold.evaluate_with_hysteresis(
value,
alert_state.level
)
# Determine which threshold was exceeded
threshold_value = None
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
threshold_value = threshold.critical
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
threshold_value = threshold.warning
old_level = alert_state.level
if alert_state.update(new_level, value, threshold_value, threshold.operator.value):
state_changes.append((metric_path, old_level, new_level, value))
self._apply_grace(host_name, alert_state, metric_path, old_level, new_level, value, threshold, data)
elif new_level != AlertLevel.OK:
self._check_pending_or_renotify(host_name, alert_state, metric_path, value, threshold, data)
def _trigger_notification(
self,
host_name: str,
metric_path: str,
old_level: AlertLevel,
new_level: AlertLevel,
value: Any,
threshold: ThresholdConfig,
plugin_data: Optional[Dict[str, Any]] = None,
):
"""Trigger a notification for an alert state change.
Args:
host_name: Name of the host
metric_path: Full metric path
old_level: Previous alert level
new_level: New alert level
value: Current metric value
threshold: Threshold configuration
plugin_data: Optional dictionary of all plugin data fields for format string
"""
# Determine which threshold was exceeded
threshold_value = None
if new_level == AlertLevel.CRITICAL and threshold.critical is not None:
threshold_value = threshold.critical
elif new_level == AlertLevel.WARNING and threshold.warning is not None:
threshold_value = threshold.warning
# Format operator symbol
op_symbol = threshold.operator.value
# Use a display-friendly value (inf is the sentinel for "overdue")
import math
display_value = "overdue" if isinstance(value, float) and math.isinf(value) else value
# Format message
if new_level == AlertLevel.OK:
lvl = "RECOVER"
message = f"{metric_path} = {display_value} ({old_level.name} -> OK)"
elif new_level == AlertLevel.WARNING:
lvl = "WARNING"
if threshold_value is not None:
threshold_info = self._format_display(
threshold.display,
value=display_value,
threshold_value=threshold_value,
op_symbol=op_symbol,
plugin_data=plugin_data
)
message = f"{metric_path} = {display_value} {threshold_info}"
else:
message = f"{metric_path} = {display_value}"
elif new_level == AlertLevel.CRITICAL:
lvl = "CRITICAL"
if threshold_value is not None:
threshold_info = self._format_display(
threshold.display,
value=display_value,
threshold_value=threshold_value,
op_symbol=op_symbol,
plugin_data=plugin_data
)
message = f"{metric_path} = {display_value} {threshold_info}"
else:
message = f"{metric_path} = {display_value}"
else:
lvl = "UNKNOWN"
message = f"{metric_path} = {display_value}"
# Return the formatted threshold info for storing in AlertState
formatted_threshold_msg = None
if threshold_value is not None and new_level != AlertLevel.OK:
formatted_threshold_msg = self._format_display(
threshold.display,
value=display_value,
threshold_value=threshold_value,
op_symbol=op_symbol,
plugin_data=plugin_data
)
return lvl, message, formatted_threshold_msg
def _send_notification(
self,
host_name: str,
lvl: str,
message: str,
metric_path: str,
old_level: AlertLevel,
new_level: AlertLevel,
value: Any,
):
"""Send notification and log to journal/eventlog."""
asyncio.get_event_loop().create_task(notify_mod.send_notification(
host_name,
notify_mod.Notification(
title=f"[{lvl}] {host_name}",
body=message,
level=lvl,
),
))
# Log to journal
if self.journal is not None:
try:
import asyncio
loop = asyncio.get_event_loop()
loop.create_task(self.journal.log_threshold_event(
host_name=host_name,
metric_path=metric_path,
old_level=old_level.name,
new_level=new_level.name,
value=value,
))
except Exception as e:
logger.debug(f"Failed to log threshold event to journal: {e}")
# Log to eventlog as well
eventlog(host_name, lvl, message, service="threshold")
def _format_display(
self,
display_format: str,
value: Any,
threshold_value: float,
op_symbol: str,
plugin_data: Optional[Dict[str, Any]] = None,
) -> str:
"""Format the display string using available data.
Args:
display_format: Format string from threshold config
value: Current metric value
threshold_value: Threshold value that was exceeded
op_symbol: Comparison operator symbol
plugin_data: Optional dictionary of plugin data fields
Returns:
Formatted display string
"""
# Build format context with standard variables
format_context = {
'value': value,
'threshold_value': threshold_value,
'op_symbol': op_symbol,
}
# Add all plugin data fields if available
if plugin_data:
format_context.update(plugin_data)
try:
# Format the display string
return display_format.format(**format_context)
except KeyError as e:
logger.warning(
"Missing format variable in display string '%s': %s",
display_format,
e
)
# Fallback to default format
return f"(threshold: {op_symbol} {threshold_value})"
except Exception as e:
logger.error(
"Error formatting display string '%s': %s",
display_format,
e
)
return f"(threshold: {op_symbol} {threshold_value})"
def _apply_grace(
self,
host_name: str,
alert_state: AlertState,
metric_path: str,
old_level: AlertLevel,
new_level: AlertLevel,
value: Any,
threshold: ThresholdConfig,
plugin_data: Optional[Dict[str, Any]],
) -> None:
"""Handle a state-change transition with grace-period logic.
Transitioning INTO alert: defers the notification for grace_seconds.
Transitioning TO OK:
- Still in grace window (pending_since set): suppresses both the alert
and the recovery — the spike never warranted a page.
- Past grace: fires the RECOVER notification normally.
"""
lvl, message, formatted_msg = self._trigger_notification(
host_name, metric_path, old_level, new_level, value, threshold, plugin_data
)
alert_state.formatted_message = formatted_msg
if new_level == AlertLevel.OK:
if alert_state.pending_since is not None:
logger.info(
"Alert suppressed (recovered within %.0fs grace): %s on %s",
self.grace_seconds, metric_path, host_name,
)
alert_state.pending_since = None
else:
self._send_notification(host_name, lvl, message, metric_path, old_level, new_level, value)
else:
alert_state.pending_since = time.time()
logger.debug(
"Alert deferred (%.0fs grace): %s on %s = %s",
self.grace_seconds, metric_path, host_name, value,
)
def _check_pending_or_renotify(
self,
host_name: str,
alert_state: AlertState,
metric_path: str,
value: Any,
threshold: ThresholdConfig,
plugin_data: Optional[Dict[str, Any]],
) -> None:
"""Called when alert level is unchanged and non-OK.
If a deferred notification is pending and grace_seconds have elapsed,
fires it now. Otherwise falls through to normal reminder logic.
"""
if alert_state.pending_since is not None:
if time.time() - alert_state.pending_since >= self.grace_seconds:
lvl, message, formatted_msg = self._trigger_notification(
host_name, metric_path, AlertLevel.OK, alert_state.level, value, threshold, plugin_data
)
alert_state.formatted_message = formatted_msg
self._send_notification(
host_name, lvl, message, metric_path, AlertLevel.OK, alert_state.level, value
)
alert_state.pending_since = None
# else: still within grace window, do nothing
else:
self._check_renotify(host_name, alert_state, metric_path, value, threshold, plugin_data)
def _check_renotify(
self,
host_name: str,
alert_state: AlertState,
metric_path: str,
value: Any,
threshold: ThresholdConfig,
plugin_data: Optional[Dict[str, Any]] = None,
):
"""Check if we should send a repeat notification.
Args:
host_name: Name of the host
alert_state: Current alert state
metric_path: Full metric path
value: Current metric value
threshold: Threshold configuration
plugin_data: Optional dictionary of all plugin data fields
"""
if alert_state.level != AlertLevel.CRITICAL:
return
# Skip reminders if alert has been acknowledged
if alert_state.acknowledged:
return
now = time.time()
# Check if we should re-notify
if alert_state.last_notification is None:
# First notification already sent during state change
alert_state.last_notification = now
alert_state.notification_count = 1
return
if (now - alert_state.last_notification) >= self.renotify_interval:
# Determine which threshold is active
threshold_value = None
if alert_state.level == AlertLevel.CRITICAL and threshold.critical is not None:
threshold_value = threshold.critical
elif alert_state.level == AlertLevel.WARNING and threshold.warning is not None:
threshold_value = threshold.warning
# Format operator symbol
op_symbol = threshold.operator.value
# Time to re-notify
if threshold_value is not None:
# Use display format string
threshold_info = self._format_display(
threshold.display,
value=value,
threshold_value=threshold_value,
op_symbol=op_symbol,
plugin_data=plugin_data
)
message = f"REMINDER ({alert_state.level.name}): {host_name} - {metric_path} = {value} {threshold_info}, ongoing for {int(now - alert_state.since)}s"
else:
message = f"REMINDER ({alert_state.level.name}): {host_name} - {metric_path} = {value} (ongoing for {int(now - alert_state.since)}s)"
asyncio.get_event_loop().create_task(notify_mod.send_notification(
host_name,
notify_mod.Notification(
title=f"[REMINDER/{alert_state.level.name}] {host_name}",
body=message,
level=alert_state.level.name,
),
))
alert_state.last_notification = now
alert_state.notification_count += 1
logger.info("Re-notification sent: %s", message)
def get_active_alerts(self, alert_states: Dict[str, AlertState]) -> list:
"""
Get all currently active (non-OK) alerts.
Args:
alert_states: Host's alert_states dictionary
Returns:
List of AlertState objects that are not OK
"""
return [
state for state in alert_states.values()
if state.level != AlertLevel.OK
]
def get_alert_summary(self, alert_states: Dict[str, AlertState]) -> Dict[str, int]:
"""
Get summary counts of alert levels.
Args:
alert_states: Host's alert_states dictionary
Returns:
Dictionary with counts: {"ok": N, "warning": N, "critical": N}
"""
summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0}
for state in alert_states.values():
if state.level == AlertLevel.OK:
summary["ok"] += 1
elif state.level == AlertLevel.WARNING:
summary["warning"] += 1
elif state.level == AlertLevel.CRITICAL:
summary["critical"] += 1
elif state.level == AlertLevel.UNKNOWN:
summary["unknown"] += 1
return summary