|
|
| from typing import Any, Dict, Optional, Tuple
|
|
|
|
|
| class EntrySelector:
|
| """
|
| Base class for entry selectors
|
| """
|
|
|
| @staticmethod
|
| def from_string(spec: str) -> "EntrySelector":
|
| if spec == "*":
|
| return AllEntrySelector()
|
| return FieldEntrySelector(spec)
|
|
|
|
|
| class AllEntrySelector(EntrySelector):
|
| """
|
| Selector that accepts all entries
|
| """
|
|
|
| SPECIFIER = "*"
|
|
|
| def __call__(self, entry):
|
| return True
|
|
|
|
|
| class FieldEntrySelector(EntrySelector):
|
| """
|
| Selector that accepts only entries that match provided field
|
| specifier(s). Only a limited set of specifiers is supported for now:
|
| <specifiers>::=<specifier>[<comma><specifiers>]
|
| <specifier>::=<field_name>[<type_delim><type>]<equal><value_or_range>
|
| <field_name> is a valid identifier
|
| <type> ::= "int" | "str"
|
| <equal> ::= "="
|
| <comma> ::= ","
|
| <type_delim> ::= ":"
|
| <value_or_range> ::= <value> | <range>
|
| <range> ::= <value><range_delim><value>
|
| <range_delim> ::= "-"
|
| <value> is a string without spaces and special symbols
|
| (e.g. <comma>, <equal>, <type_delim>, <range_delim>)
|
| """
|
|
|
| _SPEC_DELIM = ","
|
| _TYPE_DELIM = ":"
|
| _RANGE_DELIM = "-"
|
| _EQUAL = "="
|
| _ERROR_PREFIX = "Invalid field selector specifier"
|
|
|
| class _FieldEntryValuePredicate:
|
| """
|
| Predicate that checks strict equality for the specified entry field
|
| """
|
|
|
| def __init__(self, name: str, typespec: Optional[str], value: str):
|
| import builtins
|
|
|
| self.name = name
|
| self.type = getattr(builtins, typespec) if typespec is not None else str
|
| self.value = value
|
|
|
| def __call__(self, entry):
|
| return entry[self.name] == self.type(self.value)
|
|
|
| class _FieldEntryRangePredicate:
|
| """
|
| Predicate that checks whether an entry field falls into the specified range
|
| """
|
|
|
| def __init__(self, name: str, typespec: Optional[str], vmin: str, vmax: str):
|
| import builtins
|
|
|
| self.name = name
|
| self.type = getattr(builtins, typespec) if typespec is not None else str
|
| self.vmin = vmin
|
| self.vmax = vmax
|
|
|
| def __call__(self, entry):
|
| return (entry[self.name] >= self.type(self.vmin)) and (
|
| entry[self.name] <= self.type(self.vmax)
|
| )
|
|
|
| def __init__(self, spec: str):
|
| self._predicates = self._parse_specifier_into_predicates(spec)
|
|
|
| def __call__(self, entry: Dict[str, Any]):
|
| for predicate in self._predicates:
|
| if not predicate(entry):
|
| return False
|
| return True
|
|
|
| def _parse_specifier_into_predicates(self, spec: str):
|
| predicates = []
|
| specs = spec.split(self._SPEC_DELIM)
|
| for subspec in specs:
|
| eq_idx = subspec.find(self._EQUAL)
|
| if eq_idx > 0:
|
| field_name_with_type = subspec[:eq_idx]
|
| field_name, field_type = self._parse_field_name_type(field_name_with_type)
|
| field_value_or_range = subspec[eq_idx + 1 :]
|
| if self._is_range_spec(field_value_or_range):
|
| vmin, vmax = self._get_range_spec(field_value_or_range)
|
| predicate = FieldEntrySelector._FieldEntryRangePredicate(
|
| field_name, field_type, vmin, vmax
|
| )
|
| else:
|
| predicate = FieldEntrySelector._FieldEntryValuePredicate(
|
| field_name, field_type, field_value_or_range
|
| )
|
| predicates.append(predicate)
|
| elif eq_idx == 0:
|
| self._parse_error(f'"{subspec}", field name is empty!')
|
| else:
|
| self._parse_error(f'"{subspec}", should have format ' "<field>=<value_or_range>!")
|
| return predicates
|
|
|
| def _parse_field_name_type(self, field_name_with_type: str) -> Tuple[str, Optional[str]]:
|
| type_delim_idx = field_name_with_type.find(self._TYPE_DELIM)
|
| if type_delim_idx > 0:
|
| field_name = field_name_with_type[:type_delim_idx]
|
| field_type = field_name_with_type[type_delim_idx + 1 :]
|
| elif type_delim_idx == 0:
|
| self._parse_error(f'"{field_name_with_type}", field name is empty!')
|
| else:
|
| field_name = field_name_with_type
|
| field_type = None
|
|
|
|
|
| return field_name, field_type
|
|
|
| def _is_range_spec(self, field_value_or_range):
|
| delim_idx = field_value_or_range.find(self._RANGE_DELIM)
|
| return delim_idx > 0
|
|
|
| def _get_range_spec(self, field_value_or_range):
|
| if self._is_range_spec(field_value_or_range):
|
| delim_idx = field_value_or_range.find(self._RANGE_DELIM)
|
| vmin = field_value_or_range[:delim_idx]
|
| vmax = field_value_or_range[delim_idx + 1 :]
|
| return vmin, vmax
|
| else:
|
| self._parse_error('"field_value_or_range", range of values expected!')
|
|
|
| def _parse_error(self, msg):
|
| raise ValueError(f"{self._ERROR_PREFIX}: {msg}")
|
|
|