Spaces:
Sleeping
Sleeping
File size: 4,498 Bytes
f974658 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
from typing import Dict, Any, List, Literal, TypedDict, Annotated, Union
from langchain.schema import Document
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, END
from agents.router_agent import RouterAgent
from agents.weather_agent import WeatherAgent
from agents.rag_agent import RAGAgent
from utils.evaluation import LangSmithEvaluator
class WorkflowState(BaseModel):
"""State for the workflow graph"""
query: str = Field(description="The user's original query")
action: str = Field(description="The action to take: 'weather' or 'document'", default="")
context: List[Dict[str, Any]] = Field(description="Retrieved context (for document queries)", default=[])
weather_data: Dict[str, Any] = Field(description="Weather data (for weather queries)", default={})
city: str = Field(description="City for weather queries", default="")
response: str = Field(description="The final response to the user", default="")
evaluation: Dict[str, Any] = Field(description="Evaluation results", default={})
class LangGraphWorkflow:
"""LangGraph workflow for the AI pipeline"""
def __init__(self):
self.router_agent = RouterAgent()
self.weather_agent = WeatherAgent()
self.rag_agent = RAGAgent()
self.evaluator = LangSmithEvaluator()
# Build the workflow graph
self.workflow = self.build_workflow()
def route(self, state: WorkflowState) -> WorkflowState:
"""Route the query to the appropriate agent"""
action = self.router_agent.route_query(state.query)
return state.copy(update={"action": action})
def process_weather(self, state: WorkflowState) -> WorkflowState:
"""Process weather-related queries"""
weather_response = self.weather_agent.get_weather_response(state.query)
return state.copy(update={
"city": weather_response["city"],
"weather_data": weather_response["weather_data"],
"response": weather_response["response"]
})
def process_document(self, state: WorkflowState) -> WorkflowState:
"""Process document-related queries"""
rag_response = self.rag_agent.get_rag_response(state.query)
return state.copy(update={
"context": rag_response["context"],
"response": rag_response["response"]
})
def evaluate_response(self, state: WorkflowState) -> WorkflowState:
"""Evaluate the response using LangSmith"""
# For simplicity, we're only evaluating basic metrics here
evaluation = {
"query": state.query,
"response": state.response,
"action": state.action,
# Additional metrics would come from LangSmith in a real implementation
"confidence": 0.95 if state.context or state.weather_data else 0.7,
"latency": 1.2, # Example metric
}
return state.copy(update={"evaluation": evaluation})
def build_workflow(self) -> StateGraph:
"""Build the LangGraph workflow"""
workflow = StateGraph(WorkflowState)
# Register nodes with names + actual methods
workflow.add_node("router", self.route) # Use callable (method) for logic
workflow.add_node("weather", self.process_weather) # Use callable
workflow.add_node("document", self.process_document) # Use callable
workflow.add_node("evaluate", self.evaluate_response) # Use callable
# Conditional edges — based on state.action
workflow.add_conditional_edges(
"router", # Source node
lambda state: state.action, # Condition function
{
"weather": "weather", # Condition -> Target node
"document": "document"
}
)
# Sequential steps
workflow.add_edge("weather", "evaluate") # Use node names
workflow.add_edge("document", "evaluate") # Use node names
workflow.add_edge("evaluate", END) # Use node name
# Set entry point
workflow.set_entry_point("router") # Use node name
return workflow.compile()
def invoke(self, query: str) -> Dict[str, Any]:
"""Invoke the workflow with a query"""
state = WorkflowState(query=query)
result = self.workflow.invoke(state)
return result
|