Files
heartbeat/hbd/client/plugins/network_monitor.py
T
Andreas Wrede 0543266c92 Major refactoring of the codebase, including restructuring of files and directories, renaming of modules and classes, and improvements to the overall organization and readability of the code. This refactoring aims to enhance maintainability, scalability, and clarity of the codebase while preserving existing functionality. The changes include:
- Restructuring of the project directory into client and server components
- Renaming of modules and classes to better reflect their purpose and functionality
- Moving common utilities and configurations to a shared location
- Updating import statements to reflect the new structure
- Adding new documentation files for better clarity on various aspects of the project
- Removing deprecated or unused code to streamline the codebase
- Ensuring that all existing functionality is preserved and that the codebase remains functional after the refactoring.
2026-03-29 11:13:40 -04:00

241 lines
9.4 KiB
Python

"""
Network monitoring plugin for Heartbeat.
Collects network interface statistics and connection information using psutil.
"""
import logging
from typing import Dict, Any, Optional, List
try:
import psutil
except ImportError:
psutil = None
from hbd.client.plugin import MonitorPlugin
logger = logging.getLogger(__name__)
class NetworkMonitorPlugin(MonitorPlugin):
"""
Monitor network interface statistics and connections.
Collects:
- Network interface I/O counters (bytes sent/received, packets, errors, drops)
- Per-interface statistics
- Network connection counts by state
- Interface addresses and configuration
Configuration:
interval: Collection interval in seconds (default: 300)
interfaces: List of interfaces to monitor (default: all)
include_connections: Include connection statistics (default: True)
include_addresses: Include interface addresses (default: False)
"""
name = "network_monitor"
interval = 300 # Collect every 5 minutes by default
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize the network monitor plugin.
Args:
config: Optional configuration dict with keys:
- interval: Collection interval in seconds (default: 300)
- interfaces: List of specific interfaces to monitor
- include_connections: Include connection stats (default: True)
- include_addresses: Include interface addresses (default: False)
"""
super().__init__(config)
self.interfaces = self.config.get('interfaces', None) # None = all interfaces
self.include_connections = self.config.get('include_connections', True)
self.include_addresses = self.config.get('include_addresses', False)
self.interval = self.config.get('interval', 300)
if psutil is None:
raise ImportError("psutil library is required for network_monitor plugin")
# Store previous I/O counters for delta calculation
self._prev_io = {}
async def initialize(self):
"""Initialize the plugin (check psutil availability)."""
if psutil is None:
logger.error("psutil not available - network_monitor cannot run")
return False
logger.info(f"Network monitor initialized (interval: {self.interval}s, "
f"connections: {self.include_connections})")
# Initialize I/O counters
try:
self._prev_io = psutil.net_io_counters(pernic=True)
except Exception as e:
logger.warning(f"Could not initialize network I/O counters: {e}")
return True
async def collect(self) -> Dict[str, Any]:
"""
Collect current network statistics.
Returns:
Dictionary with network metrics:
- interfaces: Dict of interface statistics, keyed by interface name
- bytes_sent: Total bytes sent
- bytes_recv: Total bytes received
- packets_sent: Total packets sent
- packets_recv: Total packets received
- errin: Total incoming errors
- errout: Total outgoing errors
- dropin: Total incoming packets dropped
- dropout: Total outgoing packets dropped
- bytes_sent_delta: Bytes sent since last collection
- bytes_recv_delta: Bytes received since last collection
- packets_sent_delta: Packets sent since last collection
- packets_recv_delta: Packets received since last collection
- connections: Connection statistics by state (if include_connections)
- ESTABLISHED: Count of established connections
- LISTEN: Count of listening sockets
- TIME_WAIT: Count of TIME_WAIT connections
- etc.
- addresses: Interface address information (if include_addresses)
- Dict keyed by interface name with address details
"""
if psutil is None:
logger.error("psutil not available")
return {}
try:
data = await self._collect_metrics()
logger.debug(f"Collected network metrics: {len(data.get('interfaces', {}))} interfaces")
return data
except Exception as e:
logger.error(f"Error collecting network metrics: {e}")
return {"error": str(e)}
async def _collect_metrics(self) -> Dict[str, Any]:
"""Collect network metrics from psutil."""
metrics = {}
# Collect per-interface I/O counters
try:
io_counters = psutil.net_io_counters(pernic=True)
interfaces_data = {}
for iface_name, counters in io_counters.items():
# Skip if we're only monitoring specific interfaces
if self.interfaces and iface_name not in self.interfaces:
continue
iface_stats = {
'bytes_sent': counters.bytes_sent,
'bytes_recv': counters.bytes_recv,
'packets_sent': counters.packets_sent,
'packets_recv': counters.packets_recv,
'errin': counters.errin,
'errout': counters.errout,
'dropin': counters.dropin,
'dropout': counters.dropout,
}
# Calculate deltas from previous collection
if iface_name in self._prev_io:
prev = self._prev_io[iface_name]
iface_stats['bytes_sent_delta'] = counters.bytes_sent - prev.bytes_sent
iface_stats['bytes_recv_delta'] = counters.bytes_recv - prev.bytes_recv
iface_stats['packets_sent_delta'] = counters.packets_sent - prev.packets_sent
iface_stats['packets_recv_delta'] = counters.packets_recv - prev.packets_recv
interfaces_data[iface_name] = iface_stats
metrics['interfaces'] = interfaces_data
# Store current counters for next delta calculation
self._prev_io = io_counters
except Exception as e:
logger.warning(f"Could not collect network I/O counters: {e}")
# Collect connection statistics
if self.include_connections:
try:
connections = psutil.net_connections(kind='inet')
conn_stats = {}
# Count connections by state
for conn in connections:
state = conn.status
conn_stats[state] = conn_stats.get(state, 0) + 1
metrics['connections'] = conn_stats
except (PermissionError, psutil.AccessDenied):
logger.debug("Permission denied for net_connections (requires root/admin)")
except Exception as e:
logger.warning(f"Could not collect connection statistics: {e}")
# Collect interface addresses
if self.include_addresses:
try:
addresses = psutil.net_if_addrs()
addr_data = {}
for iface_name, addrs in addresses.items():
# Skip if we're only monitoring specific interfaces
if self.interfaces and iface_name not in self.interfaces:
continue
iface_addrs = []
for addr in addrs:
addr_info = {
'family': str(addr.family),
'address': addr.address,
}
if addr.netmask:
addr_info['netmask'] = addr.netmask
if addr.broadcast:
addr_info['broadcast'] = addr.broadcast
iface_addrs.append(addr_info)
addr_data[iface_name] = iface_addrs
metrics['addresses'] = addr_data
except Exception as e:
logger.warning(f"Could not collect interface addresses: {e}")
# Add interface stats (up/down status, speed, mtu)
try:
if_stats = psutil.net_if_stats()
stats_data = {}
for iface_name, stats in if_stats.items():
# Skip if we're only monitoring specific interfaces
if self.interfaces and iface_name not in self.interfaces:
continue
stats_data[iface_name] = {
'isup': stats.isup,
'duplex': str(stats.duplex) if hasattr(stats, 'duplex') else None,
'speed': stats.speed,
'mtu': stats.mtu,
}
metrics['interface_stats'] = stats_data
except Exception as e:
logger.warning(f"Could not collect interface stats: {e}")
return metrics
async def cleanup(self):
"""Cleanup (nothing to do for this plugin)."""
logger.info("Network monitor cleanup")
# Plugin instance for automatic discovery
plugin = NetworkMonitorPlugin