"""Configuration loader and defaults for hbd (HeartBeat Daemon/Server).""" import asyncio import logging import os try: import yaml except Exception: yaml = None SERVER_DEFAULTS = { # Network settings "hb_port": 50003, # Port to listen for heartbeats "hbd_port": 50004, # HTTP API port "hbd_host": "", # Bind address (empty = all interfaces) # Persistence "pickfile": os.path.join(os.path.expanduser("~"), ".hb.pick"), # File to store host state between restarts "pidfile": os.path.join(os.path.expanduser("~"), ".hb.pid"), # PID file for stop/restart/reload # Logging "logfile": os.path.join(os.path.expanduser("~"), ".hb.log"), # Notification channels "notification_channels": {}, # Named channels with type and credentials "base_url": "", # Base URL for notification links (e.g. https://hbd.example.com) # Monitoring settings "interval": 20, # Expected heartbeat interval (for server checks) "grace": 2, # Grace multiplier (interval * grace = timeout) "threshold_renotify_interval": 3600, # Seconds between threshold re-notifications # User management "users": {}, # username -> {full_name, avatar, password, admin, notification_channels} "default_owner": None, # Username that owns hosts with no explicit owner # Host management "hosts": {}, # Unified host definitions "dyndnshosts": [], # Hosts with dynamic DNS (legacy) "drophosts": [], # Hosts to ignore "dyndomains": ["wrede.org"], # DNS updates "nsupdate_bin": "/usr/bin/nsupdate", # WebSocket settings "ws_port": 50005, "wss_port": None, "cert_path": "/usr/local/etc/ssl/", "wss_pem": "fullchain.pem", "wss_key": "privkey.pem", # Message journal configuration "journal_enabled": True, "journal_dir": "/var/log/heartbeat", "journal_file": "messages.journal", "journal_max_size": 100 * 1024 * 1024, # 100MB "journal_max_backups": 10, # Runtime flags "foreground": False, "verbose": False, "debug": 0, # Plugin/threshold configs (for clients reporting to this server) "plugins": {}, "thresholds": {}, } THRESHOLD_DEFAULTS = { 'thresholds': { 'cpu_monitor': { 'cpu_percent': { 'warning': 80.0, 'critical': 90.0 } }, 'memory_monitor': { 'percent': { 'warning': 85.0, 'critical': 95.0 } }, 'disk_monitor': { 'partitions': { '/': { 'percent': { 'warning': 85.0, 'critical': 90.0 } } } }, 'rtt': { 'warning': 200, 'critical': 250.0, 'count': 3 # Optional: number of consecutive breaches before alerting }, 'nagios_runner': { 'status_code': { 'display': '{check_name} {output}', 'operator': "nagios" } } } } def load_config(path=None): """Load configuration from a YAML file and merge with server defaults. If YAML is not available or the file does not exist, defaults are returned. Args: path: Path to YAML config file (default: ~/.hb.yaml) Returns: Dictionary with configuration """ cfg = SERVER_DEFAULTS.copy() if not path: # default path (~/.hb.yaml) path = os.path.join(os.path.expanduser("~"), ".hb.yaml") if os.path.exists(path): if yaml: with open(path) as fh: data = yaml.safe_load(fh) # Merge YAML data with defaults # Keep all keys from YAML to support plugin configs and future extensions for k, v in data.items(): cfg[k] = v else: # yaml not installed: do not attempt to parse; user must ensure defaults pass return cfg class ReloadableConfig: """Thread-safe/async-safe configuration wrapper that supports runtime reloading. This class wraps the configuration dictionary and provides: - Thread-safe config reloading via SIGHUP - Backward-compatible dict-like access - Async lock to prevent concurrent reloads """ def __init__(self, initial_config, config_path=None): """Initialize with initial configuration. Args: initial_config: Initial configuration dictionary config_path: Path to config file for reloading (optional) """ self._config = initial_config self._config_path = config_path self._lock = asyncio.Lock() self._logger = logging.getLogger(__name__) async def reload(self, config_path=None): """Reload configuration from file. Args: config_path: Path to config file (uses stored path if not provided) Returns: New configuration dictionary Raises: Exception if reload fails (keeps existing config) """ path = config_path or self._config_path if not path: raise ValueError("No config path specified for reload") async with self._lock: try: # Load new config new_config = load_config(path) # Store old config for rollback if needed old_config = self._config # Update config self._config = new_config self._logger.info(f"Configuration reloaded from {path}") return new_config except Exception as e: self._logger.error(f"Failed to reload config from {path}: {e}", exc_info=True) # Keep existing config on error raise def get(self, key, default=None): """Get a config value (dict-compatible).""" return self._config.get(key, default) def __getitem__(self, key): """Get a config value via subscript (dict-compatible).""" return self._config[key] def __contains__(self, key): """Check if key exists (dict-compatible).""" return key in self._config def keys(self): """Return config keys (dict-compatible).""" return self._config.keys() def items(self): """Return config items (dict-compatible).""" return self._config.items() def values(self): """Return config values (dict-compatible).""" return self._config.values() @property def config(self): """Get the underlying config dict (for components that need full dict).""" return self._config def get_watchhosts(config): """Extract watched hostnames from config (hosts with watch: true). Returns: List of hostnames to watch """ watchhosts = [] hosts_config = config.get("hosts", {}) if isinstance(hosts_config, dict): for host_name, host_attrs in hosts_config.items(): if isinstance(host_attrs, dict) and host_attrs.get("watch", True): watchhosts.append(host_name) return watchhosts def get_dyndnshosts(config): """Extract dyndnshosts from config, supporting both new and legacy formats. Args: config: Configuration dictionary Returns: List of hostnames with dynamic DNS """ dyndnshosts = [] # New format: hosts section with dyndns attribute if "hosts" in config: hosts_config = config["hosts"] if isinstance(hosts_config, dict): for host_name, host_attrs in hosts_config.items(): if isinstance(host_attrs, dict) and host_attrs.get("dyndns", False): dyndnshosts.append(host_name) # Legacy format: dyndnshosts list/set if "dyndnshosts" in config: legacy_dyndnshosts = config.get("dyndnshosts", []) if isinstance(legacy_dyndnshosts, (list, set)): dyndnshosts.extend(legacy_dyndnshosts) return list(set(dyndnshosts)) # Remove duplicates def get_host_config(config, hostname): """Get configuration for a specific host from the hosts section. Returns: Dictionary with host attributes or empty dict """ hosts_config = config.get("hosts", {}) if isinstance(hosts_config, dict) and hostname in hosts_config: val = hosts_config[hostname] return val if isinstance(val, dict) else {} return {} # --------------------------------------------------------------------------- # User / host-access helpers # --------------------------------------------------------------------------- def get_default_owner(config) -> str | None: """Return the configured default_owner username, or the first admin user, or None.""" explicit = config.get("default_owner") if explicit: return explicit # Fall back to first admin user found in config users_cfg = config.get("users", {}) if isinstance(users_cfg, dict): for username, attrs in users_cfg.items(): if isinstance(attrs, dict) and attrs.get("admin", False): return username return None def get_host_access(config, hostname) -> dict: """Return the access dict for *hostname*: owner, managers, monitors. Falls back to default_owner for hosts without an explicit owner. Returns: { "owner": str | None, "managers": list[str], "monitors": list[str], } """ host_cfg = get_host_config(config, hostname) owner = host_cfg.get("owner") # or get_default_owner(config) managers = host_cfg.get("managers", []) if isinstance(managers, str): managers = [managers] monitors = host_cfg.get("monitors", []) if isinstance(monitors, str): monitors = [monitors] return { "owner": owner, "managers": list(managers), "monitors": list(monitors), }