Spaces:
Configuration error
Configuration error
File size: 14,341 Bytes
8437d61 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 | import time
from langgraph.types import Command
from langgraph.graph import END
from llm import llm_model
from pydantic import BaseModel,Field
from typing import List,Literal, Optional
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from Prompts import supervisor_prompt,PreprocessingPlanner_prompt,cleaner_prompt
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.agents import AgentExecutor, create_tool_calling_agent
from Toolkit.Tools import python_cleaning_tool,eda_fact_sheet
from Guardrails.Preprocessing import StructuredPlanOutput
from Guardrails.cleaner import CleaningSummary
from langchain.output_parsers import PydanticOutputParser,OutputFixingParser
class Router(BaseModel):
next: Literal["PreprocessingPlanner_node","Cleaner_node",END]= Field(description="The next node to route to. Must be one of the available nodes.")
reasoning: str = Field(description="A short reasoning for the decision made.")
class AgentStateModel(BaseModel):
messages: Optional[List] = None
Instructions: Optional[str] = None
Analysis: Optional[List[dict]] = None
clean: Optional[List[dict]] = None
batched_plan: Optional[List[List[dict]]] = None
Path: Optional[str] = None
next: Optional[str] = None
current_reasoning: Optional[str] = None
class DataAnalystAgent:
def __init__(self):
self.llm_model = llm_model
def supervisor_node(self,state:AgentStateModel) -> Command[Literal["PreprocessingPlanner_node","Cleaner_node", END]]:
"""
The central router of the workflow.
It evaluates the current state and the last message to decide the next action.
This node is designed to be highly token-efficient by creating a lean summary of the state
instead of passing the full, verbose state objects to the LLM.
"""
print("**************************below is my state right after entering****************************")
print(state)
print("************************** SUPERVISOR: EVALUATING STATE ****************************")
state_summary = (
f"Current Workflow Status:\n"
f"- Analysis Plan Generated: {'Yes' if state.Analysis else 'No'}\n"
f"- If Cleaning Plan Generated: {'Yes' if state.clean else 'No'}\n"
)
messages_for_llm = [
SystemMessage(content=supervisor_prompt),
HumanMessage(content=state_summary),
]
if state.messages:
last_message = state.messages[-1]
last_message_content = f"Last Event:\nThe last node to run was '{last_message.name}'. It reported the following:\n---\n{last_message.content}\n---"
messages_for_llm.append(HumanMessage(content=last_message_content))
print(f"--- Attaching last event from '{last_message.name}' ---")
else:
# Handle the very first run where there are no messages
messages_for_llm.append(HumanMessage(content="Last Event: None. This is the first step of the workflow."))
messages_for_this_attempt = list(messages_for_llm)
print("***********************Invoking LLM for routing decision************************")
parser = PydanticOutputParser(pydantic_object=Router)
fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=self.llm_model)
chain = self.llm_model | fixing_parser
# Add retries
max_attempts = 3
attempt = 0
error_msg = None
response = None
while attempt < max_attempts:
attempt += 1
print(f"--- Attempt {attempt} ---")
# Compose messages for this attempt
messages_for_this_attempt = list(messages_for_llm)
if error_msg:
# Inject previous error info to let LLM know what failed
messages_for_this_attempt.append(HumanMessage(content=f"Previous attempt failed due to: {error_msg}. Please follow the schema strictly: {Router.model_json_schema()}"))
try:
response = chain.invoke(messages_for_this_attempt)
break
except Exception as e:
error_msg = str(e)
print(f"--- Error on attempt {attempt}: {error_msg} ---")
# If last attempt, will exit loop and propagate error
if response is None:
# All retries failed, fallback error
fallback_msg = f"All {max_attempts} attempts failed. Last error: {error_msg}"
print(f"--- Supervisor node failed ---\n{fallback_msg}")
return Command(
goto="END",
update={
"next": "END",
"current_reasoning": fallback_msg
}
)
goto = response.next
print("********************************this is my goto*************************")
print(goto)
print("********************************")
print(response.reasoning)
if goto == "END":
goto = END
print("**************************below is my state****************************")
print(state)
return Command(goto=goto, update={'next': goto,
'current_reasoning': response.reasoning}
)
def PreprocessingPlanner_node(self, state: AgentStateModel) -> Command[Literal['supervisor']]:
print("*****************called PreprocessingPlanner node************")
Instructions = state.Instructions
parser = PydanticOutputParser(pydantic_object=StructuredPlanOutput)
fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=self.llm_model)
task_prompt = (
f"Find the instructions given by the user here : {Instructions} and follow this {PreprocessingPlanner_prompt} to the letter.modify in this path:{state.Path}"
)
print(f"--- Sending this direct task to the agent ---\n{task_prompt}\n---------------------------------------------")
system_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a DataFrame analyzer. Your primary tool is `eda_fact_sheet`. "
"First, call the tool to get data insights. Then, based on the tool's output, "
"provide a final answer formatted as a JSON object containing the preprocessing plan and summaries."),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
Analyzer_agent = create_tool_calling_agent(
llm=self.llm_model,
tools=[eda_fact_sheet],
prompt=system_prompt
)
agent_executor = AgentExecutor(
agent=Analyzer_agent,
tools=[eda_fact_sheet],
verbose=True,
handle_parsing_errors=True,
return_intermediate_steps=True
)
# 5. Wrap execution in a retry loop
max_attempts = 3
attempt = 0
while attempt < max_attempts:
attempt += 1
try:
result = agent_executor.invoke({
"input": task_prompt,
})
# Try parsing the final output
final_output_string = result.get("output", "")
parsed_output: StructuredPlanOutput = fixing_parser.parse(final_output_string)
# Successfully parsed → extract plan and summary
plan_dict = {"plan": [step.model_dump() for step in parsed_output.plan]}
summary_str = f"{parsed_output.summary}\n{parsed_output.details}"
# Update state and return
return Command(
update={
"messages": [
AIMessage(content=summary_str, name="PreprocessingPlanner_node")
],
"Analysis": [{"final_answer": plan_dict}]
},
goto="supervisor",
)
except Exception as e:
error_msg = (
f"Attempt {attempt} failed due to error: {str(e)}. "
f"Please strictly follow the schema: {StructuredPlanOutput.model_json_schema()}"
)
print(f"--- Runtime/Parsing error encountered ---\n{error_msg}")
# Inject error into prompt for next retry
# Use an f-string to properly embed all variables
task_prompt = f"The previous attempt failed with this error: {error_msg}. Please correct your tool usage and try again. Here is the original task:\n---\n{task_prompt}"
# If all attempts fail, fallback to supervisor with error message
return Command(
update={
"messages": [
AIMessage(content="Error: The analysis agent failed to produce a valid preprocessing plan after multiple attempts.",
name="Analyzer_node_Error")
],
"Analysis": [{"error": "Parsing failed after retries"}]
},
goto="supervisor",
)
def Cleaner_node(self, state: AgentStateModel) -> Command[Literal['supervisor']]:
print("***************** called cleaner node ************")
Path = state.Path
cleaning_plan = state.Analysis[0]['final_answer']['plan']
# Batch the determined plan
batched_plan = [cleaning_plan[i:i + 4] for i in range(0, len(cleaning_plan), 4)]
# --- Setup agent, parser, and prompt ---
parser = PydanticOutputParser(pydantic_object=CleaningSummary)
fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=self.llm_model)
system_prompt = ChatPromptTemplate.from_messages([
("system", "Follow the instructions here : {cleaner_prompt} and in the input to the letter and make the necessary changes to the dataframe."),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
Cleaner_agent = create_tool_calling_agent(llm=self.llm_model, tools=[python_cleaning_tool], prompt=system_prompt)
agent_executor = AgentExecutor(
agent=Cleaner_agent,
tools=[python_cleaning_tool],
verbose=True,
handle_parsing_errors=True,
return_intermediate_steps=True
)
all_batch_results = []
final_clean_outputs = []
# --- Iterate through each batch with retry logic ---
for i, batch in enumerate(batched_plan, start=1):
print(f"--- Starting processing for Batch {i} of {len(batched_plan)} ---")
# Initial task prompt for the batch
task_prompt = (f"Apply the following cleaning plan (batch {i} of {len(batched_plan)}) to the dataset at path: {Path}\nPlan details:\n{str(batch)}")
max_attempts = 3
attempt = 0
batch_successful = False
while attempt < max_attempts:
attempt += 1
print(f"--- Batch {i}, Attempt {attempt} ---")
try:
# 1. Invoke the agent
result = agent_executor.invoke({
"input": task_prompt,
"cleaner_prompt": cleaner_prompt,
})
# 2. Try parsing the final output
final_output_string = result.get("output", "")
parsed_output: CleaningSummary = fixing_parser.parse(final_output_string)
# 3. If successful, store results and break the retry loop
all_batch_results.append(parsed_output)
final_clean_outputs.append({"final_answer": final_output_string})
print(f"--- Batch {i}, Attempt {attempt} successful ---")
batch_successful = True
time.sleep(5)
break # Exit the while loop for this batch
except Exception as e:
# 4. On failure, create an error message for the next attempt
error_msg = (
f"Attempt {attempt} for batch {i} failed due to error: {str(e)}. "
f"Please analyze the error and the plan, then try again. Ensure your final output strictly follows this schema: {CleaningSummary.model_json_schema()}"
)
print(f"--- Runtime/Parsing error encountered ---\n{error_msg}")
# Prepend the error to the prompt for the next retry
task_prompt = f"The previous attempt failed with this error: {error_msg}. Please correct your tool usage and try again. Here is the original task for this batch:\n---\n{task_prompt}"
# 5. If all attempts for this batch fail, exit and report to supervisor
if not batch_successful:
error_message = f"Error: Cleaner agent failed on batch {i} after {max_attempts} attempts. Aborting cleaning process."
return Command(
update={
"messages": [AIMessage(content=error_message, name="Cleaner_node_Error")],
"clean": [{"error": f"Failed on batch {i} after retries"}]
},
goto="supervisor"
)
# --- If all batches succeed, return the final successful result ---
final_summary = "All cleaning batches completed successfully."
update_dict = {
"messages": [AIMessage(content=final_summary, name="cleaner_node")],
"clean": final_clean_outputs,
"batched_plan": batched_plan
}
return Command(update=update_dict, goto="supervisor")
|