Spaces:
Build error
Build error
File size: 7,255 Bytes
0827183 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import Awaitable, Callable, Optional
from aiohttp.web import (
Request,
Response,
json_response,
WebSocketResponse,
HTTPBadRequest,
HTTPMethodNotAllowed,
HTTPUnauthorized,
HTTPUnsupportedMediaType,
)
from botbuilder.core import (
Bot,
CloudAdapterBase,
InvokeResponse,
TurnContext,
)
from botbuilder.core.streaming import (
StreamingActivityProcessor,
StreamingHttpDriver,
StreamingRequestHandler,
)
from botbuilder.schema import Activity
from botbuilder.integration.aiohttp.streaming import AiohttpWebSocket
from botframework.connector import AsyncBfPipeline, BotFrameworkConnectorConfiguration
from botframework.connector.aio import ConnectorClient
from botframework.connector.auth import (
AuthenticateRequestResult,
BotFrameworkAuthentication,
BotFrameworkAuthenticationFactory,
ConnectorFactory,
MicrosoftAppCredentials,
)
from .bot_framework_http_adapter_integration_base import (
BotFrameworkHttpAdapterIntegrationBase,
)
class CloudAdapter(CloudAdapterBase, BotFrameworkHttpAdapterIntegrationBase):
def __init__(self, bot_framework_authentication: BotFrameworkAuthentication = None):
"""
Initializes a new instance of the CloudAdapter class.
:param bot_framework_authentication: Optional BotFrameworkAuthentication instance
"""
# pylint: disable=invalid-name
if not bot_framework_authentication:
bot_framework_authentication = BotFrameworkAuthenticationFactory.create()
self._AUTH_HEADER_NAME = "authorization"
self._CHANNEL_ID_HEADER_NAME = "channelid"
super().__init__(bot_framework_authentication)
async def process(
self, request: Request, bot: Bot, ws_response: WebSocketResponse = None
) -> Optional[Response]:
if not request:
raise TypeError("request can't be None")
# if ws_response is None:
# raise TypeError("ws_response can't be None")
if not bot:
raise TypeError("bot can't be None")
try:
# Only GET requests for web socket connects are allowed
if (
request.method == "GET"
and ws_response
and ws_response.can_prepare(request)
):
# All socket communication will be handled by the internal streaming-specific BotAdapter
await self._connect(bot, request, ws_response)
elif request.method == "POST":
# Deserialize the incoming Activity
if "application/json" in request.headers["Content-Type"]:
body = await request.json()
else:
raise HTTPUnsupportedMediaType()
activity: Activity = Activity().deserialize(body)
# A POST request must contain an Activity
if not activity.type:
raise HTTPBadRequest
# Grab the auth header from the inbound http request
auth_header = (
request.headers["Authorization"]
if "Authorization" in request.headers
else ""
)
# Process the inbound activity with the bot
invoke_response = await self.process_activity(
auth_header, activity, bot.on_turn
)
# Write the response, serializing the InvokeResponse
if invoke_response:
return json_response(
data=invoke_response.body, status=invoke_response.status
)
return Response(status=201)
else:
raise HTTPMethodNotAllowed
except (HTTPUnauthorized, PermissionError) as _:
raise HTTPUnauthorized
async def _connect(
self, bot: Bot, request: Request, ws_response: WebSocketResponse
):
if ws_response is None:
raise TypeError("ws_response can't be None")
# Grab the auth header from the inbound http request
auth_header = request.headers.get(self._AUTH_HEADER_NAME)
# Grab the channelId which should be in the http headers
channel_id = request.headers.get(self._CHANNEL_ID_HEADER_NAME)
authentication_request_result = (
await self.bot_framework_authentication.authenticate_streaming_request(
auth_header, channel_id
)
)
# Transition the request to a WebSocket connection
await ws_response.prepare(request)
bf_web_socket = AiohttpWebSocket(ws_response)
streaming_activity_processor = _StreamingActivityProcessor(
authentication_request_result, self, bot, bf_web_socket
)
await streaming_activity_processor.listen()
class _StreamingActivityProcessor(StreamingActivityProcessor):
def __init__(
self,
authenticate_request_result: AuthenticateRequestResult,
adapter: CloudAdapter,
bot: Bot,
web_socket: AiohttpWebSocket = None,
) -> None:
self._authenticate_request_result = authenticate_request_result
self._adapter = adapter
# Internal reuse of the existing StreamingRequestHandler class
self._request_handler = StreamingRequestHandler(bot, self, web_socket)
# Fix up the connector factory so connector create from it will send over this connection
self._authenticate_request_result.connector_factory = (
_StreamingConnectorFactory(self._request_handler)
)
async def listen(self):
await self._request_handler.listen()
async def process_streaming_activity(
self,
activity: Activity,
bot_callback_handler: Callable[[TurnContext], Awaitable],
) -> InvokeResponse:
return await self._adapter.process_activity(
self._authenticate_request_result, activity, bot_callback_handler
)
class _StreamingConnectorFactory(ConnectorFactory):
def __init__(self, request_handler: StreamingRequestHandler) -> None:
self._request_handler = request_handler
self._service_url = None
async def create(
self, service_url: str, audience: str # pylint: disable=unused-argument
) -> ConnectorClient:
if not self._service_url:
self._service_url = service_url
elif service_url != self._service_url:
raise RuntimeError(
"This is a streaming scenario, all connectors from this factory must all be for the same url."
)
# TODO: investigate if Driver and pipeline should be moved here
streaming_driver = StreamingHttpDriver(self._request_handler)
config = BotFrameworkConnectorConfiguration(
MicrosoftAppCredentials.empty(),
service_url,
pipeline_type=AsyncBfPipeline,
driver=streaming_driver,
)
streaming_driver.config = config
connector_client = ConnectorClient(None, custom_configuration=config)
return connector_client
|