"""Nagios Plugin Runner for Heartbeat. Executes Nagios-compatible monitoring plugins and parses their output. Nagios Plugin Standard: - Exit codes: 0=OK, 1=WARNING, 2=CRITICAL, 3=UNKNOWN - Output format: Single line status message, optional performance data - Performance data format: 'label'=value[UOM];[warn];[crit];[min];[max] Example configuration in ~/.hb.yaml: ```yaml nagios_runner: interval: 60 commands: - name: check_disk_root command: /usr/lib/nagios/plugins/check_disk -w 20% -c 10% -p / - name: check_procs command: /usr/lib/nagios/plugins/check_procs -w 250 -c 400 - name: check_load command: /usr/lib/nagios/plugins/check_load -w 5,4,3 -c 10,8,6 ``` """ import re import subprocess from typing import Any, Dict, List, Optional, Tuple from hbd.client.plugin import MonitorPlugin # Nagios exit codes NAGIOS_OK = 0 NAGIOS_WARNING = 1 NAGIOS_CRITICAL = 2 NAGIOS_UNKNOWN = 3 STATUS_NAMES = { NAGIOS_OK: "OK", NAGIOS_WARNING: "WARNING", NAGIOS_CRITICAL: "CRITICAL", NAGIOS_UNKNOWN: "UNKNOWN" } class NagiosRunnerPlugin(MonitorPlugin): """Run Nagios-compatible monitoring plugins. This plugin executes external Nagios plugins and collects their output, including status codes, messages, and performance data. Configuration: interval: Collection interval in seconds (default: 300) commands: List of command definitions with 'name' and 'command' keys timeout: Command execution timeout in seconds (default: 30) shell: Whether to execute commands via shell (default: True) Example: nagios_runner: interval: 300 # Check every 5 minutes timeout: 30 commands: - name: check_disk command: /usr/lib/nagios/plugins/check_disk -w 20% -c 10% - name: check_load command: /usr/lib/nagios/plugins/check_load -w 5,4,3 -c 10,8,6 """ name = "nagios_runner" version = "1.0.0" description = "Execute Nagios-compatible monitoring plugins" interval = 300 # MonitorPlugin: collect every 5 minutes by default def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) # Extract configuration self.commands: List[Dict[str, str]] = config.get("commands", []) if config else [] self.timeout: int = config.get("timeout", 30) if config else 30 self.shell: bool = config.get("shell", True) if config else True self.interval = config.get("interval", 300) if config else 300 # Validate commands if not self.commands: self.logger.info( "No Nagios commands configured. Add 'nagios_runner.commands' to config." ) async def initialize(self) -> bool: """Initialize the Nagios runner plugin. Returns: True if at least one command is configured, False otherwise """ self.logger.info(f"Initializing {self.name} plugin") if not self.commands: self.logger.info("No Nagios commands configured") return False self.logger.info(f"Configured to run {len(self.commands)} Nagios plugin(s)") for cmd_config in self.commands: name = cmd_config.get("name", "unnamed") self.logger.info(f" - {name}: {cmd_config.get('command', 'N/A')}") return True async def _collect_metrics(self) -> Dict[str, Any]: """Collect metrics from all configured Nagios plugins. Returns: Dictionary with results from all plugins """ results = {} # Track overall status (worst status wins) worst_status = NAGIOS_OK for cmd_config in self.commands: name = cmd_config.get("name") command = cmd_config.get("command") if not name or not command: self.logger.warning("Skipping command with missing name or command") continue # Execute plugin try: status_code, output, perfdata = await self._run_nagios_plugin(command) # Store results results[f"{name}_status"] = STATUS_NAMES.get(status_code, "UNKNOWN") results[f"{name}_status_code"] = status_code results[f"{name}_output"] = output # Track worst status if status_code > worst_status: worst_status = status_code # Parse and add performance data if perfdata: for metric_name, metric_value in perfdata.items(): results[f"{name}_{metric_name}"] = metric_value self.logger.debug( f"Executed {name}: {STATUS_NAMES.get(status_code, 'UNKNOWN')} - {output[:50]}" ) except Exception as e: self.logger.error(f"Error running {name}: {e}", exc_info=True) results[f"{name}_status"] = "ERROR" results[f"{name}_status_code"] = NAGIOS_UNKNOWN results[f"{name}_output"] = str(e) worst_status = NAGIOS_UNKNOWN # Add overall status results["overall_status"] = STATUS_NAMES.get(worst_status, "UNKNOWN") results["overall_status_code"] = worst_status results["plugin_count"] = len(self.commands) return results async def _run_nagios_plugin( self, command: str ) -> Tuple[int, str, Dict[str, Any]]: """Execute a Nagios plugin and parse its output. Args: command: Command string to execute Returns: Tuple of (status_code, output_message, performance_data_dict) """ try: # Run command result = subprocess.run( command, shell=self.shell, capture_output=True, timeout=self.timeout, text=True ) status_code = result.returncode output = result.stdout.strip() # Nagios plugins can return codes > 3, treat as UNKNOWN if status_code > 3: status_code = NAGIOS_UNKNOWN # Parse performance data perfdata = self._parse_perfdata(output) # Extract just the status message (before the pipe if present) if '|' in output: output_msg = output.split('|')[0].strip() else: output_msg = output return status_code, output_msg, perfdata except subprocess.TimeoutExpired: self.logger.error(f"Command timed out: {command}") return NAGIOS_UNKNOWN, f"Command timed out after {self.timeout}s", {} except Exception as e: self.logger.error(f"Error executing command: {e}") return NAGIOS_UNKNOWN, f"Execution error: {str(e)}", {} def _parse_perfdata(self, output: str) -> Dict[str, Any]: """Parse Nagios performance data from plugin output. Nagios performance data format: 'label'=value[UOM];[warn];[crit];[min];[max] Multiple metrics separated by spaces. Args: output: Plugin output string Returns: Dictionary of metric_name: value """ perfdata = {} # Performance data comes after the pipe character if '|' not in output: return perfdata perf_section = output.split('|', 1)[1].strip() # Regex to match performance data format # Matches: 'label'=value or label=value perf_regex = r"'?([^'=]+)'?=([\d.]+)([a-zA-Z%]*);?([\d.]*);?([\d.]*);?([\d.]*);?([\d.]*)" for match in re.finditer(perf_regex, perf_section): label = match.group(1).strip() value_str = match.group(2) uom = match.group(3) or "" warn = match.group(4) crit = match.group(5) min_val = match.group(6) max_val = match.group(7) # Convert value to float try: value = float(value_str) except ValueError: continue # Store the value perfdata[label] = value # Optionally store UOM as separate field if uom: perfdata[f"{label}_uom"] = uom # Store thresholds if present if warn: try: perfdata[f"{label}_warn"] = float(warn) except ValueError: pass if crit: try: perfdata[f"{label}_crit"] = float(crit) except ValueError: pass if min_val: try: perfdata[f"{label}_min"] = float(min_val) except ValueError: pass if max_val: try: perfdata[f"{label}_max"] = float(max_val) except ValueError: pass return perfdata