Files
heartbeat/docs/superpowers/plans/2026-05-08-gitea-oauth2.md
T

24 KiB

Gitea OAuth2 Authentication Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add Gitea as an OAuth2 login provider that coexists with password auth, auto-provisioning new users on first login.

Architecture: A new oauth.py module owns all Gitea-specific logic (CSRF state, URL building, token exchange, user-info fetch). users.py gains one function to upsert an OAuth-sourced user. http.py gets two new route handlers and a small login-page change. No new dependencies — aiohttp.ClientSession is already used in the codebase.

Tech Stack: Python 3.12, aiohttp 3.x, pytest, pytest-asyncio


File Map

Action Path Responsibility
Modify hbd/server/config.py Add "oauth": {} default
Create hbd/server/oauth.py CSRF state, URL builder, token exchange, user-info fetch
Modify hbd/server/users.py Add provision_oauth_user()
Modify hbd/server/http.py Import oauth, two new routes, login page button
Create tests/test_oauth.py All new unit tests

Task 1: Add config default and is_enabled()

Files:

  • Modify: hbd/server/config.py:34 (after the "users" line)

  • Create: hbd/server/oauth.py

  • Create: tests/test_oauth.py

  • Step 1: Write the failing test

Create tests/test_oauth.py:

import pytest
from hbd.server import oauth


CFG_OFF = {}
CFG_ON = {
    "oauth": {
        "gitea": {
            "url": "https://git.example.com",
            "client_id": "cid",
            "client_secret": "csec",
        }
    }
}
CFG_PARTIAL = {"oauth": {"gitea": {"url": "https://git.example.com"}}}


def test_is_enabled_when_all_keys_present():
    assert oauth.is_enabled(CFG_ON) is True


def test_is_enabled_false_when_no_oauth_key():
    assert oauth.is_enabled(CFG_OFF) is False


def test_is_enabled_false_when_partial_config():
    assert oauth.is_enabled(CFG_PARTIAL) is False
  • Step 2: Run to confirm failure
pytest tests/test_oauth.py -v

Expected: ModuleNotFoundError: No module named 'hbd.server.oauth'

  • Step 3: Add config default

In hbd/server/config.py, add after the "default_owner" line (currently line 35):

    # OAuth2 providers
    "oauth": {},                 # oauth.gitea.{url,client_id,client_secret}
  • Step 4: Create hbd/server/oauth.py with is_enabled
"""Gitea OAuth2 support.

Config shape (in ~/.hb.yaml):

    oauth:
      gitea:
        url: https://git.example.com
        client_id: <client-id>
        client_secret: <client-secret>

Register a Gitea OAuth2 application at:
  Gitea → Settings → Applications → OAuth2
Set the redirect URI to:
  https://<hbd-host>/login/oauth/gitea/callback
"""

import logging
import secrets
import time

import aiohttp

logger = logging.getLogger(__name__)

STATE_TTL = 600  # 10 minutes

# state_token -> expiry timestamp
_states: dict[str, float] = {}


class OAuthError(Exception):
    """Raised when the OAuth2 flow fails for any reason."""


def _gitea_cfg(config: dict) -> dict:
    """Return the gitea sub-dict or {} if absent/incomplete."""
    return config.get("oauth", {}).get("gitea", {})


def is_enabled(config: dict) -> bool:
    """Return True when all three required Gitea OAuth keys are present."""
    g = _gitea_cfg(config)
    return bool(g.get("url") and g.get("client_id") and g.get("client_secret"))
  • Step 5: Run to confirm tests pass
pytest tests/test_oauth.py -v

Expected: 3 passed

  • Step 6: Commit
git add hbd/server/config.py hbd/server/oauth.py tests/test_oauth.py
git commit -m "feat: add oauth module skeleton and is_enabled()"

Task 2: CSRF state management

Files:

  • Modify: hbd/server/oauth.py (add make_state, validate_state)

  • Modify: tests/test_oauth.py (add state tests)

  • Step 1: Write the failing tests

Append to tests/test_oauth.py:

import time as time_mod


def test_make_state_returns_unique_tokens():
    s1 = oauth.make_state()
    s2 = oauth.make_state()
    assert s1 != s2
    assert len(s1) == 64  # 32 bytes hex


def test_validate_state_valid():
    state = oauth.make_state()
    assert oauth.validate_state(state) is True


def test_validate_state_consumed_on_use():
    state = oauth.make_state()
    oauth.validate_state(state)
    assert oauth.validate_state(state) is False  # replay rejected


def test_validate_state_unknown():
    assert oauth.validate_state("notastate") is False


def test_validate_state_expired(monkeypatch):
    state = oauth.make_state()
    # Wind expiry into the past
    monkeypatch.setitem(oauth._states, state, time_mod.time() - 1)
    assert oauth.validate_state(state) is False
  • Step 2: Run to confirm failure
pytest tests/test_oauth.py -v -k "state"

Expected: AttributeError: module 'hbd.server.oauth' has no attribute 'make_state'

  • Step 3: Implement state functions

Add to hbd/server/oauth.py after the _states dict definition:

def make_state() -> str:
    """Generate a CSRF state token, store it with TTL, and return it."""
    _purge_states()
    token = secrets.token_hex(32)
    _states[token] = time.time() + STATE_TTL
    return token


def validate_state(state: str) -> bool:
    """Return True if *state* is known and unexpired; always removes it."""
    expiry = _states.pop(state, None)
    if expiry is None:
        return False
    return time.time() < expiry


def _purge_states() -> None:
    now = time.time()
    expired = [k for k, exp in list(_states.items()) if exp < now]
    for k in expired:
        del _states[k]
  • Step 4: Run to confirm tests pass
pytest tests/test_oauth.py -v

Expected: 8 passed

  • Step 5: Commit
git add hbd/server/oauth.py tests/test_oauth.py
git commit -m "feat: add OAuth2 CSRF state management"

Task 3: provision_oauth_user in users.py

Files:

  • Modify: hbd/server/users.py (add provision_oauth_user)

  • Modify: tests/test_oauth.py (add provisioning tests)

  • Step 1: Write the failing tests

Append to tests/test_oauth.py:

from hbd.server import users as users_mod
from hbd.server.users import User


def _reset_users(entries=None):
    users_mod.users = entries or {}


def test_provision_oauth_user_new():
    _reset_users()
    user = users_mod.provision_oauth_user("gituser", "Git User", "https://example.com/avatar.png")
    assert user.username == "gituser"
    assert user.full_name == "Git User"
    assert user.avatar == "https://example.com/avatar.png"
    assert user.admin is False
    assert user.password_hash == ""
    assert "gituser" in users_mod.users


def test_provision_oauth_user_no_password_login():
    _reset_users()
    user = users_mod.provision_oauth_user("gituser", "Git User", "")
    assert user.check_password("anything") is False


def test_provision_oauth_user_existing_updates_profile():
    existing = User(
        username="alice",
        full_name="Old Name",
        avatar="old.png",
        password_hash="pbkdf2:sha256:1:salt:abc",
        admin=True,
        notification_channels=["chan1"],
    )
    _reset_users({"alice": existing})
    user = users_mod.provision_oauth_user("alice", "New Name", "new.png")
    assert user.full_name == "New Name"
    assert user.avatar == "new.png"
    # Preserved
    assert user.admin is True
    assert user.password_hash == "pbkdf2:sha256:1:salt:abc"
    assert user.notification_channels == ["chan1"]


def test_provision_oauth_user_does_not_overwrite_with_empty():
    existing = User(username="bob", full_name="Bob", avatar="bob.png")
    _reset_users({"bob": existing})
    user = users_mod.provision_oauth_user("bob", "", "")
    assert user.full_name == "Bob"
    assert user.avatar == "bob.png"
  • Step 2: Run to confirm failure
pytest tests/test_oauth.py -v -k "provision"

Expected: AttributeError: module 'hbd.server.users' has no attribute 'provision_oauth_user'

  • Step 3: Implement provision_oauth_user

Add to hbd/server/users.py after the authenticate() function (after line 187):

def provision_oauth_user(username: str, full_name: str, avatar: str) -> "User":
    """Create or update a user sourced from an OAuth2 provider.

    New users are inserted with no password_hash — they can only authenticate
    via OAuth.  Existing users (e.g. defined in config with a password) have
    their display name and avatar refreshed; all other attributes are preserved.
    """
    user = users.get(username)
    if user is None:
        user = User(username=username, full_name=full_name, avatar=avatar)
        users[username] = user
        logger.info("Provisioned OAuth user %r", username)
    else:
        if full_name:
            user.full_name = full_name
        if avatar:
            user.avatar = avatar
    return user
  • Step 4: Run to confirm tests pass
pytest tests/test_oauth.py -v

Expected: 12 passed

  • Step 5: Commit
git add hbd/server/users.py tests/test_oauth.py
git commit -m "feat: add provision_oauth_user() to users module"

Task 4: URL builder, token exchange, and user-info fetch

Files:

  • Modify: hbd/server/oauth.py (add authorization_url, exchange_code, fetch_user)

  • Modify: tests/test_oauth.py (add async tests with mocked HTTP)

  • Step 1: Write the failing tests

Append to tests/test_oauth.py:

import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from urllib.parse import urlparse, parse_qs


def test_authorization_url_shape():
    state = "teststate"
    redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback"
    url = oauth.authorization_url(CFG_ON, state, redirect_uri)
    parsed = urlparse(url)
    qs = parse_qs(parsed.query)
    assert parsed.scheme == "https"
    assert parsed.netloc == "git.example.com"
    assert parsed.path == "/login/oauth/authorize"
    assert qs["client_id"] == ["cid"]
    assert qs["state"] == ["teststate"]
    assert qs["redirect_uri"] == [redirect_uri]
    assert qs["scope"] == ["user:email"]
    assert qs["response_type"] == ["code"]


@pytest.mark.asyncio
async def test_exchange_code_returns_token():
    redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback"
    mock_response = AsyncMock()
    mock_response.status = 200
    mock_response.json = AsyncMock(return_value={"access_token": "tok123"})

    mock_session = MagicMock()
    mock_session.post = MagicMock(return_value=AsyncMock(
        __aenter__=AsyncMock(return_value=mock_response),
        __aexit__=AsyncMock(return_value=False),
    ))

    with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock(
        __aenter__=AsyncMock(return_value=mock_session),
        __aexit__=AsyncMock(return_value=False),
    )):
        token = await oauth.exchange_code(CFG_ON, "mycode", redirect_uri)
    assert token == "tok123"


@pytest.mark.asyncio
async def test_exchange_code_raises_on_error_status():
    redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback"
    mock_response = AsyncMock()
    mock_response.status = 401
    mock_response.text = AsyncMock(return_value="unauthorized")

    mock_session = MagicMock()
    mock_session.post = MagicMock(return_value=AsyncMock(
        __aenter__=AsyncMock(return_value=mock_response),
        __aexit__=AsyncMock(return_value=False),
    ))

    with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock(
        __aenter__=AsyncMock(return_value=mock_session),
        __aexit__=AsyncMock(return_value=False),
    )):
        with pytest.raises(oauth.OAuthError):
            await oauth.exchange_code(CFG_ON, "badcode", redirect_uri)


@pytest.mark.asyncio
async def test_fetch_user_returns_profile():
    mock_response = AsyncMock()
    mock_response.status = 200
    mock_response.json = AsyncMock(return_value={
        "login": "alice",
        "full_name": "Alice Smith",
        "avatar_url": "https://git.example.com/avatars/alice.png",
    })

    mock_session = MagicMock()
    mock_session.get = MagicMock(return_value=AsyncMock(
        __aenter__=AsyncMock(return_value=mock_response),
        __aexit__=AsyncMock(return_value=False),
    ))

    with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock(
        __aenter__=AsyncMock(return_value=mock_session),
        __aexit__=AsyncMock(return_value=False),
    )):
        profile = await oauth.fetch_user(CFG_ON, "tok123")
    assert profile == {
        "login": "alice",
        "full_name": "Alice Smith",
        "avatar_url": "https://git.example.com/avatars/alice.png",
    }
  • Step 2: Run to confirm failure
pytest tests/test_oauth.py -v -k "url or exchange or fetch"

Expected: AttributeError: module 'hbd.server.oauth' has no attribute 'authorization_url'

  • Step 3: Implement the three functions

Add to hbd/server/oauth.py:

import urllib.parse


def authorization_url(config: dict, state: str, redirect_uri: str) -> str:
    """Return the Gitea OAuth2 authorization URL to redirect the browser to."""
    g = _gitea_cfg(config)
    params = urllib.parse.urlencode({
        "client_id": g["client_id"],
        "redirect_uri": redirect_uri,
        "response_type": "code",
        "scope": "user:email",
        "state": state,
    })
    return f"{g['url'].rstrip('/')}/login/oauth/authorize?{params}"


async def exchange_code(config: dict, code: str, redirect_uri: str) -> str:
    """Exchange an authorization *code* for a Gitea access token.

    Returns the access token string.  Raises OAuthError on any failure.
    """
    g = _gitea_cfg(config)
    url = f"{g['url'].rstrip('/')}/login/oauth/access_token"
    payload = {
        "client_id": g["client_id"],
        "client_secret": g["client_secret"],
        "code": code,
        "grant_type": "authorization_code",
        "redirect_uri": redirect_uri,
    }
    timeout = aiohttp.ClientTimeout(total=10)
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.post(url, json=payload, headers={"Accept": "application/json"}) as resp:
                if resp.status != 200:
                    text = await resp.text()
                    raise OAuthError(f"Token exchange failed ({resp.status}): {text}")
                data = await resp.json()
    except aiohttp.ClientError as exc:
        raise OAuthError(f"Token exchange network error: {exc}") from exc
    token = data.get("access_token")
    if not token:
        raise OAuthError(f"No access_token in response: {data}")
    return token


async def fetch_user(config: dict, token: str) -> dict:
    """Fetch the authenticated user's profile from Gitea.

    Returns a dict with keys: login, full_name, avatar_url.
    Raises OAuthError on any failure.
    """
    g = _gitea_cfg(config)
    url = f"{g['url'].rstrip('/')}/api/v1/user"
    timeout = aiohttp.ClientTimeout(total=10)
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(url, headers={"Authorization": f"token {token}"}) as resp:
                if resp.status != 200:
                    text = await resp.text()
                    raise OAuthError(f"User fetch failed ({resp.status}): {text}")
                data = await resp.json()
    except aiohttp.ClientError as exc:
        raise OAuthError(f"User fetch network error: {exc}") from exc
    return {
        "login": data.get("login", ""),
        "full_name": data.get("full_name", ""),
        "avatar_url": data.get("avatar_url", ""),
    }

Also add import urllib.parse at the top of oauth.py (alongside the existing imports).

  • Step 4: Run to confirm tests pass
pytest tests/test_oauth.py -v

Expected: 17 passed

  • Step 5: Commit
git add hbd/server/oauth.py tests/test_oauth.py
git commit -m "feat: add authorization_url, exchange_code, fetch_user to oauth module"

Task 5: HTTP routes — redirect and callback

Files:

  • Modify: hbd/server/http.py

http.py defines all handlers inside async def start(...). The two new handlers go in the same block, just before the app = web.Application() line (~line 900). The import goes at the top of the file.

  • Step 1: Add the import

In hbd/server/http.py, add after the existing local imports (after from . import users as users_mod):

from . import oauth as oauth_mod
  • Step 2: Add the two route handlers

In hbd/server/http.py, add the two handlers immediately before the app = web.Application() line:

    async def oauth_gitea_redirect(request):
        """GET /login/oauth/gitea — kick off the Gitea OAuth2 flow."""
        if not oauth_mod.is_enabled(config):
            return web.Response(status=404, text="OAuth not configured")
        state = oauth_mod.make_state()
        redirect_uri = f"{request.url.origin()}/login/oauth/gitea/callback"
        raise web.HTTPFound(oauth_mod.authorization_url(config, state, redirect_uri))

    async def oauth_gitea_callback(request):
        """GET /login/oauth/gitea/callback — handle Gitea's redirect back."""
        if not oauth_mod.is_enabled(config):
            return web.Response(status=404, text="OAuth not configured")
        code = request.rel_url.query.get("code", "")
        state = request.rel_url.query.get("state", "")
        if not code or not state:
            return web.Response(status=400, text="Missing code or state")
        if not oauth_mod.validate_state(state):
            raise web.HTTPFound("/login?error=1")
        redirect_uri = f"{request.url.origin()}/login/oauth/gitea/callback"
        try:
            token = await oauth_mod.exchange_code(config, code, redirect_uri)
            profile = await oauth_mod.fetch_user(config, token)
        except oauth_mod.OAuthError as exc:
            logger.warning("OAuth error: %s", exc)
            raise web.HTTPFound("/login?error=1")
        user = users_mod.provision_oauth_user(
            profile["login"],
            profile["full_name"],
            profile["avatar_url"],
        )
        session_token = users_mod.create_session(user.username)
        resp = web.HTTPFound("/")
        resp.set_cookie(
            SESSION_COOKIE,
            session_token,
            max_age=users_mod.SESSION_TTL,
            httponly=True,
            samesite="Lax",
        )
        raise resp
  • Step 3: Register the routes

In hbd/server/http.py, add to the route list after the existing auth routes (after web.post("/api/0/auth/logout", api_logout)):

            web.get("/login/oauth/gitea",          oauth_gitea_redirect),
            web.get("/login/oauth/gitea/callback", oauth_gitea_callback),
  • Step 4: Manual smoke test

Start the server locally with OAuth configured in ~/.hb.yaml:

oauth:
  gitea:
    url: https://your-gitea-instance.example.com
    client_id: your-client-id
    client_secret: your-client-secret

Visit http://localhost:50004/login/oauth/gitea — confirm you are redirected to Gitea's authorization page.

  • Step 5: Commit
git add hbd/server/http.py
git commit -m "feat: add Gitea OAuth2 redirect and callback routes"

Task 6: Login page — "Sign in with Gitea" button

Files:

  • Modify: hbd/server/http.py (update login_page handler, ~line 625)

  • Step 1: Replace the login page HTML

In hbd/server/http.py, find the html = f""" block inside login_page and replace it with:

        gitea_button = ""
        if oauth_mod.is_enabled(config):
            gitea_url = _gitea_cfg_url(config)
            gitea_button = f"""
    <div class="divider">or</div>
    <a href="/login/oauth/gitea" class="gitea-btn">
      Sign in with Gitea
    </a>"""

        html = f"""<!DOCTYPE html>
<html>
<head>
  <meta charset="utf-8">
  <title>Heartbeat — Login</title>
  <style>
    body {{ font-family: sans-serif; background: #f5f5f5; display: flex;
            justify-content: center; align-items: center; height: 100vh; margin: 0; }}
    .box {{ background: #fff; padding: 2em 2.5em; border-radius: 8px;
             box-shadow: 0 2px 12px rgba(0,0,0,.15); min-width: 300px; }}
    h2 {{ margin: 0 0 1.2em; color: #333; font-size: 1.4em; }}
    label {{ display: block; margin-bottom: .3em; font-size: .9em; color: #555; }}
    input {{ width: 100%; padding: .5em .7em; border: 1px solid #ccc;
              border-radius: 4px; font-size: 1em; box-sizing: border-box; }}
    button {{ margin-top: 1.2em; width: 100%; padding: .6em; background: #0066cc;
               color: #fff; border: none; border-radius: 4px; font-size: 1em; cursor: pointer; }}
    button:hover {{ background: #0055aa; }}
    .error {{ color: #c00; font-size: .9em; margin-bottom: .8em; }}
    .field {{ margin-bottom: .9em; }}
    .divider {{ text-align: center; margin: 1.2em 0 .8em; color: #999;
                font-size: .85em; border-top: 1px solid #eee; padding-top: .8em; }}
    .gitea-btn {{ display: block; width: 100%; padding: .6em; background: #609926;
                  color: #fff; border-radius: 4px; font-size: 1em; text-align: center;
                  text-decoration: none; box-sizing: border-box; }}
    .gitea-btn:hover {{ background: #4e7d1e; }}
  </style>
</head>
<body>
  <div class="box">
    <h2>Heartbeat</h2>
    {'<p class="error">Invalid username, password, or OAuth error.</p>' if error else ''}
    <form method="post">
      <div class="field"><label>Username</label><input name="username" autofocus></div>
      <div class="field"><label>Password</label><input name="password" type="password"></div>
      <button type="submit">Sign in</button>
    </form>{gitea_button}
  </div>
</body>
</html>"""
  • Step 2: Add the _gitea_cfg_url helper

Add this small helper in hbd/server/http.py just before the login_page handler (around line 600) so the template can read the Gitea display URL without importing internal oauth details:

def _gitea_cfg_url(config: dict) -> str:
    return config.get("oauth", {}).get("gitea", {}).get("url", "")

Also update the login_page handler's error logic to show the error when the ?error=1 query param is present (set by the callback on OAuth failure):

    async def login_page(request):
        """GET /login — show login form; POST /login — process and redirect."""
        if not users_mod.users_enabled():
            raise web.HTTPFound("/")

        error = ""
        if request.method == "POST":
            form = await request.post()
            username = form.get("username", "")
            password = form.get("password", "")
            user = users_mod.authenticate(username, password)
            if user:
                token = users_mod.create_session(username)
                redirect_to = request.rel_url.query.get("next", "/")
                resp = web.HTTPFound(redirect_to)
                resp.set_cookie(
                    SESSION_COOKIE,
                    token,
                    max_age=users_mod.SESSION_TTL,
                    httponly=True,
                    samesite="Lax",
                )
                raise resp
            error = "Invalid username or password."
        elif request.rel_url.query.get("error"):
            error = "Sign-in failed. Please try again."
  • Step 3: Manual verification

Start the server with OAuth configured. Visit /login. Confirm:

  • The "Sign in with Gitea" button appears (green, below a divider)
  • Clicking it redirects to Gitea
  • After authorising on Gitea, you are redirected back and land on / with a valid session cookie

Without OAuth configured, confirm the button does not appear.

  • Step 4: Commit
git add hbd/server/http.py
git commit -m "feat: add Sign in with Gitea button to login page"

Self-Review Notes

  • All 5 spec requirements covered: coexist ✓, auto-provision ✓, regular user ✓, any Gitea user ✓, config-driven ✓
  • exchange_code signature in Task 4 matches usage in Task 5 (config, code, redirect_uri) ✓
  • fetch_user returns {login, full_name, avatar_url} — matched in callback handler ✓
  • validate_state removes state on use (replay protection) ✓
  • provision_oauth_user skips empty strings so existing avatar/name aren't erased ✓
  • _gitea_cfg_url is a plain def, not async — safe to call in template prep ✓