Spaces:
Sleeping
Sleeping
| import datetime | |
| import logging | |
| import re | |
| import omagent_core.engine.http.models as http_models | |
| import six | |
| from omagent_core.engine.configuration.configuration import Configuration | |
| from omagent_core.engine.http import rest | |
| from requests.structures import CaseInsensitiveDict | |
| logger = logging.getLogger(Configuration.get_logging_formatted_name(__name__)) | |
| class ObjectMapper(object): | |
| PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types | |
| NATIVE_TYPES_MAPPING = { | |
| "int": int, | |
| "long": int if six.PY3 else long, # noqa: F821 | |
| "float": float, | |
| "str": str, | |
| "bool": bool, | |
| "date": datetime.date, | |
| "datetime": datetime.datetime, | |
| "object": object, | |
| } | |
| def to_json(self, obj): | |
| if obj is None: | |
| return None | |
| elif isinstance(obj, self.PRIMITIVE_TYPES): | |
| return obj | |
| elif isinstance(obj, list): | |
| return [self.to_json(sub_obj) for sub_obj in obj] | |
| elif isinstance(obj, tuple): | |
| return tuple(self.to_json(sub_obj) for sub_obj in obj) | |
| elif isinstance(obj, (datetime.datetime, datetime.date)): | |
| return obj.isoformat() | |
| if isinstance(obj, dict) or isinstance(obj, CaseInsensitiveDict): | |
| obj_dict = obj | |
| else: | |
| # Convert model obj to dict except | |
| # attributes `swagger_types`, `attribute_map` | |
| # and attributes which value is not None. | |
| # Convert attribute name to json key in | |
| # model definition for request. | |
| if hasattr(obj, "attribute_map") and hasattr(obj, "swagger_types"): | |
| obj_dict = { | |
| obj.attribute_map[attr]: getattr(obj, attr) | |
| for attr, _ in six.iteritems(obj.swagger_types) | |
| if getattr(obj, attr) is not None | |
| } | |
| else: | |
| obj_dict = { | |
| name: getattr(obj, name) | |
| for name in vars(obj) | |
| if getattr(obj, name) is not None | |
| } | |
| return {key: self.to_json(val) for key, val in six.iteritems(obj_dict)} | |
| def from_json(self, data, klass): | |
| return self.__deserialize(data, klass) | |
| def __deserialize(self, data, klass): | |
| if data is None: | |
| return None | |
| if type(klass) == str: | |
| if klass.startswith("list["): | |
| sub_kls = re.match(r"list\[(.*)\]", klass).group(1) | |
| return [self.__deserialize(sub_data, sub_kls) for sub_data in data] | |
| if klass.startswith("dict("): | |
| sub_kls = re.match(r"dict\(([^,]*), (.*)\)", klass).group(2) | |
| return { | |
| k: self.__deserialize(v, sub_kls) for k, v in six.iteritems(data) | |
| } | |
| # convert str to class | |
| if klass in self.NATIVE_TYPES_MAPPING: | |
| klass = self.NATIVE_TYPES_MAPPING[klass] | |
| else: | |
| klass = getattr(http_models, klass) | |
| if klass in self.PRIMITIVE_TYPES: | |
| return self.__deserialize_primitive(data, klass) | |
| elif klass == object: | |
| return self.__deserialize_object(data) | |
| elif klass == datetime.date: | |
| return self.__deserialize_date(data) | |
| elif klass == datetime.datetime: | |
| return self.__deserialize_datatime(data) | |
| else: | |
| return self.__deserialize_model(data, klass) | |
| def __deserialize_primitive(self, data, klass): | |
| """Deserializes string to primitive type. | |
| :param data: str. | |
| :param klass: class literal. | |
| :return: int, long, float, str, bool. | |
| """ | |
| try: | |
| if klass == str and type(data) == bytes: | |
| return self.__deserialize_bytes_to_str(data) | |
| return klass(data) | |
| except UnicodeEncodeError: | |
| return six.text_type(data) | |
| except TypeError: | |
| return data | |
| def __deserialize_bytes_to_str(self, data): | |
| return data.decode("utf-8") | |
| def __deserialize_object(self, value): | |
| """Return a original value. | |
| :return: object. | |
| """ | |
| return value | |
| def __deserialize_date(self, string): | |
| """Deserializes string to date. | |
| :param string: str. | |
| :return: date. | |
| """ | |
| try: | |
| from dateutil.parser import parse | |
| return parse(string).date() | |
| except ImportError: | |
| return string | |
| except ValueError: | |
| raise rest.ApiException( | |
| status=0, reason="Failed to parse `{0}` as date object".format(string) | |
| ) | |
| def __deserialize_datatime(self, string): | |
| """Deserializes string to datetime. | |
| The string should be in iso8601 datetime format. | |
| :param string: str. | |
| :return: datetime. | |
| """ | |
| try: | |
| from dateutil.parser import parse | |
| return parse(string) | |
| except ImportError: | |
| return string | |
| except ValueError: | |
| raise rest.ApiException( | |
| status=0, | |
| reason=("Failed to parse `{0}` as datetime object".format(string)), | |
| ) | |
| def __hasattr(self, object, name): | |
| return name in object.__class__.__dict__ | |
| def __deserialize_model(self, data, klass): | |
| if not klass.swagger_types and not self.__hasattr( | |
| klass, "get_real_child_model" | |
| ): | |
| return data | |
| kwargs = {} | |
| if klass.swagger_types is not None: | |
| for attr, attr_type in six.iteritems(klass.swagger_types): | |
| if ( | |
| data is not None | |
| and klass.attribute_map[attr] in data | |
| and isinstance(data, (list, dict)) | |
| ): | |
| value = data[klass.attribute_map[attr]] | |
| kwargs[attr] = self.__deserialize(value, attr_type) | |
| instance = klass(**kwargs) | |
| if ( | |
| isinstance(instance, dict) | |
| and klass.swagger_types is not None | |
| and isinstance(data, dict) | |
| ): | |
| for key, value in data.items(): | |
| if key not in klass.swagger_types: | |
| instance[key] = value | |
| if self.__hasattr(instance, "get_real_child_model"): | |
| klass_name = instance.get_real_child_model(data) | |
| if klass_name: | |
| instance = self.__deserialize(data, klass_name) | |
| return instance | |