Spaces:
Configuration error
Configuration error
File size: 25,290 Bytes
8437d61 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 | import time
from langgraph.types import Command
from langgraph.graph import END
from llm import llm_model
from pydantic import BaseModel,Field
from typing import List,Literal, Optional
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from Prompts import supervisor_prompt,PreprocessingPlanner_prompt,cleaner_prompt,Reporter_prompt,VISUALIZATION_PROMPT
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.agents import AgentExecutor, create_tool_calling_agent
from Toolkit.Tools import python_repl_ast,eda_fact_sheet
from Guardrails.Preprocessing import StructuredPlanOutput
from Guardrails.cleaner import CleaningSummary
from Guardrails.report import BusinessReport
from Guardrails.visualizer import VisualizationReport
from langchain.output_parsers import PydanticOutputParser,OutputFixingParser
class Router(BaseModel):
next: Literal["PreprocessingPlanner_node","Cleaner_node","Reporter_node","visualizer_node",END]= Field(description="The next node to route to. Must be one of the available nodes.")
reasoning: str = Field(description="A short reasoning for the decision made.")
class AgentStateModel(BaseModel):
messages: Optional[List] = None
Instructions: Optional[str] = None
Analysis: Optional[List[dict]] = None
clean: Optional[List[dict]] = None
Report: Optional[List[dict]] = None
Visualizations: Optional[List[dict]] = None
Path: Optional[str] = None
next: Optional[str] = None
current_reasoning: Optional[str] = None
class DataAnalystAgent:
def __init__(self):
self.llm_model = llm_model
def supervisor_node(self,state:AgentStateModel) -> Command[Literal["PreprocessingPlanner_node","Cleaner_node","Reporter_node","visualizer_node", END]]:
"""
The central router of the workflow.
It evaluates the current state and the last message to decide the next action.
This node is designed to be highly token-efficient by creating a lean summary of the state
instead of passing the full, verbose state objects to the LLM.
"""
print("**************************below is my state right after entering****************************")
print(state)
print("************************** SUPERVISOR: EVALUATING STATE ****************************")
state_summary = (
f"Current Workflow Status:\n"
f"- Analysis Plan Generated: {'Yes' if state.Analysis else 'No'}\n"
f"- Cleaning Plan Generated: {'Yes' if state.clean else 'No'}\n"
f"- Report Generated: {'Yes' if state.Report else 'No'}\n"
f"- Visualizations Generated: {'Yes' if state.Visualizations else 'No'}\n"
)
messages_for_llm = [
SystemMessage(content=supervisor_prompt),
HumanMessage(content=state_summary),
]
if state.messages:
last_message = state.messages[-1]
# Add a prefix to clearly label the last message for the LLM
last_message_content = f"Last Event:\nThe last node to run was '{last_message.name}'. It reported the following:\n---\n{last_message.content}\n---"
messages_for_llm.append(HumanMessage(content=last_message_content))
print(f"--- Attaching last event from '{last_message.name}' ---")
else:
# Handle the very first run where there are no messages
messages_for_llm.append(HumanMessage(content="Last Event: None. This is the first step of the workflow."))
messages_for_this_attempt = list(messages_for_llm)
print("***********************Invoking LLM for routing decision************************")
parser = PydanticOutputParser(pydantic_object=Router)
fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=self.llm_model)
# Build the retry chain
chain = self.llm_model | fixing_parser
# Add retries
max_attempts = 3
attempt = 0
error_msg = None
response = None
while attempt < max_attempts:
attempt += 1
print(f"--- Attempt {attempt} ---")
# Compose messages for this attempt
messages_for_this_attempt = list(messages_for_llm)
if error_msg:
# Inject previous error info to let LLM know what failed
messages_for_this_attempt.append(HumanMessage(content=f"Previous attempt failed due to: {error_msg}. Please follow the schema strictly: {Router.model_json_schema()}"))
try:
response = chain.invoke(messages_for_this_attempt)
break
except Exception as e:
error_msg = str(e)
print(f"--- Error on attempt {attempt}: {error_msg} ---")
# If last attempt, will exit loop and propagate error
if response is None:
# All retries failed, fallback error
fallback_msg = f"All {max_attempts} attempts failed. Last error: {error_msg}"
print(f"--- Supervisor node failed ---\n{fallback_msg}")
return Command(
goto="END",
update={
"next": "END",
"current_reasoning": fallback_msg
}
)
goto = response.next
print("********************************this is my goto*************************")
print(goto)
print("********************************")
print(response.reasoning)
if goto == "END":
goto = END
print("**************************below is my state****************************")
print(state)
return Command(goto=goto, update={'next': goto,
'current_reasoning': response.reasoning}
)
def PreprocessingPlanner_node(self, state: AgentStateModel) -> Command[Literal['supervisor']]:
print("*****************called PreprocessingPlanner node************")
Instructions = state.Instructions
parser = PydanticOutputParser(pydantic_object=StructuredPlanOutput)
fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=self.llm_model)
task_prompt = (
f"Find the instructions given by the user here : {Instructions} and follow this {PreprocessingPlanner_prompt} to the letter.modify in this path:{state.Path}"
)
print(f"--- Sending this direct task to the agent ---\n{task_prompt}\n---------------------------------------------")
system_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a DataFrame analyzer. Your primary tool is `eda_fact_sheet`. "
"First, call the tool to get data insights. Then, based on the tool's output, "
"provide a final answer formatted as a JSON object containing the preprocessing plan and summaries."),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
Analyzer_agent = create_tool_calling_agent(
llm=self.llm_model,
tools=[eda_fact_sheet],
prompt=system_prompt
)
agent_executor = AgentExecutor(
agent=Analyzer_agent,
tools=[eda_fact_sheet],
verbose=True,
handle_parsing_errors=True,
return_intermediate_steps=True
)
# 5. Wrap execution in a retry loop
max_attempts = 3
attempt = 0
while attempt < max_attempts:
attempt += 1
try:
result = agent_executor.invoke({
"input": task_prompt,
})
# Try parsing the final output
final_output_string = result.get("output", "")
parsed_output: StructuredPlanOutput = fixing_parser.parse(final_output_string)
# Successfully parsed → extract plan and summary
plan_dict = {"plan": [step.model_dump() for step in parsed_output.plan]}
summary_str = f"{parsed_output.summary}\n{parsed_output.details}"
# Update state and return
return Command(
update={
"messages": [
AIMessage(content=summary_str, name="PreprocessingPlanner_node")
],
"Analysis": [{"final_answer": plan_dict}]
},
goto="supervisor",
)
except Exception as e:
error_msg = (
f"Attempt {attempt} failed due to error: {str(e)}. "
f"Please strictly follow the schema: {StructuredPlanOutput.model_json_schema()}"
)
print(f"--- Runtime/Parsing error encountered ---\n{error_msg}")
# Inject error into prompt for next retry
# Use an f-string to properly embed all variables
task_prompt = f"The previous attempt failed with this error: {error_msg}. Please correct your tool usage and try again. Here is the original task:\n---\n{task_prompt}"
# If all attempts fail, fallback to supervisor with error message
return Command(
update={
"messages": [
AIMessage(content="Error: The analysis agent failed to produce a valid preprocessing plan after multiple attempts.",
name="Analyzer_node_Error")
],
"Analysis": [{"error": "Parsing failed after retries"}]
},
goto="supervisor",
)
def Cleaner_node(self, state: AgentStateModel) -> Command[Literal['supervisor']]:
print("*****************called cleaner node************")
Path = state.Path
preprocessing_plan = state.Analysis[0]['final_answer']['plan']
batched_plan = []
current_batch = []
for column_action in preprocessing_plan:
# Add the current item to the batch
current_batch.append(column_action)
# If the batch is now full, add it to our final list and reset it
if len(current_batch) == 4:
batched_plan.append(current_batch)
current_batch = []
# After the loop, check if there are any leftover items in the last batch
if current_batch:
batched_plan.append(current_batch)
parser = PydanticOutputParser(pydantic_object=CleaningSummary)
fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=self.llm_model)
system_prompt = ChatPromptTemplate.from_messages(
[(
"system",
"Follow the instructions here : {cleaner_prompt} and in the input to the letter and make the necessary changes to the dataframe."
),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
]
)
Cleaner_agent = create_tool_calling_agent(
llm=self.llm_model,
tools=[python_repl_ast],
prompt=system_prompt
)
agent_executor = AgentExecutor(
agent=Cleaner_agent,
tools=[python_repl_ast],
verbose=True,
# handle_parsing_errors=True,
# return_intermediate_steps=True
)
# 3. Loop Through Batches and Invoke the Agent for Each
all_batch_results = []
final_clean_outputs = []
for i, batch in enumerate(batched_plan, start=1):
clean_plan_str = str(batch)
task_prompt = (
f"Apply the following cleaning plan (batch {i} of {len(batched_plan)}) to the dataset at path: {Path}\n"
f"Plan details:\n{clean_plan_str}"
)
print(f"--- Sending task for Batch {i} to the agent ---\n{task_prompt}\n---------------------------------------------")
max_attempts = 3
attempt = 0
batch_success = False
while attempt < max_attempts:
attempt += 1
try:
result = agent_executor.invoke({"input": task_prompt,"cleaner_prompt":cleaner_prompt,"agent_scratchpad": [] })
final_output_string = result.get("output", "")
# Try parsing the output
parsed_output: CleaningSummary = fixing_parser.parse(final_output_string)
# On success, store results and break the retry loop
all_batch_results.append(parsed_output)
final_clean_outputs.append({"final_answer": final_output_string})
batch_success = True
print(f"--- Batch {i} successful on attempt {attempt} ---")
time.sleep(10)
break
except Exception as e:
error_msg = (
f"Attempt {attempt} for batch {i} failed with an error: {str(e)}. "
"You MUST provide a final answer that is a valid JSON object. Please review the plan and strictly follow the schema."
)
print(f"--- Runtime/Parsing error for Batch {i} ---\n{error_msg}")
# Inject error context for the next retry attempt on this specific batch
task_prompt = f"Your previous attempt failed with this error: {error_msg}\n\nPlease re-execute the original plan:\n{task_prompt}"
# If a batch fails after all retries, exit the entire node with an error
if not batch_success:
error_message = f"Error: The cleaner agent failed to process batch {i} after {max_attempts} attempts."
return Command(
update={
"messages": [AIMessage(content=error_message, name="Cleaner_node_Error")],
"clean": [{"error": f"Processing failed at batch {i}"}]
},
goto="supervisor",
)
final_summary = "All cleaning batches completed successfully.\n\n"
for idx, summary in enumerate(all_batch_results, start=1):
final_summary += f"--- Batch {idx} Summary ---\nSummary: {summary.summary}\nDetails: {summary.details}\n\n"
return Command(
update={
"messages": [AIMessage(content=final_summary.strip(), name="cleaner_node")],
"clean": final_clean_outputs,
},
goto="supervisor",
)
def Reporter_node(self, state: AgentStateModel) -> Command[Literal['supervisor']]:
print("*****************called Reporter node************")
Instructions = state.Instructions
df_path = state.Path
# --- STEP 1: Perform Reconnaissance MANUALLY (and only once) ---
print("--- Reporter: Performing initial data reconnaissance with eda_fact_sheet ---")
try:
recon_result_str = str(eda_fact_sheet.run(path=df_path))
except Exception as e:
return Command(update={"messages": [AIMessage(content=f"Error during initial data recon: {e}", name="Reporter_node_Error")]}, goto="supervisor")
print(f"--- Reporter: Condensing {len(recon_result_str)} characters of context... ---")
condensation_prompt = ChatPromptTemplate.from_messages([
("system", "You are a data analysis assistant. Summarize the following verbose JSON data profile into a concise, human-readable format for another AI agent to use. Focus on column names, data types, and key stats."),
("human", "Please summarize this data profile:\n\n{profile}")
])
summarizer_chain = condensation_prompt | self.llm_model
condensed_summary = summarizer_chain.invoke({"profile": recon_result_str}).content
print(f"--- Reporter: Condensed Summary Created ---")
# 1. Instantiate the parser for our new structured output
parser = PydanticOutputParser(pydantic_object=BusinessReport)
fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=self.llm_model)
task_prompt = (f"User Instructions: {Instructions}\n\nHere is a condensed summary of the dataset: \n---\n{condensed_summary}\n---\n\nNow, follow your main instructions: {Reporter_prompt}")
print(f"--- Reporter: Sending condensed task to the main analysis agent ---")
print(f"--- Sending this direct task to the agent ---\n{task_prompt}\n---------------------------------------------")
system_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a Business Intelligence consultant. You have access to `eda_fact_sheet(df_path)` for initial recon and `python_repl_ast(query)` for deep-dive analysis. "
"The CSV is at this path: {df_path}. "
"Your mission is to autonomously analyze the data and produce a strategic business report. "),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
Reporter_agent = create_tool_calling_agent(
llm=self.llm_model,
tools=[python_repl_ast],
prompt=system_prompt
)
agent_executor = AgentExecutor(
agent=Reporter_agent,
tools=[python_repl_ast],
verbose=True,
# handle_parsing_errors=True,
# return_intermediate_steps=True
)
# 2. Wrap execution in a retry loop for robust parsing
max_attempts = 3
attempt = 0
while attempt < max_attempts:
attempt += 1
try:
result = agent_executor.invoke({
"input": task_prompt,
"df_path": df_path,
})
final_output_string = result.get("output", "")
parsed_output: BusinessReport = fixing_parser.parse(final_output_string)
summary_str = f"{parsed_output.subject}\n\n{parsed_output.executive_summary}"
return Command(
update={
"messages": [AIMessage(content=summary_str, name="Reporter_node")],
"Report": [{"final_answer": parsed_output.model_dump()}]
},
goto="supervisor",
)
except Exception as e:
error_msg = (
f"Attempt {attempt} failed due to a parsing error: {str(e)}. "
f"You MUST provide a final answer that is a valid JSON object. Please strictly follow this schema: "
f"{BusinessReport.model_json_schema()}"
)
print(f"--- Runtime/Parsing error encountered ---\n{error_msg}")
task_prompt =f"here is the error from the previous execution: {error_msg} so process the workflow accordingly" + task_prompt
return Command(
update={
"messages": [
AIMessage(content="Error: The reporter agent failed to produce a valid business report after multiple attempts.",
name="Reporter_node_Error")
],
"Report": [{"error": "Parsing failed after all retries"}]
},
goto="supervisor",
)
def visualizer_node(self, state: AgentStateModel) -> Command[Literal['supervisor']]:
"""
This node directs an agent to perform EDA and generate 10 business-focused visualizations.
It enforces a structured JSON output for the final report and includes retry logic for parsing.
"""
print("***************** called Visualizer node ************")
df_path = state.Path
print("--- Visualizer: Performing initial data reconnaissance... ---")
try:
recon_result_str = str(eda_fact_sheet.run(path=df_path))
except Exception as e:
return Command(update={"messages": [AIMessage(content=f"Error during initial data recon: {e}", name="Visualizer_node_Error")]}, goto="supervisor")
print(f"--- Visualizer: Condensing {len(recon_result_str)} characters of context... ---")
condensation_prompt = ChatPromptTemplate.from_messages([
("system", "You are a data analysis assistant. Summarize the following verbose JSON data profile into a concise, human-readable format for another AI agent to use for creating visualizations."),
("human", "Please summarize this data profile:\n\n{profile}")
])
summarizer_chain = condensation_prompt | self.llm_model
condensed_summary = summarizer_chain.invoke({"profile": recon_result_str}).content
print(f"--- Visualizer: Condensed Summary Created ---")
parser = PydanticOutputParser(pydantic_object=VisualizationReport)
fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=self.llm_model)
task_prompt = (f"Based on this data summary:\n---\n{condensed_summary}\n---\n\nNow, follow your main instructions to create visualizations: {VISUALIZATION_PROMPT}")
print(f"--- Visualizer: Sending condensed task to the plotting agent ---")
print(f"--- Sending this direct task to the agent ---\n{task_prompt}\n---------------------------------------------")
system_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a Data Visualization Specialist. You have access to `eda_fact_sheet(df_path)` for initial recon and `python_repl_ast(query)` for generating and saving plots. "
"The CSV is at this path: {df_path}. "
"Your mission is to autonomously analyze the data and produce a series of 10 visualizations as instructed. "),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
visualizer_agent = create_tool_calling_agent(
llm=self.llm_model,
tools=[python_repl_ast],
prompt=system_prompt
)
agent_executor = AgentExecutor(
agent=visualizer_agent,
tools=[python_repl_ast],
verbose=True,
# handle_parsing_errors=True,
# return_intermediate_steps=True
)
max_attempts = 3
attempt = 0
while attempt < max_attempts:
attempt += 1
try:
result = agent_executor.invoke({
"input": task_prompt,
"df_path": df_path,
})
final_output_string = result.get("output", "")
parsed_output: VisualizationReport = fixing_parser.parse(final_output_string)
summary_str = f"Successfully generated a report with {len(parsed_output.visualizations)} visualizations."
return Command(
update={
"messages": [AIMessage(content=summary_str, name="Visualizer_node")],
"Visualizations": [{"final_answer": parsed_output.model_dump()}]
},
goto="supervisor",
)
except Exception as e:
error_msg = (
f"Attempt {attempt} failed due to a parsing error: {str(e)}. "
f"You MUST provide a final answer that is a valid JSON object. Please strictly follow this schema: "
f"{VisualizationReport.model_json_schema()}"
)
print(f"--- Runtime/Parsing error encountered ---\n{error_msg}")
task_prompt = f"here is the error from the last execution: {error_msg} so process the workflow accordingly" + "\n\n" + task_prompt
return Command(
update={
"messages": [
AIMessage(content="Error: The visualizer agent failed to produce a valid report after multiple attempts.",
name="Visualizer_node_Error")
],
"Visualizations": [{"error": "Parsing failed after all retries"}]
},
goto="supervisor",
)
"Data is about sales,provide the data overview along with the preprocessing steps needed to perform EDA"
"""
Path = r"D:\Code Assistant\superstore sales.csv"
D:\Code Assistant\superstore sales.csv
""" |