refactor: move loose test files out of project root

- tests/test_threshold.py: has proper pytest test functions
- scripts/test_*.py: manual run scripts with no test functions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Andreas Wrede
2026-05-12 23:52:34 -04:00
parent 7f17ddc2ff
commit ba96da9622
7 changed files with 0 additions and 0 deletions
+99
View File
@@ -0,0 +1,99 @@
#!/usr/bin/env python3
"""
Test all plugins together.
"""
import asyncio
import logging
from pathlib import Path
# Setup path
import sys
sys.path.insert(0, str(Path(__file__).parent))
from hbd.plugin import PluginRegistry, PluginLoader
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s: %(message)s"
)
async def test_all_plugins():
"""Test loading all plugins."""
print("=" * 70)
print("Testing All Plugins")
print("=" * 70)
# Create registry and loader
registry = PluginRegistry()
loader = PluginLoader(registry)
# Configuration for plugins
config = {
"cpu_monitor": {
"interval": 30,
"per_core": False
},
"nagios_runner": {
"interval": 60,
"commands": [
{
"name": "test_ok",
"command": "echo 'OK - test passed | metric=100%;;;0;100'"
},
{
"name": "test_warning",
"command": "echo 'WARNING - test result | value=85%;80;90;0;100' && exit 1"
}
]
}
}
# Load plugins
plugin_dir = Path(__file__).parent / "hbd" / "plugins"
print(f"\n1. Loading plugins from: {plugin_dir}")
count = await loader.load_from_directory(plugin_dir, config)
print(f" ✓ Loaded {count} plugins")
# List loaded plugins
print(f"\n2. Loaded plugins:")
for plugin in registry.get_all():
print(f" - {plugin.name} v{plugin.version}")
print(f" Type: {plugin.__class__.__name__}")
print(f" Interval: {plugin.interval}s")
print(f" Description: {plugin.description}")
# Test collection for each plugin
print(f"\n3. Testing data collection:")
for plugin in registry.get_all():
print(f"\n {plugin.name}:")
try:
data = await plugin.collect()
print(f" ✓ Collected {len(data)} fields")
# Show sample of data
sample_count = min(5, len(data))
for key, value in list(data.items())[:sample_count]:
value_str = str(value)
if len(value_str) > 50:
value_str = value_str[:47] + "..."
print(f" {key}: {value_str}")
if len(data) > sample_count:
print(f" ... and {len(data) - sample_count} more fields")
except Exception as e:
print(f" ✗ Error: {e}")
# Cleanup
print(f"\n4. Cleanup...")
await loader.unload_all()
print(f" ✓ All plugins unloaded")
print(f"\n" + "=" * 70)
print(f"Successfully tested {count} plugins!")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(test_all_plugins())
+160
View File
@@ -0,0 +1,160 @@
#!/usr/bin/env python3
"""
Test script for all monitoring plugins.
Tests all available plugins including the new ones:
- memory_monitor
- disk_monitor
- network_monitor
- filesystem_info
"""
import asyncio
import sys
import os
import logging
# Add parent directory to path so we can import hbd
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from hbd.plugin import PluginLoader
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def format_bytes(bytes_val):
"""Format bytes into human readable format."""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_val < 1024.0:
return f"{bytes_val:.2f} {unit}"
bytes_val /= 1024.0
return f"{bytes_val:.2f} PB"
def print_plugin_data(plugin_name, data, indent=2):
"""Pretty print plugin data."""
prefix = " " * indent
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, dict):
print(f"{prefix}{key}:")
print_plugin_data(plugin_name, value, indent + 2)
elif isinstance(value, list):
print(f"{prefix}{key}: [{len(value)} items]")
if len(value) <= 5: # Only show small lists
for item in value:
if isinstance(item, dict):
print_plugin_data(plugin_name, item, indent + 2)
else:
print(f"{prefix} - {item}")
else:
# Format output based on key name for better readability
if '_bytes' in key or key.endswith('_sent') or key.endswith('_recv') or 'memory_' in key or 'swap_' in key:
if isinstance(value, (int, float)) and value > 1024:
print(f"{prefix}{key}: {format_bytes(value)} ({value:,})")
else:
print(f"{prefix}{key}: {value}")
elif 'percent' in key:
print(f"{prefix}{key}: {value:.1f}%")
elif isinstance(value, float):
print(f"{prefix}{key}: {value:.2f}")
elif isinstance(value, int) and value > 1000:
print(f"{prefix}{key}: {value:,}")
else:
print(f"{prefix}{key}: {value}")
else:
print(f"{prefix}{data}")
async def main():
"""Main test function."""
print("="*60)
print("Plugin System Test Suite")
print("="*60)
# Load all available plugins using the plugin loader
from hbd.plugin import PluginRegistry, PluginLoader
from pathlib import Path
registry = PluginRegistry()
loader = PluginLoader(registry)
plugin_dir = Path(__file__).parent / "hbd" / "plugins"
if not plugin_dir.exists():
print(f"✗ Plugin directory not found: {plugin_dir}")
return 1
# Load plugins from directory
count = await loader.load_from_directory(plugin_dir, {})
print(f"\nLoaded {count} plugins:")
plugins = registry.get_all()
for plugin in plugins:
print(f" - {plugin.name}: {plugin.__class__.__doc__.split('.')[0] if plugin.__class__.__doc__ else 'No description'}")
# Test each plugin
results = {}
for plugin in plugins:
# Skip nagios_runner as it needs specific configuration
if plugin.name == 'nagios_runner':
print(f"\n{'='*60}")
print(f"Skipping: {plugin.name} (requires specific configuration)")
print(f"{'='*60}")
results[plugin.name] = True # Mark as success since it loaded OK
continue
print(f"\n{'='*60}")
print(f"Testing: {plugin.name}")
print(f"{'='*60}")
try:
# Collect data
data = await plugin.collect()
if data:
if 'error' in data:
print(f"✗ Collection error: {data['error']}")
results[plugin.name] = False
else:
print(f"✓ Data collected: {len(data)} top-level fields")
print_plugin_data(plugin.name, data)
results[plugin.name] = True
else:
print(f"⚠ No data collected")
results[plugin.name] = False
except Exception as e:
print(f"✗ Failed to collect data: {e}")
import traceback
traceback.print_exc()
results[plugin.name] = False
# Summary
print(f"\n{'='*60}")
print("Test Summary")
print(f"{'='*60}")
success_count = sum(1 for v in results.values() if v)
total_count = len(results)
print(f"\nResults: {success_count}/{total_count} plugins successful")
for name, success in results.items():
status = "" if success else ""
print(f" {status} {name}")
if success_count == total_count:
print("\n🎉 All plugins passed!")
return 0
else:
print(f"\n{total_count - success_count} plugin(s) failed")
return 1
if __name__ == '__main__':
exit_code = asyncio.run(main())
sys.exit(exit_code)
+331
View File
@@ -0,0 +1,331 @@
#!/usr/bin/env python3
"""
Test script for message journal functionality.
Tests:
- Journal initialization
- Message logging
- File rotation based on size
- Backup management
"""
import asyncio
import sys
import os
import json
import tempfile
import shutil
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from hbd.journal import MessageJournal, get_journal
async def test_basic_logging():
"""Test basic message logging."""
print("="*60)
print("Test 1: Basic Message Logging")
print("="*60)
# Create temporary directory for journal
temp_dir = tempfile.mkdtemp(prefix="journal_test_")
print(f"Using temp directory: {temp_dir}")
try:
# Create journal with config
config = {
'journal_enabled': True,
'journal_dir': temp_dir,
'journal_file': 'test.journal',
'journal_max_size': 1024, # 1KB for testing
'journal_max_backups': 3
}
journal = MessageJournal(config)
await journal.initialize()
# Log some test messages
test_messages = [
{
'ID': 'HTB',
'name': 'testhost1',
'interval': 30,
},
{
'ID': 'PLG',
'plugin': 'cpu_monitor',
'cpu_percent': 45.2,
'load_1min': 1.5
},
{
'ID': 'HTB',
'name': 'testhost2',
'interval': 60,
'boot': 1
}
]
for i, msg in enumerate(test_messages):
await journal.log_message(msg, ('192.168.1.100', 50000 + i), 1000.0 + i)
print(f"✓ Logged message {i+1}: {msg['ID']}")
# Check journal file exists
journal_path = Path(temp_dir) / 'test.journal'
if journal_path.exists():
print(f"✓ Journal file created: {journal_path}")
# Read and verify content
with open(journal_path, 'r') as f:
lines = f.readlines()
print(f"✓ Journal has {len(lines)} entries")
# Parse first entry
entry = json.loads(lines[0])
print(f"✓ First entry structure: {list(entry.keys())}")
assert 'timestamp' in entry
assert 'datetime' in entry
assert 'source_ip' in entry
assert 'message' in entry
print("✓ Entry structure validated")
else:
print("✗ Journal file not created")
return False
# Get stats
stats = journal.get_stats()
print(f"\nJournal stats:")
print(f" Enabled: {stats['enabled']}")
print(f" Current size: {stats['current_size']} bytes")
print(f" Max size: {stats['max_size']} bytes")
print(f" Rotation threshold: {stats['rotation_threshold']}")
await journal.close()
print("\n✅ Test 1 PASSED")
return True
except Exception as e:
print(f"\n✗ Test 1 FAILED: {e}")
import traceback
traceback.print_exc()
return False
finally:
# Cleanup
shutil.rmtree(temp_dir, ignore_errors=True)
async def test_rotation():
"""Test log rotation based on size."""
print("\n" + "="*60)
print("Test 2: Log Rotation")
print("="*60)
# Create temporary directory for journal
temp_dir = tempfile.mkdtemp(prefix="journal_test_")
print(f"Using temp directory: {temp_dir}")
try:
# Create journal with small max size
config = {
'journal_enabled': True,
'journal_dir': temp_dir,
'journal_file': 'test.journal',
'journal_max_size': 500, # 500 bytes - very small for testing
'journal_max_backups': 3
}
journal = MessageJournal(config)
await journal.initialize()
# Log many messages to trigger rotation
print("Logging messages to trigger rotation...")
for i in range(20):
msg = {
'ID': 'HTB',
'name': f'testhost{i}',
'interval': 30,
'data': 'x' * 50 # Add some padding
}
await journal.log_message(msg, ('192.168.1.100', 50000 + i), 1000.0 + i)
# Give rotation time to complete
await asyncio.sleep(0.01)
print(f"✓ Logged 20 messages")
# Check for rotated files
journal_dir = Path(temp_dir)
all_files = list(journal_dir.glob('test.journal*'))
print(f"✓ Found {len(all_files)} journal files")
for f in sorted(all_files):
size = f.stat().st_size
print(f" - {f.name}: {size} bytes")
# Should have current file + some backups
if len(all_files) > 1:
print(f"✓ Rotation occurred ({len(all_files) - 1} backup files)")
else:
print("⚠ No rotation occurred (may not have reached threshold)")
# Check max backups limit
backup_files = [f for f in all_files if f.name != 'test.journal']
if len(backup_files) <= config['journal_max_backups']:
print(f"✓ Backup count within limit: {len(backup_files)} <= {config['journal_max_backups']}")
else:
print(f"✗ Too many backups: {len(backup_files)} > {config['journal_max_backups']}")
return False
await journal.close()
print("\n✅ Test 2 PASSED")
return True
except Exception as e:
print(f"\n✗ Test 2 FAILED: {e}")
import traceback
traceback.print_exc()
return False
finally:
# Cleanup
shutil.rmtree(temp_dir, ignore_errors=True)
async def test_disabled_journal():
"""Test that disabled journal doesn't write anything."""
print("\n" + "="*60)
print("Test 3: Disabled Journal")
print("="*60)
temp_dir = tempfile.mkdtemp(prefix="journal_test_")
print(f"Using temp directory: {temp_dir}")
try:
config = {
'journal_enabled': False,
'journal_dir': temp_dir,
'journal_file': 'test.journal'
}
journal = MessageJournal(config)
await journal.initialize()
# Try to log a message
msg = {'ID': 'HTB', 'name': 'testhost'}
await journal.log_message(msg, ('192.168.1.100', 50000), 1000.0)
# Check that no file was created
journal_path = Path(temp_dir) / 'test.journal'
if not journal_path.exists():
print("✓ No journal file created (as expected)")
else:
print("✗ Journal file was created despite being disabled")
return False
await journal.close()
print("\n✅ Test 3 PASSED")
return True
except Exception as e:
print(f"\n✗ Test 3 FAILED: {e}")
import traceback
traceback.print_exc()
return False
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
async def test_global_instance():
"""Test global journal instance."""
print("\n" + "="*60)
print("Test 4: Global Journal Instance")
print("="*60)
temp_dir = tempfile.mkdtemp(prefix="journal_test_")
try:
config = {
'journal_enabled': True,
'journal_dir': temp_dir,
'journal_file': 'global.journal'
}
# Get global instance
journal1 = get_journal(config)
journal2 = get_journal() # Should return same instance
if journal1 is journal2:
print("✓ Global instance returns same object")
else:
print("✗ Global instance returns different objects")
return False
await journal1.initialize()
# Log through convenience function
from hbd.journal import log_message
msg = {'ID': 'HTB', 'name': 'testhost'}
await log_message(msg, ('192.168.1.100', 50000))
journal_path = Path(temp_dir) / 'global.journal'
if journal_path.exists():
print("✓ Global journal logged message")
else:
print("✗ Global journal did not log message")
return False
await journal1.close()
print("\n✅ Test 4 PASSED")
return True
except Exception as e:
print(f"\n✗ Test 4 FAILED: {e}")
import traceback
traceback.print_exc()
return False
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
async def main():
"""Run all tests."""
print("Message Journal Test Suite")
print("="*60)
tests = [
test_basic_logging,
test_rotation,
test_disabled_journal,
test_global_instance
]
results = []
for test in tests:
result = await test()
results.append(result)
# Summary
print("\n" + "="*60)
print("Test Summary")
print("="*60)
passed = sum(results)
total = len(results)
print(f"Passed: {passed}/{total}")
for i, (test, result) in enumerate(zip(tests, results), 1):
status = "✅ PASS" if result else "❌ FAIL"
print(f" {status} - Test {i}: {test.__name__}")
if passed == total:
print("\n🎉 All tests passed!")
return 0
else:
print(f"\n{total - passed} test(s) failed")
return 1
if __name__ == '__main__':
exit_code = asyncio.run(main())
sys.exit(exit_code)
+149
View File
@@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""
Test the Nagios Runner Plugin.
"""
import asyncio
import logging
from pathlib import Path
# Setup path
import sys
sys.path.insert(0, str(Path(__file__).parent))
from hbd.plugins.nagios_runner import NagiosRunnerPlugin, NAGIOS_OK, NAGIOS_WARNING
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s: %(message)s"
)
async def test_nagios_runner():
"""Test Nagios runner plugin."""
print("=" * 70)
print("Testing Nagios Runner Plugin")
print("=" * 70)
# Create test configuration with simple shell commands
# These mimic Nagios plugin output format
config = {
"interval": 60,
"timeout": 10,
"commands": [
{
"name": "check_uptime",
"command": "echo 'OK - uptime is 5 days | uptime=432000s;;;0'"
},
{
"name": "check_memory",
"command": "echo 'OK - Memory usage 45% | memory=45%;80;90;0;100'"
},
{
"name": "check_cpu",
"command": "echo 'WARNING - CPU load high | load1=5.2;5.0;10.0;0 load5=4.8;4.0;8.0;0 load15=3.2;3.0;6.0;0' && exit 1"
},
{
"name": "check_disk",
"command": "echo 'OK - Disk usage 62% | /=62%;80;90;0;100 /home=45%;80;90;0;100'"
}
]
}
print("\n1. Creating Nagios Runner plugin with test configuration")
print(f" Configured {len(config['commands'])} test commands")
plugin = NagiosRunnerPlugin(config)
print(f"\n2. Initializing plugin...")
initialized = await plugin.initialize()
print(f" Initialized: {initialized}")
if not initialized:
print(" ERROR: Plugin failed to initialize!")
return
print(f"\n3. Collecting metrics from Nagios plugins...")
data = await plugin.collect()
print(f" ✓ Collected {len(data)} data points")
print(f"\n4. Results:")
print(f" Data points collected: {len(data)}")
# Show individual plugin results
print(f"\n5. Individual Plugin Results:")
for cmd_config in config["commands"]:
name = cmd_config["name"]
status = data.get(f"{name}_status", "N/A")
status_code = data.get(f"{name}_status_code", "N/A")
output = data.get(f"{name}_output", "N/A")
print(f"\n {name}:")
print(f" Status: {status} (code: {status_code})")
print(f" Output: {output}")
# Show performance data if present
perf_keys = [k for k in data.keys() if k.startswith(f"{name}_") and
k not in [f"{name}_status", f"{name}_status_code", f"{name}_output"]]
if perf_keys:
print(f" Performance Data:")
for key in perf_keys:
metric_name = key.replace(f"{name}_", "")
print(f" {metric_name}: {data[key]}")
print(f"\n6. Testing Nagios plugin detection (if available)...")
# Try to find actual Nagios plugins on the system
common_nagios_paths = [
"/usr/lib/nagios/plugins",
"/usr/local/nagios/libexec",
"/usr/lib64/nagios/plugins"
]
nagios_plugin_dir = None
for path in common_nagios_paths:
if Path(path).exists():
nagios_plugin_dir = Path(path)
print(f" ✓ Found Nagios plugins at: {nagios_plugin_dir}")
break
if nagios_plugin_dir:
# Try check_users if it exists
check_users = nagios_plugin_dir / "check_users"
if check_users.exists():
print(f"\n Testing real Nagios plugin: check_users")
real_config = {
"commands": [{
"name": "users",
"command": f"{check_users} -w 10 -c 20"
}]
}
real_plugin = NagiosRunnerPlugin(real_config)
await real_plugin.initialize()
real_data = await real_plugin.collect()
print(f" Status: {real_data.get('users_status')}")
print(f" Output: {real_data.get('users_output')}")
# Show any performance data
for key in real_data:
if key.startswith("users_") and "status" not in key and "output" not in key:
print(f" {key}: {real_data[key]}")
else:
print(f" check_users not found at {check_users}")
else:
print(f" No Nagios plugins directory found")
print(f" Install nagios-plugins to test with real plugins:")
print(f" sudo apt-get install nagios-plugins # Debian/Ubuntu")
print(f" sudo yum install nagios-plugins-all # RHEL/CentOS")
print(f"\n7. Cleanup...")
await plugin.cleanup()
print(f" ✓ Cleanup complete")
print(f"\n" + "=" * 70)
print("Test complete!")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(test_nagios_runner())
+119
View File
@@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""
Test script for plugin system.
"""
import asyncio
import logging
from pathlib import Path
# Setup path
import sys
sys.path.insert(0, str(Path(__file__).parent))
from hbd.plugin import PluginRegistry, PluginLoader
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(name)s %(levelname)s: %(message)s"
)
async def test_plugins():
"""Test plugin loading and collection."""
print("=" * 60)
print("Testing Plugin System")
print("=" * 60)
# Create registry and loader
registry = PluginRegistry()
loader = PluginLoader(registry)
# Load plugins
plugin_dir = Path(__file__).parent / "hbd" / "plugins"
print(f"\n1. Loading plugins from: {plugin_dir}")
if not plugin_dir.exists():
print(f" ERROR: Plugin directory does not exist!")
return
count = await loader.load_from_directory(plugin_dir)
print(f" Loaded {count} plugins")
# List loaded plugins
print(f"\n2. Loaded plugins:")
for plugin in registry.get_all():
print(f" - {plugin.name} v{plugin.version} ({plugin.__class__.__name__})")
print(f" Description: {plugin.description}")
print(f" Interval: {plugin.interval}s")
# Test InfoPlugins
print(f"\n3. Testing InfoPlugins (collect once):")
from hbd.plugin import InfoPlugin
for plugin in registry.get_by_type(InfoPlugin):
print(f"\n Collecting {plugin.name}...")
try:
data = await plugin.collect()
print(f" ✓ Success! Got {len(data)} fields")
for key, value in list(data.items())[:5]: # Show first 5 fields
print(f" {key}: {value}")
if len(data) > 5:
print(f" ... and {len(data) - 5} more fields")
except Exception as e:
print(f" ✗ Error: {e}")
# Test MonitorPlugins
print(f"\n4. Testing MonitorPlugins (periodic collection):")
from hbd.plugin import MonitorPlugin
for plugin in registry.get_by_type(MonitorPlugin):
print(f"\n Collecting {plugin.name}...")
try:
data = await plugin.collect()
print(f" ✓ Success! Got {len(data)} fields")
for key, value in list(data.items())[:8]: # Show first 8 fields
print(f" {key}: {value}")
if len(data) > 8:
print(f" ... and {len(data) - 8} more fields")
except Exception as e:
print(f" ✗ Error: {e}")
# Test protocol encoding
print(f"\n5. Testing protocol encoding:")
from hbd.proto import dicttos, stodict
# Create sample plugin data
test_data = {
"plugin": "test_plugin",
"cpu_percent": 42.5,
"memory_mb": 1024,
"processes": 156,
"load_avg": [1.2, 0.8, 0.5],
"disk_info": {"sda": {"used": 50, "total": 100}}
}
print(f" Original data: {test_data}")
# Encode
encoded = dicttos("PLG", test_data)
print(f" Encoded ({len(encoded)} bytes): {encoded[:50]}...")
# Decode
decoded = stodict(encoded)
print(f" Decoded: {decoded}")
# Verify
if decoded.get("ID") == "PLG" and decoded.get("plugin") == "test_plugin":
print(f" ✓ Protocol encoding/decoding works!")
else:
print(f" ✗ Protocol encoding/decoding failed!")
# Cleanup
print(f"\n6. Cleaning up...")
await loader.unload_all()
print(f" ✓ Cleanup complete")
print(f"\n" + "=" * 60)
print("Test complete!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(test_plugins())
+60
View File
@@ -0,0 +1,60 @@
#!/usr/bin/env python3
"""
Debug plugin loading.
"""
import asyncio
import logging
from pathlib import Path
# Setup path
import sys
sys.path.insert(0, str(Path(__file__).parent))
from hbd.plugin import PluginRegistry, PluginLoader
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(name)s %(levelname)s: %(message)s"
)
async def test_manual_load():
"""Test manual plugin loading."""
print("Testing manual plugin import...")
# Import plugins directly
from hbd.plugins.os_info import OSInfoPlugin
from hbd.plugins.cpu_monitor import CPUMonitorPlugin
# Create instances
os_plugin = OSInfoPlugin()
cpu_plugin = CPUMonitorPlugin()
print(f"OS Plugin: {os_plugin.name} v{os_plugin.version}")
print(f"CPU Plugin: {cpu_plugin.name} v{cpu_plugin.version}")
# Initialize
print("\nInitializing plugins...")
os_init = await os_plugin.initialize()
cpu_init = await cpu_plugin.initialize()
print(f"OS plugin initialized: {os_init}")
print(f"CPU plugin initialized: {cpu_init}")
# Collect data
if os_init:
print("\nCollecting OS info...")
os_data = await os_plugin.collect()
print(f"Got {len(os_data)} fields:")
for k, v in list(os_data.items())[:10]:
print(f" {k}: {v}")
if cpu_init:
print("\nCollecting CPU info...")
cpu_data = await cpu_plugin.collect()
print(f"Got {len(cpu_data)} fields:")
for k, v in cpu_data.items():
print(f" {k}: {v}")
if __name__ == "__main__":
asyncio.run(test_manual_load())