import os from ..core.module import BaseModule from typing import Optional, Literal, Dict, Any, List from pydantic import Field, BaseModel import json from dotenv import load_dotenv import time from ..models import OpenAILLM, OpenAILLMConfig, BaseLLM from ..models.model_configs import LLMConfig from ..prompts.workflow.workflow_editor import WORKFLOW_EDITOR_PROMPT from ..core.logging import logger load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") class MockLLMConfig(LLMConfig): """Mock LLM configuration for testing purposes""" llm_type: str = "MockLLM" model: str = "mock-model" class MockLLM(BaseLLM): """Mock LLM implementation for testing purposes that passes pydantic type validation""" def __init__(self, config: MockLLMConfig = None, **kwargs): if config is None: config = MockLLMConfig( llm_type="MockLLM", model="mock-model", output_response=True ) super().__init__(config, **kwargs) def init_model(self): """Initialize the mock model (no-op)""" pass def formulate_messages(self, prompts: List[str], system_messages: Optional[List[str]] = None) -> List[List[dict]]: """Mock implementation of formulate_messages""" result = [] for prompt in prompts: messages = [] if system_messages: for sys_msg in system_messages: messages.append({"role": "system", "content": sys_msg}) messages.append({"role": "user", "content": prompt}) result.append(messages) return result def single_generate(self, messages: List[dict], **kwargs) -> str: """Mock implementation that returns a simple JSON response""" return '{"nodes": [], "edges": []}' def batch_generate(self, batch_messages: List[List[dict]], **kwargs) -> List[str]: """Mock implementation for batch generation""" return [self.single_generate(messages, **kwargs) for messages in batch_messages] async def single_generate_async(self, messages: List[dict], **kwargs) -> str: """Mock async implementation""" return self.single_generate(messages, **kwargs) def default_llm_config(): """ Create default LLM configuration. Uses MockLLM in testing environments or when OPENAI_API_KEY is not available. """ # Check if we're in a testing environment or if API key is missing is_testing = ( os.getenv("PYTEST_CURRENT_TEST") is not None or # pytest environment os.getenv("CI") is not None or # CI environment OPENAI_API_KEY is None or # no API key OPENAI_API_KEY.strip() == "" # empty API key ) if is_testing: # Return MockLLM for testing environments mock_config = MockLLMConfig( llm_type="MockLLM", model="mock-model", output_response=True ) return MockLLM(mock_config) else: # Return real OpenAI LLM for production environments llm_config = OpenAILLMConfig( model="gpt-4o", openai_key=OPENAI_API_KEY, stream=True, output_response=True ) return OpenAILLM(llm_config) class WorkFlowEditorReturn(BaseModel): """ The return of the workflow editor. """ status: Literal["success", "failed", "exceeded_max_retries"] = Field( description="The status of the workflow editing operation" ) workflow_json: Dict[str, Any] | None = Field( description="The workflow JSON structure after editing" ) workflow_json_path: str | None = Field( description="The file path where the workflow JSON is saved" ) error_message: Optional[str] | None = Field( default=None, description="Error message if the operation failed" ) class WorkFlowEditor(BaseModule): """ This is a API oriented version of HITLOutsideConversationAgent, it can be used to edit the workflow json structure but in a interaction-free way. Attributes: save_dir (str): The directory to save the workflow json file. llm (BaseLLM): The LLM model to use for editing the workflow json file. max_retries (int): The maximum number of retries to edit the workflow json file. """ save_dir: str llm: Optional[BaseLLM] = Field(default=default_llm_config()) max_retries: Optional[int] = Field(default=3) def init_module(self): pass async def edit_workflow(self, file_path: str, instruction: str, new_file_path: Optional[str] = None): """ optimize or modify the workflow json file according to the instruction, using LLM's ability. Args: file_path (str): The path to the workflow json file or the file name in the save_dir. instruction (str): The instruction to edit the workflow json file. new_file_path (Optional[str]): The path to the new workflow json file. Returns: new_json_path (str): The path to the new workflow json file. """ if new_file_path is None: new_file_path = "new_json_for__" + os.path.split(file_path)[-1] + "__" + time.strftime("%Y%m%d_%H%M%S") + ".json" new_file_path = os.path.join(self.save_dir, new_file_path) else: # check if new_file_path is a file name or a path path_split = os.path.split(new_file_path) if not path_split[0]: new_file_path = os.path.join(self.save_dir, new_file_path) else: if os.path.exists(path_split[0]) and path_split[1][:-5] == ".json": new_file_path = new_file_path else: raise FileNotFoundError(f"The directory {path_split[0]} does not exist or the file name is not a json file name.") # load the workflow json file with open(file_path, "r") as f: workflow_json = json.load(f) optimization_prompt = WORKFLOW_EDITOR_PROMPT.format( current_workflow_json=json.dumps(workflow_json, indent=2, ensure_ascii=False), user_advice=instruction ) messages = [ {"role": "system", "content": "You are a helpful assistant that can optimize the workflow json structure."}, {"role": "user", "content": optimization_prompt} ] try: response = await self.llm.single_generate_async(messages=messages, response_format={"type": "json_object"}) # try to parse the LLM response optimized_json = json.loads(response) except Exception as e: logger.error(f"LLM optimization failed: {e}") optimized_json = None if not optimized_json: return WorkFlowEditorReturn( status="failed", workflow_json=None, workflow_json_path=None, error_message="LLM optimization failed" ) # check workflow json structure try: from ..workflow.workflow import WorkFlow from ..workflow.workflow_graph import WorkFlowGraph # create the workflow graph from the json graph = WorkFlowGraph.from_dict(optimized_json) # create the workflow instance workflow = WorkFlow(graph=graph, llm=self.llm) except Exception as e: logger.error(f"Workflow json structure check failed: {e}") return WorkFlowEditorReturn( status="failed", workflow_json=None, workflow_json_path=None, error_message="Workflow json structure check failed" ) del workflow # save the workflow json file with open(new_file_path, "w") as f: json.dump(optimized_json, f, indent=2, ensure_ascii=False) return WorkFlowEditorReturn( status="success", workflow_json=optimized_json, workflow_json_path=new_file_path, error_message=None )