""" Network monitoring plugin for Heartbeat. Collects network interface statistics and connection information using psutil. """ import logging from typing import Dict, Any, Optional, List try: import psutil except ImportError: psutil = None from hbd.client.plugin import MonitorPlugin logger = logging.getLogger(__name__) class NetworkMonitorPlugin(MonitorPlugin): """ Monitor network interface statistics and connections. Collects: - Network interface I/O counters (bytes sent/received, packets, errors, drops) - Per-interface statistics - Network connection counts by state - Interface addresses and configuration Configuration: interval: Collection interval in seconds (default: 300) interfaces: List of interfaces to monitor (default: all) include_connections: Include connection statistics (default: True) include_addresses: Include interface addresses (default: False) """ name = "network_monitor" interval = 300 # Collect every 5 minutes by default def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize the network monitor plugin. Args: config: Optional configuration dict with keys: - interval: Collection interval in seconds (default: 300) - interfaces: List of specific interfaces to monitor - include_connections: Include connection stats (default: True) - include_addresses: Include interface addresses (default: False) """ super().__init__(config) self.interfaces = self.config.get('interfaces', None) # None = all interfaces self.include_connections = self.config.get('include_connections', True) self.include_addresses = self.config.get('include_addresses', False) self.interval = self.config.get('interval', 300) if psutil is None: raise ImportError("psutil library is required for network_monitor plugin") # Store previous I/O counters for delta calculation self._prev_io = {} async def initialize(self): """Initialize the plugin (check psutil availability).""" if psutil is None: logger.error("psutil not available - network_monitor cannot run") return False logger.info(f"Network monitor initialized (interval: {self.interval}s, " f"connections: {self.include_connections})") # Initialize I/O counters try: self._prev_io = psutil.net_io_counters(pernic=True) except Exception as e: logger.warning(f"Could not initialize network I/O counters: {e}") return True async def collect(self) -> Dict[str, Any]: """ Collect current network statistics. Returns: Dictionary with network metrics: - interfaces: Dict of interface statistics, keyed by interface name - bytes_sent: Total bytes sent - bytes_recv: Total bytes received - packets_sent: Total packets sent - packets_recv: Total packets received - errin: Total incoming errors - errout: Total outgoing errors - dropin: Total incoming packets dropped - dropout: Total outgoing packets dropped - bytes_sent_delta: Bytes sent since last collection - bytes_recv_delta: Bytes received since last collection - packets_sent_delta: Packets sent since last collection - packets_recv_delta: Packets received since last collection - connections: Connection statistics by state (if include_connections) - ESTABLISHED: Count of established connections - LISTEN: Count of listening sockets - TIME_WAIT: Count of TIME_WAIT connections - etc. - addresses: Interface address information (if include_addresses) - Dict keyed by interface name with address details """ if psutil is None: logger.error("psutil not available") return {} try: data = await self._collect_metrics() logger.debug(f"Collected network metrics: {len(data.get('interfaces', {}))} interfaces") return data except Exception as e: logger.error(f"Error collecting network metrics: {e}") return {"error": str(e)} async def _collect_metrics(self) -> Dict[str, Any]: """Collect network metrics from psutil.""" metrics = {} # Collect per-interface I/O counters try: io_counters = psutil.net_io_counters(pernic=True) interfaces_data = {} for iface_name, counters in io_counters.items(): # Skip if we're only monitoring specific interfaces if self.interfaces and iface_name not in self.interfaces: continue iface_stats = { 'bytes_sent': counters.bytes_sent, 'bytes_recv': counters.bytes_recv, 'packets_sent': counters.packets_sent, 'packets_recv': counters.packets_recv, 'errin': counters.errin, 'errout': counters.errout, 'dropin': counters.dropin, 'dropout': counters.dropout, } # Calculate deltas from previous collection if iface_name in self._prev_io: prev = self._prev_io[iface_name] iface_stats['bytes_sent_delta'] = counters.bytes_sent - prev.bytes_sent iface_stats['bytes_recv_delta'] = counters.bytes_recv - prev.bytes_recv iface_stats['packets_sent_delta'] = counters.packets_sent - prev.packets_sent iface_stats['packets_recv_delta'] = counters.packets_recv - prev.packets_recv interfaces_data[iface_name] = iface_stats metrics['interfaces'] = interfaces_data # Store current counters for next delta calculation self._prev_io = io_counters except Exception as e: logger.warning(f"Could not collect network I/O counters: {e}") # Collect connection statistics if self.include_connections: try: connections = psutil.net_connections(kind='inet') conn_stats = {} # Count connections by state for conn in connections: state = conn.status conn_stats[state] = conn_stats.get(state, 0) + 1 metrics['connections'] = conn_stats except (PermissionError, psutil.AccessDenied): logger.debug("Permission denied for net_connections (requires root/admin)") except Exception as e: logger.warning(f"Could not collect connection statistics: {e}") # Collect interface addresses if self.include_addresses: try: addresses = psutil.net_if_addrs() addr_data = {} for iface_name, addrs in addresses.items(): # Skip if we're only monitoring specific interfaces if self.interfaces and iface_name not in self.interfaces: continue iface_addrs = [] for addr in addrs: addr_info = { 'family': str(addr.family), 'address': addr.address, } if addr.netmask: addr_info['netmask'] = addr.netmask if addr.broadcast: addr_info['broadcast'] = addr.broadcast iface_addrs.append(addr_info) addr_data[iface_name] = iface_addrs metrics['addresses'] = addr_data except Exception as e: logger.warning(f"Could not collect interface addresses: {e}") # Add interface stats (up/down status, speed, mtu) try: if_stats = psutil.net_if_stats() stats_data = {} for iface_name, stats in if_stats.items(): # Skip if we're only monitoring specific interfaces if self.interfaces and iface_name not in self.interfaces: continue stats_data[iface_name] = { 'isup': stats.isup, 'duplex': str(stats.duplex) if hasattr(stats, 'duplex') else None, 'speed': stats.speed, 'mtu': stats.mtu, } metrics['interface_stats'] = stats_data except Exception as e: logger.warning(f"Could not collect interface stats: {e}") return metrics async def cleanup(self): """Cleanup (nothing to do for this plugin).""" logger.info("Network monitor cleanup") # Plugin instance for automatic discovery plugin = NetworkMonitorPlugin