Files
heartbeat/hbd/client/threshold.py
T
Andreas Wrede 0543266c92 Major refactoring of the codebase, including restructuring of files and directories, renaming of modules and classes, and improvements to the overall organization and readability of the code. This refactoring aims to enhance maintainability, scalability, and clarity of the codebase while preserving existing functionality. The changes include:
- Restructuring of the project directory into client and server components
- Renaming of modules and classes to better reflect their purpose and functionality
- Moving common utilities and configurations to a shared location
- Updating import statements to reflect the new structure
- Adding new documentation files for better clarity on various aspects of the project
- Removing deprecated or unused code to streamline the codebase
- Ensuring that all existing functionality is preserved and that the codebase remains functional after the refactoring.
2026-03-29 11:13:40 -04:00

580 lines
20 KiB
Python

"""
Threshold checking and alerting for plugin metrics.
This module provides a flexible threshold checking system that:
- Evaluates plugin metrics against configured warning/critical thresholds
- Tracks alert states per host and metric
- Prevents alert flapping with hysteresis
- Triggers notifications only on state changes
- Supports multiple comparison operators
"""
import logging
import time
from enum import Enum
from typing import Dict, Any, Optional, Tuple, Callable
logger = logging.getLogger(__name__)
class AlertLevel(Enum):
"""Alert severity levels."""
OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3
class ComparisonOperator(Enum):
"""Supported comparison operators for threshold checks."""
GT = ">" # Greater than
GTE = ">=" # Greater than or equal
LT = "<" # Less than
LTE = "<=" # Less than or equal
EQ = "==" # Equal to
NEQ = "!=" # Not equal to
class AlertState:
"""Represents the current alert state for a specific metric."""
def __init__(self, metric_path: str):
"""
Initialize alert state.
Args:
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
"""
self.metric_path = metric_path
self.level = AlertLevel.OK
self.since = time.time()
self.last_value = None
self.last_check = time.time()
self.notification_count = 0
self.last_notification = None
def update(self, level: AlertLevel, value: Any) -> bool:
"""
Update alert state.
Args:
level: New alert level
value: Current metric value
Returns:
True if state changed (notification needed), False otherwise
"""
now = time.time()
self.last_check = now
self.last_value = value
# Check if state changed
if level != self.level:
logger.info(
"Alert state change for %s: %s -> %s (value: %s)",
self.metric_path,
self.level.name,
level.name,
value
)
self.level = level
self.since = now
self.notification_count = 0
return True
return False
def to_dict(self) -> dict:
"""Convert alert state to dictionary for serialization."""
return {
"metric_path": self.metric_path,
"level": self.level.name,
"since": self.since,
"last_value": self.last_value,
"last_check": self.last_check,
"notification_count": self.notification_count,
}
class ThresholdConfig:
"""Configuration for a single threshold check."""
def __init__(
self,
metric_path: str,
warning: Optional[float] = None,
critical: Optional[float] = None,
operator: str = ">",
hysteresis: float = 0.0,
enabled: bool = True,
):
"""
Initialize threshold configuration.
Args:
metric_path: Full path to metric (e.g., "cpu_monitor.cpu_percent")
warning: Warning threshold value
critical: Critical threshold value
operator: Comparison operator (>, >=, <, <=, ==, !=)
hysteresis: Hysteresis percentage to prevent flapping (0.0-1.0)
enabled: Whether this threshold is enabled
"""
self.metric_path = metric_path
self.warning = warning
self.critical = critical
self.enabled = enabled
self.hysteresis = hysteresis
# Parse operator
try:
self.operator = ComparisonOperator(operator)
except ValueError:
logger.warning(
"Invalid operator '%s' for %s, using '>' as default",
operator,
metric_path
)
self.operator = ComparisonOperator.GT
def evaluate(self, value: float) -> AlertLevel:
"""
Evaluate a value against this threshold.
Args:
value: Metric value to check
Returns:
AlertLevel indicating the severity
"""
if not self.enabled:
return AlertLevel.OK
try:
# Convert value to float for comparison
value = float(value)
except (TypeError, ValueError):
logger.warning("Cannot convert value %s to float for %s", value, self.metric_path)
return AlertLevel.UNKNOWN
# Check critical threshold first
if self.critical is not None:
if self._compare(value, self.critical):
return AlertLevel.CRITICAL
# Then check warning threshold
if self.warning is not None:
if self._compare(value, self.warning):
return AlertLevel.WARNING
return AlertLevel.OK
def evaluate_with_hysteresis(
self,
value: float,
current_level: AlertLevel
) -> AlertLevel:
"""
Evaluate with hysteresis to prevent flapping.
Args:
value: Current metric value
current_level: Current alert level
Returns:
New alert level considering hysteresis
"""
new_level = self.evaluate(value)
# If no hysteresis, return new level
if self.hysteresis == 0.0:
return new_level
# If improving (going to a lower severity), apply hysteresis
if new_level.value < current_level.value:
# For recovery, value must be better by hysteresis amount
if current_level == AlertLevel.CRITICAL and self.critical is not None:
threshold = self.critical
elif current_level == AlertLevel.WARNING and self.warning is not None:
threshold = self.warning
else:
return new_level
# Calculate hysteresis threshold
hysteresis_amount = abs(threshold * self.hysteresis)
if self.operator in [ComparisonOperator.GT, ComparisonOperator.GTE]:
# For "greater than" thresholds, value must go below by hysteresis
recovery_threshold = threshold - hysteresis_amount
if value >= recovery_threshold:
# Not enough improvement, keep current level
return current_level
elif self.operator in [ComparisonOperator.LT, ComparisonOperator.LTE]:
# For "less than" thresholds, value must go above by hysteresis
recovery_threshold = threshold + hysteresis_amount
if value <= recovery_threshold:
# Not enough improvement, keep current level
return current_level
return new_level
def _compare(self, value: float, threshold: float) -> bool:
"""Perform comparison based on operator."""
if self.operator == ComparisonOperator.GT:
return value > threshold
elif self.operator == ComparisonOperator.GTE:
return value >= threshold
elif self.operator == ComparisonOperator.LT:
return value < threshold
elif self.operator == ComparisonOperator.LTE:
return value <= threshold
elif self.operator == ComparisonOperator.EQ:
return abs(value - threshold) < 1e-9 # Float comparison
elif self.operator == ComparisonOperator.NEQ:
return abs(value - threshold) >= 1e-9
return False
class ThresholdChecker:
"""Main threshold checking and alerting system."""
def __init__(
self,
config: Dict[str, Any],
notification_callback: Optional[Callable] = None,
renotify_interval: int = 3600,
journal: Optional[Any] = None,
):
"""
Initialize threshold checker.
Args:
config: Threshold configuration dictionary from YAML
notification_callback: Function to call for notifications
renotify_interval: Seconds between repeat notifications (default: 1 hour)
journal: Optional MessageJournal instance for logging threshold events
"""
self.thresholds = {} # {metric_path: ThresholdConfig}
self.notification_callback = notification_callback
self.renotify_interval = renotify_interval
self.journal = journal
# Parse configuration
self._parse_config(config)
logger.info("ThresholdChecker initialized with %d thresholds", len(self.thresholds))
def _parse_config(self, config: Dict[str, Any]):
"""Parse threshold configuration from YAML structure."""
if not config or "thresholds" not in config:
logger.info("No thresholds configured")
return
thresholds_config = config["thresholds"]
for plugin_name, plugin_thresholds in thresholds_config.items():
if not isinstance(plugin_thresholds, dict):
continue
self._parse_plugin_thresholds(plugin_name, plugin_thresholds)
def _parse_plugin_thresholds(self, plugin_name: str, thresholds: Dict[str, Any]):
"""Parse thresholds for a specific plugin."""
for metric_name, threshold_config in thresholds.items():
if not isinstance(threshold_config, dict):
continue
# Handle nested metrics (e.g., partitions./.percent)
if metric_name == "partitions":
self._parse_partition_thresholds(plugin_name, threshold_config)
continue
metric_path = f"{plugin_name}.{metric_name}"
# Extract threshold values
warning = threshold_config.get("warning")
critical = threshold_config.get("critical")
operator = threshold_config.get("operator", ">")
hysteresis = threshold_config.get("hysteresis", 0.1) # 10% default
enabled = threshold_config.get("enabled", True)
if warning is None and critical is None:
logger.warning("No thresholds defined for %s, skipping", metric_path)
continue
threshold = ThresholdConfig(
metric_path=metric_path,
warning=warning,
critical=critical,
operator=operator,
hysteresis=hysteresis,
enabled=enabled,
)
self.thresholds[metric_path] = threshold
logger.debug(
"Registered threshold for %s: warn=%s, crit=%s, op=%s",
metric_path,
warning,
critical,
operator
)
def _parse_partition_thresholds(self, plugin_name: str, partitions: Dict[str, Any]):
"""Parse partition-specific thresholds for disk monitoring."""
for partition, metrics in partitions.items():
if not isinstance(metrics, dict):
continue
for metric_name, threshold_config in metrics.items():
if not isinstance(threshold_config, dict):
continue
# Create metric path like "disk_monitor./dev/sda1.percent"
metric_path = f"{plugin_name}.{partition}.{metric_name}"
warning = threshold_config.get("warning")
critical = threshold_config.get("critical")
operator = threshold_config.get("operator", ">")
hysteresis = threshold_config.get("hysteresis", 0.1)
enabled = threshold_config.get("enabled", True)
if warning is None and critical is None:
continue
threshold = ThresholdConfig(
metric_path=metric_path,
warning=warning,
critical=critical,
operator=operator,
hysteresis=hysteresis,
enabled=enabled,
)
self.thresholds[metric_path] = threshold
def check_plugin_data(
self,
host_name: str,
plugin_name: str,
data: Dict[str, Any],
alert_states: Dict[str, AlertState],
) -> list:
"""
Check plugin data against configured thresholds.
Args:
host_name: Name of the host
plugin_name: Name of the plugin
data: Plugin data dictionary
alert_states: Host's alert_states dictionary
Returns:
List of (metric_path, old_level, new_level, value) tuples for state changes
"""
state_changes = []
# Check flat metrics
for metric_name, value in data.items():
metric_path = f"{plugin_name}.{metric_name}"
if metric_path not in self.thresholds:
continue
threshold = self.thresholds[metric_path]
# Get or create alert state
if metric_path not in alert_states:
alert_states[metric_path] = AlertState(metric_path)
alert_state = alert_states[metric_path]
# Evaluate threshold with hysteresis
new_level = threshold.evaluate_with_hysteresis(
value,
alert_state.level
)
# Update state and check for changes
old_level = alert_state.level
if alert_state.update(new_level, value):
state_changes.append((metric_path, old_level, new_level, value))
self._trigger_notification(host_name, metric_path, old_level, new_level, value)
elif new_level != AlertLevel.OK:
# Check if we should re-notify
self._check_renotify(host_name, alert_state, metric_path, value)
# Check nested metrics (e.g., partition data in disk_monitor)
self._check_nested_metrics(
host_name,
plugin_name,
data,
alert_states,
state_changes
)
return state_changes
def _check_nested_metrics(
self,
host_name: str,
plugin_name: str,
data: Dict[str, Any],
alert_states: Dict[str, AlertState],
state_changes: list,
):
"""Check nested metrics like partition-specific thresholds."""
# Look for partition data in disk_monitor
if plugin_name == "disk_monitor" and "partitions" in data:
partitions = data["partitions"]
if not isinstance(partitions, dict):
return
for partition, metrics in partitions.items():
if not isinstance(metrics, dict):
continue
for metric_name, value in metrics.items():
metric_path = f"{plugin_name}.{partition}.{metric_name}"
if metric_path not in self.thresholds:
continue
threshold = self.thresholds[metric_path]
if metric_path not in alert_states:
alert_states[metric_path] = AlertState(metric_path)
alert_state = alert_states[metric_path]
new_level = threshold.evaluate_with_hysteresis(
value,
alert_state.level
)
old_level = alert_state.level
if alert_state.update(new_level, value):
state_changes.append((metric_path, old_level, new_level, value))
self._trigger_notification(
host_name,
metric_path,
old_level,
new_level,
value
)
elif new_level != AlertLevel.OK:
self._check_renotify(host_name, alert_state, metric_path, value)
def _trigger_notification(
self,
host_name: str,
metric_path: str,
old_level: AlertLevel,
new_level: AlertLevel,
value: Any,
):
"""Trigger a notification for an alert state change."""
# Format message
if new_level == AlertLevel.OK:
message = f"RECOVERED: {host_name} - {metric_path} = {value} ({old_level.name} -> OK)"
elif new_level == AlertLevel.WARNING:
message = f"WARNING: {host_name} - {metric_path} = {value}"
elif new_level == AlertLevel.CRITICAL:
message = f"CRITICAL: {host_name} - {metric_path} = {value}"
else:
message = f"UNKNOWN: {host_name} - {metric_path} = {value}"
# Send notification
if self.notification_callback is not None:
try:
self.notification_callback(message)
logger.info("Notification sent: %s", message)
except Exception as e:
logger.error("Failed to send notification: %s", e)
# Log to journal
if self.journal is not None:
try:
import asyncio
loop = asyncio.get_event_loop()
loop.create_task(self.journal.log_threshold_event(
host_name=host_name,
metric_path=metric_path,
old_level=old_level.name,
new_level=new_level.name,
value=value,
))
except Exception as e:
logger.debug(f"Failed to log threshold event to journal: {e}")
def _check_renotify(
self,
host_name: str,
alert_state: AlertState,
metric_path: str,
value: Any,
):
"""Check if we should send a repeat notification."""
if alert_state.level == AlertLevel.OK:
return
now = time.time()
# Check if we should re-notify
if alert_state.last_notification is None:
# First notification already sent during state change
alert_state.last_notification = now
alert_state.notification_count = 1
return
if (now - alert_state.last_notification) >= self.renotify_interval:
# Time to re-notify
message = f"REMINDER ({alert_state.level.name}): {host_name} - {metric_path} = {value} (ongoing for {int(now - alert_state.since)}s)"
if self.notification_callback:
try:
self.notification_callback(message)
alert_state.last_notification = now
alert_state.notification_count += 1
logger.info("Re-notification sent: %s", message)
except Exception as e:
logger.error("Failed to send re-notification: %s", e)
def get_active_alerts(self, alert_states: Dict[str, AlertState]) -> list:
"""
Get all currently active (non-OK) alerts.
Args:
alert_states: Host's alert_states dictionary
Returns:
List of AlertState objects that are not OK
"""
return [
state for state in alert_states.values()
if state.level != AlertLevel.OK
]
def get_alert_summary(self, alert_states: Dict[str, AlertState]) -> Dict[str, int]:
"""
Get summary counts of alert levels.
Args:
alert_states: Host's alert_states dictionary
Returns:
Dictionary with counts: {"ok": N, "warning": N, "critical": N}
"""
summary = {"ok": 0, "warning": 0, "critical": 0, "unknown": 0}
for state in alert_states.values():
if state.level == AlertLevel.OK:
summary["ok"] += 1
elif state.level == AlertLevel.WARNING:
summary["warning"] += 1
elif state.level == AlertLevel.CRITICAL:
summary["critical"] += 1
elif state.level == AlertLevel.UNKNOWN:
summary["unknown"] += 1
return summary