422 lines
15 KiB
Python
422 lines
15 KiB
Python
"""Plugin system for extending Heartbeat data collection and monitoring.
|
|
|
|
This module provides the base classes and infrastructure for the plugin system
|
|
that enables extending hbc (client) data collection and hbd (server) processing.
|
|
|
|
Plugin Types:
|
|
- InfoPlugin: Collects static or rarely-changing information (OS, hardware)
|
|
- MonitorPlugin: Collects periodic monitoring data (CPU, memory, disk usage)
|
|
|
|
Plugins run on the client (hbc) to gather data, which is then sent to the server
|
|
(hbd) for storage, threshold checking, and display.
|
|
"""
|
|
|
|
import importlib.util
|
|
import inspect
|
|
import logging
|
|
import sys
|
|
from abc import ABC, abstractmethod
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Type
|
|
|
|
|
|
class Plugin(ABC):
|
|
"""Base class for all plugins.
|
|
|
|
Attributes:
|
|
name: Unique plugin identifier (e.g., "os_info", "cpu_monitor")
|
|
version: Plugin version string
|
|
description: Human-readable description
|
|
interval: Collection interval in seconds (0 for InfoPlugin = collect once)
|
|
enabled: Whether plugin is active (can be disabled via config)
|
|
"""
|
|
|
|
name: str = ""
|
|
version: str = "1.0.0"
|
|
description: str = ""
|
|
interval: int = 0
|
|
enabled: bool = True
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
"""Initialize plugin with optional configuration.
|
|
|
|
Args:
|
|
config: Plugin-specific configuration from YAML (e.g., thresholds, paths)
|
|
"""
|
|
self.config = config or {}
|
|
self.logger = logging.getLogger(f"plugin.{self.name}")
|
|
self._initialized = False
|
|
self.skip_reason: Optional[str] = None
|
|
|
|
@abstractmethod
|
|
async def initialize(self) -> bool:
|
|
"""Initialize plugin (load resources, check dependencies).
|
|
|
|
Called once when plugin is loaded. Plugins should validate dependencies
|
|
(e.g., check if psutil is available) and prepare any resources.
|
|
|
|
Returns:
|
|
True if initialization succeeded, False otherwise
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def collect(self) -> Dict[str, Any]:
|
|
"""Collect data from the system.
|
|
|
|
This is the main method called on each collection interval. Should return
|
|
a dictionary of key-value pairs representing the collected data.
|
|
|
|
Keys should be strings (metric names). Values can be:
|
|
- Scalars: int, float, str, bool
|
|
- Lists/dicts (will be serialized appropriately)
|
|
|
|
Returns:
|
|
Dictionary of collected metrics, or empty dict on error
|
|
"""
|
|
pass
|
|
|
|
async def cleanup(self) -> None:
|
|
"""Cleanup plugin resources before shutdown.
|
|
|
|
Called when plugin is being unloaded or on system shutdown.
|
|
Override to release resources, close connections, etc.
|
|
"""
|
|
pass
|
|
|
|
def validate_data(self, data: Dict[str, Any]) -> bool:
|
|
"""Validate collected data before sending to server.
|
|
|
|
Override to implement custom validation logic.
|
|
|
|
Args:
|
|
data: Data returned from collect()
|
|
|
|
Returns:
|
|
True if data is valid, False otherwise
|
|
"""
|
|
return isinstance(data, dict)
|
|
|
|
|
|
class InfoPlugin(Plugin):
|
|
"""Plugin for collecting static or rarely-changing information.
|
|
|
|
InfoPlugins collect data that doesn't change frequently:
|
|
- OS name and version
|
|
- Hardware specifications (CPU model, RAM size)
|
|
- Network interface MAC addresses
|
|
|
|
Characteristics:
|
|
- interval = 0 (collected once at startup by default)
|
|
- Can specify interval > 0 for periodic refresh (e.g., check for hardware changes)
|
|
- Data is cached and reused until next collection
|
|
"""
|
|
|
|
interval: int = 0 # Collect once at startup
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
self._cached_data: Optional[Dict[str, Any]] = None
|
|
|
|
async def get_cached_data(self) -> Optional[Dict[str, Any]]:
|
|
"""Get cached data if available (avoids re-collection).
|
|
|
|
Returns:
|
|
Cached data dict, or None if not yet collected
|
|
"""
|
|
return self._cached_data
|
|
|
|
async def collect(self) -> Dict[str, Any]:
|
|
"""Collect and cache static information."""
|
|
if self._cached_data is None:
|
|
self._cached_data = await self._collect_info()
|
|
return self._cached_data
|
|
|
|
@abstractmethod
|
|
async def _collect_info(self) -> Dict[str, Any]:
|
|
"""Internal method to perform actual data collection.
|
|
|
|
Override this method instead of collect() for InfoPlugins.
|
|
"""
|
|
pass
|
|
|
|
def invalidate_cache(self) -> None:
|
|
"""Force re-collection on next collect() call."""
|
|
self._cached_data = None
|
|
|
|
|
|
class MonitorPlugin(Plugin):
|
|
"""Plugin for collecting periodic monitoring data.
|
|
|
|
MonitorPlugins collect time-series metrics that change frequently:
|
|
- CPU usage percentage
|
|
- Memory consumption
|
|
- Disk I/O statistics
|
|
- Network traffic
|
|
|
|
Characteristics:
|
|
- interval > 0 (e.g., 30 seconds for CPU, 60 for disk)
|
|
- Collected continuously on schedule
|
|
- Data includes timestamps for time-series tracking
|
|
"""
|
|
|
|
interval: int = 30 # Default: collect every 30 seconds
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
self._last_reading: Optional[Dict[str, Any]] = None
|
|
|
|
def get_last_reading(self) -> Optional[Dict[str, Any]]:
|
|
"""Get the last collected reading.
|
|
|
|
Returns:
|
|
Last reading dict with timestamp, or None if not yet collected
|
|
"""
|
|
return self._last_reading
|
|
|
|
async def collect(self) -> Dict[str, Any]:
|
|
"""Collect monitoring data and store as last reading."""
|
|
data = await self._collect_metrics()
|
|
if data:
|
|
# Add collection timestamp
|
|
import time
|
|
data['_timestamp'] = time.time()
|
|
self._last_reading = data
|
|
return data
|
|
|
|
@abstractmethod
|
|
async def _collect_metrics(self) -> Dict[str, Any]:
|
|
"""Internal method to perform actual metric collection.
|
|
|
|
Override this method instead of collect() for MonitorPlugins.
|
|
"""
|
|
pass
|
|
|
|
|
|
class PluginRegistry:
|
|
"""Registry for managing loaded plugins.
|
|
|
|
Maintains a collection of loaded plugins and provides methods to
|
|
query plugins by name, type, or interval.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._plugins: Dict[str, Plugin] = {}
|
|
self.logger = logging.getLogger("plugin.registry")
|
|
|
|
def register(self, plugin: Plugin) -> bool:
|
|
"""Register a plugin instance.
|
|
|
|
Args:
|
|
plugin: Plugin instance to register
|
|
|
|
Returns:
|
|
True if registered successfully, False if name conflict
|
|
"""
|
|
if plugin.name in self._plugins:
|
|
self.logger.error(f"Plugin '{plugin.name}' already registered")
|
|
return False
|
|
|
|
self._plugins[plugin.name] = plugin
|
|
self.logger.info(f"Registered plugin: {plugin.name} v{plugin.version}")
|
|
return True
|
|
|
|
def unregister(self, name: str) -> bool:
|
|
"""Unregister a plugin by name.
|
|
|
|
Args:
|
|
name: Plugin name to unregister
|
|
|
|
Returns:
|
|
True if unregistered, False if not found
|
|
"""
|
|
if name in self._plugins:
|
|
del self._plugins[name]
|
|
self.logger.info(f"Unregistered plugin: {name}")
|
|
return True
|
|
return False
|
|
|
|
def get(self, name: str) -> Optional[Plugin]:
|
|
"""Get plugin by name.
|
|
|
|
Args:
|
|
name: Plugin name
|
|
|
|
Returns:
|
|
Plugin instance or None if not found
|
|
"""
|
|
return self._plugins.get(name)
|
|
|
|
def get_all(self) -> List[Plugin]:
|
|
"""Get all registered plugins."""
|
|
return list(self._plugins.values())
|
|
|
|
def get_enabled(self) -> List[Plugin]:
|
|
"""Get all enabled plugins."""
|
|
return [p for p in self._plugins.values() if p.enabled]
|
|
|
|
def get_by_type(self, plugin_type: Type[Plugin]) -> List[Plugin]:
|
|
"""Get all plugins of a specific type.
|
|
|
|
Args:
|
|
plugin_type: Plugin class (InfoPlugin or MonitorPlugin)
|
|
|
|
Returns:
|
|
List of plugins matching the type
|
|
"""
|
|
return [p for p in self._plugins.values() if isinstance(p, plugin_type)]
|
|
|
|
def get_by_interval(self, interval: int) -> List[Plugin]:
|
|
"""Get all plugins with a specific collection interval.
|
|
|
|
Args:
|
|
interval: Interval in seconds (0 for one-time collection)
|
|
|
|
Returns:
|
|
List of plugins with matching interval
|
|
"""
|
|
return [p for p in self._plugins.values() if p.interval == interval]
|
|
|
|
|
|
class PluginLoader:
|
|
"""Load plugins from filesystem and instantiate them.
|
|
|
|
Scans plugin directories for Python modules containing Plugin subclasses,
|
|
loads them dynamically, and registers them with the PluginRegistry.
|
|
"""
|
|
|
|
def __init__(self, registry: PluginRegistry):
|
|
self.registry = registry
|
|
self.logger = logging.getLogger("plugin.loader")
|
|
self._loaded_modules: Dict[str, Any] = {}
|
|
|
|
async def load_from_directory(
|
|
self,
|
|
directory: Path,
|
|
config: Optional[Dict[str, Any]] = None
|
|
) -> int:
|
|
"""Load all plugins from a directory.
|
|
|
|
Scans for .py files, imports them, finds Plugin subclasses,
|
|
instantiates them with config, initializes, and registers.
|
|
|
|
Args:
|
|
directory: Path to plugin directory
|
|
config: Configuration dict (may contain per-plugin config)
|
|
|
|
Returns:
|
|
Number of plugins successfully loaded
|
|
"""
|
|
if not directory.exists() or not directory.is_dir():
|
|
self.logger.warning(f"Plugin directory not found: {directory}")
|
|
return 0
|
|
|
|
loaded_count = 0
|
|
raw_config = config or {}
|
|
# Per-plugin config lives under the 'plugins' key or at top-level.
|
|
# CLIENT_DEFAULTS seeds "plugins": {} so the key always exists; check
|
|
# both the subdict and top-level so that either layout in .hbc.yaml works.
|
|
plugins_subconfig = raw_config.get("plugins", {})
|
|
|
|
# Scan for Python files
|
|
for plugin_file in directory.glob("*.py"):
|
|
if plugin_file.name.startswith("_"):
|
|
continue # Skip __init__.py and private modules
|
|
|
|
self.logger.debug(f"Processing plugin file: {plugin_file.name}")
|
|
|
|
try:
|
|
# Load module dynamically
|
|
module_name = f"plugins.{plugin_file.stem}"
|
|
spec = importlib.util.spec_from_file_location(module_name, plugin_file)
|
|
if not spec or not spec.loader:
|
|
self.logger.warning(f"Could not create spec for {plugin_file}")
|
|
continue
|
|
|
|
module = importlib.util.module_from_spec(spec)
|
|
sys.modules[module_name] = module
|
|
spec.loader.exec_module(module)
|
|
self._loaded_modules[module_name] = module
|
|
|
|
self.logger.debug(f"Loaded module: {module_name}")
|
|
|
|
# Track which plugin classes we've already processed to avoid duplicates
|
|
processed_classes = set()
|
|
|
|
# Find Plugin subclasses in module
|
|
for name, obj in inspect.getmembers(module, inspect.isclass):
|
|
# Skip base classes and non-Plugin classes
|
|
if obj in (Plugin, InfoPlugin, MonitorPlugin):
|
|
self.logger.debug(f"Skipping base class: {name}")
|
|
continue
|
|
if not issubclass(obj, Plugin):
|
|
self.logger.debug(f"Skipping non-Plugin class: {name}")
|
|
continue
|
|
|
|
# Skip if we've already processed this class (handles module-level aliases)
|
|
if id(obj) in processed_classes:
|
|
self.logger.debug(f"Skipping duplicate reference to: {obj.__name__}")
|
|
continue
|
|
processed_classes.add(id(obj))
|
|
|
|
self.logger.debug(f"Found plugin class: {name}")
|
|
|
|
# Instantiate plugin with config — check plugins subdict first,
|
|
# then top-level keys (e.g. nagios_runner: ... at root of config).
|
|
plugin_instance_config = plugins_subconfig.get(obj.name) or raw_config.get(obj.name, {})
|
|
plugin = obj(config=plugin_instance_config)
|
|
|
|
# Initialize plugin
|
|
try:
|
|
initialized = await plugin.initialize()
|
|
if not initialized:
|
|
if plugin.skip_reason:
|
|
self.logger.info(
|
|
f"Plugin {plugin.name} skipped: {plugin.skip_reason}"
|
|
)
|
|
else:
|
|
self.logger.warning(
|
|
f"Plugin {plugin.name} failed initialization, skipping"
|
|
)
|
|
continue
|
|
except Exception as e:
|
|
self.logger.error(
|
|
f"Error initializing plugin {plugin.name}: {e}",
|
|
exc_info=True
|
|
)
|
|
continue
|
|
|
|
# Register with registry
|
|
if self.registry.register(plugin):
|
|
loaded_count += 1
|
|
self.logger.info(
|
|
f"Loaded plugin: {plugin.name} v{plugin.version} "
|
|
f"(interval: {plugin.interval}s)"
|
|
)
|
|
|
|
except Exception as e:
|
|
self.logger.error(
|
|
f"Error loading plugin from {plugin_file}: {e}",
|
|
exc_info=True
|
|
)
|
|
|
|
return loaded_count
|
|
|
|
async def unload_all(self) -> None:
|
|
"""Unload all plugins and cleanup resources."""
|
|
for plugin in self.registry.get_all():
|
|
try:
|
|
await plugin.cleanup()
|
|
except Exception as e:
|
|
self.logger.error(
|
|
f"Error cleaning up plugin {plugin.name}: {e}",
|
|
exc_info=True
|
|
)
|
|
self.registry.unregister(plugin.name)
|
|
|
|
# Remove loaded modules
|
|
for module_name in self._loaded_modules:
|
|
if module_name in sys.modules:
|
|
del sys.modules[module_name]
|
|
self._loaded_modules.clear()
|