"""Main LangGraph Agent System Implementation""" import os from typing import Dict, Any, TypedDict, Literal from langchain_core.messages import BaseMessage, HumanMessage from langgraph.graph import StateGraph, END # Import our agents and nodes from src.agents.plan_node import plan_node from src.agents.router_node import router_node, should_route_to_agent from src.agents.retrieval_agent import retrieval_agent from src.agents.execution_agent import execution_agent from src.agents.critic_agent import critic_agent from src.agents.verification_node import verification_node, should_retry from src.memory import memory_manager from src.tracing import ( get_langfuse_callback_handler, update_trace_metadata, trace_agent_execution, flush_langfuse, ) class AgentState(TypedDict): """State schema for the agent system""" # Core conversation messages: list[BaseMessage] # Planning and routing plan_complete: bool next_agent: str routing_decision: str routing_reason: str current_step: str # Agent responses agent_response: BaseMessage execution_result: str # Quality control critic_assessment: str quality_pass: bool quality_score: int verification_status: str # System management attempt_count: int final_answer: str def create_agent_graph() -> StateGraph: """Create the LangGraph agent system""" # Initialize the state graph workflow = StateGraph(AgentState) # Add nodes workflow.add_node("plan", plan_node) workflow.add_node("router", router_node) workflow.add_node("retrieval", retrieval_agent) workflow.add_node("execution", execution_agent) workflow.add_node("critic", critic_agent) workflow.add_node("verification", verification_node) # Add fallback node def fallback_node(state: Dict[str, Any]) -> Dict[str, Any]: """Simple fallback that returns a basic response""" print("Fallback Node: Providing basic response") messages = state.get("messages", []) user_query = None for msg in reversed(messages): if msg.type == "human": user_query = msg.content break fallback_answer = "I apologize, but I was unable to provide a satisfactory answer to your question." if user_query: fallback_answer += f" Your question was: {user_query}" return { **state, "final_answer": fallback_answer, "verification_status": "fallback", "current_step": "complete" } workflow.add_node("fallback", fallback_node) # Set entry point workflow.set_entry_point("plan") # Add edges workflow.add_edge("plan", "router") # Conditional routing from router to agents workflow.add_conditional_edges( "router", should_route_to_agent, { "retrieval": "retrieval", "execution": "execution", "critic": "critic" } ) # Route agent outputs through critic for quality evaluation before final verification workflow.add_edge("retrieval", "critic") workflow.add_edge("execution", "critic") # Critic (whether reached directly via routing or via other agents) proceeds to verification workflow.add_edge("critic", "verification") # Verification conditional logic def verification_next(state: Dict[str, Any]) -> Literal["router", "fallback", END]: """Determine next step after verification""" verification_status = state.get("verification_status", "") current_step = state.get("current_step", "") if current_step == "complete": return END elif verification_status == "failed" and state.get("attempt_count", 1) < 3: return "router" # Retry elif verification_status == "failed_max_attempts": return "fallback" else: return END workflow.add_conditional_edges( "verification", verification_next, { "router": "router", "fallback": "fallback", END: END } ) # Fallback ends the process workflow.add_edge("fallback", END) return workflow def run_agent_system(query: str, user_id: str = None, session_id: str = None) -> str: """ Run the complete agent system with a user query Args: query: The user question user_id: Optional user identifier for tracing session_id: Optional session identifier for tracing Returns: The final formatted answer """ print(f"Agent System: Processing query: {query[:100]}...") # Open a **root** Langfuse span so that everything inside is neatly grouped with trace_agent_execution(name="user-request", user_id=user_id, session_id=session_id): try: # Enrich the root span with metadata & tags update_trace_metadata( user_id=user_id, session_id=session_id, tags=["agent_system"], ) # Create the graph workflow = create_agent_graph() # Compile with checkpointing checkpointer = memory_manager.get_checkpointer() if checkpointer: app = workflow.compile(checkpointer=checkpointer) else: app = workflow.compile() # Prepare initial state initial_state = { "messages": [HumanMessage(content=query)], "plan_complete": False, "next_agent": "", "routing_decision": "", "routing_reason": "", "current_step": "planning", "agent_response": None, "execution_result": "", "critic_assessment": "", "quality_pass": True, "quality_score": 7, "verification_status": "", "attempt_count": 1, "final_answer": "", } # Configure execution – reuse *one* callback handler callback_handler = get_langfuse_callback_handler() config = { "configurable": {"thread_id": session_id or "default"}, } if callback_handler: config["callbacks"] = [callback_handler] # Run the graph print("Agent System: Executing workflow...") final_state = app.invoke(initial_state, config=config) # Extract final answer final_answer = final_state.get("final_answer", "No answer generated") # Store in memory if appropriate if memory_manager.should_ingest(query): memory_manager.ingest_qa_pair(query, final_answer) print(f"Agent System: Completed. Final answer: {final_answer[:100]}...") return final_answer except Exception as e: print(f"Agent System Error: {e}") return ( f"I apologize, but I encountered an error while processing your question: {e}" ) finally: # Ensure Langfuse spans are exported even in short-lived environments try: flush_langfuse() except Exception: pass # Export the main function __all__ = ["run_agent_system", "create_agent_graph", "AgentState"]