d190029728
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
143 lines
4.7 KiB
Python
143 lines
4.7 KiB
Python
"""Gitea OAuth2 support.
|
|
|
|
Config shape (in ~/.hb.yaml):
|
|
|
|
oauth:
|
|
gitea:
|
|
url: https://git.example.com
|
|
client_id: <client-id>
|
|
client_secret: <client-secret>
|
|
|
|
Register a Gitea OAuth2 application at:
|
|
Gitea → Settings → Applications → OAuth2
|
|
Set the redirect URI to:
|
|
https://<hbd-host>/login/oauth/gitea/callback
|
|
"""
|
|
|
|
import logging
|
|
import secrets
|
|
import time
|
|
import urllib.parse
|
|
|
|
import aiohttp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
STATE_TTL = 600 # 10 minutes
|
|
|
|
# state_token -> expiry timestamp
|
|
_states: dict[str, float] = {}
|
|
|
|
|
|
def make_state() -> str:
|
|
"""Generate a CSRF state token, store it with TTL, and return it."""
|
|
_purge_states()
|
|
token = secrets.token_hex(32)
|
|
_states[token] = time.time() + STATE_TTL
|
|
return token
|
|
|
|
|
|
def validate_state(state: str) -> bool:
|
|
"""Return True if *state* is known and unexpired; always removes it."""
|
|
expiry = _states.pop(state, None)
|
|
if expiry is None:
|
|
return False
|
|
return time.time() < expiry
|
|
|
|
|
|
def _purge_states() -> None:
|
|
"""Remove all expired CSRF state tokens from the in-memory store."""
|
|
now = time.time()
|
|
expired = [k for k, exp in list(_states.items()) if exp < now]
|
|
for k in expired:
|
|
del _states[k]
|
|
|
|
|
|
class OAuthError(Exception):
|
|
"""Raised when the OAuth2 flow fails for any reason."""
|
|
|
|
|
|
def _gitea_cfg(config: dict) -> dict:
|
|
"""Return the gitea sub-dict or {} if absent/incomplete."""
|
|
return config.get("oauth", {}).get("gitea", {})
|
|
|
|
|
|
def is_enabled(config: dict) -> bool:
|
|
"""Return True when all three required Gitea OAuth keys are present."""
|
|
g = _gitea_cfg(config)
|
|
return bool(g.get("url") and g.get("client_id") and g.get("client_secret"))
|
|
|
|
|
|
def authorization_url(config: dict, state: str, redirect_uri: str) -> str:
|
|
"""Return the Gitea OAuth2 authorization URL to redirect the browser to."""
|
|
g = _gitea_cfg(config)
|
|
if not (g.get("url") and g.get("client_id") and g.get("client_secret")):
|
|
raise OAuthError("Gitea OAuth2 is not configured")
|
|
params = urllib.parse.urlencode({
|
|
"client_id": g["client_id"],
|
|
"redirect_uri": redirect_uri,
|
|
"response_type": "code",
|
|
"scope": "user:email",
|
|
"state": state,
|
|
})
|
|
return f"{g['url'].rstrip('/')}/login/oauth/authorize?{params}"
|
|
|
|
|
|
async def exchange_code(config: dict, code: str, redirect_uri: str) -> str:
|
|
"""Exchange an authorization *code* for a Gitea access token.
|
|
|
|
Returns the access token string. Raises OAuthError on any failure.
|
|
"""
|
|
g = _gitea_cfg(config)
|
|
if not (g.get("url") and g.get("client_id") and g.get("client_secret")):
|
|
raise OAuthError("Gitea OAuth2 is not configured")
|
|
url = f"{g['url'].rstrip('/')}/login/oauth/access_token"
|
|
payload = {
|
|
"client_id": g["client_id"],
|
|
"client_secret": g["client_secret"],
|
|
"code": code,
|
|
"grant_type": "authorization_code",
|
|
"redirect_uri": redirect_uri,
|
|
}
|
|
timeout = aiohttp.ClientTimeout(total=10)
|
|
try:
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
async with session.post(url, json=payload, headers={"Accept": "application/json"}) as resp:
|
|
if resp.status != 200:
|
|
text = await resp.text()
|
|
raise OAuthError(f"Token exchange failed ({resp.status}): {text}")
|
|
data = await resp.json()
|
|
token = data.get("access_token")
|
|
if not token:
|
|
raise OAuthError(f"No access_token in response: {data}")
|
|
except aiohttp.ClientError as exc:
|
|
raise OAuthError(f"Token exchange network error: {exc}") from exc
|
|
return token
|
|
|
|
|
|
async def fetch_user(config: dict, token: str) -> dict:
|
|
"""Fetch the authenticated user's profile from Gitea.
|
|
|
|
Returns a dict with keys: login, full_name, avatar_url.
|
|
Raises OAuthError on any failure.
|
|
"""
|
|
g = _gitea_cfg(config)
|
|
if not (g.get("url") and g.get("client_id") and g.get("client_secret")):
|
|
raise OAuthError("Gitea OAuth2 is not configured")
|
|
url = f"{g['url'].rstrip('/')}/api/v1/user"
|
|
timeout = aiohttp.ClientTimeout(total=10)
|
|
try:
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
async with session.get(url, headers={"Authorization": f"token {token}"}) as resp:
|
|
if resp.status != 200:
|
|
text = await resp.text()
|
|
raise OAuthError(f"User fetch failed ({resp.status}): {text}")
|
|
data = await resp.json()
|
|
except aiohttp.ClientError as exc:
|
|
raise OAuthError(f"User fetch network error: {exc}") from exc
|
|
return {
|
|
"login": data.get("login", ""),
|
|
"full_name": data.get("full_name", ""),
|
|
"avatar_url": data.get("avatar_url", ""),
|
|
}
|