| from typing import Dict, Any, AsyncGenerator |
| import time |
| import logging |
|
|
| from .error import StartActiveError, CloseInactiveError, UsedInactiveError |
|
|
| class Operation: |
| def __init__(self, op_type: str, op_id: str): |
| self.op_type = op_type |
| self.op_id = op_id |
| |
| self.active = False |
| |
| async def __call__(self, chunk_in: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]: |
| '''Generates a stream of chunks similar to chunk_in but augmented with new data''' |
| if not self.active: raise UsedInactiveError(self.op_type, self.op_id) |
| start_time = time.perf_counter() |
| |
| kwargs = await self._parse_chunk(chunk_in) |
| |
| async for chunk_out in self._generate(**kwargs): |
| |
| yield chunk_out |
| end_time = time.perf_counter() |
| logging.info("{} operation {} completed in {} ms".format(self.op_type, self.op_id, (end_time-start_time)*1000)) |
| |
| |
| |
| async def start(self) -> None: |
| '''General setup needed to start generated''' |
| if self.active: raise StartActiveError(self.op_type, self.op_id) |
| logging.info("Starting {} operation {}".format(self.op_type, self.op_id)) |
| self.active = True |
| |
| async def close(self) -> None: |
| '''Clean up resources before unloading''' |
| if not self.active: raise CloseInactiveError(self.op_type, self.op_id) |
| logging.info("Closing {} operation {}".format(self.op_type, self.op_id)) |
| self.active = False |
| |
| |
| async def configure(self, config_d: Dict[str, Any]): |
| '''Configure and validate operation-specific configuration''' |
| raise NotImplementedError |
| |
| async def get_configuration(self) -> Dict[str, Any]: |
| '''Returns values of configurable fields''' |
| raise NotImplementedError |
| |
| async def _parse_chunk(self, chunk_in: Dict[str, Any]) -> Dict[str, Any]: |
| '''Extract information from input for use in _generate''' |
| raise NotImplementedError |
| |
| async def _generate(self, **kwargs) -> AsyncGenerator[Dict[str, Any], None]: |
| '''Generate a output stream''' |
| raise NotImplementedError |