# Message Journal The message journal provides persistent logging of all received heartbeat messages with automatic size-based log rotation. ## Overview The journal logs every message received by the heartbeat daemon (hbd) in JSON format, making it easy to: - Audit message history - Debug connection issues - Analyze traffic patterns - Replay messages for testing - Create historical reports ## Features - **JSON Format**: Each message is logged as a single JSON line for easy parsing - **Size-Based Rotation**: Automatically rotates logs when size threshold is reached - **Automatic Cleanup**: Keeps only a configurable number of backup files - **Thread-Safe**: Safe for concurrent access from multiple async tasks - **Configurable**: All settings controllable via configuration file - **Performance**: Non-blocking async operation with minimal overhead ## Configuration Add these settings to your hbd configuration file (e.g., `.hb.yaml`): ```yaml # Message journal configuration journal_enabled: true # Enable/disable journaling journal_dir: /var/log/heartbeat # Directory for journal files journal_file: messages.journal # Base filename journal_max_size: 104857600 # Max size in bytes (100MB default) journal_max_backups: 10 # Number of backup files to keep ``` ### Configuration Options | Option | Default | Description | |--------|---------|-------------| | `journal_enabled` | `true` | Enable or disable message journaling | | `journal_dir` | `/var/log/heartbeat` | Directory where journal files are stored | | `journal_file` | `messages.journal` | Base filename for the journal | | `journal_max_size` | `104857600` (100MB) | Maximum file size before rotation | | `journal_max_backups` | `10` | Number of rotated backup files to keep | ## File Format Messages are logged in JSONL (JSON Lines) format - one JSON object per line: ```json {"timestamp":1711234567.123,"datetime":"2026-03-28T12:34:56","source_ip":"192.168.1.100","source_port":50003,"message":{"ID":"HTB","name":"webserver1","interval":30}} {"timestamp":1711234597.456,"datetime":"2026-03-28T12:35:37","source_ip":"192.168.1.101","source_port":50003,"message":{"ID":"PLG","plugin":"cpu_monitor","cpu_percent":45.2,"load_1min":1.5}} ``` ### Entry Structure Each journal entry contains: | Field | Type | Description | |-------|------|-------------| | `timestamp` | float | Unix timestamp (seconds since epoch) | | `datetime` | string | ISO 8601 formatted datetime | | `source_ip` | string | Source IP address | | `source_port` | integer | Source UDP port | | `message` | object | Complete parsed message dictionary | ## Log Rotation ### How Rotation Works 1. Journal writes messages to the current file 2. When file size exceeds `journal_max_size`, rotation is triggered 3. Current file is renamed with timestamp: `messages.journal.YYYYMMDD-HHMMSS` 4. New empty file is created as the current journal 5. Old backup files exceeding `journal_max_backups` are deleted ### Example File Structure ``` /var/log/heartbeat/ ├── messages.journal # Current active journal ├── messages.journal.20260328-120000 # Rotated backup ├── messages.journal.20260328-140000 # Rotated backup └── messages.journal.20260328-160000 # Rotated backup (oldest) ``` ### Rotation Behavior - Rotation is triggered when the next message would exceed the size limit - Rotation is automatic and requires no manual intervention - Old backups are deleted in FIFO order (oldest first) - Rotation is thread-safe and won't lose messages ## Usage Examples ### Reading Journal Files #### Using Python ```python import json # Read all entries from current journal with open('/var/log/heartbeat/messages.journal', 'r') as f: for line in f: entry = json.loads(line) print(f"{entry['datetime']} - {entry['source_ip']} - {entry['message']['ID']}") ``` #### Using jq (command line) ```bash # View all messages cat /var/log/heartbeat/messages.journal | jq . # Filter by message type cat /var/log/heartbeat/messages.journal | jq 'select(.message.ID == "HTB")' # Filter by hostname cat /var/log/heartbeat/messages.journal | jq 'select(.message.name == "webserver1")' # Count messages by type cat /var/log/heartbeat/messages.journal | jq -r '.message.ID' | sort | uniq -c # Extract timestamps and source IPs cat /var/log/heartbeat/messages.journal | jq -r '[.datetime, .source_ip, .message.ID] | @tsv' ``` #### Using shell tools ```bash # Count total messages wc -l /var/log/heartbeat/messages.journal # View recent messages tail -n 100 /var/log/heartbeat/messages.journal | jq . # Search for specific host grep -F '"name":"webserver1"' /var/log/heartbeat/messages.journal # Check journal file size du -h /var/log/heartbeat/messages.journal ``` ### Analyzing Historical Data ```bash # Combine all journal files (current + backups) cat /var/log/heartbeat/messages.journal* | jq . > all_messages.json # Count messages per host cat /var/log/heartbeat/messages.journal* | jq -r '.message.name // "unknown"' | sort | uniq -c # Find all plugin messages cat /var/log/heartbeat/messages.journal* | jq 'select(.message.ID == "PLG")' # Extract CPU metrics from plugin messages cat /var/log/heartbeat/messages.journal* | \ jq 'select(.message.plugin == "cpu_monitor") | {time: .datetime, host: .message.name, cpu: .message.cpu_percent}' ``` ## Integration with Log Management ### Logrotate While the journal has built-in rotation, you can also use logrotate for additional management: ``` /var/log/heartbeat/messages.journal.* { daily rotate 30 compress delaycompress missingok notifempty } ``` ### Elasticsearch/OpenSearch Import journal data into Elasticsearch for advanced analysis: ```python from elasticsearch import Elasticsearch import json es = Elasticsearch(['localhost:9200']) with open('/var/log/heartbeat/messages.journal', 'r') as f: for line in f: entry = json.loads(line) es.index(index='heartbeat-messages', body=entry) ``` ### Splunk Create a Splunk input for the journal: ```ini [monitor:///var/log/heartbeat/messages.journal*] sourcetype = heartbeat_json index = heartbeat ``` ## Performance Considerations ### Overhead - Journal writing is async and non-blocking - Typical overhead: < 1ms per message - Minimal impact on heartbeat processing ### Disk Usage Calculate expected disk usage: ``` Messages per day = (86400 seconds / interval) * number_of_hosts Average message size ≈ 200-500 bytes Daily disk usage = Messages per day * Average message size Example: - 100 hosts - 30 second interval - 2880 messages/day per host - 288,000 messages/day total - ~60-140 MB/day ``` ### Recommendations - **Small deployments** (< 50 hosts): Default settings work well - **Medium deployments** (50-500 hosts): Increase `journal_max_size` to 500MB, `journal_max_backups` to 20 - **Large deployments** (> 500 hosts): Consider 1GB+ journal files, 30+ backups, or external log aggregation ## Monitoring ### Check Journal Status The journal exposes statistics that can be queried: ```python from hbd.journal import get_journal journal = get_journal() stats = journal.get_stats() print(f"Current size: {stats['current_size']:,} bytes") print(f"Rotation threshold: {stats['rotation_threshold']}") ``` ### Log Messages Journal operations are logged at appropriate levels: - `INFO`: Initialization, rotation events, cleanup - `DEBUG`: Individual message logging - `WARNING`: Non-critical issues - `ERROR`: Critical failures Check hbd logs for journal-related messages: ```bash grep journal /var/log/heartbeat.log ``` ## Troubleshooting ### Journal Files Not Created **Problem**: No journal files appear in the configured directory. **Solutions**: - Check `journal_enabled: true` in configuration - Verify directory exists and hbd has write permissions - Check hbd logs for initialization errors - Verify disk space is available ### Rotation Not Working **Problem**: Journal file grows beyond `journal_max_size`. **Solutions**: - Check that `journal_max_size` is properly configured - Verify hbd has permission to rename/create files - Check for filesystem issues - Review hbd logs for rotation errors ### Missing Messages **Problem**: Some messages don't appear in journal. **Solutions**: - Verify `journal_enabled: true` - Check for write errors in hbd logs - Verify sufficient disk space - Check if filesystem is read-only ### Performance Issues **Problem**: Journal causing slow message processing. **Solutions**: - Use faster storage (SSD) for journal directory - Increase `journal_max_size` to reduce rotation frequency - Disable journal if not needed: `journal_enabled: false` - Consider async syslog forwarding instead ## Security Considerations ### File Permissions Ensure proper permissions on journal files: ```bash # Journal directory chmod 750 /var/log/heartbeat chown hbd:hbd /var/log/heartbeat # Journal files chmod 640 /var/log/heartbeat/messages.journal* ``` ### Sensitive Data Journal files may contain: - Hostnames and IP addresses - System metrics - Custom message content **Recommendations**: - Restrict read access to authorized users only - Consider encryption for archived journals - Implement log retention policies - Sanitize data if sharing for debugging ## API Reference ### MessageJournal Class ```python class MessageJournal: def __init__(self, config: Dict[str, Any]) async def initialize(self) -> bool async def log_message(self, msg: Dict, addr: tuple, timestamp: float) async def close(self) def get_stats(self) -> Dict[str, Any] ``` ### Module Functions ```python def get_journal(config: Dict = None) -> MessageJournal async def log_message(msg: Dict, addr: tuple, timestamp: float = None) ``` ## Example: Custom Message Processing Process journal messages in real-time: ```python import asyncio import json from pathlib import Path async def tail_journal(journal_path): """Follow journal file and process new messages.""" path = Path(journal_path) with open(path, 'r') as f: # Jump to end f.seek(0, 2) while True: line = f.readline() if line: entry = json.loads(line) await process_message(entry) else: await asyncio.sleep(0.1) async def process_message(entry): """Process a journal entry.""" msg = entry['message'] # Alert on boot messages if msg.get('boot'): print(f"ALERT: {msg['name']} rebooted at {entry['datetime']}") # Track CPU usage if msg.get('ID') == 'PLG' and msg.get('plugin') == 'cpu_monitor': cpu = msg.get('cpu_percent', 0) if cpu > 90: print(f"WARNING: {entry['source_ip']} CPU usage: {cpu}%") ``` ## Future Enhancements Potential improvements for future versions: - Compression of rotated logs (gzip) - Time-based rotation in addition to size-based - Filtering to exclude certain message types - Structured logging output formats (CEF, GELF) - Remote syslog forwarding - Message deduplication - Journal file encryption - Signed journal entries ## See Also - [Configuration Guide](../hbd/config.py) - Full configuration options - [UDP Protocol](../hbd/udp.py) - Message handling - [Server Architecture](../hbd/server.py) - Server initialization