File size: 4,498 Bytes
f974658
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from typing import Dict, Any, List, Literal, TypedDict, Annotated, Union
from langchain.schema import Document
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, END
from agents.router_agent import RouterAgent
from agents.weather_agent import WeatherAgent
from agents.rag_agent import RAGAgent
from utils.evaluation import LangSmithEvaluator


class WorkflowState(BaseModel):
    """State for the workflow graph"""
    query: str = Field(description="The user's original query")
    action: str = Field(description="The action to take: 'weather' or 'document'", default="")
    context: List[Dict[str, Any]] = Field(description="Retrieved context (for document queries)", default=[])
    weather_data: Dict[str, Any] = Field(description="Weather data (for weather queries)", default={})
    city: str = Field(description="City for weather queries", default="")
    response: str = Field(description="The final response to the user", default="")
    evaluation: Dict[str, Any] = Field(description="Evaluation results", default={})

class LangGraphWorkflow:
    """LangGraph workflow for the AI pipeline"""
    
    def __init__(self):
        self.router_agent = RouterAgent()
        self.weather_agent = WeatherAgent()
        self.rag_agent = RAGAgent()
        self.evaluator = LangSmithEvaluator()
        
        # Build the workflow graph
        self.workflow = self.build_workflow()
    
    def route(self, state: WorkflowState) -> WorkflowState:
        """Route the query to the appropriate agent"""
        action = self.router_agent.route_query(state.query)
        return state.copy(update={"action": action})
    
    def process_weather(self, state: WorkflowState) -> WorkflowState:
        """Process weather-related queries"""
        weather_response = self.weather_agent.get_weather_response(state.query)
        return state.copy(update={
            "city": weather_response["city"],
            "weather_data": weather_response["weather_data"],
            "response": weather_response["response"]
        })
    
    def process_document(self, state: WorkflowState) -> WorkflowState:
        """Process document-related queries"""
        rag_response = self.rag_agent.get_rag_response(state.query)
        return state.copy(update={
            "context": rag_response["context"],
            "response": rag_response["response"]
        })
    
    def evaluate_response(self, state: WorkflowState) -> WorkflowState:
        """Evaluate the response using LangSmith"""
        # For simplicity, we're only evaluating basic metrics here
        evaluation = {
            "query": state.query,
            "response": state.response,
            "action": state.action,
            # Additional metrics would come from LangSmith in a real implementation
            "confidence": 0.95 if state.context or state.weather_data else 0.7,
            "latency": 1.2,  # Example metric
        }
        
        return state.copy(update={"evaluation": evaluation})
    
    def build_workflow(self) -> StateGraph:
        """Build the LangGraph workflow"""
        workflow = StateGraph(WorkflowState)
        
        # Register nodes with names + actual methods
        workflow.add_node("router", self.route)  # Use callable (method) for logic
        workflow.add_node("weather", self.process_weather)  # Use callable
        workflow.add_node("document", self.process_document)  # Use callable
        workflow.add_node("evaluate", self.evaluate_response)  # Use callable

        # Conditional edges — based on state.action
        workflow.add_conditional_edges(
        "router",  # Source node
        lambda state: state.action,  # Condition function
        {
            "weather": "weather",  # Condition -> Target node
            "document": "document"
        }
        )
        # Sequential steps
        workflow.add_edge("weather", "evaluate")  # Use node names
        workflow.add_edge("document", "evaluate")  # Use node names
        workflow.add_edge("evaluate", END)  # Use node name

        # Set entry point
        workflow.set_entry_point("router")  # Use node name

        return workflow.compile()
    
    def invoke(self, query: str) -> Dict[str, Any]:
        """Invoke the workflow with a query"""
        state = WorkflowState(query=query)
        result = self.workflow.invoke(state)
        return result