Spaces:
Runtime error
Runtime error
| # agent/state.py | |
| """ | |
| Agent State Management | |
| This module defines the state structure for the LangGraph operations agent. | |
| The state represents all data that flows between nodes in the agent workflow, | |
| including user inputs, tool results, and response generation. | |
| Purpose: | |
| - Define typed state structure for LangGraph workflows | |
| - Ensure consistent data flow between agent nodes | |
| - Support tool execution results and metadata | |
| - Handle conversation context and error states | |
| Dependencies: | |
| - typing: For type annotations and contracts | |
| - dataclasses: For structured data representation | |
| Used by: | |
| - graph.py: Workflow definition and state transitions | |
| - nodes.py: Node functions for state processing | |
| - agent.py: Main agent interface | |
| """ | |
| from typing import TypedDict, List, Dict, Any, Optional, Union | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| class ToolResult: | |
| """ | |
| Standardized result structure for tool execution | |
| This class encapsulates the output from any tool execution, | |
| providing a consistent interface for the agent to handle | |
| results from different tools (OMIRL, alerts, etc.). | |
| Attributes: | |
| tool_name: Name of the executed tool | |
| success: Whether the tool execution was successful | |
| summary_text: Human-readable summary in Italian | |
| artifacts: List of file paths to generated artifacts | |
| sources: List of data source URLs | |
| metadata: Additional metadata and execution details | |
| warnings: List of non-fatal warnings | |
| execution_time: When the tool was executed | |
| """ | |
| tool_name: str | |
| success: bool | |
| summary_text: str | |
| artifacts: List[str] = field(default_factory=list) | |
| sources: List[str] = field(default_factory=list) | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| warnings: List[str] = field(default_factory=list) | |
| execution_time: str = field(default_factory=lambda: datetime.now().isoformat()) | |
| class ToolCall: | |
| """ | |
| Represents a planned tool execution | |
| This class stores the information needed to execute a tool, | |
| including the tool name, specific task, and parameters to pass. | |
| This provides clear separation between tool selection and task selection. | |
| Attributes: | |
| tool_name: Name of the tool to execute (e.g., "omirl_tool") | |
| task: Specific task within the tool that have a panel of tasks (e.g., OMIRL: "valori_stazioni", "massimi_precipitazione" or RAG: "retrieve procedures", "retrieve plan") | |
| parameters: Dictionary of parameters to pass to the tool | |
| reason: Optional explanation for why this tool is being called | |
| """ | |
| tool_name: str | |
| task: Optional[str] = None | |
| parameters: Dict[str, Any] = field(default_factory=dict) | |
| reason: Optional[str] = None | |
| class AgentState(TypedDict): | |
| """ | |
| Main state structure for the LangGraph operations agent | |
| This TypedDict defines all the data that flows through the agent | |
| workflow. Each node can read from and write to these fields. | |
| Fields: | |
| user_message: Original user input message | |
| routing_result: LLM router output with tool decisions and validation | |
| planned_tools: List of tools to execute based on routing | |
| tool_results: Results from executed tools | |
| agent_response: Final formatted response to user | |
| conversation_history: Previous messages in the conversation | |
| current_operation: Currently executing operation details | |
| errors: List of error messages encountered | |
| omirl_data: Cached OMIRL data for reuse within conversation | |
| processing_status: Current status of request processing | |
| metadata: Additional metadata about the request processing | |
| """ | |
| # User input and routing | |
| user_message: str | |
| routing_result: Dict[str, Any] # LLM router output | |
| # Tool planning and execution | |
| planned_tools: List[ToolCall] | |
| tool_results: List[ToolResult] | |
| # Response generation | |
| agent_response: str | |
| # Conversation management | |
| conversation_history: List[Dict[str, str]] | |
| # Operation tracking | |
| current_operation: Optional[str] | |
| processing_status: str | |
| # Error handling | |
| errors: List[str] | |
| # Domain-specific data caching | |
| omirl_data: Optional[Dict[str, Any]] | |
| # Additional metadata | |
| metadata: Dict[str, Any] | |
| # LLM usage tracking (for visibility) | |
| llm_usage: Dict[str, Any] | |
| # Default state factory | |
| def create_initial_state(user_message: str) -> AgentState: | |
| """ | |
| Create initial agent state for a new request | |
| Args: | |
| user_message: User's input message | |
| Returns: | |
| AgentState with default values and user message | |
| """ | |
| return AgentState( | |
| user_message=user_message, | |
| routing_result={}, # NEW: Will be populated by LLM router | |
| planned_tools=[], | |
| tool_results=[], | |
| agent_response="", | |
| conversation_history=[], | |
| current_operation=None, | |
| processing_status="initialized", | |
| errors=[], | |
| omirl_data=None, | |
| metadata={}, | |
| llm_usage={} # Track LLM provider/model usage | |
| ) | |
| # State validation functions | |
| def validate_state(state: AgentState) -> bool: | |
| """ | |
| Validate that the agent state has required fields | |
| Args: | |
| state: AgentState to validate | |
| Returns: | |
| True if state is valid, False otherwise | |
| """ | |
| required_fields = [ | |
| "user_message", "planned_tools", | |
| "tool_results", "agent_response", "processing_status" | |
| ] | |
| return all(field in state for field in required_fields) | |
| def update_processing_status(state: AgentState, status: str, details: str = None) -> AgentState: | |
| """ | |
| Update the processing status in state | |
| Args: | |
| state: Current agent state | |
| status: New processing status | |
| details: Optional details about the status change | |
| Returns: | |
| Updated state | |
| """ | |
| state["processing_status"] = status | |
| if details: | |
| if "status_history" not in state["metadata"]: | |
| state["metadata"]["status_history"] = [] | |
| state["metadata"]["status_history"].append({ | |
| "status": status, | |
| "details": details, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| return state | |
| def add_tool_result(state: AgentState, result: ToolResult) -> AgentState: | |
| """ | |
| Add a tool execution result to state | |
| Args: | |
| state: Current agent state | |
| result: Tool execution result to add | |
| Returns: | |
| Updated state | |
| """ | |
| state["tool_results"].append(result) | |
| # Update OMIRL data cache if this was an OMIRL tool | |
| if result.tool_name == "omirl_tool" and result.success: | |
| state["omirl_data"] = { | |
| "last_extraction": result.execution_time, | |
| "summary": result.summary_text, | |
| "artifacts": result.artifacts, | |
| "metadata": result.metadata | |
| } | |
| return state | |
| def add_error(state: AgentState, error_message: str, error_type: str = "general") -> AgentState: | |
| """ | |
| Add an error message to state | |
| Args: | |
| state: Current agent state | |
| error_message: Error message to add | |
| error_type: Type/category of error | |
| Returns: | |
| Updated state | |
| """ | |
| state["errors"].append(error_message) | |
| # Add detailed error info to metadata | |
| if "error_details" not in state["metadata"]: | |
| state["metadata"]["error_details"] = [] | |
| state["metadata"]["error_details"].append({ | |
| "message": error_message, | |
| "type": error_type, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| return state | |