"""YAML round-trip read/write for .hb.yaml, with backup and atomic writes.""" import glob import os import threading from datetime import datetime from ruamel.yaml import YAML _write_lock = threading.Lock() def _make_yaml() -> YAML: y = YAML() y.preserve_quotes = True return y # Top-level keys managed by the 'server' logical section _SERVER_KEYS = [ "hbd_port", "hbd_host", "ws_port", "wss_port", "hb_port", "interval", "grace", "base_url", "threshold_renotify_interval", "logfile", "pidfile", "pickfile", "journal_enabled", "journal_dir", "journal_max_size", "journal_max_backups", "default_owner", "default_threshold_config", ] # Top-level keys managed by the 'dns' logical section _DNS_KEYS = ["nsupdate_bin", "dyndomains", "dyndnshosts", "drophosts"] def read_roundtrip(path: str): """Load .hb.yaml with ruamel.yaml, preserving comments and ordering.""" with open(path, "r", encoding="utf-8") as f: return _make_yaml().load(f) def write_config(path: str, data) -> None: """Backup current file then atomically write data. Backup naming: {path}.bak.YYYYMMDD-HHMMSS Rotation: keep the 10 most recent backups, delete older ones. Atomic write: write to {path}.tmp then os.replace({path}.tmp, path). Acquires _write_lock for the full backup+write sequence. """ with _write_lock: ts = datetime.now().strftime("%Y%m%d-%H%M%S") backup_path = f"{path}.bak.{ts}" n = 0 while os.path.exists(backup_path): n += 1 backup_path = f"{path}.bak.{ts}-{n}" orig_mode = None if os.path.exists(path): orig_mode = os.stat(path).st_mode with open(path, "rb") as src, open(backup_path, "wb") as dst: dst.write(src.read()) os.chmod(backup_path, orig_mode) backups = sorted(glob.glob(f"{path}.bak.*"), reverse=True) for old in backups[10:]: os.unlink(old) tmp = f"{path}.tmp" try: with open(tmp, "w", encoding="utf-8") as f: _make_yaml().dump(data, f) if orig_mode is not None: os.chmod(tmp, orig_mode) os.replace(tmp, path) except Exception: try: os.unlink(tmp) except OSError: pass raise def list_backups(path: str) -> list: """Return backup paths sorted newest-first.""" return sorted(glob.glob(f"{path}.bak.*"), reverse=True) def apply_structured_section(data, section: str, values: dict) -> None: """Merge a dict of scalar/list values into data for the named logical section. For 'server': updates each known key individually, preserving comments on unchanged keys. For 'users': replaces the entire users dict. """ if section == "server": for key in _SERVER_KEYS: if key in values: data[key] = values[key] elif section == "users": data["users"] = values elif section == "hosts": data["hosts"] = values else: raise ValueError(f"Unknown structured section: {section!r}") def apply_channel(data, name: str, channel_cfg: dict) -> None: """Insert or replace a single notification channel entry, preserving others.""" if not data.get("notification_channels"): data["notification_channels"] = {} data["notification_channels"][name] = channel_cfg def delete_channel(data, name: str) -> None: """Remove a notification channel by name. No-op if not found.""" nc = data.get("notification_channels") or {} nc.pop(name, None) def apply_yaml_section(data, section: str, yaml_text: str) -> None: """Replace the named logical section by parsing yaml_text.""" parsed = _make_yaml().load(yaml_text) if section == "notification_channels": data["notification_channels"] = parsed elif section == "thresholds": data["threshold_configs"] = parsed elif section == "hosts": data["hosts"] = parsed elif section == "dns": if parsed: for key in _DNS_KEYS: if key in parsed: data[key] = parsed[key] else: for key in _DNS_KEYS: data.pop(key, None) else: raise ValueError(f"Unknown YAML section: {section!r}")