|
|
|
|
|
|
|
|
from typing import List, Dict, Callable, Union |
|
|
from enum import Enum |
|
|
import datetime |
|
|
|
|
|
class Aggregation(Enum): |
|
|
VALUE = ... |
|
|
MEAN = ... |
|
|
COUNT = ... |
|
|
SUM = ... |
|
|
MAX = ... |
|
|
MIN = ... |
|
|
|
|
|
class Stat: |
|
|
name: str |
|
|
count: int |
|
|
def __init__( |
|
|
self, name: str, aggregations: List[Aggregation], window_size: int, |
|
|
max_samples: int = -1, |
|
|
) -> None: ... |
|
|
def add(self, v: float) -> None: ... |
|
|
def get(self) -> Dict[Aggregation, float]: ... |
|
|
|
|
|
class Event: |
|
|
name: str |
|
|
timestamp: datetime.datetime |
|
|
data: Dict[str, Union[int, float, bool, str]] |
|
|
def __init__( |
|
|
self, |
|
|
name: str, |
|
|
timestamp: datetime.datetime, |
|
|
data: Dict[str, Union[int, float, bool, str]], |
|
|
) -> None: ... |
|
|
|
|
|
def log_event(e: Event) -> None: ... |
|
|
|
|
|
class EventHandlerHandle: ... |
|
|
|
|
|
def register_event_handler(handler: Callable[[Event], None]) -> EventHandlerHandle: ... |
|
|
def unregister_event_handler(handle: EventHandlerHandle) -> None: ... |
|
|
|