|
|
from typing import Dict, Any |
|
|
|
|
|
from flows.base_flows import SequentialFlow |
|
|
from flows.utils import logging |
|
|
from .PlanGeneratorAtomicFlow import PlanGeneratorAtomicFlow |
|
|
|
|
|
logging.set_verbosity_debug() |
|
|
log = logging.get_logger(__name__) |
|
|
|
|
|
class WritePlanFlow(SequentialFlow): |
|
|
@SequentialFlow.output_msg_payload_processor |
|
|
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: PlanGeneratorAtomicFlow) -> Dict[str, Any]: |
|
|
if "plan" not in output_payload: |
|
|
return { |
|
|
"EARLY_EXIT": True, |
|
|
"plan_writer_output": output_payload |
|
|
} |
|
|
else: |
|
|
return output_payload |