Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-azure
/botbuilder
/azure
/cosmosdb_partitioned_storage.py
| """Implements a CosmosDB based storage provider using partitioning for a bot. | |
| """ | |
| # Copyright (c) Microsoft Corporation. All rights reserved. | |
| # Licensed under the MIT License. | |
| from typing import Dict, List | |
| from threading import Lock | |
| import json | |
| from azure.cosmos import documents, http_constants | |
| from jsonpickle.pickler import Pickler | |
| from jsonpickle.unpickler import Unpickler | |
| import azure.cosmos.cosmos_client as cosmos_client # pylint: disable=no-name-in-module,import-error | |
| import azure.cosmos.errors as cosmos_errors # pylint: disable=no-name-in-module,import-error | |
| from botbuilder.core.storage import Storage | |
| from botbuilder.azure import CosmosDbKeyEscape | |
| class CosmosDbPartitionedConfig: | |
| """The class for partitioned CosmosDB configuration for the Azure Bot Framework.""" | |
| def __init__( | |
| self, | |
| cosmos_db_endpoint: str = None, | |
| auth_key: str = None, | |
| database_id: str = None, | |
| container_id: str = None, | |
| cosmos_client_options: dict = None, | |
| container_throughput: int = 400, | |
| key_suffix: str = "", | |
| compatibility_mode: bool = False, | |
| **kwargs, | |
| ): | |
| """Create the Config object. | |
| :param cosmos_db_endpoint: The CosmosDB endpoint. | |
| :param auth_key: The authentication key for Cosmos DB. | |
| :param database_id: The database identifier for Cosmos DB instance. | |
| :param container_id: The container identifier. | |
| :param cosmos_client_options: The options for the CosmosClient. Currently only supports connection_policy and | |
| consistency_level | |
| :param container_throughput: The throughput set when creating the Container. Defaults to 400. | |
| :param key_suffix: The suffix to be added to every key. The keySuffix must contain only valid ComosDb | |
| key characters. (e.g. not: '\\', '?', '/', '#', '*') | |
| :param compatibility_mode: True if keys should be truncated in order to support previous CosmosDb | |
| max key length of 255. | |
| :return CosmosDbPartitionedConfig: | |
| """ | |
| self.__config_file = kwargs.get("filename") | |
| if self.__config_file: | |
| kwargs = json.load(open(self.__config_file)) | |
| self.cosmos_db_endpoint = cosmos_db_endpoint or kwargs.get("cosmos_db_endpoint") | |
| self.auth_key = auth_key or kwargs.get("auth_key") | |
| self.database_id = database_id or kwargs.get("database_id") | |
| self.container_id = container_id or kwargs.get("container_id") | |
| self.cosmos_client_options = cosmos_client_options or kwargs.get( | |
| "cosmos_client_options", {} | |
| ) | |
| self.container_throughput = container_throughput or kwargs.get( | |
| "container_throughput" | |
| ) | |
| self.key_suffix = key_suffix or kwargs.get("key_suffix") | |
| self.compatibility_mode = compatibility_mode or kwargs.get("compatibility_mode") | |
| class CosmosDbPartitionedStorage(Storage): | |
| """A CosmosDB based storage provider using partitioning for a bot.""" | |
| def __init__(self, config: CosmosDbPartitionedConfig): | |
| """Create the storage object. | |
| :param config: | |
| """ | |
| super(CosmosDbPartitionedStorage, self).__init__() | |
| self.config = config | |
| self.client = None | |
| self.database = None | |
| self.container = None | |
| self.compatability_mode_partition_key = False | |
| # Lock used for synchronizing container creation | |
| self.__lock = Lock() | |
| if config.key_suffix is None: | |
| config.key_suffix = "" | |
| if not config.key_suffix.__eq__(""): | |
| if config.compatibility_mode: | |
| raise Exception( | |
| "compatibilityMode cannot be true while using a keySuffix." | |
| ) | |
| suffix_escaped = CosmosDbKeyEscape.sanitize_key(config.key_suffix) | |
| if not suffix_escaped.__eq__(config.key_suffix): | |
| raise Exception( | |
| f"Cannot use invalid Row Key characters: {config.key_suffix} in keySuffix." | |
| ) | |
| async def read(self, keys: List[str]) -> Dict[str, object]: | |
| """Read storeitems from storage. | |
| :param keys: | |
| :return dict: | |
| """ | |
| if not keys: | |
| raise Exception("Keys are required when reading") | |
| await self.initialize() | |
| store_items = {} | |
| for key in keys: | |
| try: | |
| escaped_key = CosmosDbKeyEscape.sanitize_key( | |
| key, self.config.key_suffix, self.config.compatibility_mode | |
| ) | |
| read_item_response = self.client.ReadItem( | |
| self.__item_link(escaped_key), self.__get_partition_key(escaped_key) | |
| ) | |
| document_store_item = read_item_response | |
| if document_store_item: | |
| store_items[document_store_item["realId"]] = self.__create_si( | |
| document_store_item | |
| ) | |
| # When an item is not found a CosmosException is thrown, but we want to | |
| # return an empty collection so in this instance we catch and do not rethrow. | |
| # Throw for any other exception. | |
| except cosmos_errors.HTTPFailure as err: | |
| if ( | |
| err.status_code | |
| == cosmos_errors.http_constants.StatusCodes.NOT_FOUND | |
| ): | |
| continue | |
| raise err | |
| except Exception as err: | |
| raise err | |
| return store_items | |
| async def write(self, changes: Dict[str, object]): | |
| """Save storeitems to storage. | |
| :param changes: | |
| :return: | |
| """ | |
| if changes is None: | |
| raise Exception("Changes are required when writing") | |
| if not changes: | |
| return | |
| await self.initialize() | |
| for key, change in changes.items(): | |
| e_tag = None | |
| if isinstance(change, dict): | |
| e_tag = change.get("e_tag", None) | |
| elif hasattr(change, "e_tag"): | |
| e_tag = change.e_tag | |
| doc = { | |
| "id": CosmosDbKeyEscape.sanitize_key( | |
| key, self.config.key_suffix, self.config.compatibility_mode | |
| ), | |
| "realId": key, | |
| "document": self.__create_dict(change), | |
| } | |
| if e_tag == "": | |
| raise Exception("cosmosdb_storage.write(): etag missing") | |
| access_condition = { | |
| "accessCondition": {"type": "IfMatch", "condition": e_tag} | |
| } | |
| options = ( | |
| access_condition if e_tag != "*" and e_tag and e_tag != "" else None | |
| ) | |
| try: | |
| self.client.UpsertItem( | |
| database_or_Container_link=self.__container_link, | |
| document=doc, | |
| options=options, | |
| ) | |
| except cosmos_errors.HTTPFailure as err: | |
| raise err | |
| except Exception as err: | |
| raise err | |
| async def delete(self, keys: List[str]): | |
| """Remove storeitems from storage. | |
| :param keys: | |
| :return: | |
| """ | |
| await self.initialize() | |
| for key in keys: | |
| escaped_key = CosmosDbKeyEscape.sanitize_key( | |
| key, self.config.key_suffix, self.config.compatibility_mode | |
| ) | |
| try: | |
| self.client.DeleteItem( | |
| document_link=self.__item_link(escaped_key), | |
| options=self.__get_partition_key(escaped_key), | |
| ) | |
| except cosmos_errors.HTTPFailure as err: | |
| if ( | |
| err.status_code | |
| == cosmos_errors.http_constants.StatusCodes.NOT_FOUND | |
| ): | |
| continue | |
| raise err | |
| except Exception as err: | |
| raise err | |
| async def initialize(self): | |
| if not self.container: | |
| if not self.client: | |
| self.client = cosmos_client.CosmosClient( | |
| self.config.cosmos_db_endpoint, | |
| {"masterKey": self.config.auth_key}, | |
| self.config.cosmos_client_options.get("connection_policy", None), | |
| self.config.cosmos_client_options.get("consistency_level", None), | |
| ) | |
| if not self.database: | |
| with self.__lock: | |
| try: | |
| if not self.database: | |
| self.database = self.client.CreateDatabase( | |
| {"id": self.config.database_id} | |
| ) | |
| except cosmos_errors.HTTPFailure: | |
| self.database = self.client.ReadDatabase( | |
| "dbs/" + self.config.database_id | |
| ) | |
| self.__get_or_create_container() | |
| def __get_or_create_container(self): | |
| with self.__lock: | |
| container_def = { | |
| "id": self.config.container_id, | |
| "partitionKey": { | |
| "paths": ["/id"], | |
| "kind": documents.PartitionKind.Hash, | |
| }, | |
| } | |
| try: | |
| if not self.container: | |
| self.container = self.client.CreateContainer( | |
| "dbs/" + self.database["id"], | |
| container_def, | |
| {"offerThroughput": self.config.container_throughput}, | |
| ) | |
| except cosmos_errors.HTTPFailure as err: | |
| if err.status_code == http_constants.StatusCodes.CONFLICT: | |
| self.container = self.client.ReadContainer( | |
| "dbs/" + self.database["id"] + "/colls/" + container_def["id"] | |
| ) | |
| if "partitionKey" not in self.container: | |
| self.compatability_mode_partition_key = True | |
| else: | |
| paths = self.container["partitionKey"]["paths"] | |
| if "/partitionKey" in paths: | |
| self.compatability_mode_partition_key = True | |
| elif "/id" not in paths: | |
| raise Exception( | |
| f"Custom Partition Key Paths are not supported. {self.config.container_id} " | |
| "has a custom Partition Key Path of {paths[0]}." | |
| ) | |
| else: | |
| raise err | |
| def __get_partition_key(self, key: str) -> str: | |
| return None if self.compatability_mode_partition_key else {"partitionKey": key} | |
| def __create_si(result) -> object: | |
| """Create an object from a result out of CosmosDB. | |
| :param result: | |
| :return object: | |
| """ | |
| # get the document item from the result and turn into a dict | |
| doc = result.get("document") | |
| # read the e_tag from Cosmos | |
| if result.get("_etag"): | |
| doc["e_tag"] = result["_etag"] | |
| result_obj = Unpickler().restore(doc) | |
| # create and return the object | |
| return result_obj | |
| def __create_dict(store_item: object) -> Dict: | |
| """Return the dict of an object. | |
| This eliminates non_magic attributes and the e_tag. | |
| :param store_item: | |
| :return dict: | |
| """ | |
| # read the content | |
| json_dict = Pickler().flatten(store_item) | |
| if "e_tag" in json_dict: | |
| del json_dict["e_tag"] | |
| # loop through attributes and write and return a dict | |
| return json_dict | |
| def __item_link(self, identifier) -> str: | |
| """Return the item link of a item in the container. | |
| :param identifier: | |
| :return str: | |
| """ | |
| return self.__container_link + "/docs/" + identifier | |
| def __container_link(self) -> str: | |
| """Return the container link in the database. | |
| :param: | |
| :return str: | |
| """ | |
| return self.__database_link + "/colls/" + self.config.container_id | |
| def __database_link(self) -> str: | |
| """Return the database link. | |
| :return str: | |
| """ | |
| return "dbs/" + self.config.database_id | |