| from typing import Dict, Any | |
| from aiflows.base_flows import CircularFlow | |
| from aiflows.utils import logging | |
| from abc import ABC, abstractmethod | |
| logging.set_verbosity_debug() | |
| log = logging.get_logger(__name__) | |
| class ContentWriterFlow(CircularFlow, ABC): | |
| def _on_reach_max_round(self): | |
| """ | |
| should update flow state dictionary about the output variables and status. | |
| """ | |
| pass | |
| def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: | |
| """ | |
| 1. Writing content to file; | |
| 2. Finish and early exit. | |
| """ | |
| pass | |