Files
heartbeat/docs/MESSAGE_JOURNAL.md
2026-04-02 07:17:00 -04:00

11 KiB

Message Journal

The message journal provides persistent logging of all received heartbeat messages with automatic size-based log rotation.

Overview

The journal logs every message received by the heartbeat daemon (hbd) in JSON format, making it easy to:

  • Audit message history
  • Debug connection issues
  • Analyze traffic patterns
  • Replay messages for testing
  • Create historical reports

Features

  • JSON Format: Each message is logged as a single JSON line for easy parsing
  • Size-Based Rotation: Automatically rotates logs when size threshold is reached
  • Automatic Cleanup: Keeps only a configurable number of backup files
  • Thread-Safe: Safe for concurrent access from multiple async tasks
  • Configurable: All settings controllable via configuration file
  • Performance: Non-blocking async operation with minimal overhead

Configuration

Add these settings to your hbd configuration file (e.g., .hb.yaml):

# Message journal configuration
journal_enabled: true                          # Enable/disable journaling
journal_dir: /var/log/heartbeat                # Directory for journal files
journal_file: messages.journal                 # Base filename
journal_max_size: 104857600                    # Max size in bytes (100MB default)
journal_max_backups: 10                        # Number of backup files to keep

Configuration Options

Option Default Description
journal_enabled true Enable or disable message journaling
journal_dir /var/log/heartbeat Directory where journal files are stored
journal_file messages.journal Base filename for the journal
journal_max_size 104857600 (100MB) Maximum file size before rotation
journal_max_backups 10 Number of rotated backup files to keep

File Format

Messages are logged in JSONL (JSON Lines) format - one JSON object per line:

{"timestamp":1711234567.123,"datetime":"2026-03-28T12:34:56","source_ip":"192.168.1.100","source_port":50003,"message":{"ID":"HTB","name":"webserver1","interval":30}}
{"timestamp":1711234597.456,"datetime":"2026-03-28T12:35:37","source_ip":"192.168.1.101","source_port":50003,"message":{"ID":"PLG","plugin":"cpu_monitor","cpu_percent":45.2,"load_1min":1.5}}

Entry Structure

Each journal entry contains:

Field Type Description
timestamp float Unix timestamp (seconds since epoch)
datetime string ISO 8601 formatted datetime
source_ip string Source IP address
source_port integer Source UDP port
message object Complete parsed message dictionary

Log Rotation

How Rotation Works

  1. Journal writes messages to the current file
  2. When file size exceeds journal_max_size, rotation is triggered
  3. Current file is renamed with timestamp: messages.journal.YYYYMMDD-HHMMSS
  4. New empty file is created as the current journal
  5. Old backup files exceeding journal_max_backups are deleted

Example File Structure

/var/log/heartbeat/
├── messages.journal                    # Current active journal
├── messages.journal.20260328-120000   # Rotated backup
├── messages.journal.20260328-140000   # Rotated backup
└── messages.journal.20260328-160000   # Rotated backup (oldest)

Rotation Behavior

  • Rotation is triggered when the next message would exceed the size limit
  • Rotation is automatic and requires no manual intervention
  • Old backups are deleted in FIFO order (oldest first)
  • Rotation is thread-safe and won't lose messages

Usage Examples

Reading Journal Files

Using Python

import json

# Read all entries from current journal
with open('/var/log/heartbeat/messages.journal', 'r') as f:
    for line in f:
        entry = json.loads(line)
        print(f"{entry['datetime']} - {entry['source_ip']} - {entry['message']['ID']}")

Using jq (command line)

# View all messages
cat /var/log/heartbeat/messages.journal | jq .

# Filter by message type
cat /var/log/heartbeat/messages.journal | jq 'select(.message.ID == "HTB")'

# Filter by hostname
cat /var/log/heartbeat/messages.journal | jq 'select(.message.name == "webserver1")'

# Count messages by type
cat /var/log/heartbeat/messages.journal | jq -r '.message.ID' | sort | uniq -c

# Extract timestamps and source IPs
cat /var/log/heartbeat/messages.journal | jq -r '[.datetime, .source_ip, .message.ID] | @tsv'

Using shell tools

# Count total messages
wc -l /var/log/heartbeat/messages.journal

# View recent messages
tail -n 100 /var/log/heartbeat/messages.journal | jq .

# Search for specific host
grep -F '"name":"webserver1"' /var/log/heartbeat/messages.journal

# Check journal file size
du -h /var/log/heartbeat/messages.journal

Analyzing Historical Data

# Combine all journal files (current + backups)
cat /var/log/heartbeat/messages.journal* | jq . > all_messages.json

# Count messages per host
cat /var/log/heartbeat/messages.journal* | jq -r '.message.name // "unknown"' | sort | uniq -c

# Find all plugin messages
cat /var/log/heartbeat/messages.journal* | jq 'select(.message.ID == "PLG")'

# Extract CPU metrics from plugin messages
cat /var/log/heartbeat/messages.journal* | \
    jq 'select(.message.plugin == "cpu_monitor") | {time: .datetime, host: .message.name, cpu: .message.cpu_percent}'

Integration with Log Management

Logrotate

While the journal has built-in rotation, you can also use logrotate for additional management:

/var/log/heartbeat/messages.journal.* {
    daily
    rotate 30
    compress
    delaycompress
    missingok
    notifempty
}

Elasticsearch/OpenSearch

Import journal data into Elasticsearch for advanced analysis:

from elasticsearch import Elasticsearch
import json

es = Elasticsearch(['localhost:9200'])

with open('/var/log/heartbeat/messages.journal', 'r') as f:
    for line in f:
        entry = json.loads(line)
        es.index(index='heartbeat-messages', body=entry)

Splunk

Create a Splunk input for the journal:

[monitor:///var/log/heartbeat/messages.journal*]
sourcetype = heartbeat_json
index = heartbeat

Performance Considerations

Overhead

  • Journal writing is async and non-blocking
  • Typical overhead: < 1ms per message
  • Minimal impact on heartbeat processing

Disk Usage

Calculate expected disk usage:

Messages per day = (86400 seconds / interval) * number_of_hosts
Average message size ≈ 200-500 bytes
Daily disk usage = Messages per day * Average message size

Example:
- 100 hosts
- 30 second interval  
- 2880 messages/day per host
- 288,000 messages/day total
- ~60-140 MB/day

Recommendations

  • Small deployments (< 50 hosts): Default settings work well
  • Medium deployments (50-500 hosts): Increase journal_max_size to 500MB, journal_max_backups to 20
  • Large deployments (> 500 hosts): Consider 1GB+ journal files, 30+ backups, or external log aggregation

Monitoring

Check Journal Status

The journal exposes statistics that can be queried:

from hbd.journal import get_journal

journal = get_journal()
stats = journal.get_stats()
print(f"Current size: {stats['current_size']:,} bytes")
print(f"Rotation threshold: {stats['rotation_threshold']}")

Log Messages

Journal operations are logged at appropriate levels:

  • INFO: Initialization, rotation events, cleanup
  • DEBUG: Individual message logging
  • WARNING: Non-critical issues
  • ERROR: Critical failures

Check hbd logs for journal-related messages:

grep journal /var/log/heartbeat.log

Troubleshooting

Journal Files Not Created

Problem: No journal files appear in the configured directory.

Solutions:

  • Check journal_enabled: true in configuration
  • Verify directory exists and hbd has write permissions
  • Check hbd logs for initialization errors
  • Verify disk space is available

Rotation Not Working

Problem: Journal file grows beyond journal_max_size.

Solutions:

  • Check that journal_max_size is properly configured
  • Verify hbd has permission to rename/create files
  • Check for filesystem issues
  • Review hbd logs for rotation errors

Missing Messages

Problem: Some messages don't appear in journal.

Solutions:

  • Verify journal_enabled: true
  • Check for write errors in hbd logs
  • Verify sufficient disk space
  • Check if filesystem is read-only

Performance Issues

Problem: Journal causing slow message processing.

Solutions:

  • Use faster storage (SSD) for journal directory
  • Increase journal_max_size to reduce rotation frequency
  • Disable journal if not needed: journal_enabled: false
  • Consider async syslog forwarding instead

Security Considerations

File Permissions

Ensure proper permissions on journal files:

# Journal directory
chmod 750 /var/log/heartbeat
chown hbd:hbd /var/log/heartbeat

# Journal files
chmod 640 /var/log/heartbeat/messages.journal*

Sensitive Data

Journal files may contain:

  • Hostnames and IP addresses
  • System metrics
  • Custom message content

Recommendations:

  • Restrict read access to authorized users only
  • Consider encryption for archived journals
  • Implement log retention policies
  • Sanitize data if sharing for debugging

API Reference

MessageJournal Class

class MessageJournal:
    def __init__(self, config: Dict[str, Any])
    async def initialize(self) -> bool
    async def log_message(self, msg: Dict, addr: tuple, timestamp: float)
    async def close(self)
    def get_stats(self) -> Dict[str, Any]

Module Functions

def get_journal(config: Dict = None) -> MessageJournal
async def log_message(msg: Dict, addr: tuple, timestamp: float = None)

Example: Custom Message Processing

Process journal messages in real-time:

import asyncio
import json
from pathlib import Path

async def tail_journal(journal_path):
    """Follow journal file and process new messages."""
    path = Path(journal_path)
    
    with open(path, 'r') as f:
        # Jump to end
        f.seek(0, 2)
        
        while True:
            line = f.readline()
            if line:
                entry = json.loads(line)
                await process_message(entry)
            else:
                await asyncio.sleep(0.1)

async def process_message(entry):
    """Process a journal entry."""
    msg = entry['message']
    
    # Alert on boot messages
    if msg.get('boot'):
        print(f"ALERT: {msg['name']} rebooted at {entry['datetime']}")
    
    # Track CPU usage
    if msg.get('ID') == 'PLG' and msg.get('plugin') == 'cpu_monitor':
        cpu = msg.get('cpu_percent', 0)
        if cpu > 90:
            print(f"WARNING: {entry['source_ip']} CPU usage: {cpu}%")

Future Enhancements

Potential improvements for future versions:

  • Compression of rotated logs (gzip)
  • Time-based rotation in addition to size-based
  • Filtering to exclude certain message types
  • Structured logging output formats (CEF, GELF)
  • Remote syslog forwarding
  • Message deduplication
  • Journal file encryption
  • Signed journal entries

See Also