"""Configuration loader and defaults for hbd (HeartBeat Daemon/Server).""" import asyncio import logging import os try: import yaml except Exception: yaml = None SERVER_DEFAULTS = { # Network settings "hb_port": 50003, # Port to listen for heartbeats "hbd_port": 50004, # HTTP API port "hbd_host": "", # Bind address (empty = all interfaces) # Persistence "pickfile": "/tmp/hb.pick", # Logging "logfile": "/var/log/heartbeat.log", "logfmt": "text", # text or msg or json # Notification channels "notification_channels": {}, # Named channels with type and credentials "default_notification_channels": [], # Default channels if host doesn't specify # Monitoring settings "interval": 20, # Expected heartbeat interval (for server checks) "grace": 2, # Grace multiplier (interval * grace = timeout) "threshold_renotify_interval": 3600, # Seconds between threshold re-notifications # User management "users": {}, # username -> {full_name, avatar, password, admin, notification_channels} "default_owner": None, # Username that owns hosts with no explicit owner # Host management "hosts": {}, # New unified host definitions (optional) "watchhosts": [], # Hosts to monitor and notify about (legacy) "dyndnshosts": [], # Hosts with dynamic DNS (legacy) "drophosts": [], # Hosts to ignore "dyndomains": ["wrede.org"], # DNS updates "nsupdate_bin": "/usr/bin/nsupdate", # WebSocket settings "ws_port": 50005, "wss_port": None, "cert_path": "/usr/local/etc/ssl/", "wss_pem": "fullchain.pem", "wss_key": "privkey.pem", # Message journal configuration "journal_enabled": True, "journal_dir": "/var/log/heartbeat", "journal_file": "messages.journal", "journal_max_size": 100 * 1024 * 1024, # 100MB "journal_max_backups": 10, # Runtime flags "foreground": False, "verbose": False, "debug": 0, # Plugin/threshold configs (for clients reporting to this server) "plugins": {}, "thresholds": {}, } THRESHOLD_DEFAULTS = { 'thresholds': { 'cpu_monitor': { 'cpu_percent': { 'warning': 80.0, 'critical': 90.0 } }, 'memory_monitor': { 'percent': { 'warning': 85.0, 'critical': 95.0 } }, 'disk_monitor': { 'partitions': { '/': { 'percent': { 'warning': 85.0, 'critical': 90.0 } } } }, 'rtt': { 'warning': 200, 'critical': 250.0 'count': 3 # Optional: number of consecutive breaches before alerting } } } def load_config(path=None): """Load configuration from a YAML file and merge with server defaults. If YAML is not available or the file does not exist, defaults are returned. Args: path: Path to YAML config file (default: ~/.hb.yaml) Returns: Dictionary with configuration """ cfg = SERVER_DEFAULTS.copy() if not path: # default path (~/.hb.yaml) path = os.path.join(os.path.expanduser("~"), ".hb.yaml") if os.path.exists(path): if yaml: with open(path) as fh: data = yaml.safe_load(fh) # Merge YAML data with defaults # Keep all keys from YAML to support plugin configs and future extensions for k, v in data.items(): cfg[k] = v else: # yaml not installed: do not attempt to parse; user must ensure defaults pass return cfg class ReloadableConfig: """Thread-safe/async-safe configuration wrapper that supports runtime reloading. This class wraps the configuration dictionary and provides: - Thread-safe config reloading via SIGHUP - Backward-compatible dict-like access - Async lock to prevent concurrent reloads """ def __init__(self, initial_config, config_path=None): """Initialize with initial configuration. Args: initial_config: Initial configuration dictionary config_path: Path to config file for reloading (optional) """ self._config = initial_config self._config_path = config_path self._lock = asyncio.Lock() self._logger = logging.getLogger(__name__) async def reload(self, config_path=None): """Reload configuration from file. Args: config_path: Path to config file (uses stored path if not provided) Returns: New configuration dictionary Raises: Exception if reload fails (keeps existing config) """ path = config_path or self._config_path if not path: raise ValueError("No config path specified for reload") async with self._lock: try: # Load new config new_config = load_config(path) # Store old config for rollback if needed old_config = self._config # Update config self._config = new_config self._logger.info(f"Configuration reloaded from {path}") return new_config except Exception as e: self._logger.error(f"Failed to reload config from {path}: {e}", exc_info=True) # Keep existing config on error raise def get(self, key, default=None): """Get a config value (dict-compatible).""" return self._config.get(key, default) def __getitem__(self, key): """Get a config value via subscript (dict-compatible).""" return self._config[key] def __contains__(self, key): """Check if key exists (dict-compatible).""" return key in self._config def keys(self): """Return config keys (dict-compatible).""" return self._config.keys() def items(self): """Return config items (dict-compatible).""" return self._config.items() def values(self): """Return config values (dict-compatible).""" return self._config.values() @property def config(self): """Get the underlying config dict (for components that need full dict).""" return self._config def get_watchhosts(config): """Extract watchhosts from config, supporting both new and legacy formats. Args: config: Configuration dictionary Returns: List of hostnames to watch """ watchhosts = [] # New format: hosts section with watch attribute if "hosts" in config: hosts_config = config["hosts"] if isinstance(hosts_config, dict): for host_name, host_attrs in hosts_config.items(): if isinstance(host_attrs, dict) and host_attrs.get("watch", False): watchhosts.append(host_name) # Legacy format: watchhosts list if "watchhosts" in config: legacy_watchhosts = config.get("watchhosts", []) if isinstance(legacy_watchhosts, (list, set)): watchhosts.extend(legacy_watchhosts) elif isinstance(legacy_watchhosts, dict): # Old dict format: {"host1": {attrs}, "host2": {attrs}} watchhosts.extend(legacy_watchhosts.keys()) return list(set(watchhosts)) # Remove duplicates def get_dyndnshosts(config): """Extract dyndnshosts from config, supporting both new and legacy formats. Args: config: Configuration dictionary Returns: List of hostnames with dynamic DNS """ dyndnshosts = [] # New format: hosts section with dyndns attribute if "hosts" in config: hosts_config = config["hosts"] if isinstance(hosts_config, dict): for host_name, host_attrs in hosts_config.items(): if isinstance(host_attrs, dict) and host_attrs.get("dyndns", False): dyndnshosts.append(host_name) # Legacy format: dyndnshosts list/set if "dyndnshosts" in config: legacy_dyndnshosts = config.get("dyndnshosts", []) if isinstance(legacy_dyndnshosts, (list, set)): dyndnshosts.extend(legacy_dyndnshosts) return list(set(dyndnshosts)) # Remove duplicates def get_host_config(config, hostname): """Get configuration for a specific host. Args: config: Configuration dictionary hostname: Host name Returns: Dictionary with host attributes or empty dict """ if "hosts" in config: hosts_config = config.get("hosts", {}) if isinstance(hosts_config, dict) and hostname in hosts_config: return hosts_config[hostname] if isinstance(hosts_config[hostname], dict) else {} # Check legacy watchhosts for notification settings if "watchhosts" in config: watchhosts = config.get("watchhosts", {}) if isinstance(watchhosts, dict) and hostname in watchhosts: legacy_attrs = watchhosts[hostname] if isinstance(legacy_attrs, dict): # Convert legacy format to new format return { "watch": True, "notify": legacy_attrs.get("notify"), "notify_src": legacy_attrs.get("src"), } return {} def get_notification_channels_for_host(config, hostname): """Get notification channels configured for a specific host. Args: config: Configuration dictionary hostname: Host name Returns: List of channel names to use for this host """ host_config = get_host_config(config, hostname) # Check if host specifies notification channels channels = host_config.get("notification_channels", []) if channels: if isinstance(channels, str): return [channels] elif isinstance(channels, list): return channels # Fall back to default channels default_channels = config.get("default_notification_channels", []) if default_channels: if isinstance(default_channels, str): return [default_channels] elif isinstance(default_channels, list): return default_channels # No channels configured, return empty list (will use legacy global config) return [] def get_channel_config(config, channel_name): """Get configuration for a specific notification channel. Args: config: Configuration dictionary channel_name: Name of the notification channel Returns: Dictionary with channel configuration or None if not found """ channels = config.get("notification_channels", {}) if isinstance(channels, dict) and channel_name in channels: return channels[channel_name] return None def get_notification_channels_config(config, hostname): """Get list of notification channel configurations for a host. Args: config: Configuration dictionary hostname: Host name Returns: List of (channel_name, channel_config) tuples """ channel_names = get_notification_channels_for_host(config, hostname) channels = [] for channel_name in channel_names: channel_config = get_channel_config(config, channel_name) if channel_config and channel_config.get("type"): channels.append((channel_name, channel_config)) return channels # --------------------------------------------------------------------------- # User / host-access helpers # --------------------------------------------------------------------------- def get_default_owner(config) -> str | None: """Return the configured default_owner username, or the first admin user, or None.""" explicit = config.get("default_owner") if explicit: return explicit # Fall back to first admin user found in config users_cfg = config.get("users", {}) if isinstance(users_cfg, dict): for username, attrs in users_cfg.items(): if isinstance(attrs, dict) and attrs.get("admin", False): return username return None def get_host_access(config, hostname) -> dict: """Return the access dict for *hostname*: owner, managers, monitors. Falls back to default_owner for hosts without an explicit owner. Returns: { "owner": str | None, "managers": list[str], "monitors": list[str], } """ host_cfg = get_host_config(config, hostname) owner = host_cfg.get("owner") or get_default_owner(config) managers = host_cfg.get("managers", []) if isinstance(managers, str): managers = [managers] monitors = host_cfg.get("monitors", []) if isinstance(monitors, str): monitors = [monitors] return { "owner": owner, "managers": list(managers), "monitors": list(monitors), }