"""Plugin system for extending Heartbeat data collection and monitoring. This module provides the base classes and infrastructure for the plugin system that enables extending hbc (client) data collection and hbd (server) processing. Plugin Types: - InfoPlugin: Collects static or rarely-changing information (OS, hardware) - MonitorPlugin: Collects periodic monitoring data (CPU, memory, disk usage) Plugins run on the client (hbc) to gather data, which is then sent to the server (hbd) for storage, threshold checking, and display. """ import importlib.util import inspect import logging import sys from abc import ABC, abstractmethod from pathlib import Path from typing import Any, Dict, List, Optional, Type class Plugin(ABC): """Base class for all plugins. Attributes: name: Unique plugin identifier (e.g., "os_info", "cpu_monitor") version: Plugin version string description: Human-readable description interval: Collection interval in seconds (0 for InfoPlugin = collect once) enabled: Whether plugin is active (can be disabled via config) skip_reason: Set by plugin before returning False from initialize(); causes loader to log INFO instead of WARNING. """ name: str = "" version: str = "1.0.0" description: str = "" interval: int = 0 enabled: bool = True def __init__(self, config: Optional[Dict[str, Any]] = None): """Initialize plugin with optional configuration. Args: config: Plugin-specific configuration from YAML (e.g., thresholds, paths) """ self.config = config or {} self.logger = logging.getLogger(f"plugin.{self.name}") self._initialized = False self.skip_reason: Optional[str] = None @abstractmethod async def initialize(self) -> bool: """Initialize plugin (load resources, check dependencies). Called once when plugin is loaded. Plugins should validate dependencies (e.g., check if psutil is available) and prepare any resources. Returns: True if initialization succeeded, False otherwise """ pass @abstractmethod async def collect(self) -> Dict[str, Any]: """Collect data from the system. This is the main method called on each collection interval. Should return a dictionary of key-value pairs representing the collected data. Keys should be strings (metric names). Values can be: - Scalars: int, float, str, bool - Lists/dicts (will be serialized appropriately) Returns: Dictionary of collected metrics, or empty dict on error """ pass async def cleanup(self) -> None: """Cleanup plugin resources before shutdown. Called when plugin is being unloaded or on system shutdown. Override to release resources, close connections, etc. """ pass def validate_data(self, data: Dict[str, Any]) -> bool: """Validate collected data before sending to server. Override to implement custom validation logic. Args: data: Data returned from collect() Returns: True if data is valid, False otherwise """ return isinstance(data, dict) class InfoPlugin(Plugin): """Plugin for collecting static or rarely-changing information. InfoPlugins collect data that doesn't change frequently: - OS name and version - Hardware specifications (CPU model, RAM size) - Network interface MAC addresses Characteristics: - interval = 0 (collected once at startup by default) - Can specify interval > 0 for periodic refresh (e.g., check for hardware changes) - Data is cached and reused until next collection """ interval: int = 0 # Collect once at startup def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) self._cached_data: Optional[Dict[str, Any]] = None async def get_cached_data(self) -> Optional[Dict[str, Any]]: """Get cached data if available (avoids re-collection). Returns: Cached data dict, or None if not yet collected """ return self._cached_data async def collect(self) -> Dict[str, Any]: """Collect and cache static information.""" if self._cached_data is None: self._cached_data = await self._collect_info() return self._cached_data @abstractmethod async def _collect_info(self) -> Dict[str, Any]: """Internal method to perform actual data collection. Override this method instead of collect() for InfoPlugins. """ pass def invalidate_cache(self) -> None: """Force re-collection on next collect() call.""" self._cached_data = None class MonitorPlugin(Plugin): """Plugin for collecting periodic monitoring data. MonitorPlugins collect time-series metrics that change frequently: - CPU usage percentage - Memory consumption - Disk I/O statistics - Network traffic Characteristics: - interval > 0 (e.g., 30 seconds for CPU, 60 for disk) - Collected continuously on schedule - Data includes timestamps for time-series tracking """ interval: int = 30 # Default: collect every 30 seconds def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) self._last_reading: Optional[Dict[str, Any]] = None def get_last_reading(self) -> Optional[Dict[str, Any]]: """Get the last collected reading. Returns: Last reading dict with timestamp, or None if not yet collected """ return self._last_reading async def collect(self) -> Dict[str, Any]: """Collect monitoring data and store as last reading.""" data = await self._collect_metrics() if data: # Add collection timestamp import time data['_timestamp'] = time.time() self._last_reading = data return data @abstractmethod async def _collect_metrics(self) -> Dict[str, Any]: """Internal method to perform actual metric collection. Override this method instead of collect() for MonitorPlugins. """ pass class PluginRegistry: """Registry for managing loaded plugins. Maintains a collection of loaded plugins and provides methods to query plugins by name, type, or interval. """ def __init__(self): self._plugins: Dict[str, Plugin] = {} self.logger = logging.getLogger("plugin.registry") def register(self, plugin: Plugin) -> bool: """Register a plugin instance. Args: plugin: Plugin instance to register Returns: True if registered successfully, False if name conflict """ if plugin.name in self._plugins: self.logger.error(f"Plugin '{plugin.name}' already registered") return False self._plugins[plugin.name] = plugin self.logger.info(f"Registered plugin: {plugin.name} v{plugin.version}") return True def unregister(self, name: str) -> bool: """Unregister a plugin by name. Args: name: Plugin name to unregister Returns: True if unregistered, False if not found """ if name in self._plugins: del self._plugins[name] self.logger.info(f"Unregistered plugin: {name}") return True return False def get(self, name: str) -> Optional[Plugin]: """Get plugin by name. Args: name: Plugin name Returns: Plugin instance or None if not found """ return self._plugins.get(name) def get_all(self) -> List[Plugin]: """Get all registered plugins.""" return list(self._plugins.values()) def get_enabled(self) -> List[Plugin]: """Get all enabled plugins.""" return [p for p in self._plugins.values() if p.enabled] def get_by_type(self, plugin_type: Type[Plugin]) -> List[Plugin]: """Get all plugins of a specific type. Args: plugin_type: Plugin class (InfoPlugin or MonitorPlugin) Returns: List of plugins matching the type """ return [p for p in self._plugins.values() if isinstance(p, plugin_type)] def get_by_interval(self, interval: int) -> List[Plugin]: """Get all plugins with a specific collection interval. Args: interval: Interval in seconds (0 for one-time collection) Returns: List of plugins with matching interval """ return [p for p in self._plugins.values() if p.interval == interval] class PluginLoader: """Load plugins from filesystem and instantiate them. Scans plugin directories for Python modules containing Plugin subclasses, loads them dynamically, and registers them with the PluginRegistry. """ def __init__(self, registry: PluginRegistry): self.registry = registry self.logger = logging.getLogger("plugin.loader") self._loaded_modules: Dict[str, Any] = {} async def load_from_directory( self, directory: Path, config: Optional[Dict[str, Any]] = None ) -> int: """Load all plugins from a directory. Scans for .py files, imports them, finds Plugin subclasses, instantiates them with config, initializes, and registers. Args: directory: Path to plugin directory config: Configuration dict (may contain per-plugin config) Returns: Number of plugins successfully loaded """ if not directory.exists() or not directory.is_dir(): self.logger.warning(f"Plugin directory not found: {directory}") return 0 loaded_count = 0 raw_config = config or {} # Per-plugin config lives under the 'plugins' key or at top-level. # CLIENT_DEFAULTS seeds "plugins": {} so the key always exists; check # both the subdict and top-level so that either layout in .hbc.yaml works. plugins_subconfig = raw_config.get("plugins", {}) # Scan for Python files for plugin_file in directory.glob("*.py"): if plugin_file.name.startswith("_"): continue # Skip __init__.py and private modules self.logger.debug(f"Processing plugin file: {plugin_file.name}") try: # Load module dynamically module_name = f"plugins.{plugin_file.stem}" spec = importlib.util.spec_from_file_location(module_name, plugin_file) if not spec or not spec.loader: self.logger.warning(f"Could not create spec for {plugin_file}") continue module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module spec.loader.exec_module(module) self._loaded_modules[module_name] = module self.logger.debug(f"Loaded module: {module_name}") # Track which plugin classes we've already processed to avoid duplicates processed_classes = set() # Find Plugin subclasses in module for name, obj in inspect.getmembers(module, inspect.isclass): # Skip base classes and non-Plugin classes if obj in (Plugin, InfoPlugin, MonitorPlugin): self.logger.debug(f"Skipping base class: {name}") continue if not issubclass(obj, Plugin): self.logger.debug(f"Skipping non-Plugin class: {name}") continue # Skip if we've already processed this class (handles module-level aliases) if id(obj) in processed_classes: self.logger.debug(f"Skipping duplicate reference to: {obj.__name__}") continue processed_classes.add(id(obj)) self.logger.debug(f"Found plugin class: {name}") # Instantiate plugin with config — check plugins subdict first, # then top-level keys (e.g. nagios_runner: ... at root of config). plugin_instance_config = dict(plugins_subconfig.get(obj.name) or raw_config.get(obj.name) or {}) # Propagate top-level owner so os_info (and any future plugin) can report it. if "owner" in raw_config and "owner" not in plugin_instance_config: plugin_instance_config["owner"] = raw_config["owner"] plugin = obj(config=plugin_instance_config) # Initialize plugin try: initialized = await plugin.initialize() if not initialized: if plugin.skip_reason: self.logger.info( f"Plugin {plugin.name} skipped: {plugin.skip_reason}" ) else: self.logger.warning( f"Plugin {plugin.name} failed initialization, skipping" ) continue except Exception as e: self.logger.error( f"Error initializing plugin {plugin.name}: {e}", exc_info=True ) continue # Register with registry if self.registry.register(plugin): loaded_count += 1 self.logger.info( f"Loaded plugin: {plugin.name} v{plugin.version} " f"(interval: {plugin.interval}s)" ) except Exception as e: self.logger.error( f"Error loading plugin from {plugin_file}: {e}", exc_info=True ) return loaded_count async def unload_all(self) -> None: """Unload all plugins and cleanup resources.""" for plugin in self.registry.get_all(): try: await plugin.cleanup() except Exception as e: self.logger.error( f"Error cleaning up plugin {plugin.name}: {e}", exc_info=True ) self.registry.unregister(plugin.name) # Remove loaded modules for module_name in self._loaded_modules: if module_name in sys.modules: del sys.modules[module_name] self._loaded_modules.clear()