diff --git a/hbd/server/http.py b/hbd/server/http.py index e6d0525..542bfda 100644 --- a/hbd/server/http.py +++ b/hbd/server/http.py @@ -1089,6 +1089,9 @@ async def start( except Exception: return web.json_response({"error": "Invalid JSON"}, status=400) + if not isinstance(payload, dict): + return web.json_response({"error": "Invalid JSON"}, status=400) + try: data = configio_mod.read_roundtrip(_config_path) @@ -1112,7 +1115,17 @@ async def start( configio_mod.apply_structured_section(data, "users", users_payload) if "oauth" in payload: - data["oauth"] = payload["oauth"] + existing_oauth = data.get("oauth") or {} + new_oauth = payload["oauth"] + for name, attrs in new_oauth.items(): + cs = attrs.get("client_secret", "") + if not cs or cs == "•••": + existing_cs = (existing_oauth.get(name) or {}).get("client_secret", "") + if existing_cs: + attrs["client_secret"] = existing_cs + else: + attrs.pop("client_secret", None) + data["oauth"] = new_oauth for section in ("notification_channels", "thresholds", "hosts", "dns"): if section in payload: @@ -1144,8 +1157,7 @@ async def start( return web.json_response({"error": "Invalid JSON"}, status=400) backup = body.get("backup", "") - expected_prefix = _config_path + ".bak." - if not backup or not backup.startswith(expected_prefix) or not os.path.exists(backup): + if not backup or backup not in configio_mod.list_backups(_config_path): return web.json_response({"error": "Invalid or missing backup"}, status=400) try: diff --git a/tests/test_http_config_api.py b/tests/test_http_config_api.py index 21bce09..2a0d82c 100644 --- a/tests/test_http_config_api.py +++ b/tests/test_http_config_api.py @@ -111,3 +111,63 @@ def test_rollback_restores_backup(tmp_path): configio.write_config(str(cfg), backup_data) restored = configio.read_roundtrip(str(cfg)) assert restored["interval"] == 20 + + +def test_write_path_preserves_masked_password(tmp_path): + """The "•••" sentinel must preserve the existing hash, not write "•••" to disk.""" + cfg = tmp_path / ".hb.yaml" + original_hash = "pbkdf2:sha256:original_hash" + cfg.write_text( + f"hbd_port: 50004\nusers:\n alice:\n full_name: Alice\n admin: true\n password: {original_hash}\n" + ) + from hbd.server import configio + from hbd.server import users as users_mod + data = configio.read_roundtrip(str(cfg)) + # Simulate what api_config_post does when client sends "•••" back + existing_users = data.get("users") or {} + users_payload = {"alice": {"full_name": "Alice", "admin": True, "password": "•••"}} + for username, attrs in users_payload.items(): + pw = attrs.get("password", "") + if pw and pw != "•••" and not pw.startswith("pbkdf2:"): + attrs["password"] = users_mod.hash_password(pw) + elif not pw or pw == "•••": + existing_hash = (existing_users.get(username) or {}).get("password", "") + if existing_hash: + attrs["password"] = existing_hash + else: + attrs.pop("password", None) + configio.apply_structured_section(data, "users", users_payload) + configio.write_config(str(cfg), data) + data2 = configio.read_roundtrip(str(cfg)) + assert data2["users"]["alice"]["password"] == original_hash, ( + f"Expected original hash preserved, got: {data2['users']['alice']['password']!r}" + ) + + +def test_write_path_preserves_oauth_client_secret(tmp_path): + """The "•••" sentinel for oauth client_secret must preserve the existing secret.""" + cfg = tmp_path / ".hb.yaml" + original_secret = "real_client_secret_value" + cfg.write_text( + f"hbd_port: 50004\noauth:\n gitea:\n type: gitea\n url: https://git.example.com\n" + f" client_id: cid123\n client_secret: {original_secret}\n" + ) + from hbd.server import configio + data = configio.read_roundtrip(str(cfg)) + # Simulate what api_config_post does when client sends "•••" back for client_secret + existing_oauth = data.get("oauth") or {} + new_oauth = {"gitea": {"type": "gitea", "url": "https://git.example.com", "client_id": "cid123", "client_secret": "•••"}} + for name, attrs in new_oauth.items(): + cs = attrs.get("client_secret", "") + if not cs or cs == "•••": + existing_cs = (existing_oauth.get(name) or {}).get("client_secret", "") + if existing_cs: + attrs["client_secret"] = existing_cs + else: + attrs.pop("client_secret", None) + data["oauth"] = new_oauth + configio.write_config(str(cfg), data) + data2 = configio.read_roundtrip(str(cfg)) + assert data2["oauth"]["gitea"]["client_secret"] == original_secret, ( + f"Expected original secret preserved, got: {data2['oauth']['gitea']['client_secret']!r}" + )