""" Disk monitoring plugin for Heartbeat. Collects disk usage and I/O statistics using psutil. """ import logging from typing import Dict, Any, Optional, List try: import psutil except ImportError: psutil = None from hbd.client.plugin import MonitorPlugin logger = logging.getLogger(__name__) class DiskMonitorPlugin(MonitorPlugin): """ Monitor disk usage and I/O statistics. Collects: - Disk partition information - Disk usage per partition (total, used, free, percent) - Disk I/O counters (read/write bytes, read/write count) - Disk I/O time statistics Configuration: interval: Collection interval in seconds (default: 300) partitions: List of mount points to monitor (default: all) include_io: Include disk I/O statistics (default: True) exclude_types: List of filesystem types to exclude (default: tmpfs, devtmpfs, squashfs) """ name = "disk_monitor" interval = 300 # Collect every 5 minutes by default def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize the disk monitor plugin. Args: config: Optional configuration dict with keys: - interval: Collection interval in seconds (default: 300) - partitions: List of specific mount points to monitor - include_io: Include I/O statistics (default: True) - exclude_types: List of filesystem types to exclude """ super().__init__(config) self.partitions = self.config.get('partitions', None) # None = all partitions self.include_io = self.config.get('include_io', True) self.exclude_types = set(self.config.get('exclude_types', ['tmpfs', 'devtmpfs', 'squashfs'])) self.interval = self.config.get('interval', 300) if psutil is None: raise ImportError("psutil library is required for disk_monitor plugin") # Store previous I/O counters for delta calculation self._prev_io = {} async def initialize(self): """Initialize the plugin (check psutil availability).""" if psutil is None: logger.error("psutil not available - disk_monitor cannot run") return False logger.info(f"Disk monitor initialized (interval: {self.interval}s, io: {self.include_io})") # Initialize I/O counters if available if self.include_io: try: self._prev_io = psutil.disk_io_counters(perdisk=True) except Exception as e: logger.warning(f"Could not initialize disk I/O counters: {e}") return True async def collect(self) -> Dict[str, Any]: """ Collect current disk statistics. Returns: Dictionary with disk metrics organized by partition: - partitions: Dict of partition data, keyed by mount point - device: Device name (e.g., /dev/sda1) - fstype: Filesystem type (e.g., ext4) - total: Total space in bytes - used: Used space in bytes - free: Free space in bytes - percent: Usage percentage - io_counters: Dict of I/O statistics, keyed by disk name (if include_io) - read_count: Number of reads - write_count: Number of writes - read_bytes: Bytes read - write_bytes: Bytes written - read_time: Time spent reading in ms - write_time: Time spent writing in ms - read_bytes_delta: Bytes read since last collection - write_bytes_delta: Bytes written since last collection """ if psutil is None: logger.error("psutil not available") return {} try: data = await self._collect_metrics() logger.debug(f"Collected disk metrics: {len(data.get('partitions', {}))} partitions") return data except Exception as e: logger.error(f"Error collecting disk metrics: {e}") return {"error": str(e)} async def _collect_metrics(self) -> Dict[str, Any]: """Collect disk metrics from psutil.""" metrics = {} # Collect partition usage partitions_data = {} partitions = psutil.disk_partitions(all=False) for partition in partitions: # Skip unwanted filesystem types if partition.fstype in self.exclude_types: continue # Skip if we're only monitoring specific partitions if self.partitions and partition.mountpoint not in self.partitions: continue try: usage = psutil.disk_usage(partition.mountpoint) partitions_data[partition.mountpoint] = { 'device': partition.device, 'fstype': partition.fstype, 'total': usage.total, 'used': usage.used, 'free': usage.free, 'percent': usage.percent } except PermissionError: logger.debug(f"Permission denied accessing {partition.mountpoint}") continue except Exception as e: logger.warning(f"Error reading {partition.mountpoint}: {e}") continue metrics['partitions'] = partitions_data # Collect I/O statistics if self.include_io: try: io_counters = psutil.disk_io_counters(perdisk=True) io_data = {} for disk_name, counters in io_counters.items(): disk_stats = { 'read_count': counters.read_count, 'write_count': counters.write_count, 'read_bytes': counters.read_bytes, 'write_bytes': counters.write_bytes, } # Add time statistics if available if hasattr(counters, 'read_time'): disk_stats['read_time'] = counters.read_time if hasattr(counters, 'write_time'): disk_stats['write_time'] = counters.write_time if hasattr(counters, 'busy_time'): disk_stats['busy_time'] = counters.busy_time # Calculate deltas from previous collection if disk_name in self._prev_io: prev = self._prev_io[disk_name] disk_stats['read_bytes_delta'] = counters.read_bytes - prev.read_bytes disk_stats['write_bytes_delta'] = counters.write_bytes - prev.write_bytes disk_stats['read_count_delta'] = counters.read_count - prev.read_count disk_stats['write_count_delta'] = counters.write_count - prev.write_count io_data[disk_name] = disk_stats metrics['io_counters'] = io_data # Store current counters for next delta calculation self._prev_io = io_counters except Exception as e: logger.warning(f"Could not collect disk I/O statistics: {e}") return metrics async def cleanup(self): """Cleanup (nothing to do for this plugin).""" logger.info("Disk monitor cleanup") # Plugin instance for automatic discovery plugin = DiskMonitorPlugin