"""Gitea OAuth2 support. Config shape (in ~/.hb.yaml): oauth: gitea: url: https://git.example.com client_id: client_secret: Register a Gitea OAuth2 application at: Gitea → Settings → Applications → OAuth2 Set the redirect URI to: https:///login/oauth/gitea/callback """ import logging import secrets import time import urllib.parse import aiohttp logger = logging.getLogger(__name__) STATE_TTL = 600 # 10 minutes # state_token -> expiry timestamp _states: dict[str, float] = {} def make_state() -> str: """Generate a CSRF state token, store it with TTL, and return it.""" _purge_states() token = secrets.token_hex(32) _states[token] = time.time() + STATE_TTL return token def validate_state(state: str) -> bool: """Return True if *state* is known and unexpired; always removes it.""" expiry = _states.pop(state, None) if expiry is None: return False return time.time() < expiry def _purge_states() -> None: """Remove all expired CSRF state tokens from the in-memory store.""" now = time.time() expired = [k for k, exp in list(_states.items()) if exp < now] for k in expired: del _states[k] class OAuthError(Exception): """Raised when the OAuth2 flow fails for any reason.""" def _gitea_cfg(config: dict) -> dict: """Return the gitea sub-dict or {} if absent/incomplete.""" return config.get("oauth", {}).get("gitea", {}) def is_enabled(config: dict) -> bool: """Return True when all three required Gitea OAuth keys are present.""" g = _gitea_cfg(config) return bool(g.get("url") and g.get("client_id") and g.get("client_secret")) def authorization_url(config: dict, state: str, redirect_uri: str) -> str: """Return the Gitea OAuth2 authorization URL to redirect the browser to.""" g = _gitea_cfg(config) params = urllib.parse.urlencode({ "client_id": g["client_id"], "redirect_uri": redirect_uri, "response_type": "code", "scope": "user:email", "state": state, }) return f"{g['url'].rstrip('/')}/login/oauth/authorize?{params}" async def exchange_code(config: dict, code: str, redirect_uri: str) -> str: """Exchange an authorization *code* for a Gitea access token. Returns the access token string. Raises OAuthError on any failure. """ g = _gitea_cfg(config) url = f"{g['url'].rstrip('/')}/login/oauth/access_token" payload = { "client_id": g["client_id"], "client_secret": g["client_secret"], "code": code, "grant_type": "authorization_code", "redirect_uri": redirect_uri, } timeout = aiohttp.ClientTimeout(total=10) try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(url, json=payload, headers={"Accept": "application/json"}) as resp: if resp.status != 200: text = await resp.text() raise OAuthError(f"Token exchange failed ({resp.status}): {text}") data = await resp.json() except aiohttp.ClientError as exc: raise OAuthError(f"Token exchange network error: {exc}") from exc token = data.get("access_token") if not token: raise OAuthError(f"No access_token in response: {data}") return token async def fetch_user(config: dict, token: str) -> dict: """Fetch the authenticated user's profile from Gitea. Returns a dict with keys: login, full_name, avatar_url. Raises OAuthError on any failure. """ g = _gitea_cfg(config) url = f"{g['url'].rstrip('/')}/api/v1/user" timeout = aiohttp.ClientTimeout(total=10) try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url, headers={"Authorization": f"token {token}"}) as resp: if resp.status != 200: text = await resp.text() raise OAuthError(f"User fetch failed ({resp.status}): {text}") data = await resp.json() except aiohttp.ClientError as exc: raise OAuthError(f"User fetch network error: {exc}") from exc return { "login": data.get("login", ""), "full_name": data.get("full_name", ""), "avatar_url": data.get("avatar_url", ""), }