File size: 14,341 Bytes
8437d61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import time
from langgraph.types import Command
from langgraph.graph import END
from llm import llm_model
from pydantic import BaseModel,Field
from typing import List,Literal, Optional
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from Prompts import supervisor_prompt,PreprocessingPlanner_prompt,cleaner_prompt
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.agents import AgentExecutor, create_tool_calling_agent
from Toolkit.Tools import python_cleaning_tool,eda_fact_sheet
from Guardrails.Preprocessing import StructuredPlanOutput
from Guardrails.cleaner import CleaningSummary  
from langchain.output_parsers import PydanticOutputParser,OutputFixingParser


class Router(BaseModel):
    next: Literal["PreprocessingPlanner_node","Cleaner_node",END]= Field(description="The next node to route to. Must be one of the available nodes.")
    reasoning: str = Field(description="A short reasoning for the decision made.")

class AgentStateModel(BaseModel):
    messages: Optional[List] = None
    Instructions: Optional[str] = None
    Analysis: Optional[List[dict]] = None
    clean: Optional[List[dict]] = None
    batched_plan: Optional[List[List[dict]]] = None 
    Path: Optional[str] = None
    next: Optional[str] = None
    current_reasoning: Optional[str] = None   
    
class DataAnalystAgent:
    def __init__(self):
        self.llm_model = llm_model

    def supervisor_node(self,state:AgentStateModel) -> Command[Literal["PreprocessingPlanner_node","Cleaner_node", END]]:

        """

        The central router of the workflow.

        It evaluates the current state and the last message to decide the next action.

        This node is designed to be highly token-efficient by creating a lean summary of the state

        instead of passing the full, verbose state objects to the LLM.

        """

        print("**************************below is my state right after entering****************************")

        print(state)

        print("************************** SUPERVISOR: EVALUATING STATE ****************************")

        state_summary = (
        f"Current Workflow Status:\n"
        f"- Analysis Plan Generated: {'Yes' if state.Analysis else 'No'}\n"
        f"- If Cleaning Plan Generated: {'Yes' if state.clean else 'No'}\n"

    )

        messages_for_llm = [
            SystemMessage(content=supervisor_prompt),
            HumanMessage(content=state_summary),
        ]

        if state.messages:
            last_message = state.messages[-1]
            last_message_content = f"Last Event:\nThe last node to run was '{last_message.name}'. It reported the following:\n---\n{last_message.content}\n---"
            messages_for_llm.append(HumanMessage(content=last_message_content))
            print(f"--- Attaching last event from '{last_message.name}' ---")
        else:
            # Handle the very first run where there are no messages
            messages_for_llm.append(HumanMessage(content="Last Event: None. This is the first step of the workflow."))

        messages_for_this_attempt = list(messages_for_llm)

        print("***********************Invoking LLM for routing decision************************")

        parser = PydanticOutputParser(pydantic_object=Router)

        fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=self.llm_model)

        chain = self.llm_model | fixing_parser

        # Add retries
        max_attempts = 3
        attempt = 0
        error_msg = None
        response = None

        while attempt < max_attempts:
            attempt += 1
            print(f"--- Attempt {attempt} ---")

            # Compose messages for this attempt
            messages_for_this_attempt = list(messages_for_llm)

            if error_msg:
                # Inject previous error info to let LLM know what failed
                messages_for_this_attempt.append(HumanMessage(content=f"Previous attempt failed due to: {error_msg}. Please follow the schema strictly: {Router.model_json_schema()}"))

            try:
                response = chain.invoke(messages_for_this_attempt)
                break
            
            except Exception as e:
                error_msg = str(e)
                print(f"--- Error on attempt {attempt}: {error_msg} ---")
                # If last attempt, will exit loop and propagate error

        if response is None:
            # All retries failed, fallback error
            fallback_msg = f"All {max_attempts} attempts failed. Last error: {error_msg}"
            print(f"--- Supervisor node failed ---\n{fallback_msg}")
            return Command(
                goto="END",
                update={
                    "next": "END",
                    "current_reasoning": fallback_msg
                }
            )
        
        goto = response.next
        
        print("********************************this is my goto*************************")
        print(goto)
        
        print("********************************")
        print(response.reasoning)
            
        if goto == "END":
            goto = END 
            
        print("**************************below is my state****************************")
        print(state)
        
        return Command(goto=goto, update={'next': goto, 
                                        'current_reasoning': response.reasoning}
                    )

    def PreprocessingPlanner_node(self, state: AgentStateModel) -> Command[Literal['supervisor']]:

        print("*****************called PreprocessingPlanner node************")

        Instructions = state.Instructions

        parser = PydanticOutputParser(pydantic_object=StructuredPlanOutput)

        fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=self.llm_model)

        task_prompt = (
        f"Find the instructions given by the user here : {Instructions} and follow this {PreprocessingPlanner_prompt} to the letter.modify in this path:{state.Path}"
    )
        print(f"--- Sending this direct task to the agent ---\n{task_prompt}\n---------------------------------------------")

        system_prompt = ChatPromptTemplate.from_messages([
        
        ("system",
         "You are a DataFrame analyzer. Your primary tool is `eda_fact_sheet`. "
         "First, call the tool to get data insights. Then, based on the tool's output, "
         "provide a final answer formatted as a JSON object containing the preprocessing plan and summaries."),
        ("human", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad")        
    ])
       
        Analyzer_agent = create_tool_calling_agent(
            llm=self.llm_model,
            tools=[eda_fact_sheet],
            prompt=system_prompt
        )

        agent_executor = AgentExecutor(
            agent=Analyzer_agent,
            tools=[eda_fact_sheet],
            verbose=True,
            handle_parsing_errors=True,
            return_intermediate_steps=True
        )

        # 5. Wrap execution in a retry loop
        max_attempts = 3
        attempt = 0
        while attempt < max_attempts:
            attempt += 1
            try:
                result = agent_executor.invoke({
                    "input": task_prompt,
                })

                # Try parsing the final output
                final_output_string = result.get("output", "")
                parsed_output: StructuredPlanOutput = fixing_parser.parse(final_output_string)

                # Successfully parsed → extract plan and summary
                plan_dict = {"plan": [step.model_dump() for step in parsed_output.plan]}
                summary_str = f"{parsed_output.summary}\n{parsed_output.details}"

                # Update state and return
                return Command(
                    update={
                        "messages": [
                            AIMessage(content=summary_str, name="PreprocessingPlanner_node")
                        ],
                        "Analysis": [{"final_answer": plan_dict}]
                    },
                    goto="supervisor",
                )

            except Exception as e:
                error_msg = (
                    f"Attempt {attempt} failed due to error: {str(e)}. "
                    f"Please strictly follow the schema: {StructuredPlanOutput.model_json_schema()}"
                )
                print(f"--- Runtime/Parsing error encountered ---\n{error_msg}")
                # Inject error into prompt for next retry
                # Use an f-string to properly embed all variables
                task_prompt = f"The previous attempt failed with this error: {error_msg}. Please correct your tool usage and try again. Here is the original task:\n---\n{task_prompt}"

        # If all attempts fail, fallback to supervisor with error message
        return Command(
            update={
                "messages": [
                    AIMessage(content="Error: The analysis agent failed to produce a valid preprocessing plan after multiple attempts.", 
                            name="Analyzer_node_Error")
                ],
                "Analysis": [{"error": "Parsing failed after retries"}]
            },
            goto="supervisor",
        )
    
    def Cleaner_node(self, state: AgentStateModel) -> Command[Literal['supervisor']]:
        print("***************** called cleaner node ************")
        Path = state.Path
        cleaning_plan = state.Analysis[0]['final_answer']['plan']

        # Batch the determined plan
        batched_plan = [cleaning_plan[i:i + 4] for i in range(0, len(cleaning_plan), 4)]

        # --- Setup agent, parser, and prompt ---
        parser = PydanticOutputParser(pydantic_object=CleaningSummary)
        fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=self.llm_model)
        system_prompt = ChatPromptTemplate.from_messages([
            ("system", "Follow the instructions here : {cleaner_prompt} and in the input to the letter and make the necessary changes to the dataframe."),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])
        Cleaner_agent = create_tool_calling_agent(llm=self.llm_model, tools=[python_cleaning_tool], prompt=system_prompt)
        agent_executor = AgentExecutor(
            agent=Cleaner_agent,
            tools=[python_cleaning_tool],
            verbose=True,
            handle_parsing_errors=True,
            return_intermediate_steps=True
        )

        all_batch_results = []
        final_clean_outputs = []

        # --- Iterate through each batch with retry logic ---
        for i, batch in enumerate(batched_plan, start=1):
            print(f"--- Starting processing for Batch {i} of {len(batched_plan)} ---")
            
            # Initial task prompt for the batch
            task_prompt = (f"Apply the following cleaning plan (batch {i} of {len(batched_plan)}) to the dataset at path: {Path}\nPlan details:\n{str(batch)}")

            max_attempts = 3
            attempt = 0
            batch_successful = False

            while attempt < max_attempts:
                attempt += 1
                print(f"--- Batch {i}, Attempt {attempt} ---")
                
                try:
                    # 1. Invoke the agent
                    result = agent_executor.invoke({
                        "input": task_prompt,
                        "cleaner_prompt": cleaner_prompt,
                    })

                    # 2. Try parsing the final output
                    final_output_string = result.get("output", "")
                    parsed_output: CleaningSummary = fixing_parser.parse(final_output_string)

                    # 3. If successful, store results and break the retry loop
                    all_batch_results.append(parsed_output)
                    final_clean_outputs.append({"final_answer": final_output_string})
                    print(f"--- Batch {i}, Attempt {attempt} successful ---")
                    batch_successful = True
                    time.sleep(5)
                    break # Exit the while loop for this batch

                except Exception as e:
                    # 4. On failure, create an error message for the next attempt
                    error_msg = (
                        f"Attempt {attempt} for batch {i} failed due to error: {str(e)}. "
                        f"Please analyze the error and the plan, then try again. Ensure your final output strictly follows this schema: {CleaningSummary.model_json_schema()}"
                    )
                    print(f"--- Runtime/Parsing error encountered ---\n{error_msg}")
                    
                    # Prepend the error to the prompt for the next retry
                    task_prompt = f"The previous attempt failed with this error: {error_msg}. Please correct your tool usage and try again. Here is the original task for this batch:\n---\n{task_prompt}"

            # 5. If all attempts for this batch fail, exit and report to supervisor
            if not batch_successful:
                error_message = f"Error: Cleaner agent failed on batch {i} after {max_attempts} attempts. Aborting cleaning process."
                return Command(
                    update={
                        "messages": [AIMessage(content=error_message, name="Cleaner_node_Error")],
                        "clean": [{"error": f"Failed on batch {i} after retries"}]
                    },
                    goto="supervisor"
                )

        # --- If all batches succeed, return the final successful result ---
        final_summary = "All cleaning batches completed successfully."
        update_dict = {
            "messages": [AIMessage(content=final_summary, name="cleaner_node")],
            "clean": final_clean_outputs,
            "batched_plan": batched_plan
        }

        return Command(update=update_dict, goto="supervisor")