File size: 7,558 Bytes
fe36046
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""Main LangGraph Agent System Implementation"""
import os
from typing import Dict, Any, TypedDict, Literal
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import StateGraph, END

# Import our agents and nodes
from src.agents.plan_node import plan_node
from src.agents.router_node import router_node, should_route_to_agent
from src.agents.retrieval_agent import retrieval_agent
from src.agents.execution_agent import execution_agent
from src.agents.critic_agent import critic_agent
from src.agents.verification_node import verification_node, should_retry
from src.memory import memory_manager
from src.tracing import (
    get_langfuse_callback_handler,
    update_trace_metadata,
    trace_agent_execution,
    flush_langfuse,
)


class AgentState(TypedDict):
    """State schema for the agent system"""
    # Core conversation
    messages: list[BaseMessage]
    
    # Planning and routing
    plan_complete: bool
    next_agent: str
    routing_decision: str
    routing_reason: str
    current_step: str
    
    # Agent responses
    agent_response: BaseMessage
    execution_result: str
    
    # Quality control
    critic_assessment: str
    quality_pass: bool
    quality_score: int
    verification_status: str
    
    # System management
    attempt_count: int
    final_answer: str


def create_agent_graph() -> StateGraph:
    """Create the LangGraph agent system"""
    
    # Initialize the state graph
    workflow = StateGraph(AgentState)
    
    # Add nodes
    workflow.add_node("plan", plan_node)
    workflow.add_node("router", router_node)
    workflow.add_node("retrieval", retrieval_agent)
    workflow.add_node("execution", execution_agent)
    workflow.add_node("critic", critic_agent)
    workflow.add_node("verification", verification_node)
    
    # Add fallback node
    def fallback_node(state: Dict[str, Any]) -> Dict[str, Any]:
        """Simple fallback that returns a basic response"""
        print("Fallback Node: Providing basic response")
        
        messages = state.get("messages", [])
        user_query = None
        
        for msg in reversed(messages):
            if msg.type == "human":
                user_query = msg.content
                break
        
        fallback_answer = "I apologize, but I was unable to provide a satisfactory answer to your question."
        if user_query:
            fallback_answer += f" Your question was: {user_query}"
        
        return {
            **state,
            "final_answer": fallback_answer,
            "verification_status": "fallback",
            "current_step": "complete"
        }
    
    workflow.add_node("fallback", fallback_node)
    
    # Set entry point
    workflow.set_entry_point("plan")
    
    # Add edges
    workflow.add_edge("plan", "router")
    
    # Conditional routing from router to agents
    workflow.add_conditional_edges(
        "router",
        should_route_to_agent,
        {
            "retrieval": "retrieval",
            "execution": "execution", 
            "critic": "critic"
        }
    )
    
    # Route agent outputs through critic for quality evaluation before final verification
    workflow.add_edge("retrieval", "critic")
    workflow.add_edge("execution", "critic")
    # Critic (whether reached directly via routing or via other agents) proceeds to verification
    workflow.add_edge("critic", "verification")
    
    # Verification conditional logic
    def verification_next(state: Dict[str, Any]) -> Literal["router", "fallback", END]:
        """Determine next step after verification"""
        verification_status = state.get("verification_status", "")
        current_step = state.get("current_step", "")
        
        if current_step == "complete":
            return END
        elif verification_status == "failed" and state.get("attempt_count", 1) < 3:
            return "router"  # Retry
        elif verification_status == "failed_max_attempts":
            return "fallback"
        else:
            return END
    
    workflow.add_conditional_edges(
        "verification",
        verification_next,
        {
            "router": "router",
            "fallback": "fallback",
            END: END
        }
    )
    
    # Fallback ends the process
    workflow.add_edge("fallback", END)
    
    return workflow


def run_agent_system(query: str, user_id: str = None, session_id: str = None) -> str:
    """
    Run the complete agent system with a user query
    
    Args:
        query: The user question
        user_id: Optional user identifier for tracing
        session_id: Optional session identifier for tracing
        
    Returns:
        The final formatted answer
    """
    print(f"Agent System: Processing query: {query[:100]}...")

    # Open a **root** Langfuse span so that everything inside is neatly grouped
    with trace_agent_execution(name="user-request", user_id=user_id, session_id=session_id):
        try:
            # Enrich the root span with metadata & tags
            update_trace_metadata(
                user_id=user_id,
                session_id=session_id,
                tags=["agent_system"],
            )

            # Create the graph
            workflow = create_agent_graph()

            # Compile with checkpointing
            checkpointer = memory_manager.get_checkpointer()
            if checkpointer:
                app = workflow.compile(checkpointer=checkpointer)
            else:
                app = workflow.compile()

            # Prepare initial state
            initial_state = {
                "messages": [HumanMessage(content=query)],
                "plan_complete": False,
                "next_agent": "",
                "routing_decision": "",
                "routing_reason": "",
                "current_step": "planning",
                "agent_response": None,
                "execution_result": "",
                "critic_assessment": "",
                "quality_pass": True,
                "quality_score": 7,
                "verification_status": "",
                "attempt_count": 1,
                "final_answer": "",
            }

            # Configure execution – reuse *one* callback handler
            callback_handler = get_langfuse_callback_handler()
            config = {
                "configurable": {"thread_id": session_id or "default"},
            }
            if callback_handler:
                config["callbacks"] = [callback_handler]

            # Run the graph
            print("Agent System: Executing workflow...")
            final_state = app.invoke(initial_state, config=config)

            # Extract final answer
            final_answer = final_state.get("final_answer", "No answer generated")

            # Store in memory if appropriate
            if memory_manager.should_ingest(query):
                memory_manager.ingest_qa_pair(query, final_answer)

            print(f"Agent System: Completed. Final answer: {final_answer[:100]}...")
            return final_answer
        except Exception as e:
            print(f"Agent System Error: {e}")
            return (
                f"I apologize, but I encountered an error while processing your question: {e}"
            )
        finally:
            # Ensure Langfuse spans are exported even in short-lived environments
            try:
                flush_langfuse()
            except Exception:
                pass


# Export the main function
__all__ = ["run_agent_system", "create_agent_graph", "AgentState"]