500d256d76
Notification channels are now managed through a proper web form instead
of a raw YAML textarea. Any authenticated user can create channels; private
channels (owner-scoped) are hidden from other users. The user profile
channel selector becomes a tag/chip picker with a "My Channels" CRUD section.
- settings.py: add CHANNEL_TYPE_SCHEMAS for all 6 notifier types; channel
section switches to section_mode="channels"; cards include owner/private/min_level
- configio.py: add apply_channel() and delete_channel() for per-entry CRUD
- notify.py: strip owner/private metadata before dispatching to drivers
- http.py: add GET/POST /api/0/notification_channels, PUT/DELETE /{name},
GET /api/0/notification_channel_types; visibility helper filters private
channels per user; PUT /api/0/users/me validates against visible channels
- settings.html: card grid with edit/delete per channel; add/edit modal
with type dropdown and dynamically rendered type-specific fields
- profile.html: chip picker replaces checkbox list; My Channels section
for creating/editing/deleting user-owned channels
- tests: update test_settings_sections, test_http_users_me; add
test_notification_channels_api (16 new tests, 46 total passing)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
128 lines
4.2 KiB
Python
128 lines
4.2 KiB
Python
"""YAML round-trip read/write for .hb.yaml, with backup and atomic writes."""
|
|
|
|
import glob
|
|
import os
|
|
import threading
|
|
from datetime import datetime
|
|
|
|
from ruamel.yaml import YAML
|
|
|
|
_write_lock = threading.Lock()
|
|
|
|
|
|
def _make_yaml() -> YAML:
|
|
y = YAML()
|
|
y.preserve_quotes = True
|
|
return y
|
|
|
|
# Top-level keys managed by the 'server' logical section
|
|
_SERVER_KEYS = [
|
|
"hbd_port", "hbd_host", "ws_port", "wss_port", "hb_port",
|
|
"interval", "grace", "base_url", "threshold_renotify_interval",
|
|
"logfile", "pidfile", "pickfile", "journal_enabled", "journal_dir",
|
|
"journal_max_size", "journal_max_backups", "default_owner",
|
|
]
|
|
|
|
# Top-level keys managed by the 'dns' logical section
|
|
_DNS_KEYS = ["nsupdate_bin", "dyndomains", "dyndnshosts", "drophosts"]
|
|
|
|
|
|
def read_roundtrip(path: str):
|
|
"""Load .hb.yaml with ruamel.yaml, preserving comments and ordering."""
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return _make_yaml().load(f)
|
|
|
|
|
|
def write_config(path: str, data) -> None:
|
|
"""Backup current file then atomically write data.
|
|
|
|
Backup naming: {path}.bak.YYYYMMDD-HHMMSS
|
|
Rotation: keep the 10 most recent backups, delete older ones.
|
|
Atomic write: write to {path}.tmp then os.replace({path}.tmp, path).
|
|
Acquires _write_lock for the full backup+write sequence.
|
|
"""
|
|
with _write_lock:
|
|
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
backup_path = f"{path}.bak.{ts}"
|
|
n = 0
|
|
while os.path.exists(backup_path):
|
|
n += 1
|
|
backup_path = f"{path}.bak.{ts}-{n}"
|
|
orig_mode = None
|
|
if os.path.exists(path):
|
|
orig_mode = os.stat(path).st_mode
|
|
with open(path, "rb") as src, open(backup_path, "wb") as dst:
|
|
dst.write(src.read())
|
|
os.chmod(backup_path, orig_mode)
|
|
backups = sorted(glob.glob(f"{path}.bak.*"), reverse=True)
|
|
for old in backups[10:]:
|
|
os.unlink(old)
|
|
tmp = f"{path}.tmp"
|
|
try:
|
|
with open(tmp, "w", encoding="utf-8") as f:
|
|
_make_yaml().dump(data, f)
|
|
if orig_mode is not None:
|
|
os.chmod(tmp, orig_mode)
|
|
os.replace(tmp, path)
|
|
except Exception:
|
|
try:
|
|
os.unlink(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
|
|
def list_backups(path: str) -> list:
|
|
"""Return backup paths sorted newest-first."""
|
|
return sorted(glob.glob(f"{path}.bak.*"), reverse=True)
|
|
|
|
|
|
def apply_structured_section(data, section: str, values: dict) -> None:
|
|
"""Merge a dict of scalar/list values into data for the named logical section.
|
|
|
|
For 'server': updates each known key individually, preserving comments on
|
|
unchanged keys. For 'users': replaces the entire users dict.
|
|
"""
|
|
if section == "server":
|
|
for key in _SERVER_KEYS:
|
|
if key in values:
|
|
data[key] = values[key]
|
|
elif section == "users":
|
|
data["users"] = values
|
|
else:
|
|
raise ValueError(f"Unknown structured section: {section!r}")
|
|
|
|
|
|
def apply_channel(data, name: str, channel_cfg: dict) -> None:
|
|
"""Insert or replace a single notification channel entry, preserving others."""
|
|
if not data.get("notification_channels"):
|
|
data["notification_channels"] = {}
|
|
data["notification_channels"][name] = channel_cfg
|
|
|
|
|
|
def delete_channel(data, name: str) -> None:
|
|
"""Remove a notification channel by name. No-op if not found."""
|
|
nc = data.get("notification_channels") or {}
|
|
nc.pop(name, None)
|
|
|
|
|
|
def apply_yaml_section(data, section: str, yaml_text: str) -> None:
|
|
"""Replace the named logical section by parsing yaml_text."""
|
|
parsed = _make_yaml().load(yaml_text)
|
|
if section == "notification_channels":
|
|
data["notification_channels"] = parsed
|
|
elif section == "thresholds":
|
|
data["threshold_configs"] = parsed
|
|
elif section == "hosts":
|
|
data["hosts"] = parsed
|
|
elif section == "dns":
|
|
if parsed:
|
|
for key in _DNS_KEYS:
|
|
if key in parsed:
|
|
data[key] = parsed[key]
|
|
else:
|
|
for key in _DNS_KEYS:
|
|
data.pop(key, None)
|
|
else:
|
|
raise ValueError(f"Unknown YAML section: {section!r}")
|