From 2142667f28b2e70255ca677f1e593aa6a512a697 Mon Sep 17 00:00:00 2001 From: JoinProAI Date: Sat, 28 Mar 2026 15:09:40 -0400 Subject: [PATCH] fix(auth): respect explicitly-set client_metadata.scope during discovery The scope selection strategy in async_auth_flow unconditionally overwrites client_metadata.scope with server-advertised scopes. This is problematic when the caller has explicitly set scopes to limit permissions or to avoid rejection by servers that only permit certain scopes (e.g. SalesForce MCP server). Only apply automatic scope selection when client_metadata.scope is None, preserving any explicitly-set value. Github-Issue: #2317 Reported-by: jbweston --- src/mcp/client/auth/oauth2.py | 13 ++-- tests/client/test_auth.py | 135 ++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 5 deletions(-) diff --git a/src/mcp/client/auth/oauth2.py b/src/mcp/client/auth/oauth2.py index 25075dec3..1a45dae76 100644 --- a/src/mcp/client/auth/oauth2.py +++ b/src/mcp/client/auth/oauth2.py @@ -572,11 +572,14 @@ async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx. logger.debug(f"OAuth metadata discovery failed: {url}") # Step 3: Apply scope selection strategy - self.context.client_metadata.scope = get_client_metadata_scopes( - extract_scope_from_www_auth(response), - self.context.protected_resource_metadata, - self.context.oauth_metadata, - ) + # Respect explicitly-set scopes; only auto-select + # when the caller hasn't specified any. + if self.context.client_metadata.scope is None: + self.context.client_metadata.scope = get_client_metadata_scopes( + extract_scope_from_www_auth(response), + self.context.protected_resource_metadata, + self.context.oauth_metadata, + ) # Step 4: Register client or use URL-based client ID (CIMD) if not self.context.client_info: diff --git a/tests/client/test_auth.py b/tests/client/test_auth.py index 5aa985e36..3fbd2b113 100644 --- a/tests/client/test_auth.py +++ b/tests/client/test_auth.py @@ -1167,6 +1167,141 @@ async def test_auth_flow_with_no_tokens(self, oauth_provider: OAuthClientProvide assert oauth_provider.context.current_tokens.access_token == "new_access_token" assert oauth_provider.context.token_expiry_time is not None + @pytest.mark.anyio + async def test_auth_flow_preserves_explicit_scopes( + self, oauth_provider: OAuthClientProvider, mock_storage: MockTokenStorage + ): + """Test that explicitly-set client_metadata.scope is not overwritten during discovery.""" + oauth_provider.context.current_tokens = None + oauth_provider.context.token_expiry_time = None + oauth_provider._initialized = True + + # The fixture sets scope="read write" — verify it is preserved + assert oauth_provider.context.client_metadata.scope == "read write" + + test_request = httpx.Request("GET", "https://api.example.com/mcp") + auth_flow = oauth_provider.async_auth_flow(test_request) + + # First request — no auth header + await auth_flow.__anext__() + + # 401 triggers OAuth discovery + response = httpx.Response( + 401, + headers={ + "WWW-Authenticate": ( + 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource",' + ' scope="server:scope1 server:scope2"' + ) + }, + request=test_request, + ) + + # PRM discovery + prm_request = await auth_flow.asend(response) + prm_response = httpx.Response( + 200, + content=( + b'{"resource": "https://api.example.com/v1/mcp",' + b' "authorization_servers": ["https://auth.example.com"],' + b' "scopes_supported": ["server:scope1", "server:scope2"]}' + ), + request=prm_request, + ) + + # OAuth metadata discovery + oauth_request = await auth_flow.asend(prm_response) + oauth_response = httpx.Response( + 200, + content=( + b'{"issuer": "https://auth.example.com",' + b' "authorization_endpoint": "https://auth.example.com/authorize",' + b' "token_endpoint": "https://auth.example.com/token",' + b' "registration_endpoint": "https://auth.example.com/register"}' + ), + request=oauth_request, + ) + + # After scope selection (Step 3), the explicit scope must be preserved + await auth_flow.asend(oauth_response) + assert oauth_provider.context.client_metadata.scope == "read write" + + # Clean up the generator + await auth_flow.aclose() + + @pytest.mark.anyio + async def test_auth_flow_auto_selects_scopes_when_none(self, mock_storage: MockTokenStorage): + """Test that scope auto-selection works when no explicit scope is set.""" + + async def redirect_handler(url: str) -> None: + pass # pragma: no cover + + async def callback_handler() -> tuple[str, str | None]: + return "test_auth_code", "test_state" # pragma: no cover + + client_metadata = OAuthClientMetadata( + client_name="Test Client", + client_uri=AnyHttpUrl("https://example.com"), + redirect_uris=[AnyUrl("http://localhost:3030/callback")], + scope=None, + ) + provider = OAuthClientProvider( + server_url="https://api.example.com/v1/mcp", + client_metadata=client_metadata, + storage=mock_storage, + redirect_handler=redirect_handler, + callback_handler=callback_handler, + ) + provider.context.current_tokens = None + provider.context.token_expiry_time = None + provider._initialized = True + + test_request = httpx.Request("GET", "https://api.example.com/mcp") + auth_flow = provider.async_auth_flow(test_request) + + await auth_flow.__anext__() + + response = httpx.Response( + 401, + headers={ + "WWW-Authenticate": ( + 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource",' + ' scope="server:scope1 server:scope2"' + ) + }, + request=test_request, + ) + + prm_request = await auth_flow.asend(response) + prm_response = httpx.Response( + 200, + content=( + b'{"resource": "https://api.example.com/v1/mcp",' + b' "authorization_servers": ["https://auth.example.com"],' + b' "scopes_supported": ["server:scope1", "server:scope2"]}' + ), + request=prm_request, + ) + + oauth_request = await auth_flow.asend(prm_response) + oauth_response = httpx.Response( + 200, + content=( + b'{"issuer": "https://auth.example.com",' + b' "authorization_endpoint": "https://auth.example.com/authorize",' + b' "token_endpoint": "https://auth.example.com/token",' + b' "registration_endpoint": "https://auth.example.com/register"}' + ), + request=oauth_request, + ) + + await auth_flow.asend(oauth_response) + # Scope should have been auto-selected from the server metadata + assert provider.context.client_metadata.scope is not None + assert provider.context.client_metadata.scope == "server:scope1 server:scope2" + + await auth_flow.aclose() + @pytest.mark.anyio async def test_auth_flow_no_unnecessary_retry_after_oauth( self, oauth_provider: OAuthClientProvider, mock_storage: MockTokenStorage, valid_tokens: OAuthToken