Spaces:
Build error
Build error
| # Copyright (c) Microsoft Corporation. All rights reserved. | |
| # Licensed under the MIT License. | |
| import uuid | |
| from typing import Dict, List, Union | |
| from unittest.mock import Mock | |
| import pytest | |
| from botbuilder.schema import Activity, ConversationReference, ChannelAccount, RoleTypes | |
| from botframework.connector import Channels | |
| from botframework.connector.auth import ( | |
| AuthenticationConfiguration, | |
| AuthenticationConstants, | |
| JwtTokenValidation, | |
| SimpleCredentialProvider, | |
| EmulatorValidation, | |
| EnterpriseChannelValidation, | |
| ChannelValidation, | |
| ClaimsIdentity, | |
| MicrosoftAppCredentials, | |
| # GovernmentConstants, | |
| GovernmentChannelValidation, | |
| SimpleChannelProvider, | |
| ChannelProvider, | |
| # AppCredentials, | |
| ) | |
| async def jwt_token_validation_validate_auth_header_with_channel_service_succeeds( | |
| app_id: str, | |
| pwd: str, | |
| channel_service_or_provider: Union[str, ChannelProvider], | |
| header: str = None, | |
| ): | |
| if header is None: | |
| header = f"Bearer {MicrosoftAppCredentials(app_id, pwd).get_access_token()}" | |
| credentials = SimpleCredentialProvider(app_id, pwd) | |
| result = await JwtTokenValidation.validate_auth_header( | |
| header, | |
| credentials, | |
| channel_service_or_provider, | |
| "", | |
| "https://webchat.botframework.com/", | |
| ) | |
| assert result.is_authenticated | |
| # TODO: Consider changing to unittest to use ddt for Credentials tests | |
| class TestAuth: | |
| EmulatorValidation.TO_BOT_FROM_EMULATOR_TOKEN_VALIDATION_PARAMETERS.ignore_expiration = ( | |
| True | |
| ) | |
| ChannelValidation.TO_BOT_FROM_CHANNEL_TOKEN_VALIDATION_PARAMETERS.ignore_expiration = ( | |
| True | |
| ) | |
| async def test_claims_validation(self): | |
| claims: List[Dict] = {} | |
| default_auth_config = AuthenticationConfiguration() | |
| # No validator should pass. | |
| await JwtTokenValidation.validate_claims(default_auth_config, claims) | |
| mock_validator = Mock() | |
| auth_with_validator = AuthenticationConfiguration( | |
| claims_validator=mock_validator | |
| ) | |
| # Configure IClaimsValidator to fail | |
| mock_validator.side_effect = PermissionError("Invalid claims.") | |
| with pytest.raises(PermissionError) as excinfo: | |
| await JwtTokenValidation.validate_claims(auth_with_validator, claims) | |
| assert "Invalid claims." in str(excinfo.value) | |
| # No validator with not skill cliams should pass. | |
| default_auth_config.claims_validator = None | |
| claims: List[Dict] = { | |
| AuthenticationConstants.VERSION_CLAIM: "1.0", | |
| AuthenticationConstants.AUDIENCE_CLAIM: "this_bot_id", | |
| AuthenticationConstants.APP_ID_CLAIM: "this_bot_id", # Skill claims aud!=azp | |
| } | |
| await JwtTokenValidation.validate_claims(default_auth_config, claims) | |
| # No validator with skill cliams should fail. | |
| claims: List[Dict] = { | |
| AuthenticationConstants.VERSION_CLAIM: "1.0", | |
| AuthenticationConstants.AUDIENCE_CLAIM: "this_bot_id", | |
| AuthenticationConstants.APP_ID_CLAIM: "not_this_bot_id", # Skill claims aud!=azp | |
| } | |
| mock_validator.side_effect = PermissionError( | |
| "Unauthorized Access. Request is not authorized. Skill Claims require validation." | |
| ) | |
| with pytest.raises(PermissionError) as excinfo_skill: | |
| await JwtTokenValidation.validate_claims(auth_with_validator, claims) | |
| assert ( | |
| "Unauthorized Access. Request is not authorized. Skill Claims require validation." | |
| in str(excinfo_skill.value) | |
| ) | |
| # @pytest.mark.asyncio | |
| # async def test_connector_auth_header_correct_app_id_and_service_url_should_validate( | |
| # self, | |
| # ): | |
| # header = ( | |
| # "Bearer " | |
| # MicrosoftAppCredentials( | |
| # "", "" | |
| # ).get_access_token() | |
| # ) | |
| # credentials = SimpleCredentialProvider( | |
| # "", "" | |
| # ) | |
| # result = await JwtTokenValidation.validate_auth_header( | |
| # header, credentials, "", "https://webchat.botframework.com/" | |
| # ) | |
| # | |
| # result_with_provider = await JwtTokenValidation.validate_auth_header( | |
| # header, | |
| # credentials, | |
| # SimpleChannelProvider(), | |
| # "https://webchat.botframework.com/", | |
| # ) | |
| # | |
| # assert result | |
| # assert result_with_provider | |
| # @pytest.mark.asyncio | |
| # async def test_connector_auth_header_with_different_bot_app_id_should_not_validate( | |
| # self, | |
| # ): | |
| # header = ( | |
| # "Bearer " | |
| # MicrosoftAppCredentials( | |
| # "", "" | |
| # ).get_access_token() | |
| # ) | |
| # credentials = SimpleCredentialProvider( | |
| # "00000000-0000-0000-0000-000000000000", "" | |
| # ) | |
| # with pytest.raises(Exception) as excinfo: | |
| # await JwtTokenValidation.validate_auth_header( | |
| # header, credentials, "", "https://webchat.botframework.com/" | |
| # ) | |
| # assert "Unauthorized" in str(excinfo.value) | |
| # | |
| # with pytest.raises(Exception) as excinfo2: | |
| # await JwtTokenValidation.validate_auth_header( | |
| # header, | |
| # credentials, | |
| # SimpleChannelProvider(), | |
| # "https://webchat.botframework.com/", | |
| # ) | |
| # assert "Unauthorized" in str(excinfo2.value) | |
| # @pytest.mark.asyncio | |
| # async def test_connector_auth_header_and_no_credential_should_not_validate(self): | |
| # header = ( | |
| # "Bearer " | |
| # MicrosoftAppCredentials( | |
| # "", "" | |
| # ).get_access_token() | |
| # ) | |
| # credentials = SimpleCredentialProvider("", "") | |
| # with pytest.raises(Exception) as excinfo: | |
| # await JwtTokenValidation.validate_auth_header( | |
| # header, credentials, "", "https://webchat.botframework.com/" | |
| # ) | |
| # assert "Unauthorized" in str(excinfo.value) | |
| # | |
| # with pytest.raises(Exception) as excinfo2: | |
| # await JwtTokenValidation.validate_auth_header( | |
| # header, | |
| # credentials, | |
| # SimpleChannelProvider(), | |
| # "https://webchat.botframework.com/", | |
| # ) | |
| # assert "Unauthorized" in str(excinfo2.value) | |
| async def test_empty_header_and_no_credential_should_throw(self): | |
| header = "" | |
| credentials = SimpleCredentialProvider("", "") | |
| with pytest.raises(Exception) as excinfo: | |
| await JwtTokenValidation.validate_auth_header(header, credentials, "", None) | |
| assert "auth_header" in str(excinfo.value) | |
| with pytest.raises(Exception) as excinfo2: | |
| await JwtTokenValidation.validate_auth_header( | |
| header, credentials, SimpleChannelProvider(), None | |
| ) | |
| assert "auth_header" in str(excinfo2.value) | |
| # @pytest.mark.asyncio | |
| # async def test_emulator_msa_header_correct_app_id_and_service_url_should_validate( | |
| # self, | |
| # ): | |
| # header = ( | |
| # "Bearer " | |
| # MicrosoftAppCredentials( | |
| # "", "" | |
| # ).get_access_token() | |
| # ) | |
| # credentials = SimpleCredentialProvider( | |
| # "", "" | |
| # ) | |
| # result = await JwtTokenValidation.validate_auth_header( | |
| # header, credentials, "", "https://webchat.botframework.com/" | |
| # ) | |
| # | |
| # result_with_provider = await JwtTokenValidation.validate_auth_header( | |
| # header, | |
| # credentials, | |
| # SimpleChannelProvider(), | |
| # "https://webchat.botframework.com/", | |
| # ) | |
| # | |
| # assert result | |
| # assert result_with_provider | |
| # @pytest.mark.asyncio | |
| # async def test_emulator_msa_header_and_no_credential_should_not_validate(self): | |
| # # pylint: disable=protected-access | |
| # header = ( | |
| # "Bearer " | |
| # MicrosoftAppCredentials( | |
| # "", "" | |
| # ).get_access_token() | |
| # ) | |
| # credentials = SimpleCredentialProvider( | |
| # "00000000-0000-0000-0000-000000000000", "" | |
| # ) | |
| # with pytest.raises(Exception) as excinfo: | |
| # await JwtTokenValidation.validate_auth_header(header, credentials, "", None) | |
| # assert "Unauthorized" in str(excinfo._excinfo) | |
| # | |
| # with pytest.raises(Exception) as excinfo2: | |
| # await JwtTokenValidation.validate_auth_header( | |
| # header, credentials, SimpleChannelProvider(), None | |
| # ) | |
| # assert "Unauthorized" in str(excinfo2._excinfo) | |
| # Tests with a valid Token and service url; and ensures that Service url is added to Trusted service url list. | |
| # @pytest.mark.asyncio | |
| # async def test_channel_msa_header_valid_service_url_should_be_trusted(self): | |
| # activity = Activity( | |
| # service_url="https://smba.trafficmanager.net/amer-client-ss.msg/" | |
| # ) | |
| # header = ( | |
| # "Bearer " | |
| # MicrosoftAppCredentials( | |
| # "", "" | |
| # ).get_access_token() | |
| # ) | |
| # credentials = SimpleCredentialProvider( | |
| # "", "" | |
| # ) | |
| # | |
| # await JwtTokenValidation.authenticate_request(activity, header, credentials) | |
| # | |
| # assert AppCredentials.is_trusted_service( | |
| # "https://smba.trafficmanager.net/amer-client-ss.msg/" | |
| # ) | |
| # @pytest.mark.asyncio | |
| # # Tests with a valid Token and invalid service url and ensures that Service url is NOT added to | |
| # # Trusted service url list. | |
| # async def test_channel_msa_header_invalid_service_url_should_not_be_trusted(self): | |
| # activity = Activity(service_url="https://webchat.botframework.com/") | |
| # header = ( | |
| # "Bearer " | |
| # MicrosoftAppCredentials( | |
| # "", "" | |
| # ).get_access_token() | |
| # ) | |
| # credentials = SimpleCredentialProvider( | |
| # "7f74513e-6f96-4dbc-be9d-9a81fea22b88", "" | |
| # ) | |
| # | |
| # with pytest.raises(Exception) as excinfo: | |
| # await JwtTokenValidation.authenticate_request(activity, header, credentials) | |
| # assert "Unauthorized" in str(excinfo.value) | |
| # | |
| # assert not MicrosoftAppCredentials.is_trusted_service( | |
| # "https://webchat.botframework.com/" | |
| # ) | |
| # Tests with a valid Token and invalid service url and ensures that Service url is NOT added to | |
| # Trusted service url list. | |
| async def test_channel_authentication_disabled_and_skill_should_be_anonymous(self): | |
| activity = Activity( | |
| channel_id=Channels.emulator, | |
| service_url="https://webchat.botframework.com/", | |
| relates_to=ConversationReference(), | |
| recipient=ChannelAccount(role=RoleTypes.skill), | |
| ) | |
| header = "" | |
| credentials = SimpleCredentialProvider("", "") | |
| claims_principal = await JwtTokenValidation.authenticate_request( | |
| activity, header, credentials | |
| ) | |
| assert ( | |
| claims_principal.authentication_type | |
| == AuthenticationConstants.ANONYMOUS_AUTH_TYPE | |
| ) | |
| assert ( | |
| JwtTokenValidation.get_app_id_from_claims(claims_principal.claims) | |
| == AuthenticationConstants.ANONYMOUS_SKILL_APP_ID | |
| ) | |
| # @pytest.mark.asyncio | |
| # async def test_channel_msa_header_from_user_specified_tenant(self): | |
| # activity = Activity( | |
| # service_url="https://smba.trafficmanager.net/amer-client-ss.msg/" | |
| # ) | |
| # header = "Bearer " MicrosoftAppCredentials( | |
| # "", "", "microsoft.com" | |
| # ).get_access_token(True) | |
| # credentials = SimpleCredentialProvider( | |
| # "", "" | |
| # ) | |
| # | |
| # claims = await JwtTokenValidation.authenticate_request( | |
| # activity, header, credentials | |
| # ) | |
| # | |
| # assert claims.get_claim_value("tid") == "72f988bf-86f1-41af-91ab-2d7cd011db47" | |
| # Tests with no authentication header and makes sure the service URL is not added to the trusted list. | |
| async def test_channel_authentication_disabled_should_be_anonymous(self): | |
| activity = Activity(service_url="https://webchat.botframework.com/") | |
| header = "" | |
| credentials = SimpleCredentialProvider("", "") | |
| claims_principal = await JwtTokenValidation.authenticate_request( | |
| activity, header, credentials | |
| ) | |
| assert ( | |
| claims_principal.authentication_type | |
| == AuthenticationConstants.ANONYMOUS_AUTH_TYPE | |
| ) | |
| async def test_government_channel_validation_succeeds(self): | |
| credentials = SimpleCredentialProvider("", "") | |
| await GovernmentChannelValidation.validate_identity( | |
| ClaimsIdentity( | |
| {"iss": "https://api.botframework.us", "aud": credentials.app_id}, True | |
| ), | |
| credentials, | |
| ) | |
| async def test_government_channel_validation_no_authentication_fails(self): | |
| with pytest.raises(Exception) as excinfo: | |
| await GovernmentChannelValidation.validate_identity( | |
| ClaimsIdentity({}, False), None | |
| ) | |
| assert "Unauthorized" in str(excinfo.value) | |
| async def test_government_channel_validation_no_issuer_fails(self): | |
| credentials = SimpleCredentialProvider("", "") | |
| with pytest.raises(Exception) as excinfo: | |
| await GovernmentChannelValidation.validate_identity( | |
| ClaimsIdentity({"peanut": "peanut"}, True), credentials | |
| ) | |
| assert "Unauthorized" in str(excinfo.value) | |
| async def test_government_channel_validation_wrong_issuer_fails(self): | |
| credentials = SimpleCredentialProvider("", "") | |
| with pytest.raises(Exception) as excinfo: | |
| await GovernmentChannelValidation.validate_identity( | |
| ClaimsIdentity({"iss": "peanut"}, True), credentials | |
| ) | |
| assert "Unauthorized" in str(excinfo.value) | |
| # @pytest.mark.asyncio | |
| # async def test_government_channel_validation_no_audience_fails(self): | |
| # credentials = SimpleCredentialProvider( | |
| # "", "" | |
| # ) | |
| # with pytest.raises(Exception) as excinfo: | |
| # await GovernmentChannelValidation.validate_identity( | |
| # ClaimsIdentity({"iss": "https://api.botframework.us"}, True), | |
| # credentials, | |
| # ) | |
| # assert "Unauthorized" in str(excinfo.value) | |
| async def test_government_channel_validation_wrong_audience_fails(self): | |
| credentials = SimpleCredentialProvider("", "") | |
| with pytest.raises(Exception) as excinfo: | |
| await GovernmentChannelValidation.validate_identity( | |
| ClaimsIdentity( | |
| {"iss": "https://api.botframework.us", "aud": "peanut"}, True | |
| ), | |
| credentials, | |
| ) | |
| assert "Unauthorized" in str(excinfo.value) | |
| async def test_enterprise_channel_validation_succeeds(self): | |
| credentials = SimpleCredentialProvider("", "") | |
| await EnterpriseChannelValidation.validate_identity( | |
| ClaimsIdentity( | |
| {"iss": "https://api.botframework.com", "aud": credentials.app_id}, True | |
| ), | |
| credentials, | |
| ) | |
| async def test_enterprise_channel_validation_no_authentication_fails(self): | |
| with pytest.raises(Exception) as excinfo: | |
| await EnterpriseChannelValidation.validate_identity( | |
| ClaimsIdentity({}, False), None | |
| ) | |
| assert "Unauthorized" in str(excinfo.value) | |
| async def test_enterprise_channel_validation_no_issuer_fails(self): | |
| credentials = SimpleCredentialProvider("", "") | |
| with pytest.raises(Exception) as excinfo: | |
| await EnterpriseChannelValidation.validate_identity( | |
| ClaimsIdentity({"peanut": "peanut"}, True), credentials | |
| ) | |
| assert "Unauthorized" in str(excinfo.value) | |
| async def test_enterprise_channel_validation_wrong_issuer_fails(self): | |
| credentials = SimpleCredentialProvider("", "") | |
| with pytest.raises(Exception) as excinfo: | |
| await EnterpriseChannelValidation.validate_identity( | |
| ClaimsIdentity({"iss": "peanut"}, True), credentials | |
| ) | |
| assert "Unauthorized" in str(excinfo.value) | |
| async def test_enterprise_channel_validation_no_audience_fails(self): | |
| credentials = SimpleCredentialProvider("", "") | |
| with pytest.raises(Exception) as excinfo: | |
| await GovernmentChannelValidation.validate_identity( | |
| ClaimsIdentity({"iss": "https://api.botframework.com"}, True), | |
| credentials, | |
| ) | |
| assert "Unauthorized" in str(excinfo.value) | |
| async def test_enterprise_channel_validation_wrong_audience_fails(self): | |
| credentials = SimpleCredentialProvider("", "") | |
| with pytest.raises(Exception) as excinfo: | |
| await GovernmentChannelValidation.validate_identity( | |
| ClaimsIdentity( | |
| {"iss": "https://api.botframework.com", "aud": "peanut"}, True | |
| ), | |
| credentials, | |
| ) | |
| assert "Unauthorized" in str(excinfo.value) | |
| def test_get_app_id_from_claims(self): | |
| v1_claims = {} | |
| v2_claims = {} | |
| app_id = str(uuid.uuid4()) | |
| # Empty list | |
| assert not JwtTokenValidation.get_app_id_from_claims(v1_claims) | |
| # AppId there but no version (assumes v1) | |
| v1_claims[AuthenticationConstants.APP_ID_CLAIM] = app_id | |
| assert JwtTokenValidation.get_app_id_from_claims(v1_claims) == app_id | |
| # AppId there with v1 version | |
| v1_claims[AuthenticationConstants.VERSION_CLAIM] = "1.0" | |
| assert JwtTokenValidation.get_app_id_from_claims(v1_claims) == app_id | |
| # v2 version but no azp | |
| v2_claims[AuthenticationConstants.VERSION_CLAIM] = "2.0" | |
| assert not JwtTokenValidation.get_app_id_from_claims(v2_claims) | |
| # v2 version but no azp | |
| v2_claims[AuthenticationConstants.AUTHORIZED_PARTY] = app_id | |
| assert JwtTokenValidation.get_app_id_from_claims(v2_claims) == app_id | |