File size: 6,238 Bytes
f844f16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""
LangGraph Multi-Agent System Implementation

This module implements a multi-agent system using LangGraph with the following components:
- LeadAgent: Orchestrates the workflow and makes decisions
- ResearchAgent: Handles information gathering and research tasks  
- CodeAgent: Handles computational and code execution tasks
- AnswerFormatter: Formats final answers according to GAIA requirements
- Memory: Persistent storage for context and learning
"""

import os
from typing import Dict, Any, TypedDict, Literal, Annotated, List
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
import operator
from dotenv import load_dotenv

# Import our observability module
from observability import (
    start_root_span, 
    get_callback_handler, 
    flush_traces, 
    shutdown_observability
)

# Load environment variables
load_dotenv("env.local")

class AgentState(TypedDict):
    """
    State schema for the multi-agent system following LangGraph best practices.
    Treats every agent node as a pure function AgentState → Command.
    """
    # Core conversation messages
    messages: Annotated[List[BaseMessage], operator.add]
    
    # Working draft and evidence
    draft_answer: str
    research_notes: Annotated[str, operator.add]  # Use add for accumulation
    code_outputs: Annotated[str, operator.add]   # Use add for accumulation
    
    # Loop control
    loop_counter: int
    max_iterations: int
    
    # Routing decisions
    next: Literal["research", "code", "formatter", "__end__"]
    
    # Final formatted answer
    final_answer: str
    
    # Metadata for tracing
    user_id: str
    session_id: str


# Removed setup_tracing function - now handled by observability module


def create_agent_graph():
    """
    Create the LangGraph workflow following the specified architecture:
    lead -> research -> code -> lead (loop) -> formatter -> END
    """
    from agents.lead_agent import lead_agent
    from agents.research_agent import research_agent  
    from agents.code_agent import code_agent
    from agents.answer_formatter import answer_formatter
    
    # Create the state graph
    workflow = StateGraph(AgentState)
    
    # Add nodes
    workflow.add_node("lead", lead_agent)
    workflow.add_node("research", research_agent)
    workflow.add_node("code", code_agent)
    workflow.add_node("formatter", answer_formatter)
    
    # Add edges
    workflow.add_edge(START, "lead")
    
    # Conditional edges from lead agent based on routing decisions
    def route_from_lead(state: AgentState) -> str:
        """Route from lead agent based on the 'next' field"""
        # Check for termination conditions
        if (state.get("loop_counter", 0) >= state.get("max_iterations", 3) or 
            state.get("final_answer")):
            return "__end__"
        return state.get("next", "research")
    
    workflow.add_conditional_edges(
        "lead",
        route_from_lead,
        {
            "research": "research",
            "code": "code", 
            "formatter": "formatter",
            "__end__": END
        }
    )
    
    # Both research and code agents return to lead agent for next decision
    workflow.add_edge("research", "lead")
    workflow.add_edge("code", "lead")
    workflow.add_edge("formatter", END)
    
    return workflow


async def run_agent_system(
    query: str, 
    user_id: str = "default_user",
    session_id: str = "default_session",
    max_iterations: int = 3
) -> str:
    """
    Main entry point for the agent system.
    
    Args:
        query: User question to answer
        user_id: User identifier for tracing
        session_id: Session identifier for tracing  
        max_iterations: Maximum number of research/code loops
        
    Returns:
        Final formatted answer
    """
    try:
        # Get the global callback handler
        callback_handler = get_callback_handler()
        
        # Create root span for the entire request
        with start_root_span(
            name="user-request",
            user_id=user_id,
            session_id=session_id,
            metadata={"query": query, "max_iterations": max_iterations}
        ) as root_span:
            
            # Create the workflow
            workflow = create_agent_graph()
            app = workflow.compile()
            
            # Initial state
            initial_state: AgentState = {
                "messages": [HumanMessage(content=query)],
                "draft_answer": "",
                "research_notes": "",
                "code_outputs": "",
                "loop_counter": 0,
                "max_iterations": max_iterations,
                "next": "research",  # Start with research
                "final_answer": "",
                "user_id": user_id,
                "session_id": session_id
            }
            
            # Run the workflow with callback handler if available
            if callback_handler:
                final_state = await app.ainvoke(
                    initial_state,
                    config={"callbacks": [callback_handler]}
                )
            else:
                print("Warning: Running without Langfuse tracing")
                final_state = await app.ainvoke(initial_state)
            
            # Update trace with output if span exists
            if root_span:
                root_span.update_trace(output={"final_answer": final_state["final_answer"]})
            
            return final_state["final_answer"]
            
    except Exception as e:
        print(f"Error in agent system: {e}")
        return f"I apologize, but I encountered an error while processing your query: {str(e)}"
    
    finally:
        # Flush traces in background
        flush_traces(background=True)


if __name__ == "__main__":
    import asyncio
    
    # Test the system
    async def test():
        result = await run_agent_system(
            "What is the capital of Maharashtra?",
            user_id="test_user",
            session_id="test_session"
        )
        print(f"Final Answer: {result}")
    
    asyncio.run(test())