| | """Main LangGraph Agent System Implementation""" |
| | import os |
| | from typing import Dict, Any, TypedDict, Literal |
| | from langchain_core.messages import BaseMessage, HumanMessage |
| | from langgraph.graph import StateGraph, END |
| |
|
| | |
| | from src.agents.plan_node import plan_node |
| | from src.agents.router_node import router_node, should_route_to_agent |
| | from src.agents.retrieval_agent import retrieval_agent |
| | from src.agents.execution_agent import execution_agent |
| | from src.agents.critic_agent import critic_agent |
| | from src.agents.verification_node import verification_node, should_retry |
| | from src.memory import memory_manager |
| | from src.tracing import ( |
| | get_langfuse_callback_handler, |
| | update_trace_metadata, |
| | trace_agent_execution, |
| | flush_langfuse, |
| | ) |
| |
|
| |
|
| | class AgentState(TypedDict): |
| | """State schema for the agent system""" |
| | |
| | messages: list[BaseMessage] |
| | |
| | |
| | plan_complete: bool |
| | next_agent: str |
| | routing_decision: str |
| | routing_reason: str |
| | current_step: str |
| | |
| | |
| | agent_response: BaseMessage |
| | execution_result: str |
| | |
| | |
| | critic_assessment: str |
| | quality_pass: bool |
| | quality_score: int |
| | verification_status: str |
| | |
| | |
| | attempt_count: int |
| | final_answer: str |
| |
|
| |
|
| | def create_agent_graph() -> StateGraph: |
| | """Create the LangGraph agent system""" |
| | |
| | |
| | workflow = StateGraph(AgentState) |
| | |
| | |
| | workflow.add_node("plan", plan_node) |
| | workflow.add_node("router", router_node) |
| | workflow.add_node("retrieval", retrieval_agent) |
| | workflow.add_node("execution", execution_agent) |
| | workflow.add_node("critic", critic_agent) |
| | workflow.add_node("verification", verification_node) |
| | |
| | |
| | def fallback_node(state: Dict[str, Any]) -> Dict[str, Any]: |
| | """Simple fallback that returns a basic response""" |
| | print("Fallback Node: Providing basic response") |
| | |
| | messages = state.get("messages", []) |
| | user_query = None |
| | |
| | for msg in reversed(messages): |
| | if msg.type == "human": |
| | user_query = msg.content |
| | break |
| | |
| | fallback_answer = "I apologize, but I was unable to provide a satisfactory answer to your question." |
| | if user_query: |
| | fallback_answer += f" Your question was: {user_query}" |
| | |
| | return { |
| | **state, |
| | "final_answer": fallback_answer, |
| | "verification_status": "fallback", |
| | "current_step": "complete" |
| | } |
| | |
| | workflow.add_node("fallback", fallback_node) |
| | |
| | |
| | workflow.set_entry_point("plan") |
| | |
| | |
| | workflow.add_edge("plan", "router") |
| | |
| | |
| | workflow.add_conditional_edges( |
| | "router", |
| | should_route_to_agent, |
| | { |
| | "retrieval": "retrieval", |
| | "execution": "execution", |
| | "critic": "critic" |
| | } |
| | ) |
| | |
| | |
| | workflow.add_edge("retrieval", "critic") |
| | workflow.add_edge("execution", "critic") |
| | |
| | workflow.add_edge("critic", "verification") |
| | |
| | |
| | def verification_next(state: Dict[str, Any]) -> Literal["router", "fallback", END]: |
| | """Determine next step after verification""" |
| | verification_status = state.get("verification_status", "") |
| | current_step = state.get("current_step", "") |
| | |
| | if current_step == "complete": |
| | return END |
| | elif verification_status == "failed" and state.get("attempt_count", 1) < 3: |
| | return "router" |
| | elif verification_status == "failed_max_attempts": |
| | return "fallback" |
| | else: |
| | return END |
| | |
| | workflow.add_conditional_edges( |
| | "verification", |
| | verification_next, |
| | { |
| | "router": "router", |
| | "fallback": "fallback", |
| | END: END |
| | } |
| | ) |
| | |
| | |
| | workflow.add_edge("fallback", END) |
| | |
| | return workflow |
| |
|
| |
|
| | def run_agent_system(query: str, user_id: str = None, session_id: str = None) -> str: |
| | """ |
| | Run the complete agent system with a user query |
| | |
| | Args: |
| | query: The user question |
| | user_id: Optional user identifier for tracing |
| | session_id: Optional session identifier for tracing |
| | |
| | Returns: |
| | The final formatted answer |
| | """ |
| | print(f"Agent System: Processing query: {query[:100]}...") |
| |
|
| | |
| | with trace_agent_execution(name="user-request", user_id=user_id, session_id=session_id): |
| | try: |
| | |
| | update_trace_metadata( |
| | user_id=user_id, |
| | session_id=session_id, |
| | tags=["agent_system"], |
| | ) |
| |
|
| | |
| | workflow = create_agent_graph() |
| |
|
| | |
| | checkpointer = memory_manager.get_checkpointer() |
| | if checkpointer: |
| | app = workflow.compile(checkpointer=checkpointer) |
| | else: |
| | app = workflow.compile() |
| |
|
| | |
| | initial_state = { |
| | "messages": [HumanMessage(content=query)], |
| | "plan_complete": False, |
| | "next_agent": "", |
| | "routing_decision": "", |
| | "routing_reason": "", |
| | "current_step": "planning", |
| | "agent_response": None, |
| | "execution_result": "", |
| | "critic_assessment": "", |
| | "quality_pass": True, |
| | "quality_score": 7, |
| | "verification_status": "", |
| | "attempt_count": 1, |
| | "final_answer": "", |
| | } |
| |
|
| | |
| | callback_handler = get_langfuse_callback_handler() |
| | config = { |
| | "configurable": {"thread_id": session_id or "default"}, |
| | } |
| | if callback_handler: |
| | config["callbacks"] = [callback_handler] |
| |
|
| | |
| | print("Agent System: Executing workflow...") |
| | final_state = app.invoke(initial_state, config=config) |
| |
|
| | |
| | final_answer = final_state.get("final_answer", "No answer generated") |
| |
|
| | |
| | if memory_manager.should_ingest(query): |
| | memory_manager.ingest_qa_pair(query, final_answer) |
| |
|
| | print(f"Agent System: Completed. Final answer: {final_answer[:100]}...") |
| | return final_answer |
| | except Exception as e: |
| | print(f"Agent System Error: {e}") |
| | return ( |
| | f"I apologize, but I encountered an error while processing your question: {e}" |
| | ) |
| | finally: |
| | |
| | try: |
| | flush_langfuse() |
| | except Exception: |
| | pass |
| |
|
| |
|
| | |
| | __all__ = ["run_agent_system", "create_agent_graph", "AgentState"] |