Files
heartbeat/hbd/server/users.py
T
2026-05-08 13:34:57 -04:00

272 lines
8.7 KiB
Python

"""User management: loading, authentication, and session tracking.
Users are defined in the config file under the ``users`` key:
users:
alice:
full_name: Alice Smith
avatar: /path/to/avatar.png # file path, URL, or base64 data URI
password: pbkdf2:sha256:... # generated with: hbd passwd
admin: true # optional server-level admin
notification_channels: [pushover_standard]
Roles are assigned per-host:
hosts:
webserver01:
owner: alice
managers: [bob]
monitors: [carol]
If no users are defined the server runs in unauthenticated mode (backwards
compatible). When users are defined every API call must carry a valid session
token in an ``Authorization: Bearer <token>`` or ``X-Auth-Token`` header,
obtained via ``POST /api/0/auth/login``.
"""
import hashlib
import hmac
import logging
import secrets
import time
logger = logging.getLogger(__name__)
# Session lifetime in seconds (24 hours).
SESSION_TTL = 86400
# Global session store: token -> {"username": str, "expires": float, "created": float}
_sessions: dict = {}
# ---------------------------------------------------------------------------
# User class
# ---------------------------------------------------------------------------
class User:
def __init__(
self,
username: str,
full_name: str = "",
avatar: str = "",
password_hash: str = "",
admin: bool = False,
notification_channels: list | None = None,
):
self.username = username
self.full_name = full_name
self.avatar = avatar
self.password_hash = password_hash
self.admin = admin
self.notification_channels: list = notification_channels or []
def check_password(self, password: str) -> bool:
if not self.password_hash:
return False
return _verify_password(password, self.password_hash)
def avatar_is_local(self) -> bool:
"""Return True when the avatar is a local filesystem path (starts with '/')."""
return bool(self.avatar and self.avatar.startswith("/"))
def avatar_url(self) -> str:
"""Return the URL to use as an <img src>.
Local file paths are served via the /api/0/users/{username}/avatar
endpoint. External URLs and data URIs are returned as-is.
"""
if self.avatar_is_local():
return f"/api/0/users/{self.username}/avatar"
return self.avatar
def to_dict(self) -> dict:
return {
"username": self.username,
"full_name": self.full_name,
"avatar": self.avatar,
"avatar_url": self.avatar_url(),
"admin": self.admin,
"notification_channels": self.notification_channels,
}
# ---------------------------------------------------------------------------
# Password hashing (PBKDF2-HMAC-SHA256, stdlib only)
# ---------------------------------------------------------------------------
def hash_password(password: str) -> str:
"""Return a storable hash for *password*.
Format: ``pbkdf2:sha256:<iterations>:<salt>:<hex-digest>``
Use this to generate the ``password`` value in the config file::
python -c "from hbd.server.users import hash_password; print(hash_password('secret'))"
Or via the CLI::
hbd passwd
"""
salt = secrets.token_hex(16)
iterations = 260_000
dk = hashlib.pbkdf2_hmac(
"sha256", password.encode("utf-8"), salt.encode("utf-8"), iterations
)
return f"pbkdf2:sha256:{iterations}:{salt}:{dk.hex()}"
def _verify_password(password: str, stored_hash: str) -> bool:
"""Return True if *password* matches *stored_hash*."""
try:
parts = stored_hash.split(":")
if len(parts) != 5 or parts[0] != "pbkdf2" or parts[1] != "sha256":
return False
_, _, iterations_str, salt, expected_hex = parts
iterations = int(iterations_str)
dk = hashlib.pbkdf2_hmac(
"sha256", password.encode("utf-8"), salt.encode("utf-8"), iterations
)
return hmac.compare_digest(dk.hex(), expected_hex)
except Exception:
return False
# ---------------------------------------------------------------------------
# Global user registry
# ---------------------------------------------------------------------------
# username -> User
users: dict = {}
def load_users(config: dict) -> dict:
"""Populate the global user registry from *config*.
Called once at startup and again on SIGHUP config reload.
Returns the new ``users`` dict.
"""
global users
old_users = dict(users) # snapshot before rebuild
users_cfg = config.get("users", {})
if not isinstance(users_cfg, dict):
users = {}
# Preserve OAuth-provisioned users (password_hash == "") that aren't in config.
for username, existing_user in old_users.items():
if not existing_user.password_hash and username not in users:
users[username] = existing_user
return users
result: dict = {}
for username, attrs in users_cfg.items():
if not isinstance(attrs, dict):
logger.warning("Skipping user %r: expected a mapping", username)
continue
result[username] = User(
username=username,
full_name=attrs.get("full_name", ""),
avatar=attrs.get("avatar", ""),
password_hash=attrs.get("password", ""),
admin=bool(attrs.get("admin", False)),
notification_channels=attrs.get("notification_channels", []),
)
users = result
# Preserve OAuth-provisioned users (password_hash == "") that aren't in config.
for username, existing_user in old_users.items():
if not existing_user.password_hash and username not in users:
users[username] = existing_user
logger.info("Loaded %d user(s) from config", len(users))
return users
def users_enabled() -> bool:
"""Return True if at least one user is configured (auth-required mode)."""
return bool(users)
def get_user(username: str) -> "User | None":
return users.get(username)
def authenticate(username: str, password: str) -> "User | None":
"""Return the User if credentials are valid, else None."""
user = users.get(username)
if user and user.check_password(password):
return user
return None
def provision_oauth_user(username: str, full_name: str, avatar: str) -> "User":
"""Create or update a user sourced from an OAuth2 provider.
New users are inserted with no password_hash — they can only authenticate
via OAuth. Existing users (e.g. defined in config with a password) have
their display name and avatar refreshed; all other attributes are preserved.
"""
user = users.get(username)
if user is None:
user = User(username=username, full_name=full_name, avatar=avatar)
users[username] = user
logger.info("Provisioned OAuth user %r", username)
else:
if full_name:
user.full_name = full_name
if avatar:
user.avatar = avatar
return user
# ---------------------------------------------------------------------------
# Session management
# ---------------------------------------------------------------------------
def create_session(username: str) -> str:
"""Create a new session for *username* and return the opaque token."""
_purge_expired_sessions()
token = secrets.token_hex(32)
_sessions[token] = {
"username": username,
"expires": time.time() + SESSION_TTL,
"created": time.time(),
}
return token
def get_session_user(token: str) -> "User | None":
"""Return the User for a valid *token*, or None if missing/expired."""
if not token:
return None
session = _sessions.get(token)
if not session:
return None
if session["expires"] < time.time():
del _sessions[token]
return None
return get_user(session["username"])
def delete_session(token: str) -> None:
"""Invalidate *token* (logout)."""
_sessions.pop(token, None)
def _purge_expired_sessions() -> None:
now = time.time()
expired = [t for t, s in list(_sessions.items()) if s["expires"] < now]
for t in expired:
del _sessions[t]
def save_sessions() -> dict:
"""Return a snapshot of non-expired sessions suitable for pickling."""
_purge_expired_sessions()
return dict(_sessions)
def load_sessions(snapshot: dict) -> None:
"""Restore sessions from a pickled snapshot, dropping any that have expired."""
global _sessions
now = time.time()
_sessions = {t: s for t, s in snapshot.items() if s.get("expires", 0) > now}
logger.debug("Restored %d session(s) from pickle", len(_sessions))