# Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. from copy import deepcopy from typing import List from botframework.connector.token_api.models import TokenExchangeRequest from botbuilder.schema import ( Activity, ActivityTypes, ExpectedReplies, DeliveryModes, SignInConstants, TokenExchangeInvokeRequest, ) from botbuilder.core import BotAdapter, TurnContext, ExtendedUserTokenProvider from botbuilder.core.card_factory import ContentTypes from botbuilder.core.skills import SkillConversationIdFactoryOptions from botbuilder.dialogs import ( Dialog, DialogContext, DialogEvents, DialogReason, DialogInstance, ) from .begin_skill_dialog_options import BeginSkillDialogOptions from .skill_dialog_options import SkillDialogOptions class SkillDialog(Dialog): SKILLCONVERSATIONIDSTATEKEY = ( "Microsoft.Bot.Builder.Dialogs.SkillDialog.SkillConversationId" ) def __init__(self, dialog_options: SkillDialogOptions, dialog_id: str): super().__init__(dialog_id) if not dialog_options: raise TypeError("SkillDialog.__init__(): dialog_options cannot be None.") self.dialog_options = dialog_options self._deliver_mode_state_key = "deliverymode" async def begin_dialog(self, dialog_context: DialogContext, options: object = None): """ Method called when a new dialog has been pushed onto the stack and is being activated. :param dialog_context: The dialog context for the current turn of conversation. :param options: (Optional) additional argument(s) to pass to the dialog being started. """ dialog_args = self._validate_begin_dialog_args(options) # Create deep clone of the original activity to avoid altering it before forwarding it. skill_activity: Activity = deepcopy(dialog_args.activity) # Apply conversation reference and common properties from incoming activity before sending. TurnContext.apply_conversation_reference( skill_activity, TurnContext.get_conversation_reference(dialog_context.context.activity), is_incoming=True, ) # Store delivery mode in dialog state for later use. dialog_context.active_dialog.state[self._deliver_mode_state_key] = ( dialog_args.activity.delivery_mode ) # Create the conversationId and store it in the dialog context state so we can use it later skill_conversation_id = await self._create_skill_conversation_id( dialog_context.context, dialog_context.context.activity ) dialog_context.active_dialog.state[SkillDialog.SKILLCONVERSATIONIDSTATEKEY] = ( skill_conversation_id ) # Send the activity to the skill. eoc_activity = await self._send_to_skill( dialog_context.context, skill_activity, skill_conversation_id ) if eoc_activity: return await dialog_context.end_dialog(eoc_activity.value) return self.end_of_turn async def continue_dialog(self, dialog_context: DialogContext): if not self._on_validate_activity(dialog_context.context.activity): return self.end_of_turn # Handle EndOfConversation from the skill (this will be sent to the this dialog by the SkillHandler if # received from the Skill) if dialog_context.context.activity.type == ActivityTypes.end_of_conversation: return await dialog_context.end_dialog( dialog_context.context.activity.value ) # Create deep clone of the original activity to avoid altering it before forwarding it. skill_activity = deepcopy(dialog_context.context.activity) skill_activity.delivery_mode = dialog_context.active_dialog.state[ self._deliver_mode_state_key ] # Just forward to the remote skill skill_conversation_id = dialog_context.active_dialog.state[ SkillDialog.SKILLCONVERSATIONIDSTATEKEY ] eoc_activity = await self._send_to_skill( dialog_context.context, skill_activity, skill_conversation_id ) if eoc_activity: return await dialog_context.end_dialog(eoc_activity.value) return self.end_of_turn async def reprompt_dialog( # pylint: disable=unused-argument self, context: TurnContext, instance: DialogInstance ): # Create and send an event to the skill so it can resume the dialog. reprompt_event = Activity( type=ActivityTypes.event, name=DialogEvents.reprompt_dialog ) # Apply conversation reference and common properties from incoming activity before sending. TurnContext.apply_conversation_reference( reprompt_event, TurnContext.get_conversation_reference(context.activity), is_incoming=True, ) # connection Name is not applicable for a RePrompt, as we don't expect as OAuthCard in response. skill_conversation_id = instance.state[SkillDialog.SKILLCONVERSATIONIDSTATEKEY] await self._send_to_skill(context, reprompt_event, skill_conversation_id) async def resume_dialog( # pylint: disable=unused-argument self, dialog_context: "DialogContext", reason: DialogReason, result: object ): await self.reprompt_dialog(dialog_context.context, dialog_context.active_dialog) return self.end_of_turn async def end_dialog( self, context: TurnContext, instance: DialogInstance, reason: DialogReason ): # Send of of conversation to the skill if the dialog has been cancelled. if reason in (DialogReason.CancelCalled, DialogReason.ReplaceCalled): activity = Activity(type=ActivityTypes.end_of_conversation) # Apply conversation reference and common properties from incoming activity before sending. TurnContext.apply_conversation_reference( activity, TurnContext.get_conversation_reference(context.activity), is_incoming=True, ) activity.channel_data = context.activity.channel_data activity.additional_properties = context.activity.additional_properties # connection Name is not applicable for an EndDialog, as we don't expect as OAuthCard in response. skill_conversation_id = instance.state[ SkillDialog.SKILLCONVERSATIONIDSTATEKEY ] await self._send_to_skill(context, activity, skill_conversation_id) await super().end_dialog(context, instance, reason) def _validate_begin_dialog_args(self, options: object) -> BeginSkillDialogOptions: if not options: raise TypeError("options cannot be None.") dialog_args = BeginSkillDialogOptions.from_object(options) if not dialog_args: raise TypeError( "SkillDialog: options object not valid as BeginSkillDialogOptions." ) if not dialog_args.activity: raise TypeError( "SkillDialog: activity object in options as BeginSkillDialogOptions cannot be None." ) return dialog_args def _on_validate_activity( self, activity: Activity # pylint: disable=unused-argument ) -> bool: """ Validates the activity sent during continue_dialog. Override this method to implement a custom validator for the activity being sent during continue_dialog. This method can be used to ignore activities of a certain type if needed. If this method returns false, the dialog will end the turn without processing the activity. """ return True async def _send_to_skill( self, context: TurnContext, activity: Activity, skill_conversation_id: str ) -> Activity: if activity.type == ActivityTypes.invoke: # Force ExpectReplies for invoke activities so we can get the replies right away and send # them back to the channel if needed. This makes sure that the dialog will receive the Invoke # response from the skill and any other activities sent, including EoC. activity.delivery_mode = DeliveryModes.expect_replies # Always save state before forwarding # (the dialog stack won't get updated with the skillDialog and things won't work if you don't) await self.dialog_options.conversation_state.save_changes(context, True) skill_info = self.dialog_options.skill response = await self.dialog_options.skill_client.post_activity( self.dialog_options.bot_id, skill_info.app_id, skill_info.skill_endpoint, self.dialog_options.skill_host_endpoint, skill_conversation_id, activity, ) # Inspect the skill response status if not 200 <= response.status <= 299: raise Exception( f'Error invoking the skill id: "{skill_info.id}" at "{skill_info.skill_endpoint}"' f" (status is {response.status}). \r\n {response.body}" ) eoc_activity: Activity = None if activity.delivery_mode == DeliveryModes.expect_replies and response.body: # Process replies in the response.Body. response.body: List[Activity] response.body = ExpectedReplies().deserialize(response.body).activities # Track sent invoke responses, so more than one is not sent. sent_invoke_response = False for from_skill_activity in response.body: if from_skill_activity.type == ActivityTypes.end_of_conversation: # Capture the EndOfConversation activity if it was sent from skill eoc_activity = from_skill_activity # The conversation has ended, so cleanup the conversation id await self.dialog_options.conversation_id_factory.delete_conversation_reference( skill_conversation_id ) elif not sent_invoke_response and await self._intercept_oauth_cards( context, from_skill_activity, self.dialog_options.connection_name ): # Token exchange succeeded, so no oauthcard needs to be shown to the user sent_invoke_response = True else: # If an invoke response has already been sent we should ignore future invoke responses as this # represents a bug in the skill. if from_skill_activity.type == ActivityTypes.invoke_response: if sent_invoke_response: continue sent_invoke_response = True # Send the response back to the channel. await context.send_activity(from_skill_activity) return eoc_activity async def _create_skill_conversation_id( self, context: TurnContext, activity: Activity ) -> str: # Create a conversationId to interact with the skill and send the activity conversation_id_factory_options = SkillConversationIdFactoryOptions( from_bot_oauth_scope=context.turn_state.get(BotAdapter.BOT_OAUTH_SCOPE_KEY), from_bot_id=self.dialog_options.bot_id, activity=activity, bot_framework_skill=self.dialog_options.skill, ) skill_conversation_id = await self.dialog_options.conversation_id_factory.create_skill_conversation_id( conversation_id_factory_options ) return skill_conversation_id async def _intercept_oauth_cards( self, context: TurnContext, activity: Activity, connection_name: str ): """ Tells is if we should intercept the OAuthCard message. """ if not connection_name or not isinstance( context.adapter, ExtendedUserTokenProvider ): # The adapter may choose not to support token exchange, in which case we fallback to # showing an oauth card to the user. return False oauth_card_attachment = next( attachment for attachment in activity.attachments if attachment.content_type == ContentTypes.oauth_card ) if oauth_card_attachment: oauth_card = oauth_card_attachment.content if ( oauth_card and oauth_card.token_exchange_resource and oauth_card.token_exchange_resource.uri ): try: result = await context.adapter.exchange_token( turn_context=context, connection_name=connection_name, user_id=context.activity.from_property.id, exchange_request=TokenExchangeRequest( uri=oauth_card.token_exchange_resource.uri ), ) if result and result.token: # If token above is null, then SSO has failed and hence we return false. # If not, send an invoke to the skill with the token. return await self._send_token_exchange_invoke_to_skill( activity, oauth_card.token_exchange_resource.id, oauth_card.connection_name, result.token, ) except: # Failures in token exchange are not fatal. They simply mean that the user needs # to be shown the OAuth card. return False return False async def _send_token_exchange_invoke_to_skill( self, incoming_activity: Activity, request_id: str, connection_name: str, token: str, ): activity = incoming_activity.create_reply() activity.type = ActivityTypes.invoke activity.name = SignInConstants.token_exchange_operation_name activity.value = TokenExchangeInvokeRequest( id=request_id, token=token, connection_name=connection_name, ) # route the activity to the skill skill_info = self.dialog_options.skill response = await self.dialog_options.skill_client.post_activity( self.dialog_options.bot_id, skill_info.app_id, skill_info.skill_endpoint, self.dialog_options.skill_host_endpoint, incoming_activity.conversation.id, activity, ) # Check response status: true if success, false if failure return response.is_successful_status_code()