From 18769afd379b0a49a8c0b377b1fc049728ee5f77 Mon Sep 17 00:00:00 2001 From: Andreas Wrede Date: Sat, 9 May 2026 11:25:06 -0400 Subject: [PATCH] feat: add config read API (GET /api/0/config, /section/{name}, /backups) Co-Authored-By: Claude Sonnet 4.6 --- hbd/server/http.py | 93 +++++++++++++++++++++++++++++++++++ tests/test_http_config_api.py | 49 ++++++++++++++++++ 2 files changed, 142 insertions(+) create mode 100644 tests/test_http_config_api.py diff --git a/hbd/server/http.py b/hbd/server/http.py index a95b087..4af214a 100644 --- a/hbd/server/http.py +++ b/hbd/server/http.py @@ -19,6 +19,7 @@ from . import settings as settings_mod from . import users as users_mod from . import oauth as oauth_mod from . import ws as ws_mod +from . import configio as configio_mod logger = logging.getLogger(__name__) @@ -101,6 +102,36 @@ def _can_own_host(user, host) -> bool: return host.is_owner(user.username) +def _mask_config_for_api(config) -> dict: + """Return a JSON-serializable config dict with secrets masked.""" + _SERVER_KEYS = [ + "hbd_port", "hbd_host", "ws_port", "wss_port", "hb_port", + "interval", "grace", "base_url", "threshold_renotify_interval", + "logfile", "pidfile", "pickfile", "journal_enabled", "journal_dir", + "journal_max_size", "journal_max_backups", "default_owner", + ] + result = {} + result["server"] = {k: config.get(k) for k in _SERVER_KEYS} + + users = {} + for username, attrs in (config.get("users") or {}).items(): + u = dict(attrs) + if "password" in u: + u["password"] = "•••" + users[username] = u + result["users"] = users + + oauth = {} + for name, attrs in (config.get("oauth") or {}).items(): + o = dict(attrs) + if "client_secret" in o: + o["client_secret"] = "•••" + oauth[name] = o + result["oauth"] = oauth + + return result + + async def start( host: str, port: int, @@ -988,6 +1019,64 @@ async def start( ) raise resp + # ------------------------------------------------------------------------- + # Config API (admin only) + # ------------------------------------------------------------------------- + + _config_path = getattr(config, "_config_path", "") or "" + + async def api_config_get(request): + """GET /api/0/config — full config as JSON, secrets masked. Admin only.""" + user, err = _require_auth(request) + if err: + return err + if user and not user.admin: + return web.json_response({"error": "Admin only"}, status=403) + return web.json_response(_mask_config_for_api(config)) + + async def api_config_section_get(request): + """GET /api/0/config/section/{name} — raw YAML text for a YAML-editor section.""" + user, err = _require_auth(request) + if err: + return err + if user and not user.admin: + return web.json_response({"error": "Admin only"}, status=403) + if not _config_path: + return web.json_response({"error": "Config path not available"}, status=503) + + name = request.match_info["name"] + _DNS_KEYS = ["nsupdate_bin", "dyndomains", "dyndnshosts", "drophosts"] + _YAML_EXTRACTORS = { + "notification_channels": lambda d: d.get("notification_channels") or {}, + "thresholds": lambda d: d.get("threshold_configs") or {}, + "hosts": lambda d: d.get("hosts") or {}, + "dns": lambda d: {k: d[k] for k in _DNS_KEYS if k in d}, + } + if name not in _YAML_EXTRACTORS: + return web.json_response({"error": "Unknown section"}, status=404) + + import io as _io + from ruamel.yaml import YAML as _YAML + data = configio_mod.read_roundtrip(_config_path) + section_data = _YAML_EXTRACTORS[name](data) + _sy = _YAML() + _sy.preserve_quotes = True + buf = _io.StringIO() + _sy.dump(section_data, buf) + return web.json_response({"yaml": buf.getvalue()}) + + async def api_config_backups_get(request): + """GET /api/0/config/backups — list of backup paths, newest first.""" + user, err = _require_auth(request) + if err: + return err + if user and not user.admin: + return web.json_response({"error": "Admin only"}, status=403) + if not _config_path: + return web.json_response({"backups": []}) + backups = configio_mod.list_backups(_config_path) + return web.json_response({"backups": backups}) + app = web.Application() app.add_routes( [ @@ -1005,6 +1094,10 @@ async def start( web.get("/api/0/users", api_users), web.get("/api/0/users/me", api_user_self), web.get("/api/0/users/{username}/avatar", api_user_avatar), + # Config API (admin) + web.get("/api/0/config", api_config_get), + web.get("/api/0/config/section/{name}", api_config_section_get), + web.get("/api/0/config/backups", api_config_backups_get), # Hosts web.get("/api/0/hosts", api_hosts), web.get("/api/0/alert_summary", api_alert_summary), diff --git a/tests/test_http_config_api.py b/tests/test_http_config_api.py new file mode 100644 index 0000000..a035e36 --- /dev/null +++ b/tests/test_http_config_api.py @@ -0,0 +1,49 @@ +"""Tests for the config read/write API helpers in http.py.""" +import pytest +from hbd.server import http + + +def test_mask_config_for_api_masks_user_passwords(): + config = { + "hbd_port": 50004, + "interval": 20, + "users": { + "alice": {"full_name": "Alice", "admin": True, "password": "pbkdf2:sha256:abc"}, + }, + "oauth": {}, + } + result = http._mask_config_for_api(config) + assert result["users"]["alice"]["password"] == "•••" + assert result["users"]["alice"]["full_name"] == "Alice" + + +def test_mask_config_for_api_masks_oauth_client_secret(): + config = { + "hbd_port": 50004, + "interval": 20, + "users": {}, + "oauth": { + "gitea": {"type": "gitea", "url": "https://git.example.com", + "client_id": "cid", "client_secret": "verysecret"}, + }, + } + result = http._mask_config_for_api(config) + assert result["oauth"]["gitea"]["client_secret"] == "•••" + assert result["oauth"]["gitea"]["client_id"] == "cid" + + +def test_mask_config_for_api_includes_server_keys(): + config = {"hbd_port": 50004, "interval": 20, "users": {}, "oauth": {}} + result = http._mask_config_for_api(config) + assert result["server"]["hbd_port"] == 50004 + assert result["server"]["interval"] == 20 + + +def test_mask_config_for_api_no_password_in_users_leaves_no_key(): + config = { + "hbd_port": 50004, + "users": {"bob": {"full_name": "Bob", "admin": False}}, + "oauth": {}, + } + result = http._mask_config_for_api(config) + assert "password" not in result["users"]["bob"]