File size: 7,558 Bytes
fe36046 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
"""Main LangGraph Agent System Implementation"""
import os
from typing import Dict, Any, TypedDict, Literal
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import StateGraph, END
# Import our agents and nodes
from src.agents.plan_node import plan_node
from src.agents.router_node import router_node, should_route_to_agent
from src.agents.retrieval_agent import retrieval_agent
from src.agents.execution_agent import execution_agent
from src.agents.critic_agent import critic_agent
from src.agents.verification_node import verification_node, should_retry
from src.memory import memory_manager
from src.tracing import (
get_langfuse_callback_handler,
update_trace_metadata,
trace_agent_execution,
flush_langfuse,
)
class AgentState(TypedDict):
"""State schema for the agent system"""
# Core conversation
messages: list[BaseMessage]
# Planning and routing
plan_complete: bool
next_agent: str
routing_decision: str
routing_reason: str
current_step: str
# Agent responses
agent_response: BaseMessage
execution_result: str
# Quality control
critic_assessment: str
quality_pass: bool
quality_score: int
verification_status: str
# System management
attempt_count: int
final_answer: str
def create_agent_graph() -> StateGraph:
"""Create the LangGraph agent system"""
# Initialize the state graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("plan", plan_node)
workflow.add_node("router", router_node)
workflow.add_node("retrieval", retrieval_agent)
workflow.add_node("execution", execution_agent)
workflow.add_node("critic", critic_agent)
workflow.add_node("verification", verification_node)
# Add fallback node
def fallback_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""Simple fallback that returns a basic response"""
print("Fallback Node: Providing basic response")
messages = state.get("messages", [])
user_query = None
for msg in reversed(messages):
if msg.type == "human":
user_query = msg.content
break
fallback_answer = "I apologize, but I was unable to provide a satisfactory answer to your question."
if user_query:
fallback_answer += f" Your question was: {user_query}"
return {
**state,
"final_answer": fallback_answer,
"verification_status": "fallback",
"current_step": "complete"
}
workflow.add_node("fallback", fallback_node)
# Set entry point
workflow.set_entry_point("plan")
# Add edges
workflow.add_edge("plan", "router")
# Conditional routing from router to agents
workflow.add_conditional_edges(
"router",
should_route_to_agent,
{
"retrieval": "retrieval",
"execution": "execution",
"critic": "critic"
}
)
# Route agent outputs through critic for quality evaluation before final verification
workflow.add_edge("retrieval", "critic")
workflow.add_edge("execution", "critic")
# Critic (whether reached directly via routing or via other agents) proceeds to verification
workflow.add_edge("critic", "verification")
# Verification conditional logic
def verification_next(state: Dict[str, Any]) -> Literal["router", "fallback", END]:
"""Determine next step after verification"""
verification_status = state.get("verification_status", "")
current_step = state.get("current_step", "")
if current_step == "complete":
return END
elif verification_status == "failed" and state.get("attempt_count", 1) < 3:
return "router" # Retry
elif verification_status == "failed_max_attempts":
return "fallback"
else:
return END
workflow.add_conditional_edges(
"verification",
verification_next,
{
"router": "router",
"fallback": "fallback",
END: END
}
)
# Fallback ends the process
workflow.add_edge("fallback", END)
return workflow
def run_agent_system(query: str, user_id: str = None, session_id: str = None) -> str:
"""
Run the complete agent system with a user query
Args:
query: The user question
user_id: Optional user identifier for tracing
session_id: Optional session identifier for tracing
Returns:
The final formatted answer
"""
print(f"Agent System: Processing query: {query[:100]}...")
# Open a **root** Langfuse span so that everything inside is neatly grouped
with trace_agent_execution(name="user-request", user_id=user_id, session_id=session_id):
try:
# Enrich the root span with metadata & tags
update_trace_metadata(
user_id=user_id,
session_id=session_id,
tags=["agent_system"],
)
# Create the graph
workflow = create_agent_graph()
# Compile with checkpointing
checkpointer = memory_manager.get_checkpointer()
if checkpointer:
app = workflow.compile(checkpointer=checkpointer)
else:
app = workflow.compile()
# Prepare initial state
initial_state = {
"messages": [HumanMessage(content=query)],
"plan_complete": False,
"next_agent": "",
"routing_decision": "",
"routing_reason": "",
"current_step": "planning",
"agent_response": None,
"execution_result": "",
"critic_assessment": "",
"quality_pass": True,
"quality_score": 7,
"verification_status": "",
"attempt_count": 1,
"final_answer": "",
}
# Configure execution – reuse *one* callback handler
callback_handler = get_langfuse_callback_handler()
config = {
"configurable": {"thread_id": session_id or "default"},
}
if callback_handler:
config["callbacks"] = [callback_handler]
# Run the graph
print("Agent System: Executing workflow...")
final_state = app.invoke(initial_state, config=config)
# Extract final answer
final_answer = final_state.get("final_answer", "No answer generated")
# Store in memory if appropriate
if memory_manager.should_ingest(query):
memory_manager.ingest_qa_pair(query, final_answer)
print(f"Agent System: Completed. Final answer: {final_answer[:100]}...")
return final_answer
except Exception as e:
print(f"Agent System Error: {e}")
return (
f"I apologize, but I encountered an error while processing your question: {e}"
)
finally:
# Ensure Langfuse spans are exported even in short-lived environments
try:
flush_langfuse()
except Exception:
pass
# Export the main function
__all__ = ["run_agent_system", "create_agent_graph", "AgentState"] |