Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-integration-aiohttp
/botbuilder
/integration
/aiohttp
/cloud_adapter.py
| # Copyright (c) Microsoft Corporation. All rights reserved. | |
| # Licensed under the MIT License. | |
| from typing import Awaitable, Callable, Optional | |
| from aiohttp.web import ( | |
| Request, | |
| Response, | |
| json_response, | |
| WebSocketResponse, | |
| HTTPBadRequest, | |
| HTTPMethodNotAllowed, | |
| HTTPUnauthorized, | |
| HTTPUnsupportedMediaType, | |
| ) | |
| from botbuilder.core import ( | |
| Bot, | |
| CloudAdapterBase, | |
| InvokeResponse, | |
| TurnContext, | |
| ) | |
| from botbuilder.core.streaming import ( | |
| StreamingActivityProcessor, | |
| StreamingHttpDriver, | |
| StreamingRequestHandler, | |
| ) | |
| from botbuilder.schema import Activity | |
| from botbuilder.integration.aiohttp.streaming import AiohttpWebSocket | |
| from botframework.connector import AsyncBfPipeline, BotFrameworkConnectorConfiguration | |
| from botframework.connector.aio import ConnectorClient | |
| from botframework.connector.auth import ( | |
| AuthenticateRequestResult, | |
| BotFrameworkAuthentication, | |
| BotFrameworkAuthenticationFactory, | |
| ConnectorFactory, | |
| MicrosoftAppCredentials, | |
| ) | |
| from .bot_framework_http_adapter_integration_base import ( | |
| BotFrameworkHttpAdapterIntegrationBase, | |
| ) | |
| class CloudAdapter(CloudAdapterBase, BotFrameworkHttpAdapterIntegrationBase): | |
| def __init__(self, bot_framework_authentication: BotFrameworkAuthentication = None): | |
| """ | |
| Initializes a new instance of the CloudAdapter class. | |
| :param bot_framework_authentication: Optional BotFrameworkAuthentication instance | |
| """ | |
| # pylint: disable=invalid-name | |
| if not bot_framework_authentication: | |
| bot_framework_authentication = BotFrameworkAuthenticationFactory.create() | |
| self._AUTH_HEADER_NAME = "authorization" | |
| self._CHANNEL_ID_HEADER_NAME = "channelid" | |
| super().__init__(bot_framework_authentication) | |
| async def process( | |
| self, request: Request, bot: Bot, ws_response: WebSocketResponse = None | |
| ) -> Optional[Response]: | |
| if not request: | |
| raise TypeError("request can't be None") | |
| # if ws_response is None: | |
| # raise TypeError("ws_response can't be None") | |
| if not bot: | |
| raise TypeError("bot can't be None") | |
| try: | |
| # Only GET requests for web socket connects are allowed | |
| if ( | |
| request.method == "GET" | |
| and ws_response | |
| and ws_response.can_prepare(request) | |
| ): | |
| # All socket communication will be handled by the internal streaming-specific BotAdapter | |
| await self._connect(bot, request, ws_response) | |
| elif request.method == "POST": | |
| # Deserialize the incoming Activity | |
| if "application/json" in request.headers["Content-Type"]: | |
| body = await request.json() | |
| else: | |
| raise HTTPUnsupportedMediaType() | |
| activity: Activity = Activity().deserialize(body) | |
| # A POST request must contain an Activity | |
| if not activity.type: | |
| raise HTTPBadRequest | |
| # Grab the auth header from the inbound http request | |
| auth_header = ( | |
| request.headers["Authorization"] | |
| if "Authorization" in request.headers | |
| else "" | |
| ) | |
| # Process the inbound activity with the bot | |
| invoke_response = await self.process_activity( | |
| auth_header, activity, bot.on_turn | |
| ) | |
| # Write the response, serializing the InvokeResponse | |
| if invoke_response: | |
| return json_response( | |
| data=invoke_response.body, status=invoke_response.status | |
| ) | |
| return Response(status=201) | |
| else: | |
| raise HTTPMethodNotAllowed | |
| except (HTTPUnauthorized, PermissionError) as _: | |
| raise HTTPUnauthorized | |
| async def _connect( | |
| self, bot: Bot, request: Request, ws_response: WebSocketResponse | |
| ): | |
| if ws_response is None: | |
| raise TypeError("ws_response can't be None") | |
| # Grab the auth header from the inbound http request | |
| auth_header = request.headers.get(self._AUTH_HEADER_NAME) | |
| # Grab the channelId which should be in the http headers | |
| channel_id = request.headers.get(self._CHANNEL_ID_HEADER_NAME) | |
| authentication_request_result = ( | |
| await self.bot_framework_authentication.authenticate_streaming_request( | |
| auth_header, channel_id | |
| ) | |
| ) | |
| # Transition the request to a WebSocket connection | |
| await ws_response.prepare(request) | |
| bf_web_socket = AiohttpWebSocket(ws_response) | |
| streaming_activity_processor = _StreamingActivityProcessor( | |
| authentication_request_result, self, bot, bf_web_socket | |
| ) | |
| await streaming_activity_processor.listen() | |
| class _StreamingActivityProcessor(StreamingActivityProcessor): | |
| def __init__( | |
| self, | |
| authenticate_request_result: AuthenticateRequestResult, | |
| adapter: CloudAdapter, | |
| bot: Bot, | |
| web_socket: AiohttpWebSocket = None, | |
| ) -> None: | |
| self._authenticate_request_result = authenticate_request_result | |
| self._adapter = adapter | |
| # Internal reuse of the existing StreamingRequestHandler class | |
| self._request_handler = StreamingRequestHandler(bot, self, web_socket) | |
| # Fix up the connector factory so connector create from it will send over this connection | |
| self._authenticate_request_result.connector_factory = ( | |
| _StreamingConnectorFactory(self._request_handler) | |
| ) | |
| async def listen(self): | |
| await self._request_handler.listen() | |
| async def process_streaming_activity( | |
| self, | |
| activity: Activity, | |
| bot_callback_handler: Callable[[TurnContext], Awaitable], | |
| ) -> InvokeResponse: | |
| return await self._adapter.process_activity( | |
| self._authenticate_request_result, activity, bot_callback_handler | |
| ) | |
| class _StreamingConnectorFactory(ConnectorFactory): | |
| def __init__(self, request_handler: StreamingRequestHandler) -> None: | |
| self._request_handler = request_handler | |
| self._service_url = None | |
| async def create( | |
| self, service_url: str, audience: str # pylint: disable=unused-argument | |
| ) -> ConnectorClient: | |
| if not self._service_url: | |
| self._service_url = service_url | |
| elif service_url != self._service_url: | |
| raise RuntimeError( | |
| "This is a streaming scenario, all connectors from this factory must all be for the same url." | |
| ) | |
| # TODO: investigate if Driver and pipeline should be moved here | |
| streaming_driver = StreamingHttpDriver(self._request_handler) | |
| config = BotFrameworkConnectorConfiguration( | |
| MicrosoftAppCredentials.empty(), | |
| service_url, | |
| pipeline_type=AsyncBfPipeline, | |
| driver=streaming_driver, | |
| ) | |
| streaming_driver.config = config | |
| connector_client = ConnectorClient(None, custom_configuration=config) | |
| return connector_client | |