import logging import time as time_mod from unittest.mock import AsyncMock, MagicMock, patch from urllib.parse import urlparse, parse_qs import pytest from hbd.server import oauth from hbd.server import users as users_mod from hbd.server.users import User CFG_OFF = {} CFG_ON = { "oauth": { "gitea": { "url": "https://git.example.com", "client_id": "cid", "client_secret": "csec", } } } CFG_PARTIAL = {"oauth": {"gitea": {"url": "https://git.example.com"}}} @pytest.fixture(autouse=True) def clear_oauth_states(): oauth._states.clear() yield oauth._states.clear() @pytest.fixture(autouse=True) def reset_users_dict(): original = dict(users_mod.users) yield users_mod.users = original def test_make_state_returns_unique_tokens(): s1 = oauth.make_state() s2 = oauth.make_state() assert s1 != s2 assert len(s1) == 64 # 32 bytes hex def test_validate_state_valid(): state = oauth.make_state() assert oauth.validate_state(state) is True def test_validate_state_consumed_on_use(): state = oauth.make_state() oauth.validate_state(state) assert oauth.validate_state(state) is False # replay rejected def test_validate_state_unknown(): assert oauth.validate_state("notastate") is False def test_validate_state_expired(monkeypatch): state = oauth.make_state() # Wind expiry into the past monkeypatch.setitem(oauth._states, state, time_mod.time() - 1000) assert oauth.validate_state(state) is False def _reset_users(entries=None): users_mod.users = entries or {} def test_provision_oauth_user_new(): _reset_users() user = users_mod.provision_oauth_user("gituser", "Git User", "https://example.com/avatar.png") assert user.username == "gituser" assert user.full_name == "Git User" assert user.avatar == "https://example.com/avatar.png" assert user.admin is False assert user.password_hash == "" assert "gituser" in users_mod.users def test_provision_oauth_user_no_password_login(): _reset_users() user = users_mod.provision_oauth_user("gituser", "Git User", "") assert user.check_password("anything") is False def test_provision_oauth_user_existing_updates_profile(): existing = User( username="alice", full_name="Old Name", avatar="old.png", password_hash="pbkdf2:sha256:1:salt:abc", admin=True, notification_channels=["chan1"], ) _reset_users({"alice": existing}) user = users_mod.provision_oauth_user("alice", "New Name", "new.png") assert user.full_name == "New Name" assert user.avatar == "new.png" # Preserved assert user.admin is True assert user.password_hash == "pbkdf2:sha256:1:salt:abc" assert user.notification_channels == ["chan1"] def test_provision_oauth_user_does_not_overwrite_with_empty(): existing = User(username="bob", full_name="Bob", avatar="bob.png") _reset_users({"bob": existing}) user = users_mod.provision_oauth_user("bob", "", "") assert user.full_name == "Bob" assert user.avatar == "bob.png" def test_provision_oauth_user_survives_config_reload(): _reset_users() users_mod.provision_oauth_user("oauthonly", "OAuth Only", "https://example.com/a.png") assert "oauthonly" in users_mod.users # Reload with empty config — OAuth user should survive users_mod.load_users({}) assert "oauthonly" in users_mod.users # --------------------------------------------------------------------------- # Integration-style tests: callback logic chain # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_callback_invalid_state_rejects(): """Verify validate_state returns False for unknown state tokens.""" fake_state = "this-is-not-a-real-state" assert oauth.validate_state(fake_state) is False @pytest.mark.asyncio async def test_full_oauth_flow_chain(): """Integration-style test: state → exchange → fetch → provision chain.""" p = _gitea_provider() redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback" state = oauth.make_state() assert oauth.validate_state(state) is True mock_token_response = AsyncMock() mock_token_response.status = 200 mock_token_response.json = AsyncMock(return_value={"access_token": "flow_token"}) mock_user_response = AsyncMock() mock_user_response.status = 200 mock_user_response.json = AsyncMock(return_value={ "login": "flowuser", "full_name": "Flow User", "avatar_url": "https://git.example.com/avatars/flow.png", }) mock_session = MagicMock() mock_session.post = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_token_response), __aexit__=AsyncMock(return_value=False), )) mock_session.get = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_user_response), __aexit__=AsyncMock(return_value=False), )) with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): token = await oauth.exchange_code(p, "authcode", redirect_uri) profile = await oauth.fetch_user(p, token) assert token == "flow_token" assert profile["login"] == "flowuser" _reset_users() user = users_mod.provision_oauth_user( profile["login"], profile["full_name"], profile["avatar_url"] ) assert user.username == "flowuser" assert user.check_password("anything") is False # --------------------------------------------------------------------------- # get_providers() # --------------------------------------------------------------------------- CFG_GITHUB = { "oauth": { "github": {"type": "github", "client_id": "ghid", "client_secret": "ghs"}, } } CFG_NEXTCLOUD = { "oauth": { "nc": { "type": "nextcloud", "url": "https://nc.example.com", "client_id": "ncid", "client_secret": "ncs", } } } CFG_MULTI = { "oauth": { "mygitea": { "type": "gitea", "url": "https://git.example.com", "client_id": "cid", "client_secret": "cs", "label": "Work Gitea", "logo": "https://example.com/logo.png", }, "github": {"type": "github", "client_id": "ghid", "client_secret": "ghs"}, "nc": { "type": "nextcloud", "url": "https://nc.example.com", "client_id": "ncid", "client_secret": "ncs", }, } } def test_get_providers_backward_compat_no_type_field(): """Old config without 'type' defaults to gitea.""" providers = oauth.get_providers(CFG_ON) assert len(providers) == 1 p = providers[0] assert p.name == "gitea" assert p.type == "gitea" assert p.label == "Gitea" assert p.client_id == "cid" assert p.authorize_url == "https://git.example.com/login/oauth/authorize" assert p.token_url == "https://git.example.com/login/oauth/access_token" assert p.profile_url == "https://git.example.com/api/v1/user" assert p.scope == "user:email" assert p.profile_data_path == [] def test_get_providers_multiple(): providers = oauth.get_providers(CFG_MULTI) assert len(providers) == 3 names = [p.name for p in providers] assert "mygitea" in names assert "github" in names assert "nc" in names def test_get_providers_custom_label_and_logo(): providers = oauth.get_providers(CFG_MULTI) gitea = next(p for p in providers if p.name == "mygitea") assert gitea.label == "Work Gitea" assert gitea.logo == "https://example.com/logo.png" def test_get_providers_github_default_label(): providers = oauth.get_providers(CFG_GITHUB) assert providers[0].label == "GitHub" assert providers[0].logo == "" def test_get_providers_github_fixed_urls(): providers = oauth.get_providers(CFG_GITHUB) p = providers[0] assert p.authorize_url == "https://github.com/login/oauth/authorize" assert p.token_url == "https://github.com/login/oauth/access_token" assert p.profile_url == "https://api.github.com/user" assert p.scope == "read:user" def test_get_providers_nextcloud_urls_and_path(): providers = oauth.get_providers(CFG_NEXTCLOUD) p = providers[0] assert p.authorize_url == "https://nc.example.com/apps/oauth2/authorize" assert p.token_url == "https://nc.example.com/apps/oauth2/api/v1/token" assert p.profile_url == "https://nc.example.com/ocs/v2.php/cloud/user?format=json" assert p.profile_data_path == ["ocs", "data"] assert p.scope == "" def test_get_providers_skips_missing_client_id(caplog): cfg = {"oauth": {"gitea": {"url": "https://git.example.com", "client_secret": "cs"}}} with caplog.at_level(logging.WARNING, logger="hbd.server.oauth"): result = oauth.get_providers(cfg) assert result == [] assert "missing" in caplog.text.lower() def test_get_providers_skips_missing_client_secret(caplog): cfg = {"oauth": {"gitea": {"url": "https://git.example.com", "client_id": "cid"}}} with caplog.at_level(logging.WARNING, logger="hbd.server.oauth"): result = oauth.get_providers(cfg) assert result == [] assert "missing" in caplog.text.lower() def test_get_providers_skips_missing_url_for_gitea(caplog): cfg = {"oauth": {"gitea": {"type": "gitea", "client_id": "cid", "client_secret": "cs"}}} with caplog.at_level(logging.WARNING, logger="hbd.server.oauth"): result = oauth.get_providers(cfg) assert result == [] assert "url" in caplog.text.lower() def test_get_providers_skips_missing_url_for_nextcloud(caplog): cfg = {"oauth": {"nc": {"type": "nextcloud", "client_id": "cid", "client_secret": "cs"}}} with caplog.at_level(logging.WARNING, logger="hbd.server.oauth"): result = oauth.get_providers(cfg) assert result == [] assert "url" in caplog.text.lower() def test_get_providers_github_no_url_required(): providers = oauth.get_providers(CFG_GITHUB) assert len(providers) == 1 def test_get_providers_skips_unknown_type(caplog): cfg = {"oauth": {"mystery": {"type": "saml", "client_id": "cid", "client_secret": "cs"}}} import logging with caplog.at_level(logging.WARNING, logger="hbd.server.oauth"): result = oauth.get_providers(cfg) assert result == [] assert "saml" in caplog.text def test_get_providers_empty_config(): assert oauth.get_providers({}) == [] assert oauth.get_providers(CFG_OFF) == [] # --------------------------------------------------------------------------- # build_auth_url / exchange_code / fetch_user (generic, ResolvedProvider-based) # --------------------------------------------------------------------------- def _gitea_provider() -> oauth.ResolvedProvider: return oauth.get_providers(CFG_ON)[0] def _github_provider() -> oauth.ResolvedProvider: return oauth.get_providers(CFG_GITHUB)[0] def _nextcloud_provider() -> oauth.ResolvedProvider: return oauth.get_providers(CFG_NEXTCLOUD)[0] def test_build_auth_url_gitea(): p = _gitea_provider() url = oauth.build_auth_url(p, "teststate", "https://hbd.example.com/login/oauth/gitea/callback") parsed = urlparse(url) qs = parse_qs(parsed.query) assert parsed.netloc == "git.example.com" assert parsed.path == "/login/oauth/authorize" assert qs["client_id"] == ["cid"] assert qs["state"] == ["teststate"] assert qs["scope"] == ["user:email"] assert qs["response_type"] == ["code"] assert qs["redirect_uri"] == ["https://hbd.example.com/login/oauth/gitea/callback"] def test_build_auth_url_github(): p = _github_provider() url = oauth.build_auth_url(p, "st", "https://hbd.example.com/login/oauth/github/callback") parsed = urlparse(url) qs = parse_qs(parsed.query) assert parsed.netloc == "github.com" assert qs["scope"] == ["read:user"] def test_build_auth_url_nextcloud_no_scope_param(): """Nextcloud scope is empty — the 'scope' key must be absent from the URL.""" p = _nextcloud_provider() url = oauth.build_auth_url(p, "st", "https://hbd.example.com/login/oauth/nc/callback") qs = parse_qs(urlparse(url).query) assert "scope" not in qs @pytest.mark.asyncio async def test_exchange_code_generic_returns_token(): p = _gitea_provider() redirect_uri = "https://hbd.example.com/login/oauth/gitea/callback" mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value={"access_token": "tok123"}) mock_session = MagicMock() mock_session.post = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock(return_value=False), )) with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): token = await oauth.exchange_code(p, "mycode", redirect_uri) assert token == "tok123" @pytest.mark.asyncio async def test_exchange_code_sends_accept_json(): """Accept: application/json must be present for all providers (required by GitHub).""" p = _github_provider() captured_headers = {} mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value={"access_token": "ghtoken"}) mock_session = MagicMock() def capture_post(url, **kwargs): captured_headers.update(kwargs.get("headers", {})) return AsyncMock( __aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock(return_value=False), ) mock_session.post = capture_post with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): await oauth.exchange_code(p, "code", "https://hbd.example.com/login/oauth/github/callback") assert captured_headers.get("Accept") == "application/json" @pytest.mark.asyncio async def test_exchange_code_raises_on_error_status(): p = _gitea_provider() mock_response = AsyncMock() mock_response.status = 401 mock_response.text = AsyncMock(return_value="unauthorized") mock_session = MagicMock() mock_session.post = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock(return_value=False), )) with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): with pytest.raises(oauth.OAuthError): await oauth.exchange_code(p, "badcode", "https://hbd.example.com/login/oauth/gitea/callback") @pytest.mark.asyncio async def test_exchange_code_raises_when_no_access_token(): p = _gitea_provider() mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value={"error": "bad_request"}) mock_session = MagicMock() mock_session.post = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock(return_value=False), )) with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): with pytest.raises(oauth.OAuthError): await oauth.exchange_code(p, "mycode", "https://hbd.example.com/login/oauth/gitea/callback") @pytest.mark.asyncio async def test_fetch_user_gitea_returns_profile(): p = _gitea_provider() mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value={ "login": "alice", "full_name": "Alice Smith", "avatar_url": "https://git.example.com/avatars/alice.png", }) mock_session = MagicMock() mock_session.get = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock(return_value=False), )) with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): profile = await oauth.fetch_user(p, "tok123") assert profile == { "login": "alice", "full_name": "Alice Smith", "avatar_url": "https://git.example.com/avatars/alice.png", } @pytest.mark.asyncio async def test_fetch_user_github_maps_name_field(): p = _github_provider() mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value={ "login": "bobgh", "name": "Bob GitHub", "avatar_url": "https://avatars.githubusercontent.com/u/1", }) mock_session = MagicMock() mock_session.get = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock(return_value=False), )) with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): profile = await oauth.fetch_user(p, "ghtoken") assert profile["login"] == "bobgh" assert profile["full_name"] == "Bob GitHub" assert profile["avatar_url"] == "https://avatars.githubusercontent.com/u/1" @pytest.mark.asyncio async def test_fetch_user_nextcloud_nested_extraction(): """Nextcloud profile is nested under ocs.data; avatar is absent.""" p = _nextcloud_provider() mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value={ "ocs": { "meta": {"status": "ok", "statuscode": 200}, "data": { "id": "ncuser", "display-name": "NC User", "email": "nc@example.com", }, } }) mock_session = MagicMock() mock_session.get = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock(return_value=False), )) with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): profile = await oauth.fetch_user(p, "nctoken") assert profile["login"] == "ncuser" assert profile["full_name"] == "NC User" assert profile["avatar_url"] == "" # Nextcloud has no avatar field @pytest.mark.asyncio async def test_fetch_user_raises_on_error_status(): p = _gitea_provider() mock_response = AsyncMock() mock_response.status = 401 mock_response.text = AsyncMock(return_value="unauthorized") mock_session = MagicMock() mock_session.get = MagicMock(return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock(return_value=False), )) with patch("hbd.server.oauth.aiohttp.ClientSession", return_value=AsyncMock( __aenter__=AsyncMock(return_value=mock_session), __aexit__=AsyncMock(return_value=False), )): with pytest.raises(oauth.OAuthError): await oauth.fetch_user(p, "badtoken") def test_is_enabled_with_valid_provider(): assert oauth.is_enabled(CFG_ON) is True def test_is_enabled_false_when_no_providers(): assert oauth.is_enabled(CFG_OFF) is False def test_is_enabled_false_partial_config(): assert oauth.is_enabled(CFG_PARTIAL) is False