0543266c92
- Restructuring of the project directory into client and server components - Renaming of modules and classes to better reflect their purpose and functionality - Moving common utilities and configurations to a shared location - Updating import statements to reflect the new structure - Adding new documentation files for better clarity on various aspects of the project - Removing deprecated or unused code to streamline the codebase - Ensuring that all existing functionality is preserved and that the codebase remains functional after the refactoring.
343 lines
12 KiB
Python
343 lines
12 KiB
Python
"""
|
|
Journal logging for heartbeat messages.
|
|
|
|
Provides size-based rotating log files for all received heartbeat messages.
|
|
Messages are logged in JSON format for easy parsing and analysis.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import asyncio
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MessageJournal:
|
|
"""
|
|
Journal logger for heartbeat messages with size-based rotation.
|
|
|
|
Features:
|
|
- Logs all received messages in JSON format
|
|
- Automatic rotation when file size exceeds threshold
|
|
- Keeps configurable number of rotated logs
|
|
- Thread-safe and async-safe operation
|
|
- Configurable log directory and file naming
|
|
|
|
Configuration:
|
|
journal_dir: Directory for journal files (default: /var/log/heartbeat/)
|
|
journal_file: Base filename (default: messages.journal)
|
|
max_size: Maximum file size in bytes before rotation (default: 100MB)
|
|
max_backups: Number of backup files to keep (default: 10)
|
|
enabled: Enable/disable journaling (default: True)
|
|
"""
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
"""
|
|
Initialize the message journal.
|
|
|
|
Args:
|
|
config: Configuration dictionary with journal settings
|
|
"""
|
|
self.config = config or {}
|
|
|
|
# Configuration options
|
|
self.journal_dir = Path(self.config.get('journal_dir', '/var/log/heartbeat'))
|
|
self.journal_file = self.config.get('journal_file', 'messages.journal')
|
|
self.max_size = self.config.get('journal_max_size', 100 * 1024 * 1024) # 100MB default
|
|
self.max_backups = self.config.get('journal_max_backups', 10)
|
|
self.enabled = self.config.get('journal_enabled', True)
|
|
|
|
# Runtime state
|
|
self._file_handle = None
|
|
self._current_size = 0
|
|
self._lock = asyncio.Lock()
|
|
self._initialized = False
|
|
|
|
# Full path to current journal file
|
|
self.journal_path = self.journal_dir / self.journal_file
|
|
|
|
async def initialize(self) -> bool:
|
|
"""
|
|
Initialize the journal.
|
|
|
|
Creates journal directory if needed and opens the journal file.
|
|
|
|
Returns:
|
|
True if initialization successful, False otherwise
|
|
"""
|
|
if not self.enabled:
|
|
logger.info("Message journal disabled in configuration")
|
|
return True
|
|
|
|
try:
|
|
# Create journal directory if it doesn't exist
|
|
self.journal_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Open journal file in append mode
|
|
self._file_handle = open(self.journal_path, 'a', encoding='utf-8')
|
|
|
|
# Get current file size
|
|
try:
|
|
self._current_size = os.path.getsize(self.journal_path)
|
|
except OSError:
|
|
self._current_size = 0
|
|
|
|
self._initialized = True
|
|
logger.info(f"Message journal initialized: {self.journal_path} "
|
|
f"(current size: {self._current_size:,} bytes, "
|
|
f"max: {self.max_size:,} bytes)")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize message journal: {e}")
|
|
self.enabled = False
|
|
return False
|
|
|
|
async def log_message(
|
|
self,
|
|
msg: Dict[str, Any],
|
|
addr: tuple,
|
|
timestamp: Optional[float] = None
|
|
):
|
|
"""
|
|
Log a received message to the journal.
|
|
|
|
Args:
|
|
msg: Parsed message dictionary
|
|
addr: Source address (ip, port) tuple
|
|
timestamp: Message timestamp (defaults to current time)
|
|
"""
|
|
if not self.enabled or not self._initialized:
|
|
return
|
|
|
|
# Skip HTB (heartbeat) messages - too verbose
|
|
msg_id = msg.get('ID', '')
|
|
if msg_id == 'HTB':
|
|
return
|
|
|
|
async with self._lock:
|
|
try:
|
|
# Prepare journal entry
|
|
if timestamp is None:
|
|
import time
|
|
timestamp = time.time()
|
|
|
|
entry = {
|
|
'timestamp': timestamp,
|
|
'datetime': datetime.fromtimestamp(timestamp).isoformat(),
|
|
'source_ip': addr[0] if isinstance(addr, (tuple, list)) else str(addr),
|
|
'source_port': addr[1] if isinstance(addr, (tuple, list)) and len(addr) > 1 else None,
|
|
'message': msg
|
|
}
|
|
|
|
# Serialize to JSON (one line per entry)
|
|
json_line = json.dumps(entry, separators=(',', ':')) + '\n'
|
|
json_bytes = json_line.encode('utf-8')
|
|
|
|
# Check if rotation is needed
|
|
if self._current_size + len(json_bytes) > self.max_size:
|
|
await self._rotate()
|
|
|
|
# Write to journal
|
|
if self._file_handle:
|
|
self._file_handle.write(json_line)
|
|
self._file_handle.flush() # Ensure data is written
|
|
self._current_size += len(json_bytes)
|
|
|
|
logger.debug(f"Logged message from {addr[0]}: {msg.get('ID', 'UNKNOWN')}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error writing to journal: {e}")
|
|
|
|
async def _rotate(self):
|
|
"""
|
|
Rotate the journal file.
|
|
|
|
Renames current file with timestamp, opens new file, and removes
|
|
old backups exceeding max_backups limit.
|
|
"""
|
|
try:
|
|
# Close current file
|
|
if self._file_handle:
|
|
self._file_handle.close()
|
|
self._file_handle = None
|
|
|
|
# Generate backup filename with timestamp
|
|
timestamp_str = datetime.now().strftime('%Y%m%d-%H%M%S')
|
|
backup_name = f"{self.journal_file}.{timestamp_str}"
|
|
backup_path = self.journal_dir / backup_name
|
|
|
|
# Rename current file to backup
|
|
if self.journal_path.exists():
|
|
self.journal_path.rename(backup_path)
|
|
logger.info(f"Rotated journal: {backup_path} "
|
|
f"(size: {self._current_size:,} bytes)")
|
|
|
|
# Open new journal file
|
|
self._file_handle = open(self.journal_path, 'a', encoding='utf-8')
|
|
self._current_size = 0
|
|
|
|
# Clean up old backups
|
|
await self._cleanup_old_backups()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error rotating journal: {e}")
|
|
# Try to reopen the file even if rotation failed
|
|
try:
|
|
self._file_handle = open(self.journal_path, 'a', encoding='utf-8')
|
|
except Exception as e2:
|
|
logger.error(f"Failed to reopen journal after rotation error: {e2}")
|
|
self.enabled = False
|
|
|
|
async def _cleanup_old_backups(self):
|
|
"""
|
|
Remove old backup files exceeding max_backups limit.
|
|
|
|
Keeps only the most recent backups based on filename (which includes timestamp).
|
|
"""
|
|
try:
|
|
# Find all backup files
|
|
backup_pattern = f"{self.journal_file}.*"
|
|
backup_files = sorted(self.journal_dir.glob(backup_pattern))
|
|
|
|
# Remove oldest backups if we have too many
|
|
if len(backup_files) > self.max_backups:
|
|
files_to_remove = backup_files[:len(backup_files) - self.max_backups]
|
|
for backup_file in files_to_remove:
|
|
try:
|
|
backup_file.unlink()
|
|
logger.info(f"Removed old backup: {backup_file.name}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to remove old backup {backup_file}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up old backups: {e}")
|
|
|
|
async def log_threshold_event(
|
|
self,
|
|
host_name: str,
|
|
metric_path: str,
|
|
old_level: str,
|
|
new_level: str,
|
|
value: Any,
|
|
timestamp: Optional[float] = None
|
|
):
|
|
"""
|
|
Log a threshold state change event.
|
|
|
|
Args:
|
|
host_name: Name of the host
|
|
metric_path: Full metric path (e.g., "cpu_monitor.cpu_percent")
|
|
old_level: Previous alert level
|
|
new_level: New alert level
|
|
value: Current metric value
|
|
timestamp: Event timestamp (default: current time)
|
|
"""
|
|
if not self.enabled or not self._initialized:
|
|
return
|
|
|
|
try:
|
|
if timestamp is None:
|
|
timestamp = __import__('time').time()
|
|
|
|
event = {
|
|
'timestamp': timestamp,
|
|
'iso_time': datetime.fromtimestamp(timestamp).isoformat(),
|
|
'event_type': 'threshold',
|
|
'host': host_name,
|
|
'metric': metric_path,
|
|
'old_level': old_level,
|
|
'new_level': new_level,
|
|
'value': value,
|
|
}
|
|
|
|
async with self._lock:
|
|
if not self._file_handle:
|
|
return
|
|
|
|
# Check if rotation is needed
|
|
if self._current_size >= self.max_size:
|
|
await self._rotate()
|
|
|
|
# Write event
|
|
line = json.dumps(event) + '\n'
|
|
self._file_handle.write(line)
|
|
self._file_handle.flush()
|
|
|
|
# Update size
|
|
self._current_size += len(line.encode('utf-8'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error logging threshold event: {e}")
|
|
|
|
async def close(self):
|
|
"""
|
|
Close the journal and release resources.
|
|
|
|
Should be called during shutdown.
|
|
"""
|
|
async with self._lock:
|
|
if self._file_handle:
|
|
try:
|
|
self._file_handle.close()
|
|
logger.info("Message journal closed")
|
|
except Exception as e:
|
|
logger.error(f"Error closing journal: {e}")
|
|
finally:
|
|
self._file_handle = None
|
|
self._initialized = False
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Get journal statistics.
|
|
|
|
Returns:
|
|
Dictionary with journal stats
|
|
"""
|
|
return {
|
|
'enabled': self.enabled,
|
|
'initialized': self._initialized,
|
|
'current_file': str(self.journal_path),
|
|
'current_size': self._current_size,
|
|
'max_size': self.max_size,
|
|
'max_backups': self.max_backups,
|
|
'rotation_threshold': f"{(self._current_size / self.max_size * 100):.1f}%"
|
|
}
|
|
|
|
|
|
# Global journal instance
|
|
_journal_instance: Optional[MessageJournal] = None
|
|
|
|
|
|
def get_journal(config: Optional[Dict[str, Any]] = None) -> MessageJournal:
|
|
"""
|
|
Get or create the global journal instance.
|
|
|
|
Args:
|
|
config: Configuration dictionary (only used on first call)
|
|
|
|
Returns:
|
|
MessageJournal instance
|
|
"""
|
|
global _journal_instance
|
|
if _journal_instance is None:
|
|
_journal_instance = MessageJournal(config)
|
|
return _journal_instance
|
|
|
|
|
|
async def log_message(msg: Dict[str, Any], addr: tuple, timestamp: Optional[float] = None):
|
|
"""
|
|
Convenience function to log a message using the global journal.
|
|
|
|
Args:
|
|
msg: Parsed message dictionary
|
|
addr: Source address (ip, port) tuple
|
|
timestamp: Message timestamp (defaults to current time)
|
|
"""
|
|
journal = get_journal()
|
|
await journal.log_message(msg, addr, timestamp)
|