""" LangGraph Multi-Agent System Implementation This module implements a multi-agent system using LangGraph with the following components: - LeadAgent: Orchestrates the workflow and makes decisions - ResearchAgent: Handles information gathering and research tasks - CodeAgent: Handles computational and code execution tasks - AnswerFormatter: Formats final answers according to GAIA requirements - Memory: Persistent storage for context and learning """ import os from typing import Dict, Any, TypedDict, Literal, Annotated, List from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage from langgraph.graph import StateGraph, START, END from langgraph.types import Command import operator from dotenv import load_dotenv # Import our observability module from observability import ( start_root_span, get_callback_handler, flush_traces, shutdown_observability ) # Load environment variables load_dotenv("env.local") class AgentState(TypedDict): """ State schema for the multi-agent system following LangGraph best practices. Treats every agent node as a pure function AgentState → Command. """ # Core conversation messages messages: Annotated[List[BaseMessage], operator.add] # Working draft and evidence draft_answer: str research_notes: Annotated[str, operator.add] # Use add for accumulation code_outputs: Annotated[str, operator.add] # Use add for accumulation # Loop control loop_counter: int max_iterations: int # Routing decisions next: Literal["research", "code", "formatter", "__end__"] # Final formatted answer final_answer: str # Metadata for tracing user_id: str session_id: str # Removed setup_tracing function - now handled by observability module def create_agent_graph(): """ Create the LangGraph workflow following the specified architecture: lead -> research -> code -> lead (loop) -> formatter -> END """ from agents.lead_agent import lead_agent from agents.research_agent import research_agent from agents.code_agent import code_agent from agents.answer_formatter import answer_formatter # Create the state graph workflow = StateGraph(AgentState) # Add nodes workflow.add_node("lead", lead_agent) workflow.add_node("research", research_agent) workflow.add_node("code", code_agent) workflow.add_node("formatter", answer_formatter) # Add edges workflow.add_edge(START, "lead") # Conditional edges from lead agent based on routing decisions def route_from_lead(state: AgentState) -> str: """Route from lead agent based on the 'next' field""" # Check for termination conditions if (state.get("loop_counter", 0) >= state.get("max_iterations", 3) or state.get("final_answer")): return "__end__" return state.get("next", "research") workflow.add_conditional_edges( "lead", route_from_lead, { "research": "research", "code": "code", "formatter": "formatter", "__end__": END } ) # Both research and code agents return to lead agent for next decision workflow.add_edge("research", "lead") workflow.add_edge("code", "lead") workflow.add_edge("formatter", END) return workflow async def run_agent_system( query: str, user_id: str = "default_user", session_id: str = "default_session", max_iterations: int = 3 ) -> str: """ Main entry point for the agent system. Args: query: User question to answer user_id: User identifier for tracing session_id: Session identifier for tracing max_iterations: Maximum number of research/code loops Returns: Final formatted answer """ try: # Get the global callback handler callback_handler = get_callback_handler() # Create root span for the entire request with start_root_span( name="user-request", user_id=user_id, session_id=session_id, metadata={"query": query, "max_iterations": max_iterations} ) as root_span: # Create the workflow workflow = create_agent_graph() app = workflow.compile() # Initial state initial_state: AgentState = { "messages": [HumanMessage(content=query)], "draft_answer": "", "research_notes": "", "code_outputs": "", "loop_counter": 0, "max_iterations": max_iterations, "next": "research", # Start with research "final_answer": "", "user_id": user_id, "session_id": session_id } # Run the workflow with callback handler if available if callback_handler: final_state = await app.ainvoke( initial_state, config={"callbacks": [callback_handler]} ) else: print("Warning: Running without Langfuse tracing") final_state = await app.ainvoke(initial_state) # Update trace with output if span exists if root_span: root_span.update_trace(output={"final_answer": final_state["final_answer"]}) return final_state["final_answer"] except Exception as e: print(f"Error in agent system: {e}") return f"I apologize, but I encountered an error while processing your query: {str(e)}" finally: # Flush traces in background flush_traces(background=True) if __name__ == "__main__": import asyncio # Test the system async def test(): result = await run_agent_system( "What is the capital of Maharashtra?", user_id="test_user", session_id="test_session" ) print(f"Final Answer: {result}") asyncio.run(test())