File size: 8,187 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import os
from ..core.module import BaseModule
from typing import Optional, Literal, Dict, Any, List
from pydantic import Field, BaseModel
import json
from dotenv import load_dotenv
import time

from ..models import OpenAILLM, OpenAILLMConfig, BaseLLM
from ..models.model_configs import LLMConfig
from ..prompts.workflow.workflow_editor import WORKFLOW_EDITOR_PROMPT
from ..core.logging import logger

load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")


class MockLLMConfig(LLMConfig):
    """Mock LLM configuration for testing purposes"""
    llm_type: str = "MockLLM"
    model: str = "mock-model"


class MockLLM(BaseLLM):
    """Mock LLM implementation for testing purposes that passes pydantic type validation"""
    
    def __init__(self, config: MockLLMConfig = None, **kwargs):
        if config is None:
            config = MockLLMConfig(
                llm_type="MockLLM",
                model="mock-model",
                output_response=True
            )
        super().__init__(config, **kwargs)
    
    def init_model(self):
        """Initialize the mock model (no-op)"""
        pass
    
    def formulate_messages(self, prompts: List[str], system_messages: Optional[List[str]] = None) -> List[List[dict]]:
        """Mock implementation of formulate_messages"""
        result = []
        for prompt in prompts:
            messages = []
            if system_messages:
                for sys_msg in system_messages:
                    messages.append({"role": "system", "content": sys_msg})
            messages.append({"role": "user", "content": prompt})
            result.append(messages)
        return result
    
    def single_generate(self, messages: List[dict], **kwargs) -> str:
        """Mock implementation that returns a simple JSON response"""
        return '{"nodes": [], "edges": []}'
    
    def batch_generate(self, batch_messages: List[List[dict]], **kwargs) -> List[str]:
        """Mock implementation for batch generation"""
        return [self.single_generate(messages, **kwargs) for messages in batch_messages]
    
    async def single_generate_async(self, messages: List[dict], **kwargs) -> str:
        """Mock async implementation"""
        return self.single_generate(messages, **kwargs)


def default_llm_config():
    """
    Create default LLM configuration. Uses MockLLM in testing environments 
    or when OPENAI_API_KEY is not available.
    """
    # Check if we're in a testing environment or if API key is missing
    is_testing = (
        os.getenv("PYTEST_CURRENT_TEST") is not None or  # pytest environment
        os.getenv("CI") is not None or  # CI environment
        OPENAI_API_KEY is None or  # no API key
        OPENAI_API_KEY.strip() == ""  # empty API key
    )
    
    if is_testing:
        # Return MockLLM for testing environments
        mock_config = MockLLMConfig(
            llm_type="MockLLM",
            model="mock-model",
            output_response=True
        )
        return MockLLM(mock_config)
    else:
        # Return real OpenAI LLM for production environments
        llm_config = OpenAILLMConfig(
            model="gpt-4o", 
            openai_key=OPENAI_API_KEY, 
            stream=True, 
            output_response=True
        )
        return OpenAILLM(llm_config)

class WorkFlowEditorReturn(BaseModel):
    """
    The return of the workflow editor.
    """
    
    status: Literal["success", "failed", "exceeded_max_retries"] = Field(
        description="The status of the workflow editing operation"
    )
    
    workflow_json: Dict[str, Any] | None = Field(
        description="The workflow JSON structure after editing"
    )
    
    workflow_json_path: str | None = Field(
        description="The file path where the workflow JSON is saved"
    )
    
    error_message: Optional[str] | None = Field(
        default=None,
        description="Error message if the operation failed"
    )

class WorkFlowEditor(BaseModule):
    """
    This is a API oriented version of HITLOutsideConversationAgent, it can be used to edit the workflow json structure but in a interaction-free way.
    Attributes:
        save_dir (str): The directory to save the workflow json file.
        llm (BaseLLM): The LLM model to use for editing the workflow json file.
        max_retries (int): The maximum number of retries to edit the workflow json file.
    """
    save_dir: str
    llm: Optional[BaseLLM] = Field(default=default_llm_config())
    max_retries: Optional[int] = Field(default=3)

    def init_module(self):
        pass

    async def edit_workflow(self, file_path: str, instruction: str, new_file_path: Optional[str] = None):
        """
        optimize or modify the workflow json file according to the instruction, using LLM's ability.
        Args:
            file_path (str): The path to the workflow json file or the file name in the save_dir.
            instruction (str): The instruction to edit the workflow json file.
            new_file_path (Optional[str]): The path to the new workflow json file.
        Returns:
            new_json_path (str): The path to the new workflow json file.
        """
        if new_file_path is None:
            new_file_path = "new_json_for__" + os.path.split(file_path)[-1] + "__" + time.strftime("%Y%m%d_%H%M%S") + ".json"
            new_file_path = os.path.join(self.save_dir, new_file_path)
        else:
            # check if new_file_path is a file name or a path
            path_split = os.path.split(new_file_path)
            if not path_split[0]:
                new_file_path = os.path.join(self.save_dir, new_file_path)
            else:
                if os.path.exists(path_split[0]) and path_split[1][:-5] == ".json":
                    new_file_path = new_file_path
                else:
                    raise FileNotFoundError(f"The directory {path_split[0]} does not exist or the file name is not a json file name.")

        # load the workflow json file
        with open(file_path, "r") as f:
            workflow_json = json.load(f)

        optimization_prompt = WORKFLOW_EDITOR_PROMPT.format(
            current_workflow_json=json.dumps(workflow_json, indent=2, ensure_ascii=False),
            user_advice=instruction
        )
        messages = [
            {"role": "system", "content": "You are a helpful assistant that can optimize the workflow json structure."},
            {"role": "user", "content": optimization_prompt}
        ]
        try:
            response = await self.llm.single_generate_async(messages=messages, response_format={"type": "json_object"})
            # try to parse the LLM response
            optimized_json = json.loads(response)
        except Exception as e:
            logger.error(f"LLM optimization failed: {e}")
            optimized_json = None

        if not optimized_json:
            return WorkFlowEditorReturn(
                status="failed",
                workflow_json=None,
                workflow_json_path=None,
                error_message="LLM optimization failed"
            )
        
        # check workflow json structure
        try:
            from ..workflow.workflow import WorkFlow
            from ..workflow.workflow_graph import WorkFlowGraph

            # create the workflow graph from the json
            graph = WorkFlowGraph.from_dict(optimized_json)

            # create the workflow instance
            workflow = WorkFlow(graph=graph, llm=self.llm)
        except Exception as e:
            logger.error(f"Workflow json structure check failed: {e}")
            return WorkFlowEditorReturn(
                status="failed",
                workflow_json=None,
                workflow_json_path=None,
                error_message="Workflow json structure check failed"
            )
        del workflow

        # save the workflow json file
        with open(new_file_path, "w") as f:
            json.dump(optimized_json, f, indent=2, ensure_ascii=False)
        
        return WorkFlowEditorReturn(
            status="success",
            workflow_json=optimized_json,
            workflow_json_path=new_file_path,
            error_message=None
        )