Spaces:
Running
Running
| import pickle | |
| from abc import abstractmethod, ABCMeta | |
| from collections import deque | |
| from threading import Lock | |
| from typing import TypeVar, Iterable, List, Tuple, Union | |
| from .time_ctl import BaseTime | |
| _Tp = TypeVar('_Tp') | |
| class RangedData(metaclass=ABCMeta): | |
| """ | |
| Overview: | |
| A data structure that can store data for a period of time. | |
| Interfaces: | |
| ``__init__``, ``append``, ``extend``, ``current``, ``history``, ``expire``, ``__bool__``, ``_get_time``. | |
| Properties: | |
| - expire (:obj:`float`): The expire time. | |
| """ | |
| def __init__(self, expire: float, use_pickle: bool = False): | |
| """ | |
| Overview: | |
| Initialize the RangedData object. | |
| Arguments: | |
| - expire (:obj:`float`): The expire time of the data. | |
| - use_pickle (:obj:`bool`): Whether to use pickle to serialize the data. | |
| """ | |
| self.__expire = expire | |
| self.__use_pickle = use_pickle | |
| self.__check_expire() | |
| self.__data_max_id = 0 | |
| self.__data_items = {} | |
| self.__data_lock = Lock() | |
| self.__last_item = None | |
| self.__queue = deque() | |
| self.__lock = Lock() | |
| def __check_expire(self): | |
| """ | |
| Overview: | |
| Check the expire time. | |
| """ | |
| if isinstance(self.__expire, (int, float)): | |
| if self.__expire <= 0: | |
| raise ValueError( | |
| "Expire should be greater than 0, but {actual} found.".format(actual=repr(self.__expire)) | |
| ) | |
| else: | |
| raise TypeError( | |
| 'Expire should be int or float, but {actual} found.'.format(actual=type(self.__expire).__name__) | |
| ) | |
| def __registry_data_item(self, data: _Tp) -> int: | |
| """ | |
| Overview: | |
| Registry the data item. | |
| Arguments: | |
| - data (:obj:`_Tp`): The data item. | |
| """ | |
| with self.__data_lock: | |
| self.__data_max_id += 1 | |
| if self.__use_pickle: | |
| self.__data_items[self.__data_max_id] = pickle.dumps(data) | |
| else: | |
| self.__data_items[self.__data_max_id] = data | |
| return self.__data_max_id | |
| def __get_data_item(self, data_id: int) -> _Tp: | |
| """ | |
| Overview: | |
| Get the data item. | |
| Arguments: | |
| - data_id (:obj:`int`): The data id. | |
| """ | |
| with self.__data_lock: | |
| if self.__use_pickle: | |
| return pickle.loads(self.__data_items[data_id]) | |
| else: | |
| return self.__data_items[data_id] | |
| def __remove_data_item(self, data_id: int): | |
| """ | |
| Overview: | |
| Remove the data item. | |
| Arguments: | |
| - data_id (:obj:`int`): The data id. | |
| """ | |
| with self.__data_lock: | |
| del self.__data_items[data_id] | |
| def __check_time(self, time_: float): | |
| """ | |
| Overview: | |
| Check the time. | |
| Arguments: | |
| - time_ (:obj:`float`): The time. | |
| """ | |
| if self.__queue: | |
| _time, _ = self.__queue[-1] | |
| if time_ < _time: | |
| raise ValueError( | |
| "Time {time} invalid for descending from last time {last_time}".format( | |
| time=repr(time_), last_time=repr(_time) | |
| ) | |
| ) | |
| def __append_item(self, time_: float, data: _Tp): | |
| """ | |
| Overview: | |
| Append the data item. | |
| Arguments: | |
| - time_ (:obj:`float`): The time. | |
| - data (:obj:`_Tp`): The data item. | |
| """ | |
| self.__queue.append((time_, self.__registry_data_item(data))) | |
| def __flush_history(self): | |
| """ | |
| Overview: | |
| Flush the history data. | |
| """ | |
| _time = self._get_time() | |
| _limit_time = _time - self.__expire | |
| while self.__queue: | |
| _head_time, _head_id = self.__queue.popleft() | |
| if _head_time >= _limit_time: | |
| self.__queue.appendleft((_head_time, _head_id)) | |
| break | |
| else: | |
| if self.__last_item: | |
| _last_time, _last_id = self.__last_item | |
| self.__remove_data_item(_last_id) | |
| self.__last_item = (_head_time, _head_id) | |
| def __append(self, time_: float, data: _Tp): | |
| """ | |
| Overview: | |
| Append the data. | |
| """ | |
| self.__check_time(time_) | |
| self.__append_item(time_, data) | |
| self.__flush_history() | |
| def __current(self): | |
| """ | |
| Overview: | |
| Get the current data. | |
| """ | |
| if self.__queue: | |
| _tail_time, _tail_id = self.__queue.pop() | |
| self.__queue.append((_tail_time, _tail_id)) | |
| return self.__get_data_item(_tail_id) | |
| elif self.__last_item: | |
| _last_time, _last_id = self.__last_item | |
| return self.__get_data_item(_last_id) | |
| else: | |
| raise ValueError("This range is empty.") | |
| def __history_yield(self): | |
| """ | |
| Overview: | |
| Yield the history data. | |
| """ | |
| _time = self._get_time() | |
| _limit_time = _time - self.__expire | |
| _latest_time, _latest_id = None, None | |
| if self.__last_item: | |
| _latest_time, _latest_id = _last_time, _last_id = self.__last_item | |
| yield max(_last_time, _limit_time), self.__get_data_item(_last_id) | |
| for _item_time, _item_id in self.__queue: | |
| _latest_time, _latest_id = _item_time, _item_id | |
| yield _item_time, self.__get_data_item(_item_id) | |
| if _latest_time is not None and _latest_time < _time: | |
| yield _time, self.__get_data_item(_latest_id) | |
| def __history(self): | |
| """ | |
| Overview: | |
| Get the history data. | |
| """ | |
| return list(self.__history_yield()) | |
| def append(self, data: _Tp): | |
| """ | |
| Overview: | |
| Append the data. | |
| """ | |
| with self.__lock: | |
| self.__flush_history() | |
| _time = self._get_time() | |
| self.__append(_time, data) | |
| return self | |
| def extend(self, iter_: Iterable[_Tp]): | |
| """ | |
| Overview: | |
| Extend the data. | |
| """ | |
| with self.__lock: | |
| self.__flush_history() | |
| _time = self._get_time() | |
| for item in iter_: | |
| self.__append(_time, item) | |
| return self | |
| def current(self) -> _Tp: | |
| """ | |
| Overview: | |
| Get the current data. | |
| """ | |
| with self.__lock: | |
| self.__flush_history() | |
| return self.__current() | |
| def history(self) -> List[Tuple[Union[int, float], _Tp]]: | |
| """ | |
| Overview: | |
| Get the history data. | |
| """ | |
| with self.__lock: | |
| self.__flush_history() | |
| return self.__history() | |
| def expire(self) -> float: | |
| """ | |
| Overview: | |
| Get the expire time. | |
| """ | |
| with self.__lock: | |
| self.__flush_history() | |
| return self.__expire | |
| def __bool__(self): | |
| """ | |
| Overview: | |
| Check whether the range is empty. | |
| """ | |
| with self.__lock: | |
| self.__flush_history() | |
| return not not (self.__queue or self.__last_item) | |
| def _get_time(self) -> float: | |
| """ | |
| Overview: | |
| Get the current time. | |
| """ | |
| raise NotImplementedError | |
| class TimeRangedData(RangedData): | |
| """ | |
| Overview: | |
| A data structure that can store data for a period of time. | |
| Interfaces: | |
| ``__init__``, ``_get_time``, ``append``, ``extend``, ``current``, ``history``, ``expire``, ``__bool__``. | |
| Properties: | |
| - time (:obj:`BaseTime`): The time. | |
| - expire (:obj:`float`): The expire time. | |
| """ | |
| def __init__(self, time_: BaseTime, expire: float): | |
| """ | |
| Overview: | |
| Initialize the TimeRangedData object. | |
| Arguments: | |
| - time_ (:obj:`BaseTime`): The time. | |
| - expire (:obj:`float`): The expire time. | |
| """ | |
| RangedData.__init__(self, expire) | |
| self.__time = time_ | |
| def _get_time(self) -> float: | |
| """ | |
| Overview: | |
| Get the current time. | |
| """ | |
| return self.__time.time() | |
| def time(self): | |
| """ | |
| Overview: | |
| Get the time. | |
| """ | |
| return self.__time | |