| | import math |
| | import json |
| |
|
| | from jmespath import exceptions |
| | from jmespath.compat import string_type as STRING_TYPE |
| | from jmespath.compat import get_methods |
| |
|
| |
|
| | |
| | TYPES_MAP = { |
| | 'bool': 'boolean', |
| | 'list': 'array', |
| | 'dict': 'object', |
| | 'NoneType': 'null', |
| | 'unicode': 'string', |
| | 'str': 'string', |
| | 'float': 'number', |
| | 'int': 'number', |
| | 'long': 'number', |
| | 'OrderedDict': 'object', |
| | '_Projection': 'array', |
| | '_Expression': 'expref', |
| | } |
| |
|
| |
|
| | |
| | REVERSE_TYPES_MAP = { |
| | 'boolean': ('bool',), |
| | 'array': ('list', '_Projection'), |
| | 'object': ('dict', 'OrderedDict',), |
| | 'null': ('NoneType',), |
| | 'string': ('unicode', 'str'), |
| | 'number': ('float', 'int', 'long'), |
| | 'expref': ('_Expression',), |
| | } |
| |
|
| |
|
| | def signature(*arguments): |
| | def _record_signature(func): |
| | func.signature = arguments |
| | return func |
| | return _record_signature |
| |
|
| |
|
| | class FunctionRegistry(type): |
| | def __init__(cls, name, bases, attrs): |
| | cls._populate_function_table() |
| | super(FunctionRegistry, cls).__init__(name, bases, attrs) |
| |
|
| | def _populate_function_table(cls): |
| | function_table = {} |
| | |
| | |
| | |
| | for name, method in get_methods(cls): |
| | if not name.startswith('_func_'): |
| | continue |
| | signature = getattr(method, 'signature', None) |
| | if signature is not None: |
| | function_table[name[6:]] = { |
| | 'function': method, |
| | 'signature': signature, |
| | } |
| | cls.FUNCTION_TABLE = function_table |
| |
|
| |
|
| | class Functions(metaclass=FunctionRegistry): |
| |
|
| | FUNCTION_TABLE = { |
| | } |
| |
|
| | def call_function(self, function_name, resolved_args): |
| | try: |
| | spec = self.FUNCTION_TABLE[function_name] |
| | except KeyError: |
| | raise exceptions.UnknownFunctionError( |
| | "Unknown function: %s()" % function_name) |
| | function = spec['function'] |
| | signature = spec['signature'] |
| | self._validate_arguments(resolved_args, signature, function_name) |
| | return function(self, *resolved_args) |
| |
|
| | def _validate_arguments(self, args, signature, function_name): |
| | if signature and signature[-1].get('variadic'): |
| | if len(args) < len(signature): |
| | raise exceptions.VariadictArityError( |
| | len(signature), len(args), function_name) |
| | elif len(args) != len(signature): |
| | raise exceptions.ArityError( |
| | len(signature), len(args), function_name) |
| | return self._type_check(args, signature, function_name) |
| |
|
| | def _type_check(self, actual, signature, function_name): |
| | for i in range(len(signature)): |
| | allowed_types = signature[i]['types'] |
| | if allowed_types: |
| | self._type_check_single(actual[i], allowed_types, |
| | function_name) |
| |
|
| | def _type_check_single(self, current, types, function_name): |
| | |
| | |
| | |
| | allowed_types, allowed_subtypes = self._get_allowed_pytypes(types) |
| | |
| | |
| | |
| | |
| | actual_typename = type(current).__name__ |
| | if actual_typename not in allowed_types: |
| | raise exceptions.JMESPathTypeError( |
| | function_name, current, |
| | self._convert_to_jmespath_type(actual_typename), types) |
| | |
| | |
| | |
| | |
| | |
| | if allowed_subtypes: |
| | self._subtype_check(current, allowed_subtypes, |
| | types, function_name) |
| |
|
| | def _get_allowed_pytypes(self, types): |
| | allowed_types = [] |
| | allowed_subtypes = [] |
| | for t in types: |
| | type_ = t.split('-', 1) |
| | if len(type_) == 2: |
| | type_, subtype = type_ |
| | allowed_subtypes.append(REVERSE_TYPES_MAP[subtype]) |
| | else: |
| | type_ = type_[0] |
| | allowed_types.extend(REVERSE_TYPES_MAP[type_]) |
| | return allowed_types, allowed_subtypes |
| |
|
| | def _subtype_check(self, current, allowed_subtypes, types, function_name): |
| | if len(allowed_subtypes) == 1: |
| | |
| | |
| | allowed_subtypes = allowed_subtypes[0] |
| | for element in current: |
| | actual_typename = type(element).__name__ |
| | if actual_typename not in allowed_subtypes: |
| | raise exceptions.JMESPathTypeError( |
| | function_name, element, actual_typename, types) |
| | elif len(allowed_subtypes) > 1 and current: |
| | |
| | |
| | |
| | first = type(current[0]).__name__ |
| | for subtypes in allowed_subtypes: |
| | if first in subtypes: |
| | allowed = subtypes |
| | break |
| | else: |
| | raise exceptions.JMESPathTypeError( |
| | function_name, current[0], first, types) |
| | for element in current: |
| | actual_typename = type(element).__name__ |
| | if actual_typename not in allowed: |
| | raise exceptions.JMESPathTypeError( |
| | function_name, element, actual_typename, types) |
| |
|
| | @signature({'types': ['number']}) |
| | def _func_abs(self, arg): |
| | return abs(arg) |
| |
|
| | @signature({'types': ['array-number']}) |
| | def _func_avg(self, arg): |
| | if arg: |
| | return sum(arg) / float(len(arg)) |
| | else: |
| | return None |
| |
|
| | @signature({'types': [], 'variadic': True}) |
| | def _func_not_null(self, *arguments): |
| | for argument in arguments: |
| | if argument is not None: |
| | return argument |
| |
|
| | @signature({'types': []}) |
| | def _func_to_array(self, arg): |
| | if isinstance(arg, list): |
| | return arg |
| | else: |
| | return [arg] |
| |
|
| | @signature({'types': []}) |
| | def _func_to_string(self, arg): |
| | if isinstance(arg, STRING_TYPE): |
| | return arg |
| | else: |
| | return json.dumps(arg, separators=(',', ':'), |
| | default=str) |
| |
|
| | @signature({'types': []}) |
| | def _func_to_number(self, arg): |
| | if isinstance(arg, (list, dict, bool)): |
| | return None |
| | elif arg is None: |
| | return None |
| | elif isinstance(arg, (int, float)): |
| | return arg |
| | else: |
| | try: |
| | return int(arg) |
| | except ValueError: |
| | try: |
| | return float(arg) |
| | except ValueError: |
| | return None |
| |
|
| | @signature({'types': ['array', 'string']}, {'types': []}) |
| | def _func_contains(self, subject, search): |
| | return search in subject |
| |
|
| | @signature({'types': ['string', 'array', 'object']}) |
| | def _func_length(self, arg): |
| | return len(arg) |
| |
|
| | @signature({'types': ['string']}, {'types': ['string']}) |
| | def _func_ends_with(self, search, suffix): |
| | return search.endswith(suffix) |
| |
|
| | @signature({'types': ['string']}, {'types': ['string']}) |
| | def _func_starts_with(self, search, suffix): |
| | return search.startswith(suffix) |
| |
|
| | @signature({'types': ['array', 'string']}) |
| | def _func_reverse(self, arg): |
| | if isinstance(arg, STRING_TYPE): |
| | return arg[::-1] |
| | else: |
| | return list(reversed(arg)) |
| |
|
| | @signature({"types": ['number']}) |
| | def _func_ceil(self, arg): |
| | return math.ceil(arg) |
| |
|
| | @signature({"types": ['number']}) |
| | def _func_floor(self, arg): |
| | return math.floor(arg) |
| |
|
| | @signature({"types": ['string']}, {"types": ['array-string']}) |
| | def _func_join(self, separator, array): |
| | return separator.join(array) |
| |
|
| | @signature({'types': ['expref']}, {'types': ['array']}) |
| | def _func_map(self, expref, arg): |
| | result = [] |
| | for element in arg: |
| | result.append(expref.visit(expref.expression, element)) |
| | return result |
| |
|
| | @signature({"types": ['array-number', 'array-string']}) |
| | def _func_max(self, arg): |
| | if arg: |
| | return max(arg) |
| | else: |
| | return None |
| |
|
| | @signature({"types": ["object"], "variadic": True}) |
| | def _func_merge(self, *arguments): |
| | merged = {} |
| | for arg in arguments: |
| | merged.update(arg) |
| | return merged |
| |
|
| | @signature({"types": ['array-number', 'array-string']}) |
| | def _func_min(self, arg): |
| | if arg: |
| | return min(arg) |
| | else: |
| | return None |
| |
|
| | @signature({"types": ['array-string', 'array-number']}) |
| | def _func_sort(self, arg): |
| | return list(sorted(arg)) |
| |
|
| | @signature({"types": ['array-number']}) |
| | def _func_sum(self, arg): |
| | return sum(arg) |
| |
|
| | @signature({"types": ['object']}) |
| | def _func_keys(self, arg): |
| | |
| | |
| | return list(arg.keys()) |
| |
|
| | @signature({"types": ['object']}) |
| | def _func_values(self, arg): |
| | return list(arg.values()) |
| |
|
| | @signature({'types': []}) |
| | def _func_type(self, arg): |
| | if isinstance(arg, STRING_TYPE): |
| | return "string" |
| | elif isinstance(arg, bool): |
| | return "boolean" |
| | elif isinstance(arg, list): |
| | return "array" |
| | elif isinstance(arg, dict): |
| | return "object" |
| | elif isinstance(arg, (float, int)): |
| | return "number" |
| | elif arg is None: |
| | return "null" |
| |
|
| | @signature({'types': ['array']}, {'types': ['expref']}) |
| | def _func_sort_by(self, array, expref): |
| | if not array: |
| | return array |
| | |
| | |
| | |
| | |
| | |
| | |
| | required_type = self._convert_to_jmespath_type( |
| | type(expref.visit(expref.expression, array[0])).__name__) |
| | if required_type not in ['number', 'string']: |
| | raise exceptions.JMESPathTypeError( |
| | 'sort_by', array[0], required_type, ['string', 'number']) |
| | keyfunc = self._create_key_func(expref, |
| | [required_type], |
| | 'sort_by') |
| | return list(sorted(array, key=keyfunc)) |
| |
|
| | @signature({'types': ['array']}, {'types': ['expref']}) |
| | def _func_min_by(self, array, expref): |
| | keyfunc = self._create_key_func(expref, |
| | ['number', 'string'], |
| | 'min_by') |
| | if array: |
| | return min(array, key=keyfunc) |
| | else: |
| | return None |
| |
|
| | @signature({'types': ['array']}, {'types': ['expref']}) |
| | def _func_max_by(self, array, expref): |
| | keyfunc = self._create_key_func(expref, |
| | ['number', 'string'], |
| | 'max_by') |
| | if array: |
| | return max(array, key=keyfunc) |
| | else: |
| | return None |
| |
|
| | def _create_key_func(self, expref, allowed_types, function_name): |
| | def keyfunc(x): |
| | result = expref.visit(expref.expression, x) |
| | actual_typename = type(result).__name__ |
| | jmespath_type = self._convert_to_jmespath_type(actual_typename) |
| | |
| | if jmespath_type not in allowed_types: |
| | raise exceptions.JMESPathTypeError( |
| | function_name, result, jmespath_type, allowed_types) |
| | return result |
| | return keyfunc |
| |
|
| | def _convert_to_jmespath_type(self, pyobject): |
| | return TYPES_MAP.get(pyobject, 'unknown') |
| |
|