"""User management: loading, authentication, and session tracking. Users are defined in the config file under the ``users`` key: users: alice: full_name: Alice Smith avatar: /path/to/avatar.png # file path, URL, or base64 data URI password: pbkdf2:sha256:... # generated with: hbd passwd admin: true # optional server-level admin notification_channels: [pushover_standard] Roles are assigned per-host: hosts: webserver01: owner: alice managers: [bob] monitors: [carol] If no users are defined the server runs in unauthenticated mode (backwards compatible). When users are defined every API call must carry a valid session token in an ``Authorization: Bearer `` or ``X-Auth-Token`` header, obtained via ``POST /api/0/auth/login``. """ import hashlib import hmac import logging import secrets import time logger = logging.getLogger(__name__) # Session lifetime in seconds (24 hours). SESSION_TTL = 86400 # Global session store: token -> {"username": str, "expires": float, "created": float} _sessions: dict = {} # --------------------------------------------------------------------------- # User class # --------------------------------------------------------------------------- class User: def __init__( self, username: str, full_name: str = "", avatar: str = "", password_hash: str = "", admin: bool = False, notification_channels: list | None = None, ): self.username = username self.full_name = full_name self.avatar = avatar self.password_hash = password_hash self.admin = admin self.notification_channels: list = notification_channels or [] def check_password(self, password: str) -> bool: if not self.password_hash: return False return _verify_password(password, self.password_hash) def avatar_is_local(self) -> bool: """Return True when the avatar is a local filesystem path (starts with '/').""" return bool(self.avatar and self.avatar.startswith("/")) def avatar_url(self) -> str: """Return the URL to use as an . Local file paths are served via the /api/0/users/{username}/avatar endpoint. External URLs and data URIs are returned as-is. """ if self.avatar_is_local(): return f"/api/0/users/{self.username}/avatar" return self.avatar def to_dict(self) -> dict: return { "username": self.username, "full_name": self.full_name, "avatar": self.avatar, "avatar_url": self.avatar_url(), "admin": self.admin, "notification_channels": self.notification_channels, } # --------------------------------------------------------------------------- # Password hashing (PBKDF2-HMAC-SHA256, stdlib only) # --------------------------------------------------------------------------- def hash_password(password: str) -> str: """Return a storable hash for *password*. Format: ``pbkdf2:sha256:::`` Use this to generate the ``password`` value in the config file:: python -c "from hbd.server.users import hash_password; print(hash_password('secret'))" Or via the CLI:: hbd passwd """ salt = secrets.token_hex(16) iterations = 260_000 dk = hashlib.pbkdf2_hmac( "sha256", password.encode("utf-8"), salt.encode("utf-8"), iterations ) return f"pbkdf2:sha256:{iterations}:{salt}:{dk.hex()}" def _verify_password(password: str, stored_hash: str) -> bool: """Return True if *password* matches *stored_hash*.""" try: parts = stored_hash.split(":") if len(parts) != 5 or parts[0] != "pbkdf2" or parts[1] != "sha256": return False _, _, iterations_str, salt, expected_hex = parts iterations = int(iterations_str) dk = hashlib.pbkdf2_hmac( "sha256", password.encode("utf-8"), salt.encode("utf-8"), iterations ) return hmac.compare_digest(dk.hex(), expected_hex) except Exception: return False # --------------------------------------------------------------------------- # Global user registry # --------------------------------------------------------------------------- # username -> User users: dict = {} def load_users(config: dict) -> dict: """Populate the global user registry from *config*. Called once at startup and again on SIGHUP config reload. Returns the new ``users`` dict. """ global users users_cfg = config.get("users", {}) if not isinstance(users_cfg, dict): users = {} return users result: dict = {} for username, attrs in users_cfg.items(): if not isinstance(attrs, dict): logger.warning("Skipping user %r: expected a mapping", username) continue result[username] = User( username=username, full_name=attrs.get("full_name", ""), avatar=attrs.get("avatar", ""), password_hash=attrs.get("password", ""), admin=bool(attrs.get("admin", False)), notification_channels=attrs.get("notification_channels", []), ) users = result logger.info("Loaded %d user(s) from config", len(users)) return users def users_enabled() -> bool: """Return True if at least one user is configured (auth-required mode).""" return bool(users) def get_user(username: str) -> "User | None": return users.get(username) def authenticate(username: str, password: str) -> "User | None": """Return the User if credentials are valid, else None.""" user = users.get(username) if user and user.check_password(password): return user return None def provision_oauth_user(username: str, full_name: str, avatar: str) -> "User": """Create or update a user sourced from an OAuth2 provider. New users are inserted with no password_hash — they can only authenticate via OAuth. Existing users (e.g. defined in config with a password) have their display name and avatar refreshed; all other attributes are preserved. """ user = users.get(username) if user is None: user = User(username=username, full_name=full_name, avatar=avatar) users[username] = user logger.info("Provisioned OAuth user %r", username) else: if full_name: user.full_name = full_name if avatar: user.avatar = avatar return user # --------------------------------------------------------------------------- # Session management # --------------------------------------------------------------------------- def create_session(username: str) -> str: """Create a new session for *username* and return the opaque token.""" _purge_expired_sessions() token = secrets.token_hex(32) _sessions[token] = { "username": username, "expires": time.time() + SESSION_TTL, "created": time.time(), } return token def get_session_user(token: str) -> "User | None": """Return the User for a valid *token*, or None if missing/expired.""" if not token: return None session = _sessions.get(token) if not session: return None if session["expires"] < time.time(): del _sessions[token] return None return get_user(session["username"]) def delete_session(token: str) -> None: """Invalidate *token* (logout).""" _sessions.pop(token, None) def _purge_expired_sessions() -> None: now = time.time() expired = [t for t, s in list(_sessions.items()) if s["expires"] < now] for t in expired: del _sessions[t] def save_sessions() -> dict: """Return a snapshot of non-expired sessions suitable for pickling.""" _purge_expired_sessions() return dict(_sessions) def load_sessions(snapshot: dict) -> None: """Restore sessions from a pickled snapshot, dropping any that have expired.""" global _sessions now = time.time() _sessions = {t: s for t, s in snapshot.items() if s.get("expires", 0) > now} logger.debug("Restored %d session(s) from pickle", len(_sessions))