Files
heartbeat/hbd/server/config.py
T
Andreas Wrede ab0132a38d fix: correct THRESHOLD_DEFAULTS metric keys and add missing defaults
- Rename memory_monitor threshold key from 'percent' to 'memory_percent'
  so it matches exactly rather than relying on suffix stripping, which was
  causing swap_percent to be evaluated against the memory threshold
- Add swap_percent default thresholds (warning: 40%, critical: 75%)
- Add zfs_monitor pool capacity default thresholds (warning: 80%, critical: 90%)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-10 14:03:44 -04:00

351 lines
11 KiB
Python

"""Configuration loader and defaults for hbd (HeartBeat Daemon/Server)."""
import asyncio
import logging
import os
try:
import yaml
except Exception:
yaml = None
SERVER_DEFAULTS = {
# Network settings
"hb_port": 50003, # Port to listen for heartbeats
"hbd_port": 50004, # HTTP API port
"hbd_host": "", # Bind address (empty = all interfaces)
# Persistence
"pickfile": os.path.join(os.path.expanduser("~"), ".hb.pick"), # File to store host state between restarts
"pidfile": os.path.join(os.path.expanduser("~"), ".hb.pid"), # PID file for stop/restart/reload
# Logging
"logfile": os.path.join(os.path.expanduser("~"), ".hb.log"),
# Notification channels
"notification_channels": {}, # Named channels with type and credentials
"base_url": "", # Base URL for notification links (e.g. https://hbd.example.com)
# Monitoring settings
"interval": 20, # Expected heartbeat interval (for server checks)
"grace": 2, # Grace period (extra seconds before notifying after a missed heartbeat)
"threshold_renotify_interval": 3600, # Seconds between threshold re-notifications
# User management
"users": {}, # username -> {full_name, avatar, password, admin, notification_channels}
"default_owner": None, # Username that owns hosts with no explicit owner
# OAuth2 providers
"oauth": {}, # oauth.gitea.{url,client_id,client_secret}
# Host management
"hosts": {}, # Unified host definitions
"dyndnshosts": [], # Hosts with dynamic DNS (legacy)
"drophosts": [], # Hosts to ignore
"dyndomains": ["wrede.org"],
# DNS updates
"nsupdate_bin": "/usr/bin/nsupdate",
# WebSocket settings
"ws_port": 50005,
"wss_port": None,
"cert_path": "/usr/local/etc/ssl/",
"wss_pem": "fullchain.pem",
"wss_key": "privkey.pem",
# Message journal configuration
"journal_enabled": True,
"journal_dir": "/var/log/heartbeat",
"journal_file": "messages.journal",
"journal_max_size": 100 * 1024 * 1024, # 100MB
"journal_max_backups": 10,
# Runtime flags
"foreground": False,
"verbose": False,
"debug": 0,
# Plugin/threshold configs (for clients reporting to this server)
"plugins": {},
"thresholds": {},
}
THRESHOLD_DEFAULTS = {
'thresholds': {
'cpu_monitor': {
'cpu_percent': {
'warning': 80.0,
'critical': 90.0
}
},
'memory_monitor': {
'memory_percent': {
'warning': 85.0,
'critical': 95.0
},
'swap_percent': {
'warning': 40.0,
'critical': 75.0
}
},
'disk_monitor': {
'partitions': {
'/': {
'percent': {
'warning': 85.0,
'critical': 90.0
}
}
}
},
'rtt': {
'warning': 200,
'critical': 250.0,
'count': 3 # Optional: number of consecutive breaches before alerting
},
'nagios_runner': {
'status_code': {
'display': '{check_name} {output}',
'operator': "nagios"
}
},
'zfs_monitor': {
'pools': {
'*': {
'status': {
'warning': 1,
'critical': 2,
'operator': '>',
'hysteresis': 0.0,
'display': 'ZFS pool {pool_name} is {health}'
},
'capacity': {
'warning': 80.0,
'critical': 90.0,
}
}
}
},
}
}
def load_config(path=None):
"""Load configuration from a YAML file and merge with server defaults.
If YAML is not available or the file does not exist, defaults are returned.
Args:
path: Path to YAML config file (default: ~/.hb.yaml)
Returns:
Dictionary with configuration
"""
cfg = SERVER_DEFAULTS.copy()
if not path:
# default path (~/.hb.yaml)
path = os.path.join(os.path.expanduser("~"), ".hb.yaml")
if os.path.exists(path):
if yaml:
with open(path) as fh:
data = yaml.safe_load(fh)
# Merge YAML data with defaults
# Keep all keys from YAML to support plugin configs and future extensions
for k, v in data.items():
cfg[k] = v
else:
# yaml not installed: do not attempt to parse; user must ensure defaults
pass
return cfg
class ReloadableConfig:
"""Thread-safe/async-safe configuration wrapper that supports runtime reloading.
This class wraps the configuration dictionary and provides:
- Thread-safe config reloading via SIGHUP
- Backward-compatible dict-like access
- Async lock to prevent concurrent reloads
"""
def __init__(self, initial_config, config_path=None):
"""Initialize with initial configuration.
Args:
initial_config: Initial configuration dictionary
config_path: Path to config file for reloading (optional)
"""
self._config = initial_config
self._config_path = config_path
self._lock = asyncio.Lock()
self._logger = logging.getLogger(__name__)
async def reload(self, config_path=None):
"""Reload configuration from file.
Args:
config_path: Path to config file (uses stored path if not provided)
Returns:
New configuration dictionary
Raises:
Exception if reload fails (keeps existing config)
"""
path = config_path or self._config_path
if not path:
raise ValueError("No config path specified for reload")
async with self._lock:
try:
# Load new config
new_config = load_config(path)
# Store old config for rollback if needed
old_config = self._config
# Update config
self._config = new_config
self._logger.info(f"Configuration reloaded from {path}")
return new_config
except Exception as e:
self._logger.error(f"Failed to reload config from {path}: {e}", exc_info=True)
# Keep existing config on error
raise
def get(self, key, default=None):
"""Get a config value (dict-compatible)."""
return self._config.get(key, default)
def __getitem__(self, key):
"""Get a config value via subscript (dict-compatible)."""
return self._config[key]
def __contains__(self, key):
"""Check if key exists (dict-compatible)."""
return key in self._config
def keys(self):
"""Return config keys (dict-compatible)."""
return self._config.keys()
def items(self):
"""Return config items (dict-compatible)."""
return self._config.items()
def values(self):
"""Return config values (dict-compatible)."""
return self._config.values()
@property
def config(self):
"""Get the underlying config dict (for components that need full dict)."""
return self._config
def get_watchhosts(config):
"""Extract watched hostnames from config (hosts with watch: true).
Returns:
List of hostnames to watch
"""
watchhosts = []
hosts_config = config.get("hosts", {})
if isinstance(hosts_config, dict):
for host_name, host_attrs in hosts_config.items():
if isinstance(host_attrs, dict) and host_attrs.get("watch", True):
watchhosts.append(host_name)
return watchhosts
def get_dyndnshosts(config):
"""Extract dyndnshosts from config, supporting both new and legacy formats.
Args:
config: Configuration dictionary
Returns:
List of hostnames with dynamic DNS
"""
dyndnshosts = []
# New format: hosts section with dyndns attribute
if "hosts" in config:
hosts_config = config["hosts"]
if isinstance(hosts_config, dict):
for host_name, host_attrs in hosts_config.items():
if isinstance(host_attrs, dict) and host_attrs.get("dyndns", False):
dyndnshosts.append(host_name)
# Legacy format: dyndnshosts list/set
if "dyndnshosts" in config:
legacy_dyndnshosts = config.get("dyndnshosts", [])
if isinstance(legacy_dyndnshosts, (list, set)):
dyndnshosts.extend(legacy_dyndnshosts)
return list(set(dyndnshosts)) # Remove duplicates
def get_host_config(config, hostname):
"""Get configuration for a specific host from the hosts section.
Returns:
Dictionary with host attributes or empty dict
"""
hosts_config = config.get("hosts", {})
if isinstance(hosts_config, dict) and hostname in hosts_config:
val = hosts_config[hostname]
return val if isinstance(val, dict) else {}
return {}
# ---------------------------------------------------------------------------
# User / host-access helpers
# ---------------------------------------------------------------------------
def get_default_owner(config) -> str | None:
"""Return the configured default_owner username, or the first admin user, or None."""
explicit = config.get("default_owner")
if explicit:
return explicit
# Fall back to first admin user found in config
users_cfg = config.get("users", {})
if isinstance(users_cfg, dict):
for username, attrs in users_cfg.items():
if isinstance(attrs, dict) and attrs.get("admin", False):
return username
return None
def get_host_access(config, hostname) -> dict:
"""Return the access dict for *hostname*: owner, managers, monitors.
Falls back to default_owner for hosts without an explicit owner.
Returns:
{
"owner": str | None,
"managers": list[str],
"monitors": list[str],
}
"""
host_cfg = get_host_config(config, hostname)
owner = host_cfg.get("owner") # or get_default_owner(config)
managers = host_cfg.get("managers", [])
if isinstance(managers, str):
managers = [managers]
monitors = host_cfg.get("monitors", [])
if isinstance(monitors, str):
monitors = [monitors]
return {
"owner": owner,
"managers": list(managers),
"monitors": list(monitors),
}