Spaces:
Runtime error
Runtime error
| # agent/nodes.py | |
| """ | |
| Agent Workflow Nodes | |
| This module contains the node functions that make up the LangGraph workflow | |
| for the operations agent. Each node performs a specific step in processing | |
| user requests, from LLM routing to tool execution to response generation. | |
| Purpose: | |
| - Define workflow nodes for the LangGraph agent | |
| - Integrate LLM router for intelligent request routing | |
| - Execute tools and process results | |
| - Generate user-facing responses using configurable templates | |
| - Manage error handling and recovery | |
| Dependencies: | |
| - state.py: Agent state definitions | |
| - registry.py: Tool registry and discovery | |
| - llm_router_node.py: LLM-based routing logic | |
| - response_handler.py: Configurable response generation | |
| - asyncio: For async tool execution | |
| Used by: | |
| - graph.py: Workflow graph construction | |
| - agent.py: Agent execution pipeline | |
| """ | |
| import asyncio | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| from .state import AgentState, ToolCall, ToolResult, update_processing_status, add_tool_result, add_error | |
| from .registry import get_tool_registry, get_tool_by_name, validate_tool_parameters | |
| from .llm_router_node import llm_router_node | |
| from .llm_client import LLMClient | |
| from .response_handler import get_response_handler, ResponseContext | |
| async def llm_routing_node(state: AgentState) -> AgentState: | |
| """ | |
| Route user requests using LLM-based intelligent routing | |
| This node uses the LLM router to understand user intent and | |
| directly plan tool execution based on natural language processing. | |
| Args: | |
| state: Current agent state | |
| Returns: | |
| Updated state with routing results and planned tools | |
| """ | |
| print(f"🧠 Starting LLM-based routing...") | |
| state = update_processing_status(state, "llm_routing", "Processing request with LLM router") | |
| try: | |
| # Use the LLM router node to process the request | |
| routed_state = await llm_router_node(state) | |
| # The LLM router node has already: | |
| # - Processed the user message | |
| # - Validated geographic constraints | |
| # - Planned appropriate tools | |
| # - Updated processing status | |
| print(f"✅ LLM routing complete") | |
| print(f"📋 Processing status: {routed_state['processing_status']}") | |
| print(f"🎯 Planned tools: {len(routed_state.get('planned_tools', []))}") | |
| return routed_state | |
| except Exception as e: | |
| print(f"❌ LLM routing failed: {e}") | |
| # Fallback to error state | |
| state = add_error(state, f"LLM routing error: {str(e)}", "routing_error") | |
| state = update_processing_status(state, "error", "LLM routing failed") | |
| return state | |
| async def tool_execution_node(state: AgentState) -> AgentState: | |
| """ | |
| Execute planned tools and collect results | |
| This node executes all planned tools in the correct order | |
| and collects their results for response generation. | |
| Args: | |
| state: Current agent state with planned tools | |
| Returns: | |
| Updated state with tool execution results | |
| """ | |
| print(f"⚡ Executing planned tools...") | |
| state = update_processing_status(state, "executing_tools", "Running tool execution") | |
| planned_tools = state["planned_tools"] | |
| if not planned_tools: | |
| print(f"ℹ️ No tools to execute") | |
| return state | |
| registry = get_tool_registry() | |
| for tool_call in planned_tools: | |
| tool_name = tool_call.tool_name | |
| task = tool_call.task | |
| parameters = tool_call.parameters | |
| print(f"🔧 Executing {tool_name}" + (f" (task: {task})" if task else "") + "...") | |
| # Get tool function | |
| tool_function, tool_spec = get_tool_by_name(tool_name) | |
| if not tool_function: | |
| error_result = ToolResult( | |
| tool_name=tool_name, | |
| success=False, | |
| summary_text=f"⚠️ Tool '{tool_name}' not available", | |
| metadata={"error": "tool_not_found"} | |
| ) | |
| state = add_tool_result(state, error_result) | |
| state = add_error(state, f"Tool '{tool_name}' not found", "tool_error") | |
| continue | |
| # Prepare final parameters by including task if specified | |
| final_parameters = parameters.copy() | |
| if task: | |
| final_parameters["task"] = task | |
| # Validate parameters | |
| is_valid, validation_errors = validate_tool_parameters(tool_name, final_parameters) | |
| if not is_valid: | |
| error_result = ToolResult( | |
| tool_name=tool_name, | |
| success=False, | |
| summary_text=f"⚠️ Invalid parameters for {tool_name}: {', '.join(validation_errors)}", | |
| metadata={"validation_errors": validation_errors} | |
| ) | |
| state = add_tool_result(state, error_result) | |
| state = add_error(state, f"Invalid parameters for {tool_name}", "validation_error") | |
| continue | |
| # Execute tool | |
| try: | |
| # Check if tool is async | |
| tool_info = registry.get_tool_info(tool_name) | |
| is_async = tool_info.get("async", False) | |
| if is_async: | |
| tool_output = await tool_function(**final_parameters) | |
| else: | |
| tool_output = tool_function(**final_parameters) | |
| # Convert tool output to ToolResult | |
| # Determine success based on presence of data/artifacts or explicit success field | |
| success = tool_output.get("success", True) | |
| if success is None: # If success is not explicitly set | |
| # Consider successful if there are artifacts or no error message | |
| success = bool(tool_output.get("artifacts")) or not any( | |
| err_word in tool_output.get("summary_text", "").lower() | |
| for err_word in ["errore", "error", "failed", "fallito"] | |
| ) | |
| result = ToolResult( | |
| tool_name=tool_name, | |
| success=success, | |
| summary_text=tool_output.get("summary_text", ""), | |
| artifacts=tool_output.get("artifacts", []), | |
| sources=tool_output.get("sources", []), | |
| metadata=tool_output.get("metadata", {}), | |
| warnings=tool_output.get("warnings", []) | |
| ) | |
| state = add_tool_result(state, result) | |
| print(f"✅ {tool_name} completed successfully") | |
| except Exception as e: | |
| error_result = ToolResult( | |
| tool_name=tool_name, | |
| success=False, | |
| summary_text=f"⚠️ Error executing {tool_name}: {str(e)}", | |
| metadata={ | |
| "error_type": type(e).__name__, | |
| "error_details": str(e) | |
| } | |
| ) | |
| state = add_tool_result(state, error_result) | |
| state = add_error(state, f"Tool execution error: {str(e)}", "execution_error") | |
| print(f"❌ {tool_name} failed: {e}") | |
| print(f"✅ Tool execution complete: {len(state['tool_results'])} results") | |
| return state | |
| async def llm_summarization_node(state: AgentState) -> AgentState: | |
| """ | |
| Generate intelligent insights from tool results using LLM | |
| This node takes the raw task-specific formatted data and generates | |
| higher-level insights, trends, and cross-task analysis using an LLM. | |
| This is Phase 4 of the architecture: LLM-based intelligent summarization. | |
| Args: | |
| state: Current agent state with tool results | |
| Returns: | |
| Updated state with LLM-generated insights added to metadata | |
| """ | |
| print(f"🧠 Generating LLM-based insights...") | |
| state = update_processing_status(state, "llm_summarization", "Analyzing data with LLM") | |
| tool_results = state["tool_results"] | |
| # Only process successful results that have substantive data | |
| successful_results = [r for r in tool_results if r.success and r.summary_text] | |
| if not successful_results: | |
| print(f"ℹ️ No successful results to analyze") | |
| return state | |
| try: | |
| # Initialize LLM client for summarization (using default from config) | |
| llm_client = LLMClient( | |
| temperature=0.3, # Slightly higher for more creative insights | |
| max_tokens=800, # Allow for richer analysis | |
| timeout=15 | |
| ) | |
| # Build summarization prompt | |
| prompt_parts = [ | |
| "Analizza i seguenti dati meteorologici OMIRL e genera insights intelligenti.", | |
| "Concentrati su: tendenze, valori anomali, confronti geografici, raccomandazioni operative.", | |
| "Rispondi in italiano con bullet points chiari e concisi.\n", | |
| "DATI DA ANALIZZARE:" | |
| ] | |
| # Add each tool result's data | |
| for i, result in enumerate(successful_results, 1): | |
| prompt_parts.append(f"\n{i}. {result.tool_name.upper()}:") | |
| prompt_parts.append(f" {result.summary_text}") | |
| # Add key metadata for context | |
| if result.metadata: | |
| relevant_metadata = { | |
| k: v for k, v in result.metadata.items() | |
| if k in ['sensor_type', 'filters_applied', 'total_after_filtering', 'zona_allerta_records', 'province_records'] | |
| } | |
| if relevant_metadata: | |
| prompt_parts.append(f" Dettagli: {relevant_metadata}") | |
| prompt_parts.append("\nGENERA INSIGHTS OPERATIVI:") | |
| full_prompt = "\n".join(prompt_parts) | |
| # Get LLM insights (with fallback) | |
| try: | |
| insights = await llm_client.generate_insights(full_prompt) | |
| # Track which LLM was used for insights | |
| llm_usage = llm_client.get_llm_usage_info() | |
| if "llm_usage" not in state: | |
| state["llm_usage"] = {} | |
| state["llm_usage"]["summarization"] = llm_usage | |
| print(f"🔧 LLM used for insights: {llm_usage['provider']}/{llm_usage['model']}") | |
| if insights and len(insights.strip()) > 20: # Valid response | |
| # Add insights to the first successful result's metadata | |
| if successful_results: | |
| # Find existing metadata or create new | |
| original_result = successful_results[0] | |
| enhanced_metadata = original_result.metadata.copy() | |
| enhanced_metadata["llm_insights"] = insights | |
| enhanced_metadata["insights_generated_at"] = datetime.now().isoformat() | |
| # Create new enhanced result | |
| enhanced_result = ToolResult( | |
| tool_name=original_result.tool_name, | |
| success=original_result.success, | |
| summary_text=original_result.summary_text, | |
| artifacts=original_result.artifacts, | |
| sources=original_result.sources, | |
| metadata=enhanced_metadata, | |
| warnings=original_result.warnings | |
| ) | |
| # Update the state with enhanced result using state functions | |
| # Remove the original result first | |
| new_results = [r for r in state["tool_results"] if r != original_result] | |
| # Add the enhanced result | |
| new_results.insert(0, enhanced_result) | |
| # Update state | |
| updated_state = dict(state) | |
| updated_state["tool_results"] = new_results | |
| state = updated_state | |
| print(f"✅ LLM insights generated ({len(insights)} chars)") | |
| else: | |
| print(f"⚠️ LLM generated empty insights, skipping") | |
| except Exception as llm_error: | |
| print(f"⚠️ LLM summarization failed: {llm_error}") | |
| # Don't fail the whole workflow - just skip insights | |
| except Exception as e: | |
| print(f"⚠️ Summarization node error: {e}") | |
| # Don't break the workflow - summarization is optional enhancement | |
| print(f"✅ LLM summarization complete") | |
| return state | |
| async def response_generation_node(state: AgentState) -> AgentState: | |
| """ | |
| Generate final response based on tool results and LLM router status | |
| This node creates the final user-facing response using configurable | |
| templates to eliminate hard-coded response strings. | |
| Args: | |
| state: Current agent state with tool results | |
| Returns: | |
| Updated state with generated response | |
| """ | |
| print(f"📝 Generating user response...") | |
| # Capture processing status BEFORE updating it | |
| original_processing_status = state.get("processing_status", "unknown") | |
| state = update_processing_status(state, "generating_response", "Creating user response") | |
| # Get response handler | |
| response_handler = get_response_handler() | |
| # Create response context | |
| context = ResponseContext( | |
| tool_results=state["tool_results"], | |
| processing_status=original_processing_status, # Use original status | |
| errors=state["errors"], | |
| clarification_request=state.get("clarification_request"), | |
| rejection_reason=state.get("rejection_reason") | |
| ) | |
| # Generate appropriate response based on ORIGINAL processing status | |
| processing_status = original_processing_status | |
| # Check for successful tool execution first | |
| has_successful_results = bool(state["tool_results"]) and any(result.success for result in state["tool_results"]) | |
| if has_successful_results: | |
| # Successful tool execution | |
| final_response = response_handler.generate_success_response(context) | |
| elif processing_status in ["needs_clarification", "help_requested"]: | |
| # Help or clarification requests | |
| final_response = response_handler.generate_help_response(context) | |
| elif processing_status == "rejected": | |
| # Request rejected with explanation | |
| final_response = response_handler.generate_rejection_response(context) | |
| elif state["errors"] or (state["tool_results"] and any(not result.success for result in state["tool_results"])): | |
| # Errors occurred | |
| final_response = response_handler.generate_error_response(context) | |
| else: | |
| # Fallback for unrecognized requests - use template if available | |
| unrecognized_config = response_handler.templates.get('error_responses', {}).get('unrecognized_request', {}) | |
| if isinstance(unrecognized_config, dict) and 'template' in unrecognized_config: | |
| template = unrecognized_config['template'] | |
| fallback_message = unrecognized_config.get('fallback_message', 'Non ho capito cosa vuoi che faccia.') | |
| suggestions_intro = unrecognized_config.get('suggestions_intro', 'Prova con:') | |
| help_hint = unrecognized_config.get('help_hint', "'aiuto' per vedere tutte le funzioni") | |
| final_response = template + fallback_message + "\n" + suggestions_intro + "\n" | |
| final_response += "• 'Mostra precipitazioni a Genova'\n" | |
| final_response += "• 'Temperatura in provincia di Savona'\n" | |
| final_response += "• " + help_hint | |
| else: | |
| # Fallback when templates not available | |
| final_response = "❓ **Richiesta Non Riconosciuta**\n" | |
| final_response += "Non ho capito cosa vuoi che faccia.\n" | |
| final_response += "Prova con:\n" | |
| final_response += "• 'Mostra precipitazioni a Genova'\n" | |
| final_response += "• 'Temperatura in provincia di Savona'\n" | |
| final_response += "• 'aiuto' per vedere tutte le funzioni" | |
| # Set final response | |
| state["agent_response"] = final_response | |
| state = update_processing_status(state, "completed", "Response generated successfully") | |
| print(f"✅ Response generated ({len(final_response)} chars)") | |
| return state | |
| async def error_handling_node(state: AgentState) -> AgentState: | |
| """ | |
| Handle errors and generate appropriate error responses | |
| This node is called when errors occur in the workflow | |
| and generates user-friendly error messages using configurable templates. | |
| Args: | |
| state: Current agent state with errors | |
| Returns: | |
| Updated state with error response | |
| """ | |
| print(f"🚨 Handling errors...") | |
| state = update_processing_status(state, "error_handling", "Processing errors") | |
| errors = state["errors"] | |
| if not errors: | |
| # No errors to handle | |
| return state | |
| # Use response handler for error response generation | |
| response_handler = get_response_handler() | |
| context = ResponseContext( | |
| tool_results=state["tool_results"], | |
| processing_status="error", | |
| errors=errors | |
| ) | |
| error_response = response_handler.generate_error_response(context) | |
| state["agent_response"] = error_response | |
| state = update_processing_status(state, "completed_with_errors", "Error response generated") | |
| print(f"✅ Error response generated") | |
| return state | |
| # Node utility functions | |
| def should_execute_tools(state: AgentState) -> bool: | |
| """Check if there are tools to execute""" | |
| return len(state.get("planned_tools", [])) > 0 | |
| def has_errors(state: AgentState) -> bool: | |
| """Check if there are errors in the state""" | |
| return (len(state.get("errors", [])) > 0 or | |
| any(not result.success for result in state.get("tool_results", [])) or | |
| state.get("processing_status") == "error") | |
| def is_help_request(state: AgentState) -> bool: | |
| """Check if this is a help request""" | |
| return (state.get("processing_status") in ["needs_clarification", "help_requested"] or | |
| "aiuto" in state.get("user_message", "").lower() or | |
| "help" in state.get("user_message", "").lower()) | |
| def needs_tool_execution(state: AgentState) -> bool: | |
| """Check if tool execution is needed based on LLM router status""" | |
| status = state.get("processing_status", "") | |
| return (status == "approved" and | |
| len(state.get("planned_tools", [])) > 0) | |