Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botframework-streaming
/botframework
/streaming
/protocol_adapter.py
| # Copyright (c) Microsoft Corporation. All rights reserved. | |
| # Licensed under the MIT License. | |
| import asyncio | |
| from uuid import UUID, uuid4 | |
| from botframework.streaming.payloads import ( | |
| PayloadAssemblerManager, | |
| RequestManager, | |
| SendOperations, | |
| StreamManager, | |
| ) | |
| from botframework.streaming.payloads.assemblers import PayloadStreamAssembler | |
| from botframework.streaming.payload_transport import PayloadSender, PayloadReceiver | |
| from .receive_request import ReceiveRequest | |
| from .receive_response import ReceiveResponse | |
| from .request_handler import RequestHandler | |
| from .streaming_request import StreamingRequest | |
| class ProtocolAdapter: | |
| def __init__( | |
| self, | |
| request_handler: RequestHandler, | |
| request_manager: RequestManager, | |
| payload_sender: PayloadSender, | |
| payload_receiver: PayloadReceiver, | |
| handler_context: object = None, | |
| ): | |
| self._request_handler = request_handler | |
| self._request_manager = request_manager | |
| self._payload_sender = payload_sender | |
| self._payload_receiver = payload_receiver | |
| self._handler_context = handler_context | |
| self._send_operations = SendOperations(self._payload_sender) | |
| # TODO: might be able to remove | |
| self._stream_manager = StreamManager(self._on_cancel_stream) | |
| self._assembler_manager = PayloadAssemblerManager( | |
| self._stream_manager, self._on_receive_request, self._on_receive_response | |
| ) | |
| self._payload_receiver.subscribe( | |
| self._assembler_manager.get_payload_stream, | |
| self._assembler_manager.on_receive, | |
| ) | |
| async def send_request(self, request: StreamingRequest) -> ReceiveResponse: | |
| if not request: | |
| raise TypeError( | |
| f"'request: {request.__class__.__name__}' argument can't be None" | |
| ) | |
| request_id = uuid4() | |
| response_task = self._request_manager.get_response(request_id) | |
| request_task = self._send_operations.send_request(request_id, request) | |
| [_, response] = await asyncio.gather(request_task, response_task) | |
| return response | |
| async def _on_receive_request(self, identifier: UUID, request: ReceiveRequest): | |
| # request is done, we can handle it | |
| if self._request_handler: | |
| response = await self._request_handler.process_request( | |
| request, None, self._handler_context | |
| ) | |
| if response: | |
| await self._send_operations.send_response(identifier, response) | |
| async def _on_receive_response(self, identifier: UUID, response: ReceiveResponse): | |
| # we received the response to something, signal it | |
| await self._request_manager.signal_response(identifier, response) | |
| def _on_cancel_stream(self, content_stream_assembler: PayloadStreamAssembler): | |
| # TODO: on original C# code content_stream_assembler is typed as IAssembler | |
| task = asyncio.create_task( | |
| self._send_operations.send_cancel_stream( | |
| content_stream_assembler.identifier | |
| ) | |
| ) | |