import json from copy import deepcopy from typing import Any, Dict from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow class PlanGeneratorAtomicFlow(ChatAtomicFlow): """Generates one function with docstrings to finish the given goal (from the controller). """ def __init__(self, **kwargs): super().__init__(**kwargs) self.hint_for_model = """ Make sure your response is in the following format: Response Format: { "plan": "A step-by-step plan to finish the given goal", } """ @classmethod def instantiate_from_config(cls, config): flow_config = deepcopy(config) kwargs = {"flow_config": flow_config} # ~~~ Set up prompts ~~~ kwargs.update(cls._set_up_prompts(flow_config)) # ~~~ Set up backend ~~~ kwargs.update(cls._set_up_backend(flow_config)) # ~~~ Instantiate flow ~~~ return cls(**kwargs) def _update_prompts_and_input(self, input_data: Dict[str, Any]): if 'goal' in input_data: input_data['goal'] += self.hint_for_model def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: self._update_prompts_and_input(input_data) api_output = super().run(input_data)["api_output"].strip() try: response = json.loads(api_output) return response except json.decoder.JSONDecodeError: new_input_data = input_data.copy() new_input_data['goal'] += ("The previous respond cannot be parsed with json.loads. " "Make sure your next response is in JSON format.") new_api_output = super().run(new_input_data)["api_output"].strip() return json.loads(new_api_output)