# Gitea OAuth2 Authentication Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Add Gitea as an OAuth2 login provider that coexists with password auth, auto-provisioning new users on first login. **Architecture:** A new `oauth.py` module owns all Gitea-specific logic (CSRF state, URL building, token exchange, user-info fetch). `users.py` gains one function to upsert an OAuth-sourced user. `http.py` gets two new route handlers and a small login-page change. No new dependencies — `aiohttp.ClientSession` is already used in the codebase. **Tech Stack:** Python 3.12, aiohttp 3.x, pytest, pytest-asyncio --- ## File Map | Action | Path | Responsibility | |--------|------|----------------| | Modify | `hbd/server/config.py` | Add `"oauth": {}` default | | Create | `hbd/server/oauth.py` | CSRF state, URL builder, token exchange, user-info fetch | | Modify | `hbd/server/users.py` | Add `provision_oauth_user()` | | Modify | `hbd/server/http.py` | Import oauth, two new routes, login page button | | Create | `tests/test_oauth.py` | All new unit tests | --- ## Task 1: Add config default and `is_enabled()` **Files:** - Modify: `hbd/server/config.py:34` (after the `"users"` line) - Create: `hbd/server/oauth.py` - Create: `tests/test_oauth.py` - [ ] **Step 1: Write the failing test** Create `tests/test_oauth.py`: ```python import pytest from hbd.server import oauth CFG_OFF = {} CFG_ON = { "oauth": { "gitea": { "url": "https://git.example.com", "client_id": "cid", "client_secret": "csec", } } } CFG_PARTIAL = {"oauth": {"gitea": {"url": "https://git.example.com"}}} def test_is_enabled_when_all_keys_present(): assert oauth.is_enabled(CFG_ON) is True def test_is_enabled_false_when_no_oauth_key(): assert oauth.is_enabled(CFG_OFF) is False def test_is_enabled_false_when_partial_config(): assert oauth.is_enabled(CFG_PARTIAL) is False ``` - [ ] **Step 2: Run to confirm failure** ``` pytest tests/test_oauth.py -v ``` Expected: `ModuleNotFoundError: No module named 'hbd.server.oauth'` - [ ] **Step 3: Add config default** In `hbd/server/config.py`, add after the `"default_owner"` line (currently line 35): ```python # OAuth2 providers "oauth": {}, # oauth.gitea.{url,client_id,client_secret} ``` - [ ] **Step 4: Create `hbd/server/oauth.py` with `is_enabled`** ```python """Gitea OAuth2 support. Config shape (in ~/.hb.yaml): oauth: gitea: url: https://git.example.com client_id: client_secret: Register a Gitea OAuth2 application at: Gitea → Settings → Applications → OAuth2 Set the redirect URI to: https:///login/oauth/gitea/callback """ import logging import secrets import time import aiohttp logger = logging.getLogger(__name__) STATE_TTL = 600 # 10 minutes # state_token -> expiry timestamp _states: dict[str, float] = {} class OAuthError(Exception): """Raised when the OAuth2 flow fails for any reason.""" def _gitea_cfg(config: dict) -> dict: """Return the gitea sub-dict or {} if absent/incomplete.""" return config.get("oauth", {}).get("gitea", {}) def is_enabled(config: dict) -> bool: """Return True when all three required Gitea OAuth keys are present.""" g = _gitea_cfg(config) return bool(g.get("url") and g.get("client_id") and g.get("client_secret")) ``` - [ ] **Step 5: Run to confirm tests pass** ``` pytest tests/test_oauth.py -v ``` Expected: 3 passed - [ ] **Step 6: Commit** ```bash git add hbd/server/config.py hbd/server/oauth.py tests/test_oauth.py git commit -m "feat: add oauth module skeleton and is_enabled()" ``` --- ## Task 2: CSRF state management **Files:** - Modify: `hbd/server/oauth.py` (add `make_state`, `validate_state`) - Modify: `tests/test_oauth.py` (add state tests) - [ ] **Step 1: Write the failing tests** Append to `tests/test_oauth.py`: ```python import time as time_mod def test_make_state_returns_unique_tokens(): s1 = oauth.make_state() s2 = oauth.make_state() assert s1 != s2 assert len(s1) == 64 # 32 bytes hex def test_validate_state_valid(): state = oauth.make_state() assert oauth.validate_state(state) is True def test_validate_state_consumed_on_use(): state = oauth.make_state() oauth.validate_state(state) assert oauth.validate_state(state) is False # replay rejected def test_validate_state_unknown(): assert oauth.validate_state("notastate") is False def test_validate_state_expired(monkeypatch): state = oauth.make_state() # Wind expiry into the past monkeypatch.setitem(oauth._states, state, time_mod.time() - 1) assert oauth.validate_state(state) is False ``` - [ ] **Step 2: Run to confirm failure** ``` pytest tests/test_oauth.py -v -k "state" ``` Expected: `AttributeError: module 'hbd.server.oauth' has no attribute 'make_state'` - [ ] **Step 3: Implement state functions** Add to `hbd/server/oauth.py` after the `_states` dict definition: ```python def make_state() -> str: """Generate a CSRF state token, store it with TTL, and return it.""" _purge_states() token = secrets.token_hex(32) _states[token] = time.time() + STATE_TTL return token def validate_state(state: str) -> bool: """Return True if *state* is known and unexpired; always removes it.""" expiry = _states.pop(state, None) if expiry is None: return False return time.time() < expiry def _purge_states() -> None: now = time.time() expired = [k for k, exp in list(_states.items()) if exp < now] for k in expired: del _states[k] ``` - [ ] **Step 4: Run to confirm tests pass** ``` pytest tests/test_oauth.py -v ``` Expected: 8 passed - [ ] **Step 5: Commit** ```bash git add hbd/server/oauth.py tests/test_oauth.py git commit -m "feat: add OAuth2 CSRF state management" ``` --- ## Task 3: `provision_oauth_user` in users.py **Files:** - Modify: `hbd/server/users.py` (add `provision_oauth_user`) - Modify: `tests/test_oauth.py` (add provisioning tests) - [ ] **Step 1: Write the failing tests** Append to `tests/test_oauth.py`: ```python from hbd.server import users as users_mod from hbd.server.users import User def _reset_users(entries=None): users_mod.users = entries or {} def test_provision_oauth_user_new(): _reset_users() user = users_mod.provision_oauth_user("gituser", "Git User", "https://example.com/avatar.png") assert user.username == "gituser" assert user.full_name == "Git User" assert user.avatar == "https://example.com/avatar.png" assert user.admin is False assert user.password_hash == "" assert "gituser" in users_mod.users def test_provision_oauth_user_no_password_login(): _reset_users() user = users_mod.provision_oauth_user("gituser", "Git User", "") assert user.check_password("anything") is False def test_provision_oauth_user_existing_updates_profile(): existing = User( username="alice", full_name="Old Name", avatar="old.png", password_hash="pbkdf2:sha256:1:salt:abc", admin=True, notification_channels=["chan1"], ) _reset_users({"alice": existing}) user = users_mod.provision_oauth_user("alice", "New Name", "new.png") assert user.full_name == "New Name" assert user.avatar == "new.png" # Preserved assert user.admin is True assert user.password_hash == "pbkdf2:sha256:1:salt:abc" assert user.notification_channels == ["chan1"] def test_provision_oauth_user_does_not_overwrite_with_empty(): existing = User(username="bob", full_name="Bob", avatar="bob.png") _reset_users({"bob": existing}) user = users_mod.provision_oauth_user("bob", "", "") assert user.full_name == "Bob" assert user.avatar == "bob.png" ``` - [ ] **Step 2: Run to confirm failure** ``` pytest tests/test_oauth.py -v -k "provision" ``` Expected: `AttributeError: module 'hbd.server.users' has no attribute 'provision_oauth_user'` - [ ] **Step 3: Implement `provision_oauth_user`** Add to `hbd/server/users.py` after the `authenticate()` function (after line 187): ```python def provision_oauth_user(username: str, full_name: str, avatar: str) -> "User": """Create or update a user sourced from an OAuth2 provider. New users are inserted with no password_hash — they can only authenticate via OAuth. Existing users (e.g. defined in config with a password) have their display name and avatar refreshed; all other attributes are preserved. """ user = users.get(username) if user is None: user = User(username=username, full_name=full_name, avatar=avatar) users[username] = user logger.info("Provisioned OAuth user %r", username) else: if full_name: user.full_name = full_name if avatar: user.avatar = avatar return user ``` - [ ] **Step 4: Run to confirm tests pass** ``` pytest tests/test_oauth.py -v ``` Expected: 12 passed - [ ] **Step 5: Commit** ```bash git add hbd/server/users.py tests/test_oauth.py git commit -m "feat: add provision_oauth_user() to users module" ``` --- ## Task 4: URL builder, token exchange, and user-info fetch **Files:** - Modify: `hbd/server/oauth.py` (add `authorization_url`, `exchange_code`, `fetch_user`) - Modify: `tests/test_oauth.py` (add async tests with mocked HTTP) - [ ] **Step 1: Write the failing tests** Append to `tests/test_oauth.py`: ```python import pytest from unittest.mock import AsyncMock, MagicMock, patch from urllib.parse import urlparse, parse_qs def test_authorization_url_shape(): state = "teststate" redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback" url = oauth.authorization_url(CFG_ON, state, redirect_uri) parsed = urlparse(url) qs = parse_qs(parsed.query) assert parsed.scheme == "https" assert parsed.netloc == "git.example.com" assert parsed.path == "/login/oauth/authorize" assert qs["client_id"] == ["cid"] assert qs["state"] == ["teststate"] assert qs["redirect_uri"] == [redirect_uri] assert qs["scope"] == ["user:email"] assert qs["response_type"] == ["code"] @pytest.mark.asyncio async def test_exchange_code_returns_token(): redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback" mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value={"access_token": "tok123"}) mock_session = MagicMock() mock_session.post = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock(return_value=False), )) with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): token = await oauth.exchange_code(CFG_ON, "mycode", redirect_uri) assert token == "tok123" @pytest.mark.asyncio async def test_exchange_code_raises_on_error_status(): redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback" mock_response = AsyncMock() mock_response.status = 401 mock_response.text = AsyncMock(return_value="unauthorized") mock_session = MagicMock() mock_session.post = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock(return_value=False), )) with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): with pytest.raises(oauth.OAuthError): await oauth.exchange_code(CFG_ON, "badcode", redirect_uri) @pytest.mark.asyncio async def test_fetch_user_returns_profile(): mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value={ "login": "alice", "full_name": "Alice Smith", "avatar_url": "https://git.example.com/avatars/alice.png", }) mock_session = MagicMock() mock_session.get = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock(return_value=False), )) with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): profile = await oauth.fetch_user(CFG_ON, "tok123") assert profile == { "login": "alice", "full_name": "Alice Smith", "avatar_url": "https://git.example.com/avatars/alice.png", } ``` - [ ] **Step 2: Run to confirm failure** ``` pytest tests/test_oauth.py -v -k "url or exchange or fetch" ``` Expected: `AttributeError: module 'hbd.server.oauth' has no attribute 'authorization_url'` - [ ] **Step 3: Implement the three functions** Add to `hbd/server/oauth.py`: ```python import urllib.parse def authorization_url(config: dict, state: str, redirect_uri: str) -> str: """Return the Gitea OAuth2 authorization URL to redirect the browser to.""" g = _gitea_cfg(config) params = urllib.parse.urlencode({ "client_id": g["client_id"], "redirect_uri": redirect_uri, "response_type": "code", "scope": "user:email", "state": state, }) return f"{g['url'].rstrip('/')}/login/oauth/authorize?{params}" async def exchange_code(config: dict, code: str, redirect_uri: str) -> str: """Exchange an authorization *code* for a Gitea access token. Returns the access token string. Raises OAuthError on any failure. """ g = _gitea_cfg(config) url = f"{g['url'].rstrip('/')}/login/oauth/access_token" payload = { "client_id": g["client_id"], "client_secret": g["client_secret"], "code": code, "grant_type": "authorization_code", "redirect_uri": redirect_uri, } timeout = aiohttp.ClientTimeout(total=10) try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(url, json=payload, headers={"Accept": "application/json"}) as resp: if resp.status != 200: text = await resp.text() raise OAuthError(f"Token exchange failed ({resp.status}): {text}") data = await resp.json() except aiohttp.ClientError as exc: raise OAuthError(f"Token exchange network error: {exc}") from exc token = data.get("access_token") if not token: raise OAuthError(f"No access_token in response: {data}") return token async def fetch_user(config: dict, token: str) -> dict: """Fetch the authenticated user's profile from Gitea. Returns a dict with keys: login, full_name, avatar_url. Raises OAuthError on any failure. """ g = _gitea_cfg(config) url = f"{g['url'].rstrip('/')}/api/v1/user" timeout = aiohttp.ClientTimeout(total=10) try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url, headers={"Authorization": f"token {token}"}) as resp: if resp.status != 200: text = await resp.text() raise OAuthError(f"User fetch failed ({resp.status}): {text}") data = await resp.json() except aiohttp.ClientError as exc: raise OAuthError(f"User fetch network error: {exc}") from exc return { "login": data.get("login", ""), "full_name": data.get("full_name", ""), "avatar_url": data.get("avatar_url", ""), } ``` Also add `import urllib.parse` at the top of `oauth.py` (alongside the existing imports). - [ ] **Step 4: Run to confirm tests pass** ``` pytest tests/test_oauth.py -v ``` Expected: 17 passed - [ ] **Step 5: Commit** ```bash git add hbd/server/oauth.py tests/test_oauth.py git commit -m "feat: add authorization_url, exchange_code, fetch_user to oauth module" ``` --- ## Task 5: HTTP routes — redirect and callback **Files:** - Modify: `hbd/server/http.py` `http.py` defines all handlers inside `async def start(...)`. The two new handlers go in the same block, just before the `app = web.Application()` line (~line 900). The import goes at the top of the file. - [ ] **Step 1: Add the import** In `hbd/server/http.py`, add after the existing local imports (after `from . import users as users_mod`): ```python from . import oauth as oauth_mod ``` - [ ] **Step 2: Add the two route handlers** In `hbd/server/http.py`, add the two handlers immediately before the `app = web.Application()` line: ```python async def oauth_gitea_redirect(request): """GET /login/oauth/gitea — kick off the Gitea OAuth2 flow.""" if not oauth_mod.is_enabled(config): return web.Response(status=404, text="OAuth not configured") state = oauth_mod.make_state() redirect_uri = f"{request.url.origin()}/login/oauth/gitea/callback" raise web.HTTPFound(oauth_mod.authorization_url(config, state, redirect_uri)) async def oauth_gitea_callback(request): """GET /login/oauth/gitea/callback — handle Gitea's redirect back.""" if not oauth_mod.is_enabled(config): return web.Response(status=404, text="OAuth not configured") code = request.rel_url.query.get("code", "") state = request.rel_url.query.get("state", "") if not code or not state: return web.Response(status=400, text="Missing code or state") if not oauth_mod.validate_state(state): raise web.HTTPFound("/login?error=1") redirect_uri = f"{request.url.origin()}/login/oauth/gitea/callback" try: token = await oauth_mod.exchange_code(config, code, redirect_uri) profile = await oauth_mod.fetch_user(config, token) except oauth_mod.OAuthError as exc: logger.warning("OAuth error: %s", exc) raise web.HTTPFound("/login?error=1") user = users_mod.provision_oauth_user( profile["login"], profile["full_name"], profile["avatar_url"], ) session_token = users_mod.create_session(user.username) resp = web.HTTPFound("/") resp.set_cookie( SESSION_COOKIE, session_token, max_age=users_mod.SESSION_TTL, httponly=True, samesite="Lax", ) raise resp ``` - [ ] **Step 3: Register the routes** In `hbd/server/http.py`, add to the route list after the existing auth routes (after `web.post("/api/0/auth/logout", api_logout)`): ```python web.get("/login/oauth/gitea", oauth_gitea_redirect), web.get("/login/oauth/gitea/callback", oauth_gitea_callback), ``` - [ ] **Step 4: Manual smoke test** Start the server locally with OAuth configured in `~/.hb.yaml`: ```yaml oauth: gitea: url: https://your-gitea-instance.example.com client_id: your-client-id client_secret: your-client-secret ``` Visit `http://localhost:50004/login/oauth/gitea` — confirm you are redirected to Gitea's authorization page. - [ ] **Step 5: Commit** ```bash git add hbd/server/http.py git commit -m "feat: add Gitea OAuth2 redirect and callback routes" ``` --- ## Task 6: Login page — "Sign in with Gitea" button **Files:** - Modify: `hbd/server/http.py` (update `login_page` handler, ~line 625) - [ ] **Step 1: Replace the login page HTML** In `hbd/server/http.py`, find the `html = f"""` block inside `login_page` and replace it with: ```python gitea_button = "" if oauth_mod.is_enabled(config): gitea_url = _gitea_cfg_url(config) gitea_button = f"""
or
Sign in with Gitea """ html = f""" Heartbeat — Login

Heartbeat

{'

Invalid username, password, or OAuth error.

' if error else ''}
{gitea_button}
""" ``` - [ ] **Step 2: Add the `_gitea_cfg_url` helper** Add this small helper in `hbd/server/http.py` just before the `login_page` handler (around line 600) so the template can read the Gitea display URL without importing internal oauth details: ```python def _gitea_cfg_url(config: dict) -> str: return config.get("oauth", {}).get("gitea", {}).get("url", "") ``` Also update the `login_page` handler's `error` logic to show the error when the `?error=1` query param is present (set by the callback on OAuth failure): ```python async def login_page(request): """GET /login — show login form; POST /login — process and redirect.""" if not users_mod.users_enabled(): raise web.HTTPFound("/") error = "" if request.method == "POST": form = await request.post() username = form.get("username", "") password = form.get("password", "") user = users_mod.authenticate(username, password) if user: token = users_mod.create_session(username) redirect_to = request.rel_url.query.get("next", "/") resp = web.HTTPFound(redirect_to) resp.set_cookie( SESSION_COOKIE, token, max_age=users_mod.SESSION_TTL, httponly=True, samesite="Lax", ) raise resp error = "Invalid username or password." elif request.rel_url.query.get("error"): error = "Sign-in failed. Please try again." ``` - [ ] **Step 3: Manual verification** Start the server with OAuth configured. Visit `/login`. Confirm: - The "Sign in with Gitea" button appears (green, below a divider) - Clicking it redirects to Gitea - After authorising on Gitea, you are redirected back and land on `/` with a valid session cookie Without OAuth configured, confirm the button does not appear. - [ ] **Step 4: Commit** ```bash git add hbd/server/http.py git commit -m "feat: add Sign in with Gitea button to login page" ``` --- ## Self-Review Notes - All 5 spec requirements covered: coexist ✓, auto-provision ✓, regular user ✓, any Gitea user ✓, config-driven ✓ - `exchange_code` signature in Task 4 matches usage in Task 5 (`config, code, redirect_uri`) ✓ - `fetch_user` returns `{login, full_name, avatar_url}` — matched in callback handler ✓ - `validate_state` removes state on use (replay protection) ✓ - `provision_oauth_user` skips empty strings so existing avatar/name aren't erased ✓ - `_gitea_cfg_url` is a plain `def`, not `async` — safe to call in template prep ✓