operations / agent /nodes.py
jbbove's picture
Configured Ollaman and improved llm config functionality: now the default is ollama, the fallback is gemini 2.5 flash lite, and these can be changed modifying just on line in the yaml config file
c46d4a9
# agent/nodes.py
"""
Agent Workflow Nodes
This module contains the node functions that make up the LangGraph workflow
for the operations agent. Each node performs a specific step in processing
user requests, from LLM routing to tool execution to response generation.
Purpose:
- Define workflow nodes for the LangGraph agent
- Integrate LLM router for intelligent request routing
- Execute tools and process results
- Generate user-facing responses using configurable templates
- Manage error handling and recovery
Dependencies:
- state.py: Agent state definitions
- registry.py: Tool registry and discovery
- llm_router_node.py: LLM-based routing logic
- response_handler.py: Configurable response generation
- asyncio: For async tool execution
Used by:
- graph.py: Workflow graph construction
- agent.py: Agent execution pipeline
"""
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
from .state import AgentState, ToolCall, ToolResult, update_processing_status, add_tool_result, add_error
from .registry import get_tool_registry, get_tool_by_name, validate_tool_parameters
from .llm_router_node import llm_router_node
from .llm_client import LLMClient
from .response_handler import get_response_handler, ResponseContext
async def llm_routing_node(state: AgentState) -> AgentState:
"""
Route user requests using LLM-based intelligent routing
This node uses the LLM router to understand user intent and
directly plan tool execution based on natural language processing.
Args:
state: Current agent state
Returns:
Updated state with routing results and planned tools
"""
print(f"🧠 Starting LLM-based routing...")
state = update_processing_status(state, "llm_routing", "Processing request with LLM router")
try:
# Use the LLM router node to process the request
routed_state = await llm_router_node(state)
# The LLM router node has already:
# - Processed the user message
# - Validated geographic constraints
# - Planned appropriate tools
# - Updated processing status
print(f"✅ LLM routing complete")
print(f"📋 Processing status: {routed_state['processing_status']}")
print(f"🎯 Planned tools: {len(routed_state.get('planned_tools', []))}")
return routed_state
except Exception as e:
print(f"❌ LLM routing failed: {e}")
# Fallback to error state
state = add_error(state, f"LLM routing error: {str(e)}", "routing_error")
state = update_processing_status(state, "error", "LLM routing failed")
return state
async def tool_execution_node(state: AgentState) -> AgentState:
"""
Execute planned tools and collect results
This node executes all planned tools in the correct order
and collects their results for response generation.
Args:
state: Current agent state with planned tools
Returns:
Updated state with tool execution results
"""
print(f"⚡ Executing planned tools...")
state = update_processing_status(state, "executing_tools", "Running tool execution")
planned_tools = state["planned_tools"]
if not planned_tools:
print(f"ℹ️ No tools to execute")
return state
registry = get_tool_registry()
for tool_call in planned_tools:
tool_name = tool_call.tool_name
task = tool_call.task
parameters = tool_call.parameters
print(f"🔧 Executing {tool_name}" + (f" (task: {task})" if task else "") + "...")
# Get tool function
tool_function, tool_spec = get_tool_by_name(tool_name)
if not tool_function:
error_result = ToolResult(
tool_name=tool_name,
success=False,
summary_text=f"⚠️ Tool '{tool_name}' not available",
metadata={"error": "tool_not_found"}
)
state = add_tool_result(state, error_result)
state = add_error(state, f"Tool '{tool_name}' not found", "tool_error")
continue
# Prepare final parameters by including task if specified
final_parameters = parameters.copy()
if task:
final_parameters["task"] = task
# Validate parameters
is_valid, validation_errors = validate_tool_parameters(tool_name, final_parameters)
if not is_valid:
error_result = ToolResult(
tool_name=tool_name,
success=False,
summary_text=f"⚠️ Invalid parameters for {tool_name}: {', '.join(validation_errors)}",
metadata={"validation_errors": validation_errors}
)
state = add_tool_result(state, error_result)
state = add_error(state, f"Invalid parameters for {tool_name}", "validation_error")
continue
# Execute tool
try:
# Check if tool is async
tool_info = registry.get_tool_info(tool_name)
is_async = tool_info.get("async", False)
if is_async:
tool_output = await tool_function(**final_parameters)
else:
tool_output = tool_function(**final_parameters)
# Convert tool output to ToolResult
# Determine success based on presence of data/artifacts or explicit success field
success = tool_output.get("success", True)
if success is None: # If success is not explicitly set
# Consider successful if there are artifacts or no error message
success = bool(tool_output.get("artifacts")) or not any(
err_word in tool_output.get("summary_text", "").lower()
for err_word in ["errore", "error", "failed", "fallito"]
)
result = ToolResult(
tool_name=tool_name,
success=success,
summary_text=tool_output.get("summary_text", ""),
artifacts=tool_output.get("artifacts", []),
sources=tool_output.get("sources", []),
metadata=tool_output.get("metadata", {}),
warnings=tool_output.get("warnings", [])
)
state = add_tool_result(state, result)
print(f"✅ {tool_name} completed successfully")
except Exception as e:
error_result = ToolResult(
tool_name=tool_name,
success=False,
summary_text=f"⚠️ Error executing {tool_name}: {str(e)}",
metadata={
"error_type": type(e).__name__,
"error_details": str(e)
}
)
state = add_tool_result(state, error_result)
state = add_error(state, f"Tool execution error: {str(e)}", "execution_error")
print(f"❌ {tool_name} failed: {e}")
print(f"✅ Tool execution complete: {len(state['tool_results'])} results")
return state
async def llm_summarization_node(state: AgentState) -> AgentState:
"""
Generate intelligent insights from tool results using LLM
This node takes the raw task-specific formatted data and generates
higher-level insights, trends, and cross-task analysis using an LLM.
This is Phase 4 of the architecture: LLM-based intelligent summarization.
Args:
state: Current agent state with tool results
Returns:
Updated state with LLM-generated insights added to metadata
"""
print(f"🧠 Generating LLM-based insights...")
state = update_processing_status(state, "llm_summarization", "Analyzing data with LLM")
tool_results = state["tool_results"]
# Only process successful results that have substantive data
successful_results = [r for r in tool_results if r.success and r.summary_text]
if not successful_results:
print(f"ℹ️ No successful results to analyze")
return state
try:
# Initialize LLM client for summarization (using default from config)
llm_client = LLMClient(
temperature=0.3, # Slightly higher for more creative insights
max_tokens=800, # Allow for richer analysis
timeout=15
)
# Build summarization prompt
prompt_parts = [
"Analizza i seguenti dati meteorologici OMIRL e genera insights intelligenti.",
"Concentrati su: tendenze, valori anomali, confronti geografici, raccomandazioni operative.",
"Rispondi in italiano con bullet points chiari e concisi.\n",
"DATI DA ANALIZZARE:"
]
# Add each tool result's data
for i, result in enumerate(successful_results, 1):
prompt_parts.append(f"\n{i}. {result.tool_name.upper()}:")
prompt_parts.append(f" {result.summary_text}")
# Add key metadata for context
if result.metadata:
relevant_metadata = {
k: v for k, v in result.metadata.items()
if k in ['sensor_type', 'filters_applied', 'total_after_filtering', 'zona_allerta_records', 'province_records']
}
if relevant_metadata:
prompt_parts.append(f" Dettagli: {relevant_metadata}")
prompt_parts.append("\nGENERA INSIGHTS OPERATIVI:")
full_prompt = "\n".join(prompt_parts)
# Get LLM insights (with fallback)
try:
insights = await llm_client.generate_insights(full_prompt)
# Track which LLM was used for insights
llm_usage = llm_client.get_llm_usage_info()
if "llm_usage" not in state:
state["llm_usage"] = {}
state["llm_usage"]["summarization"] = llm_usage
print(f"🔧 LLM used for insights: {llm_usage['provider']}/{llm_usage['model']}")
if insights and len(insights.strip()) > 20: # Valid response
# Add insights to the first successful result's metadata
if successful_results:
# Find existing metadata or create new
original_result = successful_results[0]
enhanced_metadata = original_result.metadata.copy()
enhanced_metadata["llm_insights"] = insights
enhanced_metadata["insights_generated_at"] = datetime.now().isoformat()
# Create new enhanced result
enhanced_result = ToolResult(
tool_name=original_result.tool_name,
success=original_result.success,
summary_text=original_result.summary_text,
artifacts=original_result.artifacts,
sources=original_result.sources,
metadata=enhanced_metadata,
warnings=original_result.warnings
)
# Update the state with enhanced result using state functions
# Remove the original result first
new_results = [r for r in state["tool_results"] if r != original_result]
# Add the enhanced result
new_results.insert(0, enhanced_result)
# Update state
updated_state = dict(state)
updated_state["tool_results"] = new_results
state = updated_state
print(f"✅ LLM insights generated ({len(insights)} chars)")
else:
print(f"⚠️ LLM generated empty insights, skipping")
except Exception as llm_error:
print(f"⚠️ LLM summarization failed: {llm_error}")
# Don't fail the whole workflow - just skip insights
except Exception as e:
print(f"⚠️ Summarization node error: {e}")
# Don't break the workflow - summarization is optional enhancement
print(f"✅ LLM summarization complete")
return state
async def response_generation_node(state: AgentState) -> AgentState:
"""
Generate final response based on tool results and LLM router status
This node creates the final user-facing response using configurable
templates to eliminate hard-coded response strings.
Args:
state: Current agent state with tool results
Returns:
Updated state with generated response
"""
print(f"📝 Generating user response...")
# Capture processing status BEFORE updating it
original_processing_status = state.get("processing_status", "unknown")
state = update_processing_status(state, "generating_response", "Creating user response")
# Get response handler
response_handler = get_response_handler()
# Create response context
context = ResponseContext(
tool_results=state["tool_results"],
processing_status=original_processing_status, # Use original status
errors=state["errors"],
clarification_request=state.get("clarification_request"),
rejection_reason=state.get("rejection_reason")
)
# Generate appropriate response based on ORIGINAL processing status
processing_status = original_processing_status
# Check for successful tool execution first
has_successful_results = bool(state["tool_results"]) and any(result.success for result in state["tool_results"])
if has_successful_results:
# Successful tool execution
final_response = response_handler.generate_success_response(context)
elif processing_status in ["needs_clarification", "help_requested"]:
# Help or clarification requests
final_response = response_handler.generate_help_response(context)
elif processing_status == "rejected":
# Request rejected with explanation
final_response = response_handler.generate_rejection_response(context)
elif state["errors"] or (state["tool_results"] and any(not result.success for result in state["tool_results"])):
# Errors occurred
final_response = response_handler.generate_error_response(context)
else:
# Fallback for unrecognized requests - use template if available
unrecognized_config = response_handler.templates.get('error_responses', {}).get('unrecognized_request', {})
if isinstance(unrecognized_config, dict) and 'template' in unrecognized_config:
template = unrecognized_config['template']
fallback_message = unrecognized_config.get('fallback_message', 'Non ho capito cosa vuoi che faccia.')
suggestions_intro = unrecognized_config.get('suggestions_intro', 'Prova con:')
help_hint = unrecognized_config.get('help_hint', "'aiuto' per vedere tutte le funzioni")
final_response = template + fallback_message + "\n" + suggestions_intro + "\n"
final_response += "• 'Mostra precipitazioni a Genova'\n"
final_response += "• 'Temperatura in provincia di Savona'\n"
final_response += "• " + help_hint
else:
# Fallback when templates not available
final_response = "❓ **Richiesta Non Riconosciuta**\n"
final_response += "Non ho capito cosa vuoi che faccia.\n"
final_response += "Prova con:\n"
final_response += "• 'Mostra precipitazioni a Genova'\n"
final_response += "• 'Temperatura in provincia di Savona'\n"
final_response += "• 'aiuto' per vedere tutte le funzioni"
# Set final response
state["agent_response"] = final_response
state = update_processing_status(state, "completed", "Response generated successfully")
print(f"✅ Response generated ({len(final_response)} chars)")
return state
async def error_handling_node(state: AgentState) -> AgentState:
"""
Handle errors and generate appropriate error responses
This node is called when errors occur in the workflow
and generates user-friendly error messages using configurable templates.
Args:
state: Current agent state with errors
Returns:
Updated state with error response
"""
print(f"🚨 Handling errors...")
state = update_processing_status(state, "error_handling", "Processing errors")
errors = state["errors"]
if not errors:
# No errors to handle
return state
# Use response handler for error response generation
response_handler = get_response_handler()
context = ResponseContext(
tool_results=state["tool_results"],
processing_status="error",
errors=errors
)
error_response = response_handler.generate_error_response(context)
state["agent_response"] = error_response
state = update_processing_status(state, "completed_with_errors", "Error response generated")
print(f"✅ Error response generated")
return state
# Node utility functions
def should_execute_tools(state: AgentState) -> bool:
"""Check if there are tools to execute"""
return len(state.get("planned_tools", [])) > 0
def has_errors(state: AgentState) -> bool:
"""Check if there are errors in the state"""
return (len(state.get("errors", [])) > 0 or
any(not result.success for result in state.get("tool_results", [])) or
state.get("processing_status") == "error")
def is_help_request(state: AgentState) -> bool:
"""Check if this is a help request"""
return (state.get("processing_status") in ["needs_clarification", "help_requested"] or
"aiuto" in state.get("user_message", "").lower() or
"help" in state.get("user_message", "").lower())
def needs_tool_execution(state: AgentState) -> bool:
"""Check if tool execution is needed based on LLM router status"""
status = state.get("processing_status", "")
return (status == "approved" and
len(state.get("planned_tools", [])) > 0)