#!/usr/bin/env python3 """ Test script for message journal functionality. Tests: - Journal initialization - Message logging - File rotation based on size - Backup management """ import asyncio import sys import os import json import tempfile import shutil from pathlib import Path # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from hbd.journal import MessageJournal, get_journal async def test_basic_logging(): """Test basic message logging.""" print("="*60) print("Test 1: Basic Message Logging") print("="*60) # Create temporary directory for journal temp_dir = tempfile.mkdtemp(prefix="journal_test_") print(f"Using temp directory: {temp_dir}") try: # Create journal with config config = { 'journal_enabled': True, 'journal_dir': temp_dir, 'journal_file': 'test.journal', 'journal_max_size': 1024, # 1KB for testing 'journal_max_backups': 3 } journal = MessageJournal(config) await journal.initialize() # Log some test messages test_messages = [ { 'ID': 'HTB', 'name': 'testhost1', 'interval': 30, }, { 'ID': 'PLG', 'plugin': 'cpu_monitor', 'cpu_percent': 45.2, 'load_1min': 1.5 }, { 'ID': 'HTB', 'name': 'testhost2', 'interval': 60, 'boot': 1 } ] for i, msg in enumerate(test_messages): await journal.log_message(msg, ('192.168.1.100', 50000 + i), 1000.0 + i) print(f"✓ Logged message {i+1}: {msg['ID']}") # Check journal file exists journal_path = Path(temp_dir) / 'test.journal' if journal_path.exists(): print(f"✓ Journal file created: {journal_path}") # Read and verify content with open(journal_path, 'r') as f: lines = f.readlines() print(f"✓ Journal has {len(lines)} entries") # Parse first entry entry = json.loads(lines[0]) print(f"✓ First entry structure: {list(entry.keys())}") assert 'timestamp' in entry assert 'datetime' in entry assert 'source_ip' in entry assert 'message' in entry print("✓ Entry structure validated") else: print("✗ Journal file not created") return False # Get stats stats = journal.get_stats() print(f"\nJournal stats:") print(f" Enabled: {stats['enabled']}") print(f" Current size: {stats['current_size']} bytes") print(f" Max size: {stats['max_size']} bytes") print(f" Rotation threshold: {stats['rotation_threshold']}") await journal.close() print("\n✅ Test 1 PASSED") return True except Exception as e: print(f"\n✗ Test 1 FAILED: {e}") import traceback traceback.print_exc() return False finally: # Cleanup shutil.rmtree(temp_dir, ignore_errors=True) async def test_rotation(): """Test log rotation based on size.""" print("\n" + "="*60) print("Test 2: Log Rotation") print("="*60) # Create temporary directory for journal temp_dir = tempfile.mkdtemp(prefix="journal_test_") print(f"Using temp directory: {temp_dir}") try: # Create journal with small max size config = { 'journal_enabled': True, 'journal_dir': temp_dir, 'journal_file': 'test.journal', 'journal_max_size': 500, # 500 bytes - very small for testing 'journal_max_backups': 3 } journal = MessageJournal(config) await journal.initialize() # Log many messages to trigger rotation print("Logging messages to trigger rotation...") for i in range(20): msg = { 'ID': 'HTB', 'name': f'testhost{i}', 'interval': 30, 'data': 'x' * 50 # Add some padding } await journal.log_message(msg, ('192.168.1.100', 50000 + i), 1000.0 + i) # Give rotation time to complete await asyncio.sleep(0.01) print(f"✓ Logged 20 messages") # Check for rotated files journal_dir = Path(temp_dir) all_files = list(journal_dir.glob('test.journal*')) print(f"✓ Found {len(all_files)} journal files") for f in sorted(all_files): size = f.stat().st_size print(f" - {f.name}: {size} bytes") # Should have current file + some backups if len(all_files) > 1: print(f"✓ Rotation occurred ({len(all_files) - 1} backup files)") else: print("⚠ No rotation occurred (may not have reached threshold)") # Check max backups limit backup_files = [f for f in all_files if f.name != 'test.journal'] if len(backup_files) <= config['journal_max_backups']: print(f"✓ Backup count within limit: {len(backup_files)} <= {config['journal_max_backups']}") else: print(f"✗ Too many backups: {len(backup_files)} > {config['journal_max_backups']}") return False await journal.close() print("\n✅ Test 2 PASSED") return True except Exception as e: print(f"\n✗ Test 2 FAILED: {e}") import traceback traceback.print_exc() return False finally: # Cleanup shutil.rmtree(temp_dir, ignore_errors=True) async def test_disabled_journal(): """Test that disabled journal doesn't write anything.""" print("\n" + "="*60) print("Test 3: Disabled Journal") print("="*60) temp_dir = tempfile.mkdtemp(prefix="journal_test_") print(f"Using temp directory: {temp_dir}") try: config = { 'journal_enabled': False, 'journal_dir': temp_dir, 'journal_file': 'test.journal' } journal = MessageJournal(config) await journal.initialize() # Try to log a message msg = {'ID': 'HTB', 'name': 'testhost'} await journal.log_message(msg, ('192.168.1.100', 50000), 1000.0) # Check that no file was created journal_path = Path(temp_dir) / 'test.journal' if not journal_path.exists(): print("✓ No journal file created (as expected)") else: print("✗ Journal file was created despite being disabled") return False await journal.close() print("\n✅ Test 3 PASSED") return True except Exception as e: print(f"\n✗ Test 3 FAILED: {e}") import traceback traceback.print_exc() return False finally: shutil.rmtree(temp_dir, ignore_errors=True) async def test_global_instance(): """Test global journal instance.""" print("\n" + "="*60) print("Test 4: Global Journal Instance") print("="*60) temp_dir = tempfile.mkdtemp(prefix="journal_test_") try: config = { 'journal_enabled': True, 'journal_dir': temp_dir, 'journal_file': 'global.journal' } # Get global instance journal1 = get_journal(config) journal2 = get_journal() # Should return same instance if journal1 is journal2: print("✓ Global instance returns same object") else: print("✗ Global instance returns different objects") return False await journal1.initialize() # Log through convenience function from hbd.journal import log_message msg = {'ID': 'HTB', 'name': 'testhost'} await log_message(msg, ('192.168.1.100', 50000)) journal_path = Path(temp_dir) / 'global.journal' if journal_path.exists(): print("✓ Global journal logged message") else: print("✗ Global journal did not log message") return False await journal1.close() print("\n✅ Test 4 PASSED") return True except Exception as e: print(f"\n✗ Test 4 FAILED: {e}") import traceback traceback.print_exc() return False finally: shutil.rmtree(temp_dir, ignore_errors=True) async def main(): """Run all tests.""" print("Message Journal Test Suite") print("="*60) tests = [ test_basic_logging, test_rotation, test_disabled_journal, test_global_instance ] results = [] for test in tests: result = await test() results.append(result) # Summary print("\n" + "="*60) print("Test Summary") print("="*60) passed = sum(results) total = len(results) print(f"Passed: {passed}/{total}") for i, (test, result) in enumerate(zip(tests, results), 1): status = "✅ PASS" if result else "❌ FAIL" print(f" {status} - Test {i}: {test.__name__}") if passed == total: print("\n🎉 All tests passed!") return 0 else: print(f"\n⚠ {total - passed} test(s) failed") return 1 if __name__ == '__main__': exit_code = asyncio.run(main()) sys.exit(exit_code)