feat: add config write API (POST /api/0/config, POST /api/0/config/rollback)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1075,6 +1075,92 @@ async def start(
|
||||
backups = configio_mod.list_backups(_config_path)
|
||||
return web.json_response({"backups": backups})
|
||||
|
||||
async def api_config_post(request):
|
||||
"""POST /api/0/config — publish staged changes to .hb.yaml. Admin only."""
|
||||
user, err = _require_auth(request)
|
||||
if err:
|
||||
return err
|
||||
if user and not user.admin:
|
||||
return web.json_response({"error": "Forbidden"}, status=403)
|
||||
if not _config_path:
|
||||
return web.json_response({"error": "Config path not available"}, status=503)
|
||||
try:
|
||||
payload = await request.json()
|
||||
except Exception:
|
||||
return web.json_response({"error": "Invalid JSON"}, status=400)
|
||||
|
||||
try:
|
||||
data = configio_mod.read_roundtrip(_config_path)
|
||||
|
||||
if "server" in payload:
|
||||
configio_mod.apply_structured_section(data, "server", payload["server"])
|
||||
|
||||
if "users" in payload:
|
||||
# Hash any plaintext passwords; preserve existing hashes when omitted or "•••"
|
||||
existing_users = data.get("users") or {}
|
||||
users_payload = payload["users"]
|
||||
for username, attrs in users_payload.items():
|
||||
pw = attrs.get("password", "")
|
||||
if pw and pw != "•••" and not pw.startswith("pbkdf2:"):
|
||||
attrs["password"] = users_mod.hash_password(pw)
|
||||
elif not pw or pw == "•••":
|
||||
existing_hash = (existing_users.get(username) or {}).get("password", "")
|
||||
if existing_hash:
|
||||
attrs["password"] = existing_hash
|
||||
else:
|
||||
attrs.pop("password", None)
|
||||
configio_mod.apply_structured_section(data, "users", users_payload)
|
||||
|
||||
if "oauth" in payload:
|
||||
data["oauth"] = payload["oauth"]
|
||||
|
||||
for section in ("notification_channels", "thresholds", "hosts", "dns"):
|
||||
if section in payload:
|
||||
configio_mod.apply_yaml_section(data, section, payload[section])
|
||||
|
||||
configio_mod.write_config(_config_path, data)
|
||||
except Exception as exc:
|
||||
logger.error("Config write failed: %s", exc)
|
||||
return web.json_response({"error": str(exc)}, status=500)
|
||||
|
||||
if hasattr(config, "reload"):
|
||||
await config.reload()
|
||||
users_mod.load_users(config)
|
||||
|
||||
return web.json_response({"ok": True})
|
||||
|
||||
async def api_config_rollback(request):
|
||||
"""POST /api/0/config/rollback — restore a backup. Admin only."""
|
||||
user, err = _require_auth(request)
|
||||
if err:
|
||||
return err
|
||||
if user and not user.admin:
|
||||
return web.json_response({"error": "Forbidden"}, status=403)
|
||||
if not _config_path:
|
||||
return web.json_response({"error": "Config path not available"}, status=503)
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
return web.json_response({"error": "Invalid JSON"}, status=400)
|
||||
|
||||
backup = body.get("backup", "")
|
||||
expected_prefix = _config_path + ".bak."
|
||||
if not backup or not backup.startswith(expected_prefix) or not os.path.exists(backup):
|
||||
return web.json_response({"error": "Invalid or missing backup"}, status=400)
|
||||
|
||||
try:
|
||||
backup_data = configio_mod.read_roundtrip(backup)
|
||||
configio_mod.write_config(_config_path, backup_data)
|
||||
except Exception as exc:
|
||||
logger.error("Rollback failed: %s", exc)
|
||||
return web.json_response({"error": str(exc)}, status=500)
|
||||
|
||||
if hasattr(config, "reload"):
|
||||
await config.reload()
|
||||
users_mod.load_users(config)
|
||||
|
||||
return web.json_response({"ok": True})
|
||||
|
||||
app = web.Application()
|
||||
app.add_routes(
|
||||
[
|
||||
@@ -1096,6 +1182,8 @@ async def start(
|
||||
web.get("/api/0/config", api_config_get),
|
||||
web.get("/api/0/config/section/{name}", api_config_section_get),
|
||||
web.get("/api/0/config/backups", api_config_backups_get),
|
||||
web.post("/api/0/config", api_config_post),
|
||||
web.post("/api/0/config/rollback", api_config_rollback),
|
||||
# Hosts
|
||||
web.get("/api/0/hosts", api_hosts),
|
||||
web.get("/api/0/alert_summary", api_alert_summary),
|
||||
|
||||
@@ -47,3 +47,67 @@ def test_mask_config_for_api_no_password_in_users_leaves_no_key():
|
||||
}
|
||||
result = http._mask_config_for_api(config)
|
||||
assert "password" not in result["users"]["bob"]
|
||||
|
||||
|
||||
# ---- configio integration for write path ----
|
||||
|
||||
def test_write_path_applies_server_section(tmp_path):
|
||||
cfg = tmp_path / ".hb.yaml"
|
||||
cfg.write_text("hbd_port: 50004\ninterval: 20\nusers: {}\n")
|
||||
from hbd.server import configio
|
||||
data = configio.read_roundtrip(str(cfg))
|
||||
configio.apply_structured_section(data, "server", {"interval": 60})
|
||||
configio.write_config(str(cfg), data)
|
||||
data2 = configio.read_roundtrip(str(cfg))
|
||||
assert data2["interval"] == 60
|
||||
assert data2["hbd_port"] == 50004 # unchanged
|
||||
|
||||
|
||||
def test_write_path_applies_yaml_section(tmp_path):
|
||||
cfg = tmp_path / ".hb.yaml"
|
||||
cfg.write_text(
|
||||
"hbd_port: 50004\nnotification_channels:\n old_ch:\n type: email\n"
|
||||
)
|
||||
from hbd.server import configio
|
||||
data = configio.read_roundtrip(str(cfg))
|
||||
configio.apply_yaml_section(data, "notification_channels", "new_ch:\n type: pushover\n")
|
||||
configio.write_config(str(cfg), data)
|
||||
data2 = configio.read_roundtrip(str(cfg))
|
||||
assert "new_ch" in data2["notification_channels"]
|
||||
assert "old_ch" not in data2["notification_channels"]
|
||||
|
||||
|
||||
def test_write_path_hashes_plaintext_password(tmp_path):
|
||||
cfg = tmp_path / ".hb.yaml"
|
||||
cfg.write_text("hbd_port: 50004\nusers:\n alice:\n full_name: Alice\n admin: true\n password: pbkdf2:sha256:old\n")
|
||||
from hbd.server import configio
|
||||
from hbd.server import users as users_mod
|
||||
data = configio.read_roundtrip(str(cfg))
|
||||
# Simulate what the POST handler does: hash plaintext password
|
||||
new_users = {"alice": {"full_name": "Alice", "admin": True, "password": "newplaintext"}}
|
||||
for username, attrs in new_users.items():
|
||||
pw = attrs.get("password", "")
|
||||
if pw and not pw.startswith("pbkdf2:"):
|
||||
attrs["password"] = users_mod.hash_password(pw)
|
||||
configio.apply_structured_section(data, "users", new_users)
|
||||
configio.write_config(str(cfg), data)
|
||||
data2 = configio.read_roundtrip(str(cfg))
|
||||
assert data2["users"]["alice"]["password"].startswith("pbkdf2:")
|
||||
assert data2["users"]["alice"]["password"] != "newplaintext"
|
||||
|
||||
|
||||
def test_rollback_restores_backup(tmp_path):
|
||||
cfg = tmp_path / ".hb.yaml"
|
||||
cfg.write_text("hbd_port: 50004\ninterval: 20\n")
|
||||
from hbd.server import configio
|
||||
# Make a change to create a backup
|
||||
data = configio.read_roundtrip(str(cfg))
|
||||
data["interval"] = 99
|
||||
configio.write_config(str(cfg), data)
|
||||
backups = configio.list_backups(str(cfg))
|
||||
assert len(backups) == 1
|
||||
# Read the backup and write it back (simulating rollback)
|
||||
backup_data = configio.read_roundtrip(backups[0])
|
||||
configio.write_config(str(cfg), backup_data)
|
||||
restored = configio.read_roundtrip(str(cfg))
|
||||
assert restored["interval"] == 20
|
||||
|
||||
Reference in New Issue
Block a user