File size: 2,248 Bytes
1905805 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | from typing import Dict, Any, AsyncGenerator
import time
import logging
from .error import StartActiveError, CloseInactiveError, UsedInactiveError
class Operation:
def __init__(self, op_type: str, op_id: str):
self.op_type = op_type
self.op_id = op_id
self.active = False
async def __call__(self, chunk_in: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
'''Generates a stream of chunks similar to chunk_in but augmented with new data'''
if not self.active: raise UsedInactiveError(self.op_type, self.op_id)
start_time = time.perf_counter()
kwargs = await self._parse_chunk(chunk_in)
async for chunk_out in self._generate(**kwargs):
# yield chunk_in | chunk_out
yield chunk_out
end_time = time.perf_counter()
logging.info("{} operation {} completed in {} ms".format(self.op_type, self.op_id, (end_time-start_time)*1000))
## TO BE OVERRIDEN ####
async def start(self) -> None:
'''General setup needed to start generated'''
if self.active: raise StartActiveError(self.op_type, self.op_id)
logging.info("Starting {} operation {}".format(self.op_type, self.op_id))
self.active = True
async def close(self) -> None:
'''Clean up resources before unloading'''
if not self.active: raise CloseInactiveError(self.op_type, self.op_id)
logging.info("Closing {} operation {}".format(self.op_type, self.op_id))
self.active = False
## TO BE IMPLEMENTED ####
async def configure(self, config_d: Dict[str, Any]):
'''Configure and validate operation-specific configuration'''
raise NotImplementedError
async def get_configuration(self) -> Dict[str, Any]:
'''Returns values of configurable fields'''
raise NotImplementedError
async def _parse_chunk(self, chunk_in: Dict[str, Any]) -> Dict[str, Any]:
'''Extract information from input for use in _generate'''
raise NotImplementedError
async def _generate(self, **kwargs) -> AsyncGenerator[Dict[str, Any], None]:
'''Generate a output stream'''
raise NotImplementedError |