From 39f1b5de30a751024e57808d304a268082079481 Mon Sep 17 00:00:00 2001 From: Andreas Wrede Date: Fri, 8 May 2026 13:56:00 -0400 Subject: [PATCH] docs: add Gitea OAuth2 implementation plan --- .../plans/2026-05-08-gitea-oauth2.md | 781 ++++++++++++++++++ 1 file changed, 781 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-08-gitea-oauth2.md diff --git a/docs/superpowers/plans/2026-05-08-gitea-oauth2.md b/docs/superpowers/plans/2026-05-08-gitea-oauth2.md new file mode 100644 index 0000000..b68f843 --- /dev/null +++ b/docs/superpowers/plans/2026-05-08-gitea-oauth2.md @@ -0,0 +1,781 @@ +# Gitea OAuth2 Authentication Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add Gitea as an OAuth2 login provider that coexists with password auth, auto-provisioning new users on first login. + +**Architecture:** A new `oauth.py` module owns all Gitea-specific logic (CSRF state, URL building, token exchange, user-info fetch). `users.py` gains one function to upsert an OAuth-sourced user. `http.py` gets two new route handlers and a small login-page change. No new dependencies — `aiohttp.ClientSession` is already used in the codebase. + +**Tech Stack:** Python 3.12, aiohttp 3.x, pytest, pytest-asyncio + +--- + +## File Map + +| Action | Path | Responsibility | +|--------|------|----------------| +| Modify | `hbd/server/config.py` | Add `"oauth": {}` default | +| Create | `hbd/server/oauth.py` | CSRF state, URL builder, token exchange, user-info fetch | +| Modify | `hbd/server/users.py` | Add `provision_oauth_user()` | +| Modify | `hbd/server/http.py` | Import oauth, two new routes, login page button | +| Create | `tests/test_oauth.py` | All new unit tests | + +--- + +## Task 1: Add config default and `is_enabled()` + +**Files:** +- Modify: `hbd/server/config.py:34` (after the `"users"` line) +- Create: `hbd/server/oauth.py` +- Create: `tests/test_oauth.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_oauth.py`: + +```python +import pytest +from hbd.server import oauth + + +CFG_OFF = {} +CFG_ON = { + "oauth": { + "gitea": { + "url": "https://git.example.com", + "client_id": "cid", + "client_secret": "csec", + } + } +} +CFG_PARTIAL = {"oauth": {"gitea": {"url": "https://git.example.com"}}} + + +def test_is_enabled_when_all_keys_present(): + assert oauth.is_enabled(CFG_ON) is True + + +def test_is_enabled_false_when_no_oauth_key(): + assert oauth.is_enabled(CFG_OFF) is False + + +def test_is_enabled_false_when_partial_config(): + assert oauth.is_enabled(CFG_PARTIAL) is False +``` + +- [ ] **Step 2: Run to confirm failure** + +``` +pytest tests/test_oauth.py -v +``` + +Expected: `ModuleNotFoundError: No module named 'hbd.server.oauth'` + +- [ ] **Step 3: Add config default** + +In `hbd/server/config.py`, add after the `"default_owner"` line (currently line 35): + +```python + # OAuth2 providers + "oauth": {}, # oauth.gitea.{url,client_id,client_secret} +``` + +- [ ] **Step 4: Create `hbd/server/oauth.py` with `is_enabled`** + +```python +"""Gitea OAuth2 support. + +Config shape (in ~/.hb.yaml): + + oauth: + gitea: + url: https://git.example.com + client_id: + client_secret: + +Register a Gitea OAuth2 application at: + Gitea → Settings → Applications → OAuth2 +Set the redirect URI to: + https:///login/oauth/gitea/callback +""" + +import logging +import secrets +import time + +import aiohttp + +logger = logging.getLogger(__name__) + +STATE_TTL = 600 # 10 minutes + +# state_token -> expiry timestamp +_states: dict[str, float] = {} + + +class OAuthError(Exception): + """Raised when the OAuth2 flow fails for any reason.""" + + +def _gitea_cfg(config: dict) -> dict: + """Return the gitea sub-dict or {} if absent/incomplete.""" + return config.get("oauth", {}).get("gitea", {}) + + +def is_enabled(config: dict) -> bool: + """Return True when all three required Gitea OAuth keys are present.""" + g = _gitea_cfg(config) + return bool(g.get("url") and g.get("client_id") and g.get("client_secret")) +``` + +- [ ] **Step 5: Run to confirm tests pass** + +``` +pytest tests/test_oauth.py -v +``` + +Expected: 3 passed + +- [ ] **Step 6: Commit** + +```bash +git add hbd/server/config.py hbd/server/oauth.py tests/test_oauth.py +git commit -m "feat: add oauth module skeleton and is_enabled()" +``` + +--- + +## Task 2: CSRF state management + +**Files:** +- Modify: `hbd/server/oauth.py` (add `make_state`, `validate_state`) +- Modify: `tests/test_oauth.py` (add state tests) + +- [ ] **Step 1: Write the failing tests** + +Append to `tests/test_oauth.py`: + +```python +import time as time_mod + + +def test_make_state_returns_unique_tokens(): + s1 = oauth.make_state() + s2 = oauth.make_state() + assert s1 != s2 + assert len(s1) == 64 # 32 bytes hex + + +def test_validate_state_valid(): + state = oauth.make_state() + assert oauth.validate_state(state) is True + + +def test_validate_state_consumed_on_use(): + state = oauth.make_state() + oauth.validate_state(state) + assert oauth.validate_state(state) is False # replay rejected + + +def test_validate_state_unknown(): + assert oauth.validate_state("notastate") is False + + +def test_validate_state_expired(monkeypatch): + state = oauth.make_state() + # Wind expiry into the past + monkeypatch.setitem(oauth._states, state, time_mod.time() - 1) + assert oauth.validate_state(state) is False +``` + +- [ ] **Step 2: Run to confirm failure** + +``` +pytest tests/test_oauth.py -v -k "state" +``` + +Expected: `AttributeError: module 'hbd.server.oauth' has no attribute 'make_state'` + +- [ ] **Step 3: Implement state functions** + +Add to `hbd/server/oauth.py` after the `_states` dict definition: + +```python +def make_state() -> str: + """Generate a CSRF state token, store it with TTL, and return it.""" + _purge_states() + token = secrets.token_hex(32) + _states[token] = time.time() + STATE_TTL + return token + + +def validate_state(state: str) -> bool: + """Return True if *state* is known and unexpired; always removes it.""" + expiry = _states.pop(state, None) + if expiry is None: + return False + return time.time() < expiry + + +def _purge_states() -> None: + now = time.time() + expired = [k for k, exp in list(_states.items()) if exp < now] + for k in expired: + del _states[k] +``` + +- [ ] **Step 4: Run to confirm tests pass** + +``` +pytest tests/test_oauth.py -v +``` + +Expected: 8 passed + +- [ ] **Step 5: Commit** + +```bash +git add hbd/server/oauth.py tests/test_oauth.py +git commit -m "feat: add OAuth2 CSRF state management" +``` + +--- + +## Task 3: `provision_oauth_user` in users.py + +**Files:** +- Modify: `hbd/server/users.py` (add `provision_oauth_user`) +- Modify: `tests/test_oauth.py` (add provisioning tests) + +- [ ] **Step 1: Write the failing tests** + +Append to `tests/test_oauth.py`: + +```python +from hbd.server import users as users_mod +from hbd.server.users import User + + +def _reset_users(entries=None): + users_mod.users = entries or {} + + +def test_provision_oauth_user_new(): + _reset_users() + user = users_mod.provision_oauth_user("gituser", "Git User", "https://example.com/avatar.png") + assert user.username == "gituser" + assert user.full_name == "Git User" + assert user.avatar == "https://example.com/avatar.png" + assert user.admin is False + assert user.password_hash == "" + assert "gituser" in users_mod.users + + +def test_provision_oauth_user_no_password_login(): + _reset_users() + user = users_mod.provision_oauth_user("gituser", "Git User", "") + assert user.check_password("anything") is False + + +def test_provision_oauth_user_existing_updates_profile(): + existing = User( + username="alice", + full_name="Old Name", + avatar="old.png", + password_hash="pbkdf2:sha256:1:salt:abc", + admin=True, + notification_channels=["chan1"], + ) + _reset_users({"alice": existing}) + user = users_mod.provision_oauth_user("alice", "New Name", "new.png") + assert user.full_name == "New Name" + assert user.avatar == "new.png" + # Preserved + assert user.admin is True + assert user.password_hash == "pbkdf2:sha256:1:salt:abc" + assert user.notification_channels == ["chan1"] + + +def test_provision_oauth_user_does_not_overwrite_with_empty(): + existing = User(username="bob", full_name="Bob", avatar="bob.png") + _reset_users({"bob": existing}) + user = users_mod.provision_oauth_user("bob", "", "") + assert user.full_name == "Bob" + assert user.avatar == "bob.png" +``` + +- [ ] **Step 2: Run to confirm failure** + +``` +pytest tests/test_oauth.py -v -k "provision" +``` + +Expected: `AttributeError: module 'hbd.server.users' has no attribute 'provision_oauth_user'` + +- [ ] **Step 3: Implement `provision_oauth_user`** + +Add to `hbd/server/users.py` after the `authenticate()` function (after line 187): + +```python +def provision_oauth_user(username: str, full_name: str, avatar: str) -> "User": + """Create or update a user sourced from an OAuth2 provider. + + New users are inserted with no password_hash — they can only authenticate + via OAuth. Existing users (e.g. defined in config with a password) have + their display name and avatar refreshed; all other attributes are preserved. + """ + user = users.get(username) + if user is None: + user = User(username=username, full_name=full_name, avatar=avatar) + users[username] = user + logger.info("Provisioned OAuth user %r", username) + else: + if full_name: + user.full_name = full_name + if avatar: + user.avatar = avatar + return user +``` + +- [ ] **Step 4: Run to confirm tests pass** + +``` +pytest tests/test_oauth.py -v +``` + +Expected: 12 passed + +- [ ] **Step 5: Commit** + +```bash +git add hbd/server/users.py tests/test_oauth.py +git commit -m "feat: add provision_oauth_user() to users module" +``` + +--- + +## Task 4: URL builder, token exchange, and user-info fetch + +**Files:** +- Modify: `hbd/server/oauth.py` (add `authorization_url`, `exchange_code`, `fetch_user`) +- Modify: `tests/test_oauth.py` (add async tests with mocked HTTP) + +- [ ] **Step 1: Write the failing tests** + +Append to `tests/test_oauth.py`: + +```python +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from urllib.parse import urlparse, parse_qs + + +def test_authorization_url_shape(): + state = "teststate" + redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback" + url = oauth.authorization_url(CFG_ON, state, redirect_uri) + parsed = urlparse(url) + qs = parse_qs(parsed.query) + assert parsed.scheme == "https" + assert parsed.netloc == "git.example.com" + assert parsed.path == "/login/oauth/authorize" + assert qs["client_id"] == ["cid"] + assert qs["state"] == ["teststate"] + assert qs["redirect_uri"] == [redirect_uri] + assert qs["scope"] == ["user:email"] + assert qs["response_type"] == ["code"] + + +@pytest.mark.asyncio +async def test_exchange_code_returns_token(): + redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback" + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={"access_token": "tok123"}) + + mock_session = MagicMock() + mock_session.post = MagicMock(return_value=AsyncMock( + __aenter__=AsyncMock(return_value=mock_response), + __aexit__=AsyncMock(return_value=False), + )) + + with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( + __aenter__=AsyncMock(return_value=mock_session), + __aexit__=AsyncMock(return_value=False), + )): + token = await oauth.exchange_code(CFG_ON, "mycode", redirect_uri) + assert token == "tok123" + + +@pytest.mark.asyncio +async def test_exchange_code_raises_on_error_status(): + redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback" + mock_response = AsyncMock() + mock_response.status = 401 + mock_response.text = AsyncMock(return_value="unauthorized") + + mock_session = MagicMock() + mock_session.post = MagicMock(return_value=AsyncMock( + __aenter__=AsyncMock(return_value=mock_response), + __aexit__=AsyncMock(return_value=False), + )) + + with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( + __aenter__=AsyncMock(return_value=mock_session), + __aexit__=AsyncMock(return_value=False), + )): + with pytest.raises(oauth.OAuthError): + await oauth.exchange_code(CFG_ON, "badcode", redirect_uri) + + +@pytest.mark.asyncio +async def test_fetch_user_returns_profile(): + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={ + "login": "alice", + "full_name": "Alice Smith", + "avatar_url": "https://git.example.com/avatars/alice.png", + }) + + mock_session = MagicMock() + mock_session.get = MagicMock(return_value=AsyncMock( + __aenter__=AsyncMock(return_value=mock_response), + __aexit__=AsyncMock(return_value=False), + )) + + with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( + __aenter__=AsyncMock(return_value=mock_session), + __aexit__=AsyncMock(return_value=False), + )): + profile = await oauth.fetch_user(CFG_ON, "tok123") + assert profile == { + "login": "alice", + "full_name": "Alice Smith", + "avatar_url": "https://git.example.com/avatars/alice.png", + } +``` + +- [ ] **Step 2: Run to confirm failure** + +``` +pytest tests/test_oauth.py -v -k "url or exchange or fetch" +``` + +Expected: `AttributeError: module 'hbd.server.oauth' has no attribute 'authorization_url'` + +- [ ] **Step 3: Implement the three functions** + +Add to `hbd/server/oauth.py`: + +```python +import urllib.parse + + +def authorization_url(config: dict, state: str, redirect_uri: str) -> str: + """Return the Gitea OAuth2 authorization URL to redirect the browser to.""" + g = _gitea_cfg(config) + params = urllib.parse.urlencode({ + "client_id": g["client_id"], + "redirect_uri": redirect_uri, + "response_type": "code", + "scope": "user:email", + "state": state, + }) + return f"{g['url'].rstrip('/')}/login/oauth/authorize?{params}" + + +async def exchange_code(config: dict, code: str, redirect_uri: str) -> str: + """Exchange an authorization *code* for a Gitea access token. + + Returns the access token string. Raises OAuthError on any failure. + """ + g = _gitea_cfg(config) + url = f"{g['url'].rstrip('/')}/login/oauth/access_token" + payload = { + "client_id": g["client_id"], + "client_secret": g["client_secret"], + "code": code, + "grant_type": "authorization_code", + "redirect_uri": redirect_uri, + } + timeout = aiohttp.ClientTimeout(total=10) + try: + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post(url, json=payload, headers={"Accept": "application/json"}) as resp: + if resp.status != 200: + text = await resp.text() + raise OAuthError(f"Token exchange failed ({resp.status}): {text}") + data = await resp.json() + except aiohttp.ClientError as exc: + raise OAuthError(f"Token exchange network error: {exc}") from exc + token = data.get("access_token") + if not token: + raise OAuthError(f"No access_token in response: {data}") + return token + + +async def fetch_user(config: dict, token: str) -> dict: + """Fetch the authenticated user's profile from Gitea. + + Returns a dict with keys: login, full_name, avatar_url. + Raises OAuthError on any failure. + """ + g = _gitea_cfg(config) + url = f"{g['url'].rstrip('/')}/api/v1/user" + timeout = aiohttp.ClientTimeout(total=10) + try: + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url, headers={"Authorization": f"token {token}"}) as resp: + if resp.status != 200: + text = await resp.text() + raise OAuthError(f"User fetch failed ({resp.status}): {text}") + data = await resp.json() + except aiohttp.ClientError as exc: + raise OAuthError(f"User fetch network error: {exc}") from exc + return { + "login": data.get("login", ""), + "full_name": data.get("full_name", ""), + "avatar_url": data.get("avatar_url", ""), + } +``` + +Also add `import urllib.parse` at the top of `oauth.py` (alongside the existing imports). + +- [ ] **Step 4: Run to confirm tests pass** + +``` +pytest tests/test_oauth.py -v +``` + +Expected: 17 passed + +- [ ] **Step 5: Commit** + +```bash +git add hbd/server/oauth.py tests/test_oauth.py +git commit -m "feat: add authorization_url, exchange_code, fetch_user to oauth module" +``` + +--- + +## Task 5: HTTP routes — redirect and callback + +**Files:** +- Modify: `hbd/server/http.py` + +`http.py` defines all handlers inside `async def start(...)`. The two new handlers go in the same block, just before the `app = web.Application()` line (~line 900). The import goes at the top of the file. + +- [ ] **Step 1: Add the import** + +In `hbd/server/http.py`, add after the existing local imports (after `from . import users as users_mod`): + +```python +from . import oauth as oauth_mod +``` + +- [ ] **Step 2: Add the two route handlers** + +In `hbd/server/http.py`, add the two handlers immediately before the `app = web.Application()` line: + +```python + async def oauth_gitea_redirect(request): + """GET /login/oauth/gitea — kick off the Gitea OAuth2 flow.""" + if not oauth_mod.is_enabled(config): + return web.Response(status=404, text="OAuth not configured") + state = oauth_mod.make_state() + redirect_uri = f"{request.url.origin()}/login/oauth/gitea/callback" + raise web.HTTPFound(oauth_mod.authorization_url(config, state, redirect_uri)) + + async def oauth_gitea_callback(request): + """GET /login/oauth/gitea/callback — handle Gitea's redirect back.""" + if not oauth_mod.is_enabled(config): + return web.Response(status=404, text="OAuth not configured") + code = request.rel_url.query.get("code", "") + state = request.rel_url.query.get("state", "") + if not code or not state: + return web.Response(status=400, text="Missing code or state") + if not oauth_mod.validate_state(state): + raise web.HTTPFound("/login?error=1") + redirect_uri = f"{request.url.origin()}/login/oauth/gitea/callback" + try: + token = await oauth_mod.exchange_code(config, code, redirect_uri) + profile = await oauth_mod.fetch_user(config, token) + except oauth_mod.OAuthError as exc: + logger.warning("OAuth error: %s", exc) + raise web.HTTPFound("/login?error=1") + user = users_mod.provision_oauth_user( + profile["login"], + profile["full_name"], + profile["avatar_url"], + ) + session_token = users_mod.create_session(user.username) + resp = web.HTTPFound("/") + resp.set_cookie( + SESSION_COOKIE, + session_token, + max_age=users_mod.SESSION_TTL, + httponly=True, + samesite="Lax", + ) + raise resp +``` + +- [ ] **Step 3: Register the routes** + +In `hbd/server/http.py`, add to the route list after the existing auth routes (after `web.post("/api/0/auth/logout", api_logout)`): + +```python + web.get("/login/oauth/gitea", oauth_gitea_redirect), + web.get("/login/oauth/gitea/callback", oauth_gitea_callback), +``` + +- [ ] **Step 4: Manual smoke test** + +Start the server locally with OAuth configured in `~/.hb.yaml`: + +```yaml +oauth: + gitea: + url: https://your-gitea-instance.example.com + client_id: your-client-id + client_secret: your-client-secret +``` + +Visit `http://localhost:50004/login/oauth/gitea` — confirm you are redirected to Gitea's authorization page. + +- [ ] **Step 5: Commit** + +```bash +git add hbd/server/http.py +git commit -m "feat: add Gitea OAuth2 redirect and callback routes" +``` + +--- + +## Task 6: Login page — "Sign in with Gitea" button + +**Files:** +- Modify: `hbd/server/http.py` (update `login_page` handler, ~line 625) + +- [ ] **Step 1: Replace the login page HTML** + +In `hbd/server/http.py`, find the `html = f"""` block inside `login_page` and replace it with: + +```python + gitea_button = "" + if oauth_mod.is_enabled(config): + gitea_url = _gitea_cfg_url(config) + gitea_button = f""" +
or
+ + Sign in with Gitea + """ + + html = f""" + + + + Heartbeat — Login + + + +
+

Heartbeat

+ {'

Invalid username, password, or OAuth error.

' if error else ''} +
+
+
+ +
{gitea_button} +
+ +""" +``` + +- [ ] **Step 2: Add the `_gitea_cfg_url` helper** + +Add this small helper in `hbd/server/http.py` just before the `login_page` handler (around line 600) so the template can read the Gitea display URL without importing internal oauth details: + +```python +def _gitea_cfg_url(config: dict) -> str: + return config.get("oauth", {}).get("gitea", {}).get("url", "") +``` + +Also update the `login_page` handler's `error` logic to show the error when the `?error=1` query param is present (set by the callback on OAuth failure): + +```python + async def login_page(request): + """GET /login — show login form; POST /login — process and redirect.""" + if not users_mod.users_enabled(): + raise web.HTTPFound("/") + + error = "" + if request.method == "POST": + form = await request.post() + username = form.get("username", "") + password = form.get("password", "") + user = users_mod.authenticate(username, password) + if user: + token = users_mod.create_session(username) + redirect_to = request.rel_url.query.get("next", "/") + resp = web.HTTPFound(redirect_to) + resp.set_cookie( + SESSION_COOKIE, + token, + max_age=users_mod.SESSION_TTL, + httponly=True, + samesite="Lax", + ) + raise resp + error = "Invalid username or password." + elif request.rel_url.query.get("error"): + error = "Sign-in failed. Please try again." +``` + +- [ ] **Step 3: Manual verification** + +Start the server with OAuth configured. Visit `/login`. Confirm: +- The "Sign in with Gitea" button appears (green, below a divider) +- Clicking it redirects to Gitea +- After authorising on Gitea, you are redirected back and land on `/` with a valid session cookie + +Without OAuth configured, confirm the button does not appear. + +- [ ] **Step 4: Commit** + +```bash +git add hbd/server/http.py +git commit -m "feat: add Sign in with Gitea button to login page" +``` + +--- + +## Self-Review Notes + +- All 5 spec requirements covered: coexist ✓, auto-provision ✓, regular user ✓, any Gitea user ✓, config-driven ✓ +- `exchange_code` signature in Task 4 matches usage in Task 5 (`config, code, redirect_uri`) ✓ +- `fetch_user` returns `{login, full_name, avatar_url}` — matched in callback handler ✓ +- `validate_state` removes state on use (replay protection) ✓ +- `provision_oauth_user` skips empty strings so existing avatar/name aren't erased ✓ +- `_gitea_cfg_url` is a plain `def`, not `async` — safe to call in template prep ✓