Add user management and a settings page
This commit is contained in:
@@ -0,0 +1,228 @@
|
||||
"""User management: loading, authentication, and session tracking.
|
||||
|
||||
Users are defined in the config file under the ``users`` key:
|
||||
|
||||
users:
|
||||
alice:
|
||||
full_name: Alice Smith
|
||||
avatar: /path/to/avatar.png # file path, URL, or base64 data URI
|
||||
password: pbkdf2:sha256:... # generated with: hbd passwd
|
||||
admin: true # optional server-level admin
|
||||
notification_channels: [pushover_standard]
|
||||
|
||||
Roles are assigned per-host:
|
||||
|
||||
hosts:
|
||||
webserver01:
|
||||
owner: alice
|
||||
managers: [bob]
|
||||
monitors: [carol]
|
||||
|
||||
If no users are defined the server runs in unauthenticated mode (backwards
|
||||
compatible). When users are defined every API call must carry a valid session
|
||||
token in an ``Authorization: Bearer <token>`` or ``X-Auth-Token`` header,
|
||||
obtained via ``POST /api/0/auth/login``.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import hmac
|
||||
import logging
|
||||
import secrets
|
||||
import time
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Session lifetime in seconds (24 hours).
|
||||
SESSION_TTL = 86400
|
||||
|
||||
# Global session store: token -> {"username": str, "expires": float, "created": float}
|
||||
_sessions: dict = {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# User class
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class User:
|
||||
def __init__(
|
||||
self,
|
||||
username: str,
|
||||
full_name: str = "",
|
||||
avatar: str = "",
|
||||
password_hash: str = "",
|
||||
admin: bool = False,
|
||||
notification_channels: list | None = None,
|
||||
):
|
||||
self.username = username
|
||||
self.full_name = full_name
|
||||
self.avatar = avatar
|
||||
self.password_hash = password_hash
|
||||
self.admin = admin
|
||||
self.notification_channels: list = notification_channels or []
|
||||
|
||||
def check_password(self, password: str) -> bool:
|
||||
if not self.password_hash:
|
||||
return False
|
||||
return _verify_password(password, self.password_hash)
|
||||
|
||||
def avatar_is_local(self) -> bool:
|
||||
"""Return True when the avatar is a local filesystem path (starts with '/')."""
|
||||
return bool(self.avatar and self.avatar.startswith("/"))
|
||||
|
||||
def avatar_url(self) -> str:
|
||||
"""Return the URL to use as an <img src>.
|
||||
|
||||
Local file paths are served via the /api/0/users/{username}/avatar
|
||||
endpoint. External URLs and data URIs are returned as-is.
|
||||
"""
|
||||
if self.avatar_is_local():
|
||||
return f"/api/0/users/{self.username}/avatar"
|
||||
return self.avatar
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"username": self.username,
|
||||
"full_name": self.full_name,
|
||||
"avatar": self.avatar,
|
||||
"avatar_url": self.avatar_url(),
|
||||
"admin": self.admin,
|
||||
"notification_channels": self.notification_channels,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Password hashing (PBKDF2-HMAC-SHA256, stdlib only)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def hash_password(password: str) -> str:
|
||||
"""Return a storable hash for *password*.
|
||||
|
||||
Format: ``pbkdf2:sha256:<iterations>:<salt>:<hex-digest>``
|
||||
|
||||
Use this to generate the ``password`` value in the config file::
|
||||
|
||||
python -c "from hbd.server.users import hash_password; print(hash_password('secret'))"
|
||||
|
||||
Or via the CLI::
|
||||
|
||||
hbd passwd
|
||||
"""
|
||||
salt = secrets.token_hex(16)
|
||||
iterations = 260_000
|
||||
dk = hashlib.pbkdf2_hmac(
|
||||
"sha256", password.encode("utf-8"), salt.encode("utf-8"), iterations
|
||||
)
|
||||
return f"pbkdf2:sha256:{iterations}:{salt}:{dk.hex()}"
|
||||
|
||||
|
||||
def _verify_password(password: str, stored_hash: str) -> bool:
|
||||
"""Return True if *password* matches *stored_hash*."""
|
||||
try:
|
||||
parts = stored_hash.split(":")
|
||||
if len(parts) != 5 or parts[0] != "pbkdf2" or parts[1] != "sha256":
|
||||
return False
|
||||
_, _, iterations_str, salt, expected_hex = parts
|
||||
iterations = int(iterations_str)
|
||||
dk = hashlib.pbkdf2_hmac(
|
||||
"sha256", password.encode("utf-8"), salt.encode("utf-8"), iterations
|
||||
)
|
||||
return hmac.compare_digest(dk.hex(), expected_hex)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Global user registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# username -> User
|
||||
users: dict = {}
|
||||
|
||||
|
||||
def load_users(config: dict) -> dict:
|
||||
"""Populate the global user registry from *config*.
|
||||
|
||||
Called once at startup and again on SIGHUP config reload.
|
||||
Returns the new ``users`` dict.
|
||||
"""
|
||||
global users
|
||||
users_cfg = config.get("users", {})
|
||||
if not isinstance(users_cfg, dict):
|
||||
users = {}
|
||||
return users
|
||||
|
||||
result: dict = {}
|
||||
for username, attrs in users_cfg.items():
|
||||
if not isinstance(attrs, dict):
|
||||
logger.warning("Skipping user %r: expected a mapping", username)
|
||||
continue
|
||||
result[username] = User(
|
||||
username=username,
|
||||
full_name=attrs.get("full_name", ""),
|
||||
avatar=attrs.get("avatar", ""),
|
||||
password_hash=attrs.get("password", ""),
|
||||
admin=bool(attrs.get("admin", False)),
|
||||
notification_channels=attrs.get("notification_channels", []),
|
||||
)
|
||||
|
||||
users = result
|
||||
logger.info("Loaded %d user(s) from config", len(users))
|
||||
return users
|
||||
|
||||
|
||||
def users_enabled() -> bool:
|
||||
"""Return True if at least one user is configured (auth-required mode)."""
|
||||
return bool(users)
|
||||
|
||||
|
||||
def get_user(username: str) -> "User | None":
|
||||
return users.get(username)
|
||||
|
||||
|
||||
def authenticate(username: str, password: str) -> "User | None":
|
||||
"""Return the User if credentials are valid, else None."""
|
||||
user = users.get(username)
|
||||
if user and user.check_password(password):
|
||||
return user
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Session management
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def create_session(username: str) -> str:
|
||||
"""Create a new session for *username* and return the opaque token."""
|
||||
_purge_expired_sessions()
|
||||
token = secrets.token_hex(32)
|
||||
_sessions[token] = {
|
||||
"username": username,
|
||||
"expires": time.time() + SESSION_TTL,
|
||||
"created": time.time(),
|
||||
}
|
||||
return token
|
||||
|
||||
|
||||
def get_session_user(token: str) -> "User | None":
|
||||
"""Return the User for a valid *token*, or None if missing/expired."""
|
||||
if not token:
|
||||
return None
|
||||
session = _sessions.get(token)
|
||||
if not session:
|
||||
return None
|
||||
if session["expires"] < time.time():
|
||||
del _sessions[token]
|
||||
return None
|
||||
return get_user(session["username"])
|
||||
|
||||
|
||||
def delete_session(token: str) -> None:
|
||||
"""Invalidate *token* (logout)."""
|
||||
_sessions.pop(token, None)
|
||||
|
||||
|
||||
def _purge_expired_sessions() -> None:
|
||||
now = time.time()
|
||||
expired = [t for t, s in list(_sessions.items()) if s["expires"] < now]
|
||||
for t in expired:
|
||||
del _sessions[t]
|
||||
Reference in New Issue
Block a user