""" Journal logging for heartbeat messages. Provides size-based rotating log files for all received heartbeat messages. Messages are logged in JSON format for easy parsing and analysis. """ import json import logging import os import asyncio from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional logger = logging.getLogger(__name__) class MessageJournal: """ Journal logger for heartbeat messages with size-based rotation. Features: - Logs all received messages in JSON format - Automatic rotation when file size exceeds threshold - Keeps configurable number of rotated logs - Thread-safe and async-safe operation - Configurable log directory and file naming Configuration: journal_dir: Directory for journal files (default: /var/log/heartbeat/) journal_file: Base filename (default: messages.journal) max_size: Maximum file size in bytes before rotation (default: 100MB) max_backups: Number of backup files to keep (default: 10) enabled: Enable/disable journaling (default: True) """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize the message journal. Args: config: Configuration dictionary with journal settings """ self.config = config or {} # Configuration options self.journal_dir = Path(self.config.get('journal_dir', '/var/log/heartbeat')) self.journal_file = self.config.get('journal_file', 'messages.journal') self.max_size = self.config.get('journal_max_size', 100 * 1024 * 1024) # 100MB default self.max_backups = self.config.get('journal_max_backups', 10) self.enabled = self.config.get('journal_enabled', True) # Runtime state self._file_handle = None self._current_size = 0 self._lock = asyncio.Lock() self._initialized = False # Full path to current journal file self.journal_path = self.journal_dir / self.journal_file async def initialize(self) -> bool: """ Initialize the journal. Creates journal directory if needed and opens the journal file. Returns: True if initialization successful, False otherwise """ if not self.enabled: logger.info("Message journal disabled in configuration") return True try: # Create journal directory if it doesn't exist self.journal_dir.mkdir(parents=True, exist_ok=True) # Open journal file in append mode self._file_handle = open(self.journal_path, 'a', encoding='utf-8') # Get current file size try: self._current_size = os.path.getsize(self.journal_path) except OSError: self._current_size = 0 self._initialized = True logger.info(f"Message journal initialized: {self.journal_path} " f"(current size: {self._current_size:,} bytes, " f"max: {self.max_size:,} bytes)") return True except Exception as e: logger.error(f"Failed to initialize message journal: {e}") self.enabled = False return False async def log_message( self, msg: Dict[str, Any], addr: tuple, timestamp: Optional[float] = None ): """ Log a received message to the journal. Args: msg: Parsed message dictionary addr: Source address (ip, port) tuple timestamp: Message timestamp (defaults to current time) """ if not self.enabled or not self._initialized: return # Skip HTB (heartbeat) messages - too verbose msg_id = msg.get('ID', '') if msg_id == 'HTB': return async with self._lock: try: # Prepare journal entry if timestamp is None: import time timestamp = time.time() entry = { 'timestamp': timestamp, 'datetime': datetime.fromtimestamp(timestamp).isoformat(), 'source_ip': addr[0] if isinstance(addr, (tuple, list)) else str(addr), 'source_port': addr[1] if isinstance(addr, (tuple, list)) and len(addr) > 1 else None, 'message': msg } # Serialize to JSON (one line per entry) json_line = json.dumps(entry, separators=(',', ':')) + '\n' json_bytes = json_line.encode('utf-8') # Check if rotation is needed if self._current_size + len(json_bytes) > self.max_size: await self._rotate() # Write to journal if self._file_handle: self._file_handle.write(json_line) self._file_handle.flush() # Ensure data is written self._current_size += len(json_bytes) logger.debug(f"Logged message from {addr[0]}: {msg.get('ID', 'UNKNOWN')}") except Exception as e: logger.error(f"Error writing to journal: {e}") async def _rotate(self): """ Rotate the journal file. Renames current file with timestamp, opens new file, and removes old backups exceeding max_backups limit. """ try: # Close current file if self._file_handle: self._file_handle.close() self._file_handle = None # Generate backup filename with timestamp timestamp_str = datetime.now().strftime('%Y%m%d-%H%M%S') backup_name = f"{self.journal_file}.{timestamp_str}" backup_path = self.journal_dir / backup_name # Rename current file to backup if self.journal_path.exists(): self.journal_path.rename(backup_path) logger.info(f"Rotated journal: {backup_path} " f"(size: {self._current_size:,} bytes)") # Open new journal file self._file_handle = open(self.journal_path, 'a', encoding='utf-8') self._current_size = 0 # Clean up old backups await self._cleanup_old_backups() except Exception as e: logger.error(f"Error rotating journal: {e}") # Try to reopen the file even if rotation failed try: self._file_handle = open(self.journal_path, 'a', encoding='utf-8') except Exception as e2: logger.error(f"Failed to reopen journal after rotation error: {e2}") self.enabled = False async def _cleanup_old_backups(self): """ Remove old backup files exceeding max_backups limit. Keeps only the most recent backups based on filename (which includes timestamp). """ try: # Find all backup files backup_pattern = f"{self.journal_file}.*" backup_files = sorted(self.journal_dir.glob(backup_pattern)) # Remove oldest backups if we have too many if len(backup_files) > self.max_backups: files_to_remove = backup_files[:len(backup_files) - self.max_backups] for backup_file in files_to_remove: try: backup_file.unlink() logger.info(f"Removed old backup: {backup_file.name}") except Exception as e: logger.warning(f"Failed to remove old backup {backup_file}: {e}") except Exception as e: logger.error(f"Error cleaning up old backups: {e}") async def log_threshold_event( self, host_name: str, metric_path: str, old_level: str, new_level: str, value: Any, timestamp: Optional[float] = None ): """ Log a threshold state change event. Args: host_name: Name of the host metric_path: Full metric path (e.g., "cpu_monitor.cpu_percent") old_level: Previous alert level new_level: New alert level value: Current metric value timestamp: Event timestamp (default: current time) """ if not self.enabled or not self._initialized: return try: if timestamp is None: timestamp = __import__('time').time() event = { 'timestamp': timestamp, 'iso_time': datetime.fromtimestamp(timestamp).isoformat(), 'event_type': 'threshold', 'host': host_name, 'metric': metric_path, 'old_level': old_level, 'new_level': new_level, 'value': value, } async with self._lock: if not self._file_handle: return # Check if rotation is needed if self._current_size >= self.max_size: await self._rotate() # Write event line = json.dumps(event) + '\n' self._file_handle.write(line) self._file_handle.flush() # Update size self._current_size += len(line.encode('utf-8')) except Exception as e: logger.error(f"Error logging threshold event: {e}") async def close(self): """ Close the journal and release resources. Should be called during shutdown. """ async with self._lock: if self._file_handle: try: self._file_handle.close() logger.info("Message journal closed") except Exception as e: logger.error(f"Error closing journal: {e}") finally: self._file_handle = None self._initialized = False def get_stats(self) -> Dict[str, Any]: """ Get journal statistics. Returns: Dictionary with journal stats """ return { 'enabled': self.enabled, 'initialized': self._initialized, 'current_file': str(self.journal_path), 'current_size': self._current_size, 'max_size': self.max_size, 'max_backups': self.max_backups, 'rotation_threshold': f"{(self._current_size / self.max_size * 100):.1f}%" } # Global journal instance _journal_instance: Optional[MessageJournal] = None def get_journal(config: Optional[Dict[str, Any]] = None) -> MessageJournal: """ Get or create the global journal instance. Args: config: Configuration dictionary (only used on first call) Returns: MessageJournal instance """ global _journal_instance if _journal_instance is None: _journal_instance = MessageJournal(config) return _journal_instance async def log_message(msg: Dict[str, Any], addr: tuple, timestamp: Optional[float] = None): """ Convenience function to log a message using the global journal. Args: msg: Parsed message dictionary addr: Source address (ip, port) tuple timestamp: Message timestamp (defaults to current time) """ journal = get_journal() await journal.log_message(msg, addr, timestamp)