6282077fe0
- Use `is not None` for pathconf values so 0 is not silently dropped - Broaden connectivity prefix check to catch bare "connectivity" key Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
169 lines
6.1 KiB
Python
169 lines
6.1 KiB
Python
"""
|
|
Filesystem information plugin for Heartbeat.
|
|
|
|
Collects static filesystem and partition information using psutil.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, Any, Optional
|
|
|
|
try:
|
|
import psutil
|
|
except ImportError:
|
|
psutil = None
|
|
|
|
from hbd.client.plugin import InfoPlugin
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FilesystemInfoPlugin(InfoPlugin):
|
|
"""
|
|
Collect filesystem and partition information.
|
|
|
|
This is an InfoPlugin that collects static information once during startup.
|
|
|
|
By default, only reports physical mounted filesystems (e.g., ext4, xfs, btrfs).
|
|
Set include_pseudo=True to also include pseudo filesystems (proc, sysfs, tmpfs, etc.).
|
|
|
|
Collects:
|
|
- List of mounted filesystems
|
|
- Partition details (device, mount point, filesystem type, options)
|
|
- Filesystem capabilities and features
|
|
|
|
Configuration:
|
|
include_pseudo: Include pseudo/virtual filesystems (default: False)
|
|
exclude_types: List of additional filesystem types to exclude (default: [])
|
|
"""
|
|
|
|
name = "filesystem_info"
|
|
interval = 0 # InfoPlugin - collect once
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
"""
|
|
Initialize the filesystem info plugin.
|
|
|
|
Args:
|
|
config: Optional configuration dict with keys:
|
|
- include_pseudo: Include pseudo/virtual filesystems (default: False)
|
|
- exclude_types: List of filesystem types to exclude (default: [])
|
|
"""
|
|
super().__init__(config)
|
|
self.include_pseudo = self.config.get('include_pseudo', False)
|
|
# By default, no exclusions since all=False filters most pseudo filesystems
|
|
# Users can add specific types to exclude if needed
|
|
self.exclude_types = set(self.config.get('exclude_types', []))
|
|
|
|
if psutil is None:
|
|
raise ImportError("psutil library is required for filesystem_info plugin")
|
|
|
|
async def initialize(self):
|
|
"""Initialize the plugin (check psutil availability)."""
|
|
if psutil is None:
|
|
logger.error("psutil not available - filesystem_info cannot run")
|
|
return False
|
|
|
|
logger.info(f"Filesystem info initialized (pseudo: {self.include_pseudo})")
|
|
return True
|
|
|
|
async def collect(self) -> Dict[str, Any]:
|
|
"""
|
|
Collect filesystem information.
|
|
|
|
Returns only physical mounted filesystems by default.
|
|
|
|
Returns:
|
|
Dictionary with filesystem data:
|
|
- filesystems: List of filesystem dictionaries:
|
|
- device: Device name (e.g., /dev/sda1)
|
|
- mountpoint: Mount point path
|
|
- fstype: Filesystem type (e.g., ext4, xfs, btrfs)
|
|
- opts: Mount options (comma-separated string)
|
|
- maxfile: Maximum filename length
|
|
- maxpath: Maximum path length
|
|
- filesystem_types: List of unique filesystem types found
|
|
- mount_count: Total number of mounted filesystems
|
|
"""
|
|
if psutil is None:
|
|
logger.error("psutil not available")
|
|
return {}
|
|
|
|
try:
|
|
data = await self._collect_info()
|
|
logger.info(f"Collected filesystem info: {len(data.get('filesystems', []))} filesystems")
|
|
return data
|
|
except Exception as e:
|
|
logger.error(f"Error collecting filesystem info: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def _collect_info(self) -> Dict[str, Any]:
|
|
"""Collect filesystem information from psutil."""
|
|
info = {}
|
|
filesystems = []
|
|
filesystem_types = set()
|
|
|
|
# Get mounted disk partitions
|
|
# all=False returns only physical devices (real mounted filesystems)
|
|
# all=True would include pseudo filesystems (proc, sysfs, etc.)
|
|
partitions = psutil.disk_partitions(all=self.include_pseudo)
|
|
|
|
for partition in partitions:
|
|
# Additional filtering if exclude_types is specified
|
|
if partition.fstype in self.exclude_types:
|
|
continue
|
|
|
|
fs_info = {
|
|
'device': partition.device,
|
|
'mountpoint': partition.mountpoint,
|
|
'fstype': partition.fstype,
|
|
'opts': partition.opts,
|
|
}
|
|
|
|
# Try to get filesystem capabilities
|
|
try:
|
|
# Get path configuration for this mount point
|
|
import os
|
|
if hasattr(os, 'pathconf'):
|
|
try:
|
|
# Maximum filename length
|
|
max_name = os.pathconf(partition.mountpoint, 'PC_NAME_MAX')
|
|
if max_name is not None:
|
|
fs_info['maxfile'] = max_name
|
|
except (OSError, ValueError):
|
|
pass
|
|
|
|
try:
|
|
# Maximum path length
|
|
max_path = os.pathconf(partition.mountpoint, 'PC_PATH_MAX')
|
|
if max_path is not None:
|
|
fs_info['maxpath'] = max_path
|
|
except (OSError, ValueError):
|
|
pass
|
|
except Exception as e:
|
|
logger.debug(f"Could not get pathconf for {partition.mountpoint}: {e}")
|
|
|
|
filesystems.append(fs_info)
|
|
filesystem_types.add(partition.fstype)
|
|
|
|
info['filesystems'] = filesystems
|
|
info['filesystem_types'] = sorted(list(filesystem_types))
|
|
info['mount_count'] = len(filesystems)
|
|
|
|
# Add some additional filesystem statistics
|
|
try:
|
|
# Get boot time (useful for determining filesystem mount times)
|
|
boot_time = psutil.boot_time()
|
|
info['boot_time'] = boot_time
|
|
except Exception as e:
|
|
logger.debug(f"Could not get boot time: {e}")
|
|
|
|
return info
|
|
|
|
async def cleanup(self):
|
|
"""Cleanup (nothing to do for this plugin)."""
|
|
logger.info("Filesystem info cleanup")
|
|
|
|
|
|
# Plugin instance for automatic discovery
|
|
plugin = FilesystemInfoPlugin
|