| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import copy |
| import logging |
| from collections import deque, namedtuple |
|
|
| from botocore.compat import accepts_kwargs |
| from botocore.utils import EVENT_ALIASES |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| _NodeList = namedtuple('NodeList', ['first', 'middle', 'last']) |
| _FIRST = 0 |
| _MIDDLE = 1 |
| _LAST = 2 |
|
|
|
|
| class NodeList(_NodeList): |
| def __copy__(self): |
| first_copy = copy.copy(self.first) |
| middle_copy = copy.copy(self.middle) |
| last_copy = copy.copy(self.last) |
| copied = NodeList(first_copy, middle_copy, last_copy) |
| return copied |
|
|
|
|
| def first_non_none_response(responses, default=None): |
| """Find first non None response in a list of tuples. |
| |
| This function can be used to find the first non None response from |
| handlers connected to an event. This is useful if you are interested |
| in the returned responses from event handlers. Example usage:: |
| |
| print(first_non_none_response([(func1, None), (func2, 'foo'), |
| (func3, 'bar')])) |
| # This will print 'foo' |
| |
| :type responses: list of tuples |
| :param responses: The responses from the ``EventHooks.emit`` method. |
| This is a list of tuples, and each tuple is |
| (handler, handler_response). |
| |
| :param default: If no non-None responses are found, then this default |
| value will be returned. |
| |
| :return: The first non-None response in the list of tuples. |
| |
| """ |
| for response in responses: |
| if response[1] is not None: |
| return response[1] |
| return default |
|
|
|
|
| class BaseEventHooks: |
| def emit(self, event_name, **kwargs): |
| """Call all handlers subscribed to an event. |
| |
| :type event_name: str |
| :param event_name: The name of the event to emit. |
| |
| :type **kwargs: dict |
| :param **kwargs: Arbitrary kwargs to pass through to the |
| subscribed handlers. The ``event_name`` will be injected |
| into the kwargs so it's not necessary to add this to **kwargs. |
| |
| :rtype: list of tuples |
| :return: A list of ``(handler_func, handler_func_return_value)`` |
| |
| """ |
| return [] |
|
|
| def register( |
| self, event_name, handler, unique_id=None, unique_id_uses_count=False |
| ): |
| """Register an event handler for a given event. |
| |
| If a ``unique_id`` is given, the handler will not be registered |
| if a handler with the ``unique_id`` has already been registered. |
| |
| Handlers are called in the order they have been registered. |
| Note handlers can also be registered with ``register_first()`` |
| and ``register_last()``. All handlers registered with |
| ``register_first()`` are called before handlers registered |
| with ``register()`` which are called before handlers registered |
| with ``register_last()``. |
| |
| """ |
| self._verify_and_register( |
| event_name, |
| handler, |
| unique_id, |
| register_method=self._register, |
| unique_id_uses_count=unique_id_uses_count, |
| ) |
|
|
| def register_first( |
| self, event_name, handler, unique_id=None, unique_id_uses_count=False |
| ): |
| """Register an event handler to be called first for an event. |
| |
| All event handlers registered with ``register_first()`` will |
| be called before handlers registered with ``register()`` and |
| ``register_last()``. |
| |
| """ |
| self._verify_and_register( |
| event_name, |
| handler, |
| unique_id, |
| register_method=self._register_first, |
| unique_id_uses_count=unique_id_uses_count, |
| ) |
|
|
| def register_last( |
| self, event_name, handler, unique_id=None, unique_id_uses_count=False |
| ): |
| """Register an event handler to be called last for an event. |
| |
| All event handlers registered with ``register_last()`` will be called |
| after handlers registered with ``register_first()`` and ``register()``. |
| |
| """ |
| self._verify_and_register( |
| event_name, |
| handler, |
| unique_id, |
| register_method=self._register_last, |
| unique_id_uses_count=unique_id_uses_count, |
| ) |
|
|
| def _verify_and_register( |
| self, |
| event_name, |
| handler, |
| unique_id, |
| register_method, |
| unique_id_uses_count, |
| ): |
| self._verify_is_callable(handler) |
| self._verify_accept_kwargs(handler) |
| register_method(event_name, handler, unique_id, unique_id_uses_count) |
|
|
| def unregister( |
| self, |
| event_name, |
| handler=None, |
| unique_id=None, |
| unique_id_uses_count=False, |
| ): |
| """Unregister an event handler for a given event. |
| |
| If no ``unique_id`` was given during registration, then the |
| first instance of the event handler is removed (if the event |
| handler has been registered multiple times). |
| |
| """ |
| pass |
|
|
| def _verify_is_callable(self, func): |
| if not callable(func): |
| raise ValueError(f"Event handler {func} must be callable.") |
|
|
| def _verify_accept_kwargs(self, func): |
| """Verifies a callable accepts kwargs |
| |
| :type func: callable |
| :param func: A callable object. |
| |
| :returns: True, if ``func`` accepts kwargs, otherwise False. |
| |
| """ |
| try: |
| if not accepts_kwargs(func): |
| raise ValueError( |
| f"Event handler {func} must accept keyword " |
| f"arguments (**kwargs)" |
| ) |
| except TypeError: |
| return False |
|
|
|
|
| class HierarchicalEmitter(BaseEventHooks): |
| def __init__(self): |
| |
| |
| |
| self._lookup_cache = {} |
| self._handlers = _PrefixTrie() |
| |
| |
| self._unique_id_handlers = {} |
|
|
| def _emit(self, event_name, kwargs, stop_on_response=False): |
| """ |
| Emit an event with optional keyword arguments. |
| |
| :type event_name: string |
| :param event_name: Name of the event |
| :type kwargs: dict |
| :param kwargs: Arguments to be passed to the handler functions. |
| :type stop_on_response: boolean |
| :param stop_on_response: Whether to stop on the first non-None |
| response. If False, then all handlers |
| will be called. This is especially useful |
| to handlers which mutate data and then |
| want to stop propagation of the event. |
| :rtype: list |
| :return: List of (handler, response) tuples from all processed |
| handlers. |
| """ |
| responses = [] |
| |
| |
| handlers_to_call = self._lookup_cache.get(event_name) |
| if handlers_to_call is None: |
| handlers_to_call = self._handlers.prefix_search(event_name) |
| self._lookup_cache[event_name] = handlers_to_call |
| elif not handlers_to_call: |
| |
| |
| |
| return [] |
| kwargs['event_name'] = event_name |
| responses = [] |
| for handler in handlers_to_call: |
| logger.debug('Event %s: calling handler %s', event_name, handler) |
| response = handler(**kwargs) |
| responses.append((handler, response)) |
| if stop_on_response and response is not None: |
| return responses |
| return responses |
|
|
| def emit(self, event_name, **kwargs): |
| """ |
| Emit an event by name with arguments passed as keyword args. |
| |
| >>> responses = emitter.emit( |
| ... 'my-event.service.operation', arg1='one', arg2='two') |
| |
| :rtype: list |
| :return: List of (handler, response) tuples from all processed |
| handlers. |
| """ |
| return self._emit(event_name, kwargs) |
|
|
| def emit_until_response(self, event_name, **kwargs): |
| """ |
| Emit an event by name with arguments passed as keyword args, |
| until the first non-``None`` response is received. This |
| method prevents subsequent handlers from being invoked. |
| |
| >>> handler, response = emitter.emit_until_response( |
| 'my-event.service.operation', arg1='one', arg2='two') |
| |
| :rtype: tuple |
| :return: The first (handler, response) tuple where the response |
| is not ``None``, otherwise (``None``, ``None``). |
| """ |
| responses = self._emit(event_name, kwargs, stop_on_response=True) |
| if responses: |
| return responses[-1] |
| else: |
| return (None, None) |
|
|
| def _register( |
| self, event_name, handler, unique_id=None, unique_id_uses_count=False |
| ): |
| self._register_section( |
| event_name, |
| handler, |
| unique_id, |
| unique_id_uses_count, |
| section=_MIDDLE, |
| ) |
|
|
| def _register_first( |
| self, event_name, handler, unique_id=None, unique_id_uses_count=False |
| ): |
| self._register_section( |
| event_name, |
| handler, |
| unique_id, |
| unique_id_uses_count, |
| section=_FIRST, |
| ) |
|
|
| def _register_last( |
| self, event_name, handler, unique_id, unique_id_uses_count=False |
| ): |
| self._register_section( |
| event_name, handler, unique_id, unique_id_uses_count, section=_LAST |
| ) |
|
|
| def _register_section( |
| self, event_name, handler, unique_id, unique_id_uses_count, section |
| ): |
| if unique_id is not None: |
| if unique_id in self._unique_id_handlers: |
| |
| |
| count = self._unique_id_handlers[unique_id].get('count', None) |
| if unique_id_uses_count: |
| if not count: |
| raise ValueError( |
| f"Initial registration of unique id {unique_id} was " |
| "specified to use a counter. Subsequent register " |
| "calls to unique id must specify use of a counter " |
| "as well." |
| ) |
| else: |
| self._unique_id_handlers[unique_id]['count'] += 1 |
| else: |
| if count: |
| raise ValueError( |
| f"Initial registration of unique id {unique_id} was " |
| "specified to not use a counter. Subsequent " |
| "register calls to unique id must specify not to " |
| "use a counter as well." |
| ) |
| return |
| else: |
| |
| |
| |
| self._handlers.append_item( |
| event_name, handler, section=section |
| ) |
| unique_id_handler_item = {'handler': handler} |
| if unique_id_uses_count: |
| unique_id_handler_item['count'] = 1 |
| self._unique_id_handlers[unique_id] = unique_id_handler_item |
| else: |
| self._handlers.append_item(event_name, handler, section=section) |
| |
| |
| self._lookup_cache = {} |
|
|
| def unregister( |
| self, |
| event_name, |
| handler=None, |
| unique_id=None, |
| unique_id_uses_count=False, |
| ): |
| if unique_id is not None: |
| try: |
| count = self._unique_id_handlers[unique_id].get('count', None) |
| except KeyError: |
| |
| |
| return |
| if unique_id_uses_count: |
| if count is None: |
| raise ValueError( |
| f"Initial registration of unique id {unique_id} was specified to " |
| "use a counter. Subsequent unregister calls to unique " |
| "id must specify use of a counter as well." |
| ) |
| elif count == 1: |
| handler = self._unique_id_handlers.pop(unique_id)[ |
| 'handler' |
| ] |
| else: |
| self._unique_id_handlers[unique_id]['count'] -= 1 |
| return |
| else: |
| if count: |
| raise ValueError( |
| f"Initial registration of unique id {unique_id} was specified " |
| "to not use a counter. Subsequent unregister calls " |
| "to unique id must specify not to use a counter as " |
| "well." |
| ) |
| handler = self._unique_id_handlers.pop(unique_id)['handler'] |
| try: |
| self._handlers.remove_item(event_name, handler) |
| self._lookup_cache = {} |
| except ValueError: |
| pass |
|
|
| def __copy__(self): |
| new_instance = self.__class__() |
| new_state = self.__dict__.copy() |
| new_state['_handlers'] = copy.copy(self._handlers) |
| new_state['_unique_id_handlers'] = copy.copy(self._unique_id_handlers) |
| new_instance.__dict__ = new_state |
| return new_instance |
|
|
|
|
| class EventAliaser(BaseEventHooks): |
| def __init__(self, event_emitter, event_aliases=None): |
| self._event_aliases = event_aliases |
| if event_aliases is None: |
| self._event_aliases = EVENT_ALIASES |
| self._alias_name_cache = {} |
| self._emitter = event_emitter |
|
|
| def emit(self, event_name, **kwargs): |
| aliased_event_name = self._alias_event_name(event_name) |
| return self._emitter.emit(aliased_event_name, **kwargs) |
|
|
| def emit_until_response(self, event_name, **kwargs): |
| aliased_event_name = self._alias_event_name(event_name) |
| return self._emitter.emit_until_response(aliased_event_name, **kwargs) |
|
|
| def register( |
| self, event_name, handler, unique_id=None, unique_id_uses_count=False |
| ): |
| aliased_event_name = self._alias_event_name(event_name) |
| return self._emitter.register( |
| aliased_event_name, handler, unique_id, unique_id_uses_count |
| ) |
|
|
| def register_first( |
| self, event_name, handler, unique_id=None, unique_id_uses_count=False |
| ): |
| aliased_event_name = self._alias_event_name(event_name) |
| return self._emitter.register_first( |
| aliased_event_name, handler, unique_id, unique_id_uses_count |
| ) |
|
|
| def register_last( |
| self, event_name, handler, unique_id=None, unique_id_uses_count=False |
| ): |
| aliased_event_name = self._alias_event_name(event_name) |
| return self._emitter.register_last( |
| aliased_event_name, handler, unique_id, unique_id_uses_count |
| ) |
|
|
| def unregister( |
| self, |
| event_name, |
| handler=None, |
| unique_id=None, |
| unique_id_uses_count=False, |
| ): |
| aliased_event_name = self._alias_event_name(event_name) |
| return self._emitter.unregister( |
| aliased_event_name, handler, unique_id, unique_id_uses_count |
| ) |
|
|
| def _alias_event_name(self, event_name): |
| if event_name in self._alias_name_cache: |
| return self._alias_name_cache[event_name] |
|
|
| for old_part, new_part in self._event_aliases.items(): |
| |
| |
| |
| |
| |
| event_parts = event_name.split('.') |
| if '.' not in old_part: |
| try: |
| |
| |
| event_parts[event_parts.index(old_part)] = new_part |
| except ValueError: |
| continue |
|
|
| |
| |
| elif old_part in event_name: |
| old_parts = old_part.split('.') |
| self._replace_subsection(event_parts, old_parts, new_part) |
| else: |
| continue |
|
|
| new_name = '.'.join(event_parts) |
| logger.debug( |
| "Changing event name from %s to %s", event_name, new_name |
| ) |
| self._alias_name_cache[event_name] = new_name |
| return new_name |
|
|
| self._alias_name_cache[event_name] = event_name |
| return event_name |
|
|
| def _replace_subsection(self, sections, old_parts, new_part): |
| for i in range(len(sections)): |
| if ( |
| sections[i] == old_parts[0] |
| and sections[i : i + len(old_parts)] == old_parts |
| ): |
| sections[i : i + len(old_parts)] = [new_part] |
| return |
|
|
| def __copy__(self): |
| return self.__class__( |
| copy.copy(self._emitter), copy.copy(self._event_aliases) |
| ) |
|
|
|
|
| class _PrefixTrie: |
| """Specialized prefix trie that handles wildcards. |
| |
| The prefixes in this case are based on dot separated |
| names so 'foo.bar.baz' is:: |
| |
| foo -> bar -> baz |
| |
| Wildcard support just means that having a key such as 'foo.bar.*.baz' will |
| be matched with a call to ``get_items(key='foo.bar.ANYTHING.baz')``. |
| |
| You can think of this prefix trie as the equivalent as defaultdict(list), |
| except that it can do prefix searches: |
| |
| foo.bar.baz -> A |
| foo.bar -> B |
| foo -> C |
| |
| Calling ``get_items('foo.bar.baz')`` will return [A + B + C], from |
| most specific to least specific. |
| |
| """ |
|
|
| def __init__(self): |
| |
| |
| |
| |
| |
| self._root = {'chunk': None, 'children': {}, 'values': None} |
|
|
| def append_item(self, key, value, section=_MIDDLE): |
| """Add an item to a key. |
| |
| If a value is already associated with that key, the new |
| value is appended to the list for the key. |
| """ |
| key_parts = key.split('.') |
| current = self._root |
| for part in key_parts: |
| if part not in current['children']: |
| new_child = {'chunk': part, 'values': None, 'children': {}} |
| current['children'][part] = new_child |
| current = new_child |
| else: |
| current = current['children'][part] |
| if current['values'] is None: |
| current['values'] = NodeList([], [], []) |
| current['values'][section].append(value) |
|
|
| def prefix_search(self, key): |
| """Collect all items that are prefixes of key. |
| |
| Prefix in this case are delineated by '.' characters so |
| 'foo.bar.baz' is a 3 chunk sequence of 3 "prefixes" ( |
| "foo", "bar", and "baz"). |
| |
| """ |
| collected = deque() |
| key_parts = key.split('.') |
| current = self._root |
| self._get_items(current, key_parts, collected, 0) |
| return collected |
|
|
| def _get_items(self, starting_node, key_parts, collected, starting_index): |
| stack = [(starting_node, starting_index)] |
| key_parts_len = len(key_parts) |
| |
| |
| |
| |
| while stack: |
| current_node, index = stack.pop() |
| if current_node['values']: |
| |
| |
| |
| |
| |
| |
| |
| node_list = current_node['values'] |
| complete_order = ( |
| node_list.first + node_list.middle + node_list.last |
| ) |
| collected.extendleft(reversed(complete_order)) |
| if not index == key_parts_len: |
| children = current_node['children'] |
| directs = children.get(key_parts[index]) |
| wildcard = children.get('*') |
| next_index = index + 1 |
| if wildcard is not None: |
| stack.append((wildcard, next_index)) |
| if directs is not None: |
| stack.append((directs, next_index)) |
|
|
| def remove_item(self, key, value): |
| """Remove an item associated with a key. |
| |
| If the value is not associated with the key a ``ValueError`` |
| will be raised. If the key does not exist in the trie, a |
| ``ValueError`` will be raised. |
| |
| """ |
| key_parts = key.split('.') |
| current = self._root |
| self._remove_item(current, key_parts, value, index=0) |
|
|
| def _remove_item(self, current_node, key_parts, value, index): |
| if current_node is None: |
| return |
| elif index < len(key_parts): |
| next_node = current_node['children'].get(key_parts[index]) |
| if next_node is not None: |
| self._remove_item(next_node, key_parts, value, index + 1) |
| if index == len(key_parts) - 1: |
| node_list = next_node['values'] |
| if value in node_list.first: |
| node_list.first.remove(value) |
| elif value in node_list.middle: |
| node_list.middle.remove(value) |
| elif value in node_list.last: |
| node_list.last.remove(value) |
| if not next_node['children'] and not next_node['values']: |
| |
| |
| |
| |
| del current_node['children'][key_parts[index]] |
| else: |
| raise ValueError(f"key is not in trie: {'.'.join(key_parts)}") |
|
|
| def __copy__(self): |
| |
| |
| |
| |
| new_copy = self.__class__() |
| copied_attrs = self._recursive_copy(self.__dict__) |
| new_copy.__dict__ = copied_attrs |
| return new_copy |
|
|
| def _recursive_copy(self, node): |
| |
| |
| |
| copied_node = {} |
| for key, value in node.items(): |
| if isinstance(value, NodeList): |
| copied_node[key] = copy.copy(value) |
| elif isinstance(value, dict): |
| copied_node[key] = self._recursive_copy(value) |
| else: |
| copied_node[key] = value |
| return copied_node |
|
|