782 lines
24 KiB
Markdown
782 lines
24 KiB
Markdown
# Gitea OAuth2 Authentication Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Add Gitea as an OAuth2 login provider that coexists with password auth, auto-provisioning new users on first login.
|
|
|
|
**Architecture:** A new `oauth.py` module owns all Gitea-specific logic (CSRF state, URL building, token exchange, user-info fetch). `users.py` gains one function to upsert an OAuth-sourced user. `http.py` gets two new route handlers and a small login-page change. No new dependencies — `aiohttp.ClientSession` is already used in the codebase.
|
|
|
|
**Tech Stack:** Python 3.12, aiohttp 3.x, pytest, pytest-asyncio
|
|
|
|
---
|
|
|
|
## File Map
|
|
|
|
| Action | Path | Responsibility |
|
|
|--------|------|----------------|
|
|
| Modify | `hbd/server/config.py` | Add `"oauth": {}` default |
|
|
| Create | `hbd/server/oauth.py` | CSRF state, URL builder, token exchange, user-info fetch |
|
|
| Modify | `hbd/server/users.py` | Add `provision_oauth_user()` |
|
|
| Modify | `hbd/server/http.py` | Import oauth, two new routes, login page button |
|
|
| Create | `tests/test_oauth.py` | All new unit tests |
|
|
|
|
---
|
|
|
|
## Task 1: Add config default and `is_enabled()`
|
|
|
|
**Files:**
|
|
- Modify: `hbd/server/config.py:34` (after the `"users"` line)
|
|
- Create: `hbd/server/oauth.py`
|
|
- Create: `tests/test_oauth.py`
|
|
|
|
- [ ] **Step 1: Write the failing test**
|
|
|
|
Create `tests/test_oauth.py`:
|
|
|
|
```python
|
|
import pytest
|
|
from hbd.server import oauth
|
|
|
|
|
|
CFG_OFF = {}
|
|
CFG_ON = {
|
|
"oauth": {
|
|
"gitea": {
|
|
"url": "https://git.example.com",
|
|
"client_id": "cid",
|
|
"client_secret": "csec",
|
|
}
|
|
}
|
|
}
|
|
CFG_PARTIAL = {"oauth": {"gitea": {"url": "https://git.example.com"}}}
|
|
|
|
|
|
def test_is_enabled_when_all_keys_present():
|
|
assert oauth.is_enabled(CFG_ON) is True
|
|
|
|
|
|
def test_is_enabled_false_when_no_oauth_key():
|
|
assert oauth.is_enabled(CFG_OFF) is False
|
|
|
|
|
|
def test_is_enabled_false_when_partial_config():
|
|
assert oauth.is_enabled(CFG_PARTIAL) is False
|
|
```
|
|
|
|
- [ ] **Step 2: Run to confirm failure**
|
|
|
|
```
|
|
pytest tests/test_oauth.py -v
|
|
```
|
|
|
|
Expected: `ModuleNotFoundError: No module named 'hbd.server.oauth'`
|
|
|
|
- [ ] **Step 3: Add config default**
|
|
|
|
In `hbd/server/config.py`, add after the `"default_owner"` line (currently line 35):
|
|
|
|
```python
|
|
# OAuth2 providers
|
|
"oauth": {}, # oauth.gitea.{url,client_id,client_secret}
|
|
```
|
|
|
|
- [ ] **Step 4: Create `hbd/server/oauth.py` with `is_enabled`**
|
|
|
|
```python
|
|
"""Gitea OAuth2 support.
|
|
|
|
Config shape (in ~/.hb.yaml):
|
|
|
|
oauth:
|
|
gitea:
|
|
url: https://git.example.com
|
|
client_id: <client-id>
|
|
client_secret: <client-secret>
|
|
|
|
Register a Gitea OAuth2 application at:
|
|
Gitea → Settings → Applications → OAuth2
|
|
Set the redirect URI to:
|
|
https://<hbd-host>/login/oauth/gitea/callback
|
|
"""
|
|
|
|
import logging
|
|
import secrets
|
|
import time
|
|
|
|
import aiohttp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
STATE_TTL = 600 # 10 minutes
|
|
|
|
# state_token -> expiry timestamp
|
|
_states: dict[str, float] = {}
|
|
|
|
|
|
class OAuthError(Exception):
|
|
"""Raised when the OAuth2 flow fails for any reason."""
|
|
|
|
|
|
def _gitea_cfg(config: dict) -> dict:
|
|
"""Return the gitea sub-dict or {} if absent/incomplete."""
|
|
return config.get("oauth", {}).get("gitea", {})
|
|
|
|
|
|
def is_enabled(config: dict) -> bool:
|
|
"""Return True when all three required Gitea OAuth keys are present."""
|
|
g = _gitea_cfg(config)
|
|
return bool(g.get("url") and g.get("client_id") and g.get("client_secret"))
|
|
```
|
|
|
|
- [ ] **Step 5: Run to confirm tests pass**
|
|
|
|
```
|
|
pytest tests/test_oauth.py -v
|
|
```
|
|
|
|
Expected: 3 passed
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add hbd/server/config.py hbd/server/oauth.py tests/test_oauth.py
|
|
git commit -m "feat: add oauth module skeleton and is_enabled()"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 2: CSRF state management
|
|
|
|
**Files:**
|
|
- Modify: `hbd/server/oauth.py` (add `make_state`, `validate_state`)
|
|
- Modify: `tests/test_oauth.py` (add state tests)
|
|
|
|
- [ ] **Step 1: Write the failing tests**
|
|
|
|
Append to `tests/test_oauth.py`:
|
|
|
|
```python
|
|
import time as time_mod
|
|
|
|
|
|
def test_make_state_returns_unique_tokens():
|
|
s1 = oauth.make_state()
|
|
s2 = oauth.make_state()
|
|
assert s1 != s2
|
|
assert len(s1) == 64 # 32 bytes hex
|
|
|
|
|
|
def test_validate_state_valid():
|
|
state = oauth.make_state()
|
|
assert oauth.validate_state(state) is True
|
|
|
|
|
|
def test_validate_state_consumed_on_use():
|
|
state = oauth.make_state()
|
|
oauth.validate_state(state)
|
|
assert oauth.validate_state(state) is False # replay rejected
|
|
|
|
|
|
def test_validate_state_unknown():
|
|
assert oauth.validate_state("notastate") is False
|
|
|
|
|
|
def test_validate_state_expired(monkeypatch):
|
|
state = oauth.make_state()
|
|
# Wind expiry into the past
|
|
monkeypatch.setitem(oauth._states, state, time_mod.time() - 1)
|
|
assert oauth.validate_state(state) is False
|
|
```
|
|
|
|
- [ ] **Step 2: Run to confirm failure**
|
|
|
|
```
|
|
pytest tests/test_oauth.py -v -k "state"
|
|
```
|
|
|
|
Expected: `AttributeError: module 'hbd.server.oauth' has no attribute 'make_state'`
|
|
|
|
- [ ] **Step 3: Implement state functions**
|
|
|
|
Add to `hbd/server/oauth.py` after the `_states` dict definition:
|
|
|
|
```python
|
|
def make_state() -> str:
|
|
"""Generate a CSRF state token, store it with TTL, and return it."""
|
|
_purge_states()
|
|
token = secrets.token_hex(32)
|
|
_states[token] = time.time() + STATE_TTL
|
|
return token
|
|
|
|
|
|
def validate_state(state: str) -> bool:
|
|
"""Return True if *state* is known and unexpired; always removes it."""
|
|
expiry = _states.pop(state, None)
|
|
if expiry is None:
|
|
return False
|
|
return time.time() < expiry
|
|
|
|
|
|
def _purge_states() -> None:
|
|
now = time.time()
|
|
expired = [k for k, exp in list(_states.items()) if exp < now]
|
|
for k in expired:
|
|
del _states[k]
|
|
```
|
|
|
|
- [ ] **Step 4: Run to confirm tests pass**
|
|
|
|
```
|
|
pytest tests/test_oauth.py -v
|
|
```
|
|
|
|
Expected: 8 passed
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add hbd/server/oauth.py tests/test_oauth.py
|
|
git commit -m "feat: add OAuth2 CSRF state management"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 3: `provision_oauth_user` in users.py
|
|
|
|
**Files:**
|
|
- Modify: `hbd/server/users.py` (add `provision_oauth_user`)
|
|
- Modify: `tests/test_oauth.py` (add provisioning tests)
|
|
|
|
- [ ] **Step 1: Write the failing tests**
|
|
|
|
Append to `tests/test_oauth.py`:
|
|
|
|
```python
|
|
from hbd.server import users as users_mod
|
|
from hbd.server.users import User
|
|
|
|
|
|
def _reset_users(entries=None):
|
|
users_mod.users = entries or {}
|
|
|
|
|
|
def test_provision_oauth_user_new():
|
|
_reset_users()
|
|
user = users_mod.provision_oauth_user("gituser", "Git User", "https://example.com/avatar.png")
|
|
assert user.username == "gituser"
|
|
assert user.full_name == "Git User"
|
|
assert user.avatar == "https://example.com/avatar.png"
|
|
assert user.admin is False
|
|
assert user.password_hash == ""
|
|
assert "gituser" in users_mod.users
|
|
|
|
|
|
def test_provision_oauth_user_no_password_login():
|
|
_reset_users()
|
|
user = users_mod.provision_oauth_user("gituser", "Git User", "")
|
|
assert user.check_password("anything") is False
|
|
|
|
|
|
def test_provision_oauth_user_existing_updates_profile():
|
|
existing = User(
|
|
username="alice",
|
|
full_name="Old Name",
|
|
avatar="old.png",
|
|
password_hash="pbkdf2:sha256:1:salt:abc",
|
|
admin=True,
|
|
notification_channels=["chan1"],
|
|
)
|
|
_reset_users({"alice": existing})
|
|
user = users_mod.provision_oauth_user("alice", "New Name", "new.png")
|
|
assert user.full_name == "New Name"
|
|
assert user.avatar == "new.png"
|
|
# Preserved
|
|
assert user.admin is True
|
|
assert user.password_hash == "pbkdf2:sha256:1:salt:abc"
|
|
assert user.notification_channels == ["chan1"]
|
|
|
|
|
|
def test_provision_oauth_user_does_not_overwrite_with_empty():
|
|
existing = User(username="bob", full_name="Bob", avatar="bob.png")
|
|
_reset_users({"bob": existing})
|
|
user = users_mod.provision_oauth_user("bob", "", "")
|
|
assert user.full_name == "Bob"
|
|
assert user.avatar == "bob.png"
|
|
```
|
|
|
|
- [ ] **Step 2: Run to confirm failure**
|
|
|
|
```
|
|
pytest tests/test_oauth.py -v -k "provision"
|
|
```
|
|
|
|
Expected: `AttributeError: module 'hbd.server.users' has no attribute 'provision_oauth_user'`
|
|
|
|
- [ ] **Step 3: Implement `provision_oauth_user`**
|
|
|
|
Add to `hbd/server/users.py` after the `authenticate()` function (after line 187):
|
|
|
|
```python
|
|
def provision_oauth_user(username: str, full_name: str, avatar: str) -> "User":
|
|
"""Create or update a user sourced from an OAuth2 provider.
|
|
|
|
New users are inserted with no password_hash — they can only authenticate
|
|
via OAuth. Existing users (e.g. defined in config with a password) have
|
|
their display name and avatar refreshed; all other attributes are preserved.
|
|
"""
|
|
user = users.get(username)
|
|
if user is None:
|
|
user = User(username=username, full_name=full_name, avatar=avatar)
|
|
users[username] = user
|
|
logger.info("Provisioned OAuth user %r", username)
|
|
else:
|
|
if full_name:
|
|
user.full_name = full_name
|
|
if avatar:
|
|
user.avatar = avatar
|
|
return user
|
|
```
|
|
|
|
- [ ] **Step 4: Run to confirm tests pass**
|
|
|
|
```
|
|
pytest tests/test_oauth.py -v
|
|
```
|
|
|
|
Expected: 12 passed
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add hbd/server/users.py tests/test_oauth.py
|
|
git commit -m "feat: add provision_oauth_user() to users module"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 4: URL builder, token exchange, and user-info fetch
|
|
|
|
**Files:**
|
|
- Modify: `hbd/server/oauth.py` (add `authorization_url`, `exchange_code`, `fetch_user`)
|
|
- Modify: `tests/test_oauth.py` (add async tests with mocked HTTP)
|
|
|
|
- [ ] **Step 1: Write the failing tests**
|
|
|
|
Append to `tests/test_oauth.py`:
|
|
|
|
```python
|
|
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
|
|
def test_authorization_url_shape():
|
|
state = "teststate"
|
|
redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback"
|
|
url = oauth.authorization_url(CFG_ON, state, redirect_uri)
|
|
parsed = urlparse(url)
|
|
qs = parse_qs(parsed.query)
|
|
assert parsed.scheme == "https"
|
|
assert parsed.netloc == "git.example.com"
|
|
assert parsed.path == "/login/oauth/authorize"
|
|
assert qs["client_id"] == ["cid"]
|
|
assert qs["state"] == ["teststate"]
|
|
assert qs["redirect_uri"] == [redirect_uri]
|
|
assert qs["scope"] == ["user:email"]
|
|
assert qs["response_type"] == ["code"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exchange_code_returns_token():
|
|
redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback"
|
|
mock_response = AsyncMock()
|
|
mock_response.status = 200
|
|
mock_response.json = AsyncMock(return_value={"access_token": "tok123"})
|
|
|
|
mock_session = MagicMock()
|
|
mock_session.post = MagicMock(return_value=AsyncMock(
|
|
__aenter__=AsyncMock(return_value=mock_response),
|
|
__aexit__=AsyncMock(return_value=False),
|
|
))
|
|
|
|
with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock(
|
|
__aenter__=AsyncMock(return_value=mock_session),
|
|
__aexit__=AsyncMock(return_value=False),
|
|
)):
|
|
token = await oauth.exchange_code(CFG_ON, "mycode", redirect_uri)
|
|
assert token == "tok123"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exchange_code_raises_on_error_status():
|
|
redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback"
|
|
mock_response = AsyncMock()
|
|
mock_response.status = 401
|
|
mock_response.text = AsyncMock(return_value="unauthorized")
|
|
|
|
mock_session = MagicMock()
|
|
mock_session.post = MagicMock(return_value=AsyncMock(
|
|
__aenter__=AsyncMock(return_value=mock_response),
|
|
__aexit__=AsyncMock(return_value=False),
|
|
))
|
|
|
|
with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock(
|
|
__aenter__=AsyncMock(return_value=mock_session),
|
|
__aexit__=AsyncMock(return_value=False),
|
|
)):
|
|
with pytest.raises(oauth.OAuthError):
|
|
await oauth.exchange_code(CFG_ON, "badcode", redirect_uri)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fetch_user_returns_profile():
|
|
mock_response = AsyncMock()
|
|
mock_response.status = 200
|
|
mock_response.json = AsyncMock(return_value={
|
|
"login": "alice",
|
|
"full_name": "Alice Smith",
|
|
"avatar_url": "https://git.example.com/avatars/alice.png",
|
|
})
|
|
|
|
mock_session = MagicMock()
|
|
mock_session.get = MagicMock(return_value=AsyncMock(
|
|
__aenter__=AsyncMock(return_value=mock_response),
|
|
__aexit__=AsyncMock(return_value=False),
|
|
))
|
|
|
|
with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock(
|
|
__aenter__=AsyncMock(return_value=mock_session),
|
|
__aexit__=AsyncMock(return_value=False),
|
|
)):
|
|
profile = await oauth.fetch_user(CFG_ON, "tok123")
|
|
assert profile == {
|
|
"login": "alice",
|
|
"full_name": "Alice Smith",
|
|
"avatar_url": "https://git.example.com/avatars/alice.png",
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Run to confirm failure**
|
|
|
|
```
|
|
pytest tests/test_oauth.py -v -k "url or exchange or fetch"
|
|
```
|
|
|
|
Expected: `AttributeError: module 'hbd.server.oauth' has no attribute 'authorization_url'`
|
|
|
|
- [ ] **Step 3: Implement the three functions**
|
|
|
|
Add to `hbd/server/oauth.py`:
|
|
|
|
```python
|
|
import urllib.parse
|
|
|
|
|
|
def authorization_url(config: dict, state: str, redirect_uri: str) -> str:
|
|
"""Return the Gitea OAuth2 authorization URL to redirect the browser to."""
|
|
g = _gitea_cfg(config)
|
|
params = urllib.parse.urlencode({
|
|
"client_id": g["client_id"],
|
|
"redirect_uri": redirect_uri,
|
|
"response_type": "code",
|
|
"scope": "user:email",
|
|
"state": state,
|
|
})
|
|
return f"{g['url'].rstrip('/')}/login/oauth/authorize?{params}"
|
|
|
|
|
|
async def exchange_code(config: dict, code: str, redirect_uri: str) -> str:
|
|
"""Exchange an authorization *code* for a Gitea access token.
|
|
|
|
Returns the access token string. Raises OAuthError on any failure.
|
|
"""
|
|
g = _gitea_cfg(config)
|
|
url = f"{g['url'].rstrip('/')}/login/oauth/access_token"
|
|
payload = {
|
|
"client_id": g["client_id"],
|
|
"client_secret": g["client_secret"],
|
|
"code": code,
|
|
"grant_type": "authorization_code",
|
|
"redirect_uri": redirect_uri,
|
|
}
|
|
timeout = aiohttp.ClientTimeout(total=10)
|
|
try:
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
async with session.post(url, json=payload, headers={"Accept": "application/json"}) as resp:
|
|
if resp.status != 200:
|
|
text = await resp.text()
|
|
raise OAuthError(f"Token exchange failed ({resp.status}): {text}")
|
|
data = await resp.json()
|
|
except aiohttp.ClientError as exc:
|
|
raise OAuthError(f"Token exchange network error: {exc}") from exc
|
|
token = data.get("access_token")
|
|
if not token:
|
|
raise OAuthError(f"No access_token in response: {data}")
|
|
return token
|
|
|
|
|
|
async def fetch_user(config: dict, token: str) -> dict:
|
|
"""Fetch the authenticated user's profile from Gitea.
|
|
|
|
Returns a dict with keys: login, full_name, avatar_url.
|
|
Raises OAuthError on any failure.
|
|
"""
|
|
g = _gitea_cfg(config)
|
|
url = f"{g['url'].rstrip('/')}/api/v1/user"
|
|
timeout = aiohttp.ClientTimeout(total=10)
|
|
try:
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
async with session.get(url, headers={"Authorization": f"token {token}"}) as resp:
|
|
if resp.status != 200:
|
|
text = await resp.text()
|
|
raise OAuthError(f"User fetch failed ({resp.status}): {text}")
|
|
data = await resp.json()
|
|
except aiohttp.ClientError as exc:
|
|
raise OAuthError(f"User fetch network error: {exc}") from exc
|
|
return {
|
|
"login": data.get("login", ""),
|
|
"full_name": data.get("full_name", ""),
|
|
"avatar_url": data.get("avatar_url", ""),
|
|
}
|
|
```
|
|
|
|
Also add `import urllib.parse` at the top of `oauth.py` (alongside the existing imports).
|
|
|
|
- [ ] **Step 4: Run to confirm tests pass**
|
|
|
|
```
|
|
pytest tests/test_oauth.py -v
|
|
```
|
|
|
|
Expected: 17 passed
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add hbd/server/oauth.py tests/test_oauth.py
|
|
git commit -m "feat: add authorization_url, exchange_code, fetch_user to oauth module"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 5: HTTP routes — redirect and callback
|
|
|
|
**Files:**
|
|
- Modify: `hbd/server/http.py`
|
|
|
|
`http.py` defines all handlers inside `async def start(...)`. The two new handlers go in the same block, just before the `app = web.Application()` line (~line 900). The import goes at the top of the file.
|
|
|
|
- [ ] **Step 1: Add the import**
|
|
|
|
In `hbd/server/http.py`, add after the existing local imports (after `from . import users as users_mod`):
|
|
|
|
```python
|
|
from . import oauth as oauth_mod
|
|
```
|
|
|
|
- [ ] **Step 2: Add the two route handlers**
|
|
|
|
In `hbd/server/http.py`, add the two handlers immediately before the `app = web.Application()` line:
|
|
|
|
```python
|
|
async def oauth_gitea_redirect(request):
|
|
"""GET /login/oauth/gitea — kick off the Gitea OAuth2 flow."""
|
|
if not oauth_mod.is_enabled(config):
|
|
return web.Response(status=404, text="OAuth not configured")
|
|
state = oauth_mod.make_state()
|
|
redirect_uri = f"{request.url.origin()}/login/oauth/gitea/callback"
|
|
raise web.HTTPFound(oauth_mod.authorization_url(config, state, redirect_uri))
|
|
|
|
async def oauth_gitea_callback(request):
|
|
"""GET /login/oauth/gitea/callback — handle Gitea's redirect back."""
|
|
if not oauth_mod.is_enabled(config):
|
|
return web.Response(status=404, text="OAuth not configured")
|
|
code = request.rel_url.query.get("code", "")
|
|
state = request.rel_url.query.get("state", "")
|
|
if not code or not state:
|
|
return web.Response(status=400, text="Missing code or state")
|
|
if not oauth_mod.validate_state(state):
|
|
raise web.HTTPFound("/login?error=1")
|
|
redirect_uri = f"{request.url.origin()}/login/oauth/gitea/callback"
|
|
try:
|
|
token = await oauth_mod.exchange_code(config, code, redirect_uri)
|
|
profile = await oauth_mod.fetch_user(config, token)
|
|
except oauth_mod.OAuthError as exc:
|
|
logger.warning("OAuth error: %s", exc)
|
|
raise web.HTTPFound("/login?error=1")
|
|
user = users_mod.provision_oauth_user(
|
|
profile["login"],
|
|
profile["full_name"],
|
|
profile["avatar_url"],
|
|
)
|
|
session_token = users_mod.create_session(user.username)
|
|
resp = web.HTTPFound("/")
|
|
resp.set_cookie(
|
|
SESSION_COOKIE,
|
|
session_token,
|
|
max_age=users_mod.SESSION_TTL,
|
|
httponly=True,
|
|
samesite="Lax",
|
|
)
|
|
raise resp
|
|
```
|
|
|
|
- [ ] **Step 3: Register the routes**
|
|
|
|
In `hbd/server/http.py`, add to the route list after the existing auth routes (after `web.post("/api/0/auth/logout", api_logout)`):
|
|
|
|
```python
|
|
web.get("/login/oauth/gitea", oauth_gitea_redirect),
|
|
web.get("/login/oauth/gitea/callback", oauth_gitea_callback),
|
|
```
|
|
|
|
- [ ] **Step 4: Manual smoke test**
|
|
|
|
Start the server locally with OAuth configured in `~/.hb.yaml`:
|
|
|
|
```yaml
|
|
oauth:
|
|
gitea:
|
|
url: https://your-gitea-instance.example.com
|
|
client_id: your-client-id
|
|
client_secret: your-client-secret
|
|
```
|
|
|
|
Visit `http://localhost:50004/login/oauth/gitea` — confirm you are redirected to Gitea's authorization page.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add hbd/server/http.py
|
|
git commit -m "feat: add Gitea OAuth2 redirect and callback routes"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 6: Login page — "Sign in with Gitea" button
|
|
|
|
**Files:**
|
|
- Modify: `hbd/server/http.py` (update `login_page` handler, ~line 625)
|
|
|
|
- [ ] **Step 1: Replace the login page HTML**
|
|
|
|
In `hbd/server/http.py`, find the `html = f"""` block inside `login_page` and replace it with:
|
|
|
|
```python
|
|
gitea_button = ""
|
|
if oauth_mod.is_enabled(config):
|
|
gitea_url = _gitea_cfg_url(config)
|
|
gitea_button = f"""
|
|
<div class="divider">or</div>
|
|
<a href="/login/oauth/gitea" class="gitea-btn">
|
|
Sign in with Gitea
|
|
</a>"""
|
|
|
|
html = f"""<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<title>Heartbeat — Login</title>
|
|
<style>
|
|
body {{ font-family: sans-serif; background: #f5f5f5; display: flex;
|
|
justify-content: center; align-items: center; height: 100vh; margin: 0; }}
|
|
.box {{ background: #fff; padding: 2em 2.5em; border-radius: 8px;
|
|
box-shadow: 0 2px 12px rgba(0,0,0,.15); min-width: 300px; }}
|
|
h2 {{ margin: 0 0 1.2em; color: #333; font-size: 1.4em; }}
|
|
label {{ display: block; margin-bottom: .3em; font-size: .9em; color: #555; }}
|
|
input {{ width: 100%; padding: .5em .7em; border: 1px solid #ccc;
|
|
border-radius: 4px; font-size: 1em; box-sizing: border-box; }}
|
|
button {{ margin-top: 1.2em; width: 100%; padding: .6em; background: #0066cc;
|
|
color: #fff; border: none; border-radius: 4px; font-size: 1em; cursor: pointer; }}
|
|
button:hover {{ background: #0055aa; }}
|
|
.error {{ color: #c00; font-size: .9em; margin-bottom: .8em; }}
|
|
.field {{ margin-bottom: .9em; }}
|
|
.divider {{ text-align: center; margin: 1.2em 0 .8em; color: #999;
|
|
font-size: .85em; border-top: 1px solid #eee; padding-top: .8em; }}
|
|
.gitea-btn {{ display: block; width: 100%; padding: .6em; background: #609926;
|
|
color: #fff; border-radius: 4px; font-size: 1em; text-align: center;
|
|
text-decoration: none; box-sizing: border-box; }}
|
|
.gitea-btn:hover {{ background: #4e7d1e; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="box">
|
|
<h2>Heartbeat</h2>
|
|
{'<p class="error">Invalid username, password, or OAuth error.</p>' if error else ''}
|
|
<form method="post">
|
|
<div class="field"><label>Username</label><input name="username" autofocus></div>
|
|
<div class="field"><label>Password</label><input name="password" type="password"></div>
|
|
<button type="submit">Sign in</button>
|
|
</form>{gitea_button}
|
|
</div>
|
|
</body>
|
|
</html>"""
|
|
```
|
|
|
|
- [ ] **Step 2: Add the `_gitea_cfg_url` helper**
|
|
|
|
Add this small helper in `hbd/server/http.py` just before the `login_page` handler (around line 600) so the template can read the Gitea display URL without importing internal oauth details:
|
|
|
|
```python
|
|
def _gitea_cfg_url(config: dict) -> str:
|
|
return config.get("oauth", {}).get("gitea", {}).get("url", "")
|
|
```
|
|
|
|
Also update the `login_page` handler's `error` logic to show the error when the `?error=1` query param is present (set by the callback on OAuth failure):
|
|
|
|
```python
|
|
async def login_page(request):
|
|
"""GET /login — show login form; POST /login — process and redirect."""
|
|
if not users_mod.users_enabled():
|
|
raise web.HTTPFound("/")
|
|
|
|
error = ""
|
|
if request.method == "POST":
|
|
form = await request.post()
|
|
username = form.get("username", "")
|
|
password = form.get("password", "")
|
|
user = users_mod.authenticate(username, password)
|
|
if user:
|
|
token = users_mod.create_session(username)
|
|
redirect_to = request.rel_url.query.get("next", "/")
|
|
resp = web.HTTPFound(redirect_to)
|
|
resp.set_cookie(
|
|
SESSION_COOKIE,
|
|
token,
|
|
max_age=users_mod.SESSION_TTL,
|
|
httponly=True,
|
|
samesite="Lax",
|
|
)
|
|
raise resp
|
|
error = "Invalid username or password."
|
|
elif request.rel_url.query.get("error"):
|
|
error = "Sign-in failed. Please try again."
|
|
```
|
|
|
|
- [ ] **Step 3: Manual verification**
|
|
|
|
Start the server with OAuth configured. Visit `/login`. Confirm:
|
|
- The "Sign in with Gitea" button appears (green, below a divider)
|
|
- Clicking it redirects to Gitea
|
|
- After authorising on Gitea, you are redirected back and land on `/` with a valid session cookie
|
|
|
|
Without OAuth configured, confirm the button does not appear.
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add hbd/server/http.py
|
|
git commit -m "feat: add Sign in with Gitea button to login page"
|
|
```
|
|
|
|
---
|
|
|
|
## Self-Review Notes
|
|
|
|
- All 5 spec requirements covered: coexist ✓, auto-provision ✓, regular user ✓, any Gitea user ✓, config-driven ✓
|
|
- `exchange_code` signature in Task 4 matches usage in Task 5 (`config, code, redirect_uri`) ✓
|
|
- `fetch_user` returns `{login, full_name, avatar_url}` — matched in callback handler ✓
|
|
- `validate_state` removes state on use (replay protection) ✓
|
|
- `provision_oauth_user` skips empty strings so existing avatar/name aren't erased ✓
|
|
- `_gitea_cfg_url` is a plain `def`, not `async` — safe to call in template prep ✓
|