from typing import Dict, Any from aiflows.base_flows import CircularFlow from aiflows.utils import logging from abc import ABC, abstractmethod logging.set_verbosity_debug() log = logging.get_logger(__name__) class ContentWriterFlow(CircularFlow, ABC): @abstractmethod def _on_reach_max_round(self): """ should update flow state dictionary about the output variables and status. """ pass @abstractmethod @CircularFlow.output_msg_payload_processor def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: """ 1. Writing content to file; 2. Finish and early exit. """ pass