Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-ai
/botbuilder
/ai
/luis
/luis_util.py
| # Copyright (c) Microsoft Corporation. All rights reserved. | |
| # Licensed under the MIT License. | |
| import platform | |
| from collections import OrderedDict | |
| from typing import Dict, List, Union | |
| import azure.cognitiveservices.language.luis.runtime.models as runtime_models | |
| from azure.cognitiveservices.language.luis.runtime.models import ( | |
| CompositeEntityModel, | |
| EntityModel, | |
| LuisResult, | |
| ) | |
| from msrest import Serializer | |
| from botbuilder.core import IntentScore, RecognizerResult | |
| from .. import __title__, __version__ | |
| class LuisUtil: | |
| """ | |
| Utility functions used to extract and transform data from Luis SDK | |
| """ | |
| _metadata_key: str = "$instance" | |
| def normalized_intent(intent: str) -> str: | |
| return intent.replace(".", "_").replace(" ", "_") | |
| def get_intents(luis_result: LuisResult) -> Dict[str, IntentScore]: | |
| if luis_result.intents is not None: | |
| return { | |
| LuisUtil.normalized_intent(i.intent): IntentScore(i.score or 0) | |
| for i in luis_result.intents | |
| } | |
| return { | |
| LuisUtil.normalized_intent( | |
| luis_result.top_scoring_intent.intent | |
| ): IntentScore(luis_result.top_scoring_intent.score or 0) | |
| } | |
| def extract_entities_and_metadata( | |
| entities: List[EntityModel], | |
| composite_entities: List[CompositeEntityModel], | |
| verbose: bool, | |
| ) -> Dict[str, object]: | |
| entities_and_metadata = {} | |
| if verbose: | |
| entities_and_metadata[LuisUtil._metadata_key] = {} | |
| composite_entity_types = set() | |
| # We start by populating composite entities so that entities covered by them are removed from the entities list | |
| if composite_entities: | |
| composite_entity_types = set(ce.parent_type for ce in composite_entities) | |
| current = entities | |
| for composite_entity in composite_entities: | |
| current = LuisUtil.populate_composite_entity_model( | |
| composite_entity, current, entities_and_metadata, verbose | |
| ) | |
| entities = current | |
| for entity in entities: | |
| # we'll address composite entities separately | |
| if entity.type in composite_entity_types: | |
| continue | |
| LuisUtil.add_property( | |
| entities_and_metadata, | |
| LuisUtil.extract_normalized_entity_name(entity), | |
| LuisUtil.extract_entity_value(entity), | |
| ) | |
| if verbose: | |
| LuisUtil.add_property( | |
| entities_and_metadata[LuisUtil._metadata_key], | |
| LuisUtil.extract_normalized_entity_name(entity), | |
| LuisUtil.extract_entity_metadata(entity), | |
| ) | |
| return entities_and_metadata | |
| def number(value: object) -> Union[int, float]: | |
| if value is None: | |
| return None | |
| try: | |
| str_value = str(value) | |
| int_value = int(str_value) | |
| return int_value | |
| except ValueError: | |
| float_value = float(str_value) | |
| return float_value | |
| def extract_entity_value(entity: EntityModel) -> object: | |
| if ( | |
| entity.additional_properties is None | |
| or "resolution" not in entity.additional_properties | |
| ): | |
| return entity.entity | |
| resolution = entity.additional_properties["resolution"] | |
| if entity.type.startswith("builtin.datetime."): | |
| return resolution | |
| if entity.type.startswith("builtin.datetimeV2."): | |
| if not resolution["values"]: | |
| return resolution | |
| resolution_values = resolution["values"] | |
| val_type = resolution["values"][0]["type"] | |
| timexes = [val["timex"] for val in resolution_values] | |
| distinct_timexes = list(OrderedDict.fromkeys(timexes)) | |
| return {"type": val_type, "timex": distinct_timexes} | |
| if entity.type in {"builtin.number", "builtin.ordinal"}: | |
| return LuisUtil.number(resolution["value"]) | |
| if entity.type == "builtin.percentage": | |
| svalue = str(resolution["value"]) | |
| if svalue.endswith("%"): | |
| svalue = svalue[:-1] | |
| return LuisUtil.number(svalue) | |
| if entity.type in { | |
| "builtin.age", | |
| "builtin.dimension", | |
| "builtin.currency", | |
| "builtin.temperature", | |
| }: | |
| units = resolution["unit"] | |
| val = LuisUtil.number(resolution["value"]) | |
| obj = {} | |
| if val is not None: | |
| obj["number"] = val | |
| obj["units"] = units | |
| return obj | |
| value = resolution.get("value") | |
| return value if value is not None else resolution.get("values") | |
| def extract_entity_metadata(entity: EntityModel) -> Dict: | |
| obj = dict( | |
| startIndex=int(entity.start_index), | |
| endIndex=int(entity.end_index + 1), | |
| text=entity.entity, | |
| type=entity.type, | |
| ) | |
| if entity.additional_properties is not None: | |
| if "score" in entity.additional_properties: | |
| obj["score"] = float(entity.additional_properties["score"]) | |
| resolution = entity.additional_properties.get("resolution") | |
| if resolution is not None and resolution.get("subtype") is not None: | |
| obj["subtype"] = resolution["subtype"] | |
| return obj | |
| def extract_normalized_entity_name(entity: EntityModel) -> str: | |
| # Type::Role -> Role | |
| type = entity.type.split(":")[-1] | |
| if type.startswith("builtin.datetimeV2."): | |
| type = "datetime" | |
| if type.startswith("builtin.currency"): | |
| type = "money" | |
| if type.startswith("builtin."): | |
| type = type[8:] | |
| role = ( | |
| entity.additional_properties["role"] | |
| if entity.additional_properties is not None | |
| and "role" in entity.additional_properties | |
| else "" | |
| ) | |
| if role and not role.isspace(): | |
| type = role | |
| return type.replace(".", "_").replace(" ", "_") | |
| def populate_composite_entity_model( | |
| composite_entity: CompositeEntityModel, | |
| entities: List[EntityModel], | |
| entities_and_metadata: Dict, | |
| verbose: bool, | |
| ) -> List[EntityModel]: | |
| children_entities = {} | |
| children_entities_metadata = {} | |
| if verbose: | |
| children_entities[LuisUtil._metadata_key] = {} | |
| # This is now implemented as O(n^2) search and can be reduced to O(2n) using a map as an optimization if n grows | |
| composite_entity_metadata = next( | |
| ( | |
| ent | |
| for ent in entities | |
| if ent.type == composite_entity.parent_type | |
| and ent.entity == composite_entity.value | |
| ), | |
| None, | |
| ) | |
| # This is an error case and should not happen in theory | |
| if composite_entity_metadata is None: | |
| return entities | |
| if verbose: | |
| children_entities_metadata = LuisUtil.extract_entity_metadata( | |
| composite_entity_metadata | |
| ) | |
| children_entities[LuisUtil._metadata_key] = {} | |
| covered_set: List[EntityModel] = [] | |
| for child in composite_entity.children: | |
| for entity in entities: | |
| # We already covered this entity | |
| if entity in covered_set: | |
| continue | |
| # This entity doesn't belong to this composite entity | |
| if child.type != entity.type or not LuisUtil.composite_contains_entity( | |
| composite_entity_metadata, entity | |
| ): | |
| continue | |
| # Add to the set to ensure that we don't consider the same child entity more than once per composite | |
| covered_set.append(entity) | |
| LuisUtil.add_property( | |
| children_entities, | |
| LuisUtil.extract_normalized_entity_name(entity), | |
| LuisUtil.extract_entity_value(entity), | |
| ) | |
| if verbose: | |
| LuisUtil.add_property( | |
| children_entities[LuisUtil._metadata_key], | |
| LuisUtil.extract_normalized_entity_name(entity), | |
| LuisUtil.extract_entity_metadata(entity), | |
| ) | |
| LuisUtil.add_property( | |
| entities_and_metadata, | |
| LuisUtil.extract_normalized_entity_name(composite_entity_metadata), | |
| children_entities, | |
| ) | |
| if verbose: | |
| LuisUtil.add_property( | |
| entities_and_metadata[LuisUtil._metadata_key], | |
| LuisUtil.extract_normalized_entity_name(composite_entity_metadata), | |
| children_entities_metadata, | |
| ) | |
| # filter entities that were covered by this composite entity | |
| return [entity for entity in entities if entity not in covered_set] | |
| def composite_contains_entity( | |
| composite_entity_metadata: EntityModel, entity: EntityModel | |
| ) -> bool: | |
| return ( | |
| entity.start_index >= composite_entity_metadata.start_index | |
| and entity.end_index <= composite_entity_metadata.end_index | |
| ) | |
| def add_property(obj: Dict[str, object], key: str, value: object) -> None: | |
| # If a property doesn't exist add it to a new array, otherwise append it to the existing array. | |
| if key in obj: | |
| obj[key].append(value) | |
| else: | |
| obj[key] = [value] | |
| def add_properties(luis: LuisResult, result: RecognizerResult) -> None: | |
| if luis.sentiment_analysis is not None: | |
| result.properties["sentiment"] = { | |
| "label": luis.sentiment_analysis.label, | |
| "score": luis.sentiment_analysis.score, | |
| } | |
| def get_user_agent(): | |
| package_user_agent = f"{__title__}/{__version__}" | |
| uname = platform.uname() | |
| os_version = f"{uname.machine}-{uname.system}-{uname.version}" | |
| py_version = f"Python,Version={platform.python_version()}" | |
| platform_user_agent = f"({os_version}; {py_version})" | |
| user_agent = f"{package_user_agent} {platform_user_agent}" | |
| return user_agent | |
| def recognizer_result_as_dict( | |
| recognizer_result: RecognizerResult, | |
| ) -> Dict[str, object]: | |
| # an internal method that returns a dict for json serialization. | |
| intents: Dict[str, Dict[str, float]] = ( | |
| { | |
| name: LuisUtil.intent_score_as_dict(intent_score) | |
| for name, intent_score in recognizer_result.intents.items() | |
| } | |
| if recognizer_result.intents is not None | |
| else None | |
| ) | |
| dictionary: Dict[str, object] = { | |
| "text": recognizer_result.text, | |
| "alteredText": recognizer_result.altered_text, | |
| "intents": intents, | |
| "entities": recognizer_result.entities, | |
| } | |
| if recognizer_result.properties is not None: | |
| for key, value in recognizer_result.properties.items(): | |
| if key not in dictionary: | |
| if isinstance(value, LuisResult): | |
| dictionary[key] = LuisUtil.luis_result_as_dict(value) | |
| else: | |
| dictionary[key] = value | |
| return dictionary | |
| def intent_score_as_dict(intent_score: IntentScore) -> Dict[str, float]: | |
| if intent_score is None: | |
| return None | |
| return {"score": intent_score.score} | |
| def luis_result_as_dict(luis_result: LuisResult) -> Dict[str, object]: | |
| if luis_result is None: | |
| return None | |
| client_models = { | |
| k: v for k, v in runtime_models.__dict__.items() if isinstance(v, type) | |
| } | |
| serializer = Serializer(client_models) | |
| result = serializer.body(luis_result, "LuisResult") | |
| return result | |