# Plugin Development Guide This guide explains how to create custom plugins for the Heartbeat monitoring system. ## Table of Contents - [Plugin Architecture](#plugin-architecture) - [Plugin Types](#plugin-types) - [Creating a Plugin](#creating-a-plugin) - [Plugin Lifecycle](#plugin-lifecycle) - [Server-initiated InfoPlugin refresh](#server-initiated-infoplugin-refresh) - [Configuration](#configuration) - [Best Practices](#best-practices) - [Examples](#examples) - [Testing](#testing) ## Plugin Architecture Heartbeat's plugin system is designed to be simple yet powerful. Plugins are Python classes that inherit from one of the base plugin types and implement a few key methods. ### Key Concepts - **Plugin Registry**: Central registry that manages all loaded plugins - **Plugin Loader**: Automatically discovers and loads plugins from the `hbd/plugins/` directory - **Plugin Types**: InfoPlugin (static data) and MonitorPlugin (periodic metrics) - **Async/Await**: All plugin methods are async for non-blocking operation ## Plugin Types ### InfoPlugin InfoPlugins collect static information that doesn't change frequently (OS version, hardware specs, etc.). - **Runs once** at startup (interval = 0) - **Cached** - data is collected once and reused - **Lightweight** - no periodic overhead **Use InfoPlugin for:** - Operating system details - Hardware information - Software versions - Configuration data - Static inventory ### MonitorPlugin MonitorPlugins collect metrics that change over time (CPU usage, memory, network traffic). - **Runs periodically** based on configured interval - **Scheduled** - collected at regular intervals - **Dynamic** - captures changing system state **Use MonitorPlugin for:** - Resource usage (CPU, memory, disk, network) - Performance metrics - Counters and gauges - Time-series data ## Creating a Plugin ### Step 1: Choose Plugin Type Decide whether your plugin collects static information (InfoPlugin) or dynamic metrics (MonitorPlugin). ### Step 2: Create Plugin File Create a new Python file in `hbd/plugins/` directory: ```python """ My awesome plugin for Heartbeat. Brief description of what this plugin does. """ import logging from typing import Dict, Any, Optional # Import psutil or other dependencies if needed try: import psutil except ImportError: psutil = None from hbd.plugin import MonitorPlugin # or InfoPlugin logger = logging.getLogger(__name__) class MyAwesomePlugin(MonitorPlugin): # or InfoPlugin """ One-line description of the plugin. Collects: - List of metrics/data collected - Another metric Configuration: interval: Collection interval in seconds (default: 60) option1: Description of option1 (default: value) option2: Description of option2 (default: value) """ name = "my_awesome_plugin" # Unique plugin name interval = 60 # For MonitorPlugin, use 0 for InfoPlugin def __init__(self, config: Optional[Dict[str, Any]] = None): """Initialize the plugin with optional configuration.""" super().__init__(config) # Extract configuration options self.option1 = self.config.get('option1', 'default_value') self.option2 = self.config.get('option2', True) # Check dependencies if psutil is None: raise ImportError("psutil is required for my_awesome_plugin") async def initialize(self): """ Initialize the plugin. This is called once when the plugin is loaded. Use this to verify dependencies, establish connections, etc. Returns: True if initialization successful, False otherwise """ logger.info(f"My awesome plugin initialized (option1: {self.option1})") return True async def collect(self) -> Dict[str, Any]: """ Collect data. This is called periodically (MonitorPlugin) or once (InfoPlugin). Returns: Dictionary of collected data (will be sent to server) """ try: data = await self._collect_metrics() logger.debug(f"Collected {len(data)} metrics") return data except Exception as e: logger.error(f"Error collecting data: {e}") return {"error": str(e)} async def _collect_metrics(self) -> Dict[str, Any]: """Internal method to collect actual metrics.""" metrics = {} # Collect your data here metrics['metric1'] = self._get_metric1() metrics['metric2'] = self._get_metric2() return metrics def _get_metric1(self): """Helper method for metric collection.""" # Implementation here return 42 def _get_metric2(self): """Helper method for metric collection.""" # Implementation here return "hello" async def cleanup(self): """ Cleanup resources. This is called when the plugin is unloaded or the client shuts down. Use this to close connections, release resources, etc. """ logger.info("My awesome plugin cleanup") # Plugin instance for automatic discovery plugin = MyAwesomePlugin ``` ### Step 3: Test Your Plugin Create a test script to verify your plugin works: ```python #!/usr/bin/env python3 import asyncio import sys from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) from hbd.plugins.my_awesome_plugin import MyAwesomePlugin async def test(): # Create plugin instance plugin = MyAwesomePlugin({'option1': 'test_value'}) # Initialize if not await plugin.initialize(): print("Failed to initialize") return False # Collect data data = await plugin.collect() print(f"Collected data: {data}") # Cleanup await plugin.cleanup() return True if __name__ == '__main__': success = asyncio.run(test()) sys.exit(0 if success else 1) ``` ## Plugin Lifecycle Understanding the plugin lifecycle helps you implement plugins correctly: ``` 1. Plugin Discovery └─> Loader scans hbd/plugins/ directory └─> Finds Python files (except those starting with _) └─> Imports modules 2. Plugin Instantiation └─> Creates instance with configuration └─> __init__() is called 3. Plugin Initialization └─> initialize() is called └─> Plugin verifies dependencies, establishes connections └─> Returns True/False for success/failure 4. Plugin Registration └─> If initialization succeeds, plugin is registered └─> Plugin becomes active 5. Data Collection └─> For InfoPlugin: collect() called once after initialization └─> For MonitorPlugin: collect() called periodically based on interval └─> Data is sent to server via PLG message 6. Plugin Shutdown └─> cleanup() is called └─> Plugin releases resources, closes connections ``` ## Server-initiated InfoPlugin refresh When a heartbeat packet arrives from a host the server has no plugin data for (e.g. after a server restart), the server sets `request_update = 1` in the ACK reply. The client detects this flag and immediately re-runs all InfoPlugins — clearing their cached results first — then resends the data as PLG messages. This means InfoPlugin data will always reach the server as soon as possible without requiring a client restart. No action is needed from plugin authors: the framework handles cache invalidation and re-collection automatically. The lifecycle for this case looks like: ``` Server restarts, host reconnects └─> hbd receives HTB with no existing plugin_data for host └─> hbd sets request_update=1 in ACK Client receives ACK └─> Detects request_update flag └─> Clears _cache on every registered InfoPlugin └─> Calls collect() on each InfoPlugin └─> Sends fresh PLG messages to server ``` If you write an `InfoPlugin` with side effects in `_collect_info()` (opening connections, writing files, etc.), be aware it may be called more than once per client session when this mechanism triggers. ## Configuration ### Plugin-Specific Configuration Plugins receive configuration through the `config` parameter in `__init__`: ```python def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) # Access configuration with defaults self.interval = self.config.get('interval', 60) self.threshold = self.config.get('threshold', 80) self.enabled_features = self.config.get('features', ['feature1', 'feature2']) ``` ### Client Configuration File Users configure plugins in the client configuration YAML: ```yaml plugins: my_awesome_plugin: enabled: true interval: 120 option1: custom_value option2: false ``` ## Best Practices ### 1. Error Handling Always handle errors gracefully: ```python async def collect(self) -> Dict[str, Any]: try: return await self._collect_metrics() except Exception as e: logger.error(f"Error collecting metrics: {e}") return {"error": str(e)} ``` ### 2. Logging Use appropriate log levels: ```python logger.debug("Detailed information for debugging") logger.info("Normal operation messages") logger.warning("Warning messages for unusual but handled situations") logger.error("Error messages for failures") ``` ### 3. Dependencies Check for optional dependencies: ```python try: import some_optional_library except ImportError: some_optional_library = None # Later in __init__: if some_optional_library is None: raise ImportError("some_optional_library is required") ``` ### 4. Performance - Keep collection methods fast (< 1 second) - Use async/await for I/O operations - Cache expensive computations - Don't block the event loop ### 5. Data Structure Return clean, structured data: ```python { 'metric_name': value, 'nested_data': { 'sub_metric': value }, 'list_data': [item1, item2], 'timestamp': time.time() # Optional timestamp } ``` ### 6. Documentation Document your plugin thoroughly: - Class docstring with description and configuration - Method docstrings explaining purpose and return values - Inline comments for complex logic ## Examples ### Example 1: Simple InfoPlugin ```python from hbd.plugin import InfoPlugin import platform class SimpleInfoPlugin(InfoPlugin): """Collect basic system information.""" name = "simple_info" interval = 0 # InfoPlugin async def initialize(self): return True async def collect(self) -> Dict[str, Any]: return { 'hostname': platform.node(), 'system': platform.system(), 'python_version': platform.python_version() } async def cleanup(self): pass plugin = SimpleInfoPlugin ``` ### Example 2: MonitorPlugin with State ```python from hbd.plugin import MonitorPlugin import time class CounterPlugin(MonitorPlugin): """Track a counter over time.""" name = "counter" interval = 30 def __init__(self, config=None): super().__init__(config) self._counter = 0 self._start_time = time.time() async def initialize(self): return True async def collect(self) -> Dict[str, Any]: self._counter += 1 uptime = time.time() - self._start_time return { 'count': self._counter, 'uptime': uptime, 'rate': self._counter / uptime } async def cleanup(self): pass plugin = CounterPlugin ``` ### Example 3: Plugin with External Command ```python from hbd.plugin import MonitorPlugin import asyncio class CommandPlugin(MonitorPlugin): """Execute external command and capture output.""" name = "command_executor" interval = 60 def __init__(self, config=None): super().__init__(config) self.command = self.config.get('command', 'echo "no command"') async def initialize(self): return True async def collect(self) -> Dict[str, Any]: try: process = await asyncio.create_subprocess_shell( self.command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=30 ) return { 'exit_code': process.returncode, 'stdout': stdout.decode('utf-8'), 'stderr': stderr.decode('utf-8') } except Exception as e: return {'error': str(e)} async def cleanup(self): pass plugin = CommandPlugin ``` ## Testing ### Unit Testing Create unit tests for your plugins: ```python import unittest import asyncio class TestMyPlugin(unittest.TestCase): def setUp(self): self.plugin = MyAwesomePlugin({'option1': 'test'}) def test_initialization(self): result = asyncio.run(self.plugin.initialize()) self.assertTrue(result) def test_collection(self): asyncio.run(self.plugin.initialize()) data = asyncio.run(self.plugin.collect()) self.assertIsInstance(data, dict) self.assertIn('metric1', data) self.assertGreater(data['metric1'], 0) def tearDown(self): asyncio.run(self.plugin.cleanup()) if __name__ == '__main__': unittest.main() ``` ### Integration Testing Test your plugin with the actual client: ```bash # Create test configuration cat > test_config.yaml < 0 for MonitorPlugin) 2. Verify `collect()` returns a dictionary 3. Check for exceptions in `collect()` method 4. Enable DEBUG logging to see detailed errors ### Data isn't appearing on server 1. Verify client is connected to server 2. Check server logs for PLG message handling 3. Verify returned data is JSON-serializable 4. Check for large data sizes (may exceed UDP packet size) ## Further Reading - [Plugin Framework Source](../hbd/plugin.py) - Core plugin implementation - [Built-in Plugins](../hbd/plugins/) - Examples of working plugins - [Nagios Integration](NAGIOS_INTEGRATION.md) - Running external plugins - [Configuration Guide](../hbd/config_example.yaml) - Full configuration reference