Files
heartbeat/hbd/client/plugins/nagios_runner.py
T
Andreas Wrede 0543266c92 Major refactoring of the codebase, including restructuring of files and directories, renaming of modules and classes, and improvements to the overall organization and readability of the code. This refactoring aims to enhance maintainability, scalability, and clarity of the codebase while preserving existing functionality. The changes include:
- Restructuring of the project directory into client and server components
- Renaming of modules and classes to better reflect their purpose and functionality
- Moving common utilities and configurations to a shared location
- Updating import statements to reflect the new structure
- Adding new documentation files for better clarity on various aspects of the project
- Removing deprecated or unused code to streamline the codebase
- Ensuring that all existing functionality is preserved and that the codebase remains functional after the refactoring.
2026-03-29 11:13:40 -04:00

284 lines
9.4 KiB
Python

"""Nagios Plugin Runner for Heartbeat.
Executes Nagios-compatible monitoring plugins and parses their output.
Nagios Plugin Standard:
- Exit codes: 0=OK, 1=WARNING, 2=CRITICAL, 3=UNKNOWN
- Output format: Single line status message, optional performance data
- Performance data format: 'label'=value[UOM];[warn];[crit];[min];[max]
Example configuration in ~/.hb.yaml:
```yaml
nagios_runner:
interval: 60
commands:
- name: check_disk_root
command: /usr/lib/nagios/plugins/check_disk -w 20% -c 10% -p /
- name: check_procs
command: /usr/lib/nagios/plugins/check_procs -w 250 -c 400
- name: check_load
command: /usr/lib/nagios/plugins/check_load -w 5,4,3 -c 10,8,6
```
"""
import re
import subprocess
from typing import Any, Dict, List, Optional, Tuple
from hbd.client.plugin import MonitorPlugin
# Nagios exit codes
NAGIOS_OK = 0
NAGIOS_WARNING = 1
NAGIOS_CRITICAL = 2
NAGIOS_UNKNOWN = 3
STATUS_NAMES = {
NAGIOS_OK: "OK",
NAGIOS_WARNING: "WARNING",
NAGIOS_CRITICAL: "CRITICAL",
NAGIOS_UNKNOWN: "UNKNOWN"
}
class NagiosRunnerPlugin(MonitorPlugin):
"""Run Nagios-compatible monitoring plugins.
This plugin executes external Nagios plugins and collects their output,
including status codes, messages, and performance data.
Configuration:
interval: Collection interval in seconds (default: 300)
commands: List of command definitions with 'name' and 'command' keys
timeout: Command execution timeout in seconds (default: 30)
shell: Whether to execute commands via shell (default: True)
Example:
nagios_runner:
interval: 300 # Check every 5 minutes
timeout: 30
commands:
- name: check_disk
command: /usr/lib/nagios/plugins/check_disk -w 20% -c 10%
- name: check_load
command: /usr/lib/nagios/plugins/check_load -w 5,4,3 -c 10,8,6
"""
name = "nagios_runner"
version = "1.0.0"
description = "Execute Nagios-compatible monitoring plugins"
interval = 300 # MonitorPlugin: collect every 5 minutes by default
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
# Extract configuration
self.commands: List[Dict[str, str]] = config.get("commands", []) if config else []
self.timeout: int = config.get("timeout", 30) if config else 30
self.shell: bool = config.get("shell", True) if config else True
self.interval = config.get("interval", 300) if config else 300
# Validate commands
if not self.commands:
self.logger.warning(
"No Nagios commands configured. Add 'nagios_runner.commands' to config."
)
async def initialize(self) -> bool:
"""Initialize the Nagios runner plugin.
Returns:
True if at least one command is configured, False otherwise
"""
self.logger.info(f"Initializing {self.name} plugin")
if not self.commands:
self.logger.error("No Nagios commands configured")
return False
self.logger.info(f"Configured to run {len(self.commands)} Nagios plugin(s)")
for cmd_config in self.commands:
name = cmd_config.get("name", "unnamed")
self.logger.info(f" - {name}: {cmd_config.get('command', 'N/A')}")
return True
async def _collect_metrics(self) -> Dict[str, Any]:
"""Collect metrics from all configured Nagios plugins.
Returns:
Dictionary with results from all plugins
"""
results = {}
# Track overall status (worst status wins)
worst_status = NAGIOS_OK
for cmd_config in self.commands:
name = cmd_config.get("name")
command = cmd_config.get("command")
if not name or not command:
self.logger.warning("Skipping command with missing name or command")
continue
# Execute plugin
try:
status_code, output, perfdata = await self._run_nagios_plugin(command)
# Store results
results[f"{name}_status"] = STATUS_NAMES.get(status_code, "UNKNOWN")
results[f"{name}_status_code"] = status_code
results[f"{name}_output"] = output
# Track worst status
if status_code > worst_status:
worst_status = status_code
# Parse and add performance data
if perfdata:
for metric_name, metric_value in perfdata.items():
results[f"{name}_{metric_name}"] = metric_value
self.logger.debug(
f"Executed {name}: {STATUS_NAMES.get(status_code, 'UNKNOWN')} - {output[:50]}"
)
except Exception as e:
self.logger.error(f"Error running {name}: {e}", exc_info=True)
results[f"{name}_status"] = "ERROR"
results[f"{name}_status_code"] = NAGIOS_UNKNOWN
results[f"{name}_output"] = str(e)
worst_status = NAGIOS_UNKNOWN
# Add overall status
results["overall_status"] = STATUS_NAMES.get(worst_status, "UNKNOWN")
results["overall_status_code"] = worst_status
results["plugin_count"] = len(self.commands)
return results
async def _run_nagios_plugin(
self,
command: str
) -> Tuple[int, str, Dict[str, Any]]:
"""Execute a Nagios plugin and parse its output.
Args:
command: Command string to execute
Returns:
Tuple of (status_code, output_message, performance_data_dict)
"""
try:
# Run command
result = subprocess.run(
command,
shell=self.shell,
capture_output=True,
timeout=self.timeout,
text=True
)
status_code = result.returncode
output = result.stdout.strip()
# Nagios plugins can return codes > 3, treat as UNKNOWN
if status_code > 3:
status_code = NAGIOS_UNKNOWN
# Parse performance data
perfdata = self._parse_perfdata(output)
# Extract just the status message (before the pipe if present)
if '|' in output:
output_msg = output.split('|')[0].strip()
else:
output_msg = output
return status_code, output_msg, perfdata
except subprocess.TimeoutExpired:
self.logger.error(f"Command timed out: {command}")
return NAGIOS_UNKNOWN, f"Command timed out after {self.timeout}s", {}
except Exception as e:
self.logger.error(f"Error executing command: {e}")
return NAGIOS_UNKNOWN, f"Execution error: {str(e)}", {}
def _parse_perfdata(self, output: str) -> Dict[str, Any]:
"""Parse Nagios performance data from plugin output.
Nagios performance data format:
'label'=value[UOM];[warn];[crit];[min];[max]
Multiple metrics separated by spaces.
Args:
output: Plugin output string
Returns:
Dictionary of metric_name: value
"""
perfdata = {}
# Performance data comes after the pipe character
if '|' not in output:
return perfdata
perf_section = output.split('|', 1)[1].strip()
# Regex to match performance data format
# Matches: 'label'=value or label=value
perf_regex = r"'?([^'=]+)'?=([\d.]+)([a-zA-Z%]*);?([\d.]*);?([\d.]*);?([\d.]*);?([\d.]*)"
for match in re.finditer(perf_regex, perf_section):
label = match.group(1).strip()
value_str = match.group(2)
uom = match.group(3) or ""
warn = match.group(4)
crit = match.group(5)
min_val = match.group(6)
max_val = match.group(7)
# Convert value to float
try:
value = float(value_str)
except ValueError:
continue
# Store the value
perfdata[label] = value
# Optionally store UOM as separate field
if uom:
perfdata[f"{label}_uom"] = uom
# Store thresholds if present
if warn:
try:
perfdata[f"{label}_warn"] = float(warn)
except ValueError:
pass
if crit:
try:
perfdata[f"{label}_crit"] = float(crit)
except ValueError:
pass
if min_val:
try:
perfdata[f"{label}_min"] = float(min_val)
except ValueError:
pass
if max_val:
try:
perfdata[f"{label}_max"] = float(max_val)
except ValueError:
pass
return perfdata