cohit's picture
Upload folder using huggingface_hub
0827183 verified
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import Awaitable, Callable, Optional
from aiohttp.web import (
Request,
Response,
json_response,
WebSocketResponse,
HTTPBadRequest,
HTTPMethodNotAllowed,
HTTPUnauthorized,
HTTPUnsupportedMediaType,
)
from botbuilder.core import (
Bot,
CloudAdapterBase,
InvokeResponse,
TurnContext,
)
from botbuilder.core.streaming import (
StreamingActivityProcessor,
StreamingHttpDriver,
StreamingRequestHandler,
)
from botbuilder.schema import Activity
from botbuilder.integration.aiohttp.streaming import AiohttpWebSocket
from botframework.connector import AsyncBfPipeline, BotFrameworkConnectorConfiguration
from botframework.connector.aio import ConnectorClient
from botframework.connector.auth import (
AuthenticateRequestResult,
BotFrameworkAuthentication,
BotFrameworkAuthenticationFactory,
ConnectorFactory,
MicrosoftAppCredentials,
)
from .bot_framework_http_adapter_integration_base import (
BotFrameworkHttpAdapterIntegrationBase,
)
class CloudAdapter(CloudAdapterBase, BotFrameworkHttpAdapterIntegrationBase):
def __init__(self, bot_framework_authentication: BotFrameworkAuthentication = None):
"""
Initializes a new instance of the CloudAdapter class.
:param bot_framework_authentication: Optional BotFrameworkAuthentication instance
"""
# pylint: disable=invalid-name
if not bot_framework_authentication:
bot_framework_authentication = BotFrameworkAuthenticationFactory.create()
self._AUTH_HEADER_NAME = "authorization"
self._CHANNEL_ID_HEADER_NAME = "channelid"
super().__init__(bot_framework_authentication)
async def process(
self, request: Request, bot: Bot, ws_response: WebSocketResponse = None
) -> Optional[Response]:
if not request:
raise TypeError("request can't be None")
# if ws_response is None:
# raise TypeError("ws_response can't be None")
if not bot:
raise TypeError("bot can't be None")
try:
# Only GET requests for web socket connects are allowed
if (
request.method == "GET"
and ws_response
and ws_response.can_prepare(request)
):
# All socket communication will be handled by the internal streaming-specific BotAdapter
await self._connect(bot, request, ws_response)
elif request.method == "POST":
# Deserialize the incoming Activity
if "application/json" in request.headers["Content-Type"]:
body = await request.json()
else:
raise HTTPUnsupportedMediaType()
activity: Activity = Activity().deserialize(body)
# A POST request must contain an Activity
if not activity.type:
raise HTTPBadRequest
# Grab the auth header from the inbound http request
auth_header = (
request.headers["Authorization"]
if "Authorization" in request.headers
else ""
)
# Process the inbound activity with the bot
invoke_response = await self.process_activity(
auth_header, activity, bot.on_turn
)
# Write the response, serializing the InvokeResponse
if invoke_response:
return json_response(
data=invoke_response.body, status=invoke_response.status
)
return Response(status=201)
else:
raise HTTPMethodNotAllowed
except (HTTPUnauthorized, PermissionError) as _:
raise HTTPUnauthorized
async def _connect(
self, bot: Bot, request: Request, ws_response: WebSocketResponse
):
if ws_response is None:
raise TypeError("ws_response can't be None")
# Grab the auth header from the inbound http request
auth_header = request.headers.get(self._AUTH_HEADER_NAME)
# Grab the channelId which should be in the http headers
channel_id = request.headers.get(self._CHANNEL_ID_HEADER_NAME)
authentication_request_result = (
await self.bot_framework_authentication.authenticate_streaming_request(
auth_header, channel_id
)
)
# Transition the request to a WebSocket connection
await ws_response.prepare(request)
bf_web_socket = AiohttpWebSocket(ws_response)
streaming_activity_processor = _StreamingActivityProcessor(
authentication_request_result, self, bot, bf_web_socket
)
await streaming_activity_processor.listen()
class _StreamingActivityProcessor(StreamingActivityProcessor):
def __init__(
self,
authenticate_request_result: AuthenticateRequestResult,
adapter: CloudAdapter,
bot: Bot,
web_socket: AiohttpWebSocket = None,
) -> None:
self._authenticate_request_result = authenticate_request_result
self._adapter = adapter
# Internal reuse of the existing StreamingRequestHandler class
self._request_handler = StreamingRequestHandler(bot, self, web_socket)
# Fix up the connector factory so connector create from it will send over this connection
self._authenticate_request_result.connector_factory = (
_StreamingConnectorFactory(self._request_handler)
)
async def listen(self):
await self._request_handler.listen()
async def process_streaming_activity(
self,
activity: Activity,
bot_callback_handler: Callable[[TurnContext], Awaitable],
) -> InvokeResponse:
return await self._adapter.process_activity(
self._authenticate_request_result, activity, bot_callback_handler
)
class _StreamingConnectorFactory(ConnectorFactory):
def __init__(self, request_handler: StreamingRequestHandler) -> None:
self._request_handler = request_handler
self._service_url = None
async def create(
self, service_url: str, audience: str # pylint: disable=unused-argument
) -> ConnectorClient:
if not self._service_url:
self._service_url = service_url
elif service_url != self._service_url:
raise RuntimeError(
"This is a streaming scenario, all connectors from this factory must all be for the same url."
)
# TODO: investigate if Driver and pipeline should be moved here
streaming_driver = StreamingHttpDriver(self._request_handler)
config = BotFrameworkConnectorConfiguration(
MicrosoftAppCredentials.empty(),
service_url,
pipeline_type=AsyncBfPipeline,
driver=streaming_driver,
)
streaming_driver.config = config
connector_client = ConnectorClient(None, custom_configuration=config)
return connector_client