operations / agent /agent.py
jbbove's picture
Configured Ollaman and improved llm config functionality: now the default is ollama, the fallback is gemini 2.5 flash lite, and these can be changed modifying just on line in the yaml config file
c46d4a9
# agent/agent.py
"""
Operations Agent
This module provides the main agent interface for the operations system.
It integrates the LangGraph workflow with OMIRL data extraction capabilities
and provides a clean API for processing user requests.
Purpose:
- Provide main agent interface for the operations system
- Integrate LangGraph workflow execution
- Handle request preprocessing and response formatting
- Support conversation context and session management
- Expose OMIRL and other operational tools
Dependencies:
- graph.py: LangGraph workflow definitions
- state.py: Agent state management
- registry.py: Tool registry and discovery
Used by:
- app/main.py: Web application interface
- Direct usage: CLI tools and testing
- API endpoints: RESTful agent services
"""
import asyncio
import logging
import sys
import os
from typing import Dict, Any, List, Optional, Union
from datetime import datetime
import uuid
from .state import AgentState, create_initial_state, validate_state
from .graph import get_default_workflow, create_simple_workflow, print_workflow_summary
from .registry import get_tool_registry, print_registry_summary
# Simple constants from existing geography config (avoid complex config loading)
VALID_PROVINCES = ["GENOVA", "SAVONA", "IMPERIA", "LA SPEZIA"]
SUPPORTED_SENSORS = [
"Precipitazione", "Temperatura", "Livelli Idrometrici", "Vento",
"Umidità dell'aria", "Eliofanie", "Radiazione solare", "Bagnatura Fogliare",
"Pressione Atmosferica", "Tensione Batteria", "Stato del Mare", "Neve"
]
class ColoredFormatter(logging.Formatter):
"""Custom formatter with colors and emojis for development"""
COLORS = {
'DEBUG': '\033[36m', # Cyan
'INFO': '\033[32m', # Green
'WARNING': '\033[33m', # Yellow
'ERROR': '\033[31m', # Red
'CRITICAL': '\033[35m' # Magenta
}
RESET = '\033[0m'
def format(self, record):
color = self.COLORS.get(record.levelname, '')
reset = self.RESET
# Simple format for development
timestamp = datetime.now().strftime('%H:%M:%S')
return f"{color}[{timestamp}] {record.getMessage()}{reset}"
def setup_agent_logging(session_id: str, dev_mode: bool = True) -> logging.Logger:
"""Setup logging for agent with session-specific logger"""
logger_name = f"operations_agent.{session_id[:8]}"
logger = logging.getLogger(logger_name)
# Avoid duplicate handlers
if logger.handlers:
return logger
if dev_mode:
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(ColoredFormatter())
else:
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)
logger.propagate = False # Prevent duplicate logs
return logger
class AgentError(Exception):
"""Base exception for agent errors"""
pass
class ValidationError(AgentError):
"""Raised when input validation fails"""
pass
class WorkflowError(AgentError):
"""Raised when workflow execution fails"""
pass
class TimeoutError(AgentError):
"""Raised when operations timeout"""
pass
class OperationsAgent:
"""
Main operations agent with OMIRL integration
This class provides the primary interface for the operations agent,
handling request processing, workflow execution, and response formatting.
Attributes:
workflow: Compiled LangGraph workflow
session_id: Unique session identifier
conversation_history: History of requests and responses
default_language: Default language for responses (Italian)
"""
def __init__(self, workflow_type: str = "default", session_id: str = None, lazy_init: bool = False, dev_mode: bool = None):
"""
Initialize the operations agent
Args:
workflow_type: Type of workflow to use ("default", "simple", "debug")
session_id: Optional session ID for conversation tracking
lazy_init: If True, delay workflow creation until first request
dev_mode: If True, use development logging with colors. Auto-detect if None.
"""
self.session_id = session_id or str(uuid.uuid4())
self.conversation_history = []
self.default_language = "it"
self.workflow_type = workflow_type
self.lazy_init = lazy_init
self.workflow = None
self.registry = None
# Auto-detect dev mode if not specified
if dev_mode is None:
dev_mode = os.getenv('ENVIRONMENT', 'development').lower() in ['development', 'dev', 'local']
# Setup logging
self.logger = setup_agent_logging(self.session_id, dev_mode)
self.logger.info(f"🚀 Initializing Operations Agent (session: {self.session_id[:8]})")
if not lazy_init:
self._initialize_workflow()
def _validate_request_input(self, user_message: str, user_id: str = None, context: Dict[str, Any] = None) -> None:
"""
Validate input parameters for request processing
Args:
user_message: User's input message
user_id: Optional user identifier
context: Optional additional context
Raises:
ValidationError: If validation fails
"""
if not user_message:
raise ValidationError("Il messaggio utente è richiesto")
if not isinstance(user_message, str):
raise ValidationError("Il messaggio utente deve essere una stringa")
if not user_message.strip():
raise ValidationError("Il messaggio utente non può essere vuoto")
if len(user_message) > 10000: # Reasonable limit
raise ValidationError("Messaggio utente troppo lungo (massimo 10.000 caratteri)")
if user_id is not None and not isinstance(user_id, str):
raise ValidationError("L'ID utente deve essere una stringa")
if context is not None and not isinstance(context, dict):
raise ValidationError("Il contesto deve essere un dizionario")
def _initialize_workflow(self):
"""Initialize the workflow and tools (called lazily if needed)"""
if self.workflow is not None:
return # Already initialized
self.logger.info(f"🔧 Creating workflow: {self.workflow_type}")
try:
# Initialize workflow based on type
if self.workflow_type == "simple":
self.workflow = create_simple_workflow()
self.logger.info("🤖 Simple workflow compiled successfully")
elif self.workflow_type == "debug":
self.workflow = create_simple_workflow() # Use simple for debug
self.logger.debug("🐛 Debug workflow initialized")
else:
self.workflow = get_default_workflow()
self.logger.info("⚡ Default workflow compiled successfully")
# Initialize tool registry
self.registry = get_tool_registry()
self.logger.info(f"🔧 Registry initialized with {len(self.registry.tools)} tools")
self.logger.info(f"✅ Operations agent ready")
except Exception as e:
self.logger.error(f"💥 Workflow initialization failed: {e}")
raise
async def process_request(
self,
user_message: str,
user_id: str = None,
context: Dict[str, Any] = None,
timeout: float = 60.0 # Default timeout in seconds
) -> Dict[str, Any]:
"""
Process a user request through the agent workflow
This is the main entry point for processing user requests.
It executes the full workflow and returns formatted results.
Args:
user_message: User's input message
user_id: Optional user identifier
context: Optional additional context
timeout: Maximum execution time in seconds
Returns:
Dict containing:
- response: Formatted agent response
- artifacts: List of generated files
- sources: List of data sources
- metadata: Request processing metadata
- session_id: Session identifier
Raises:
ValidationError: If input validation fails
TimeoutError: If execution exceeds timeout
WorkflowError: If workflow execution fails
"""
try:
# Validate input
self._validate_request_input(user_message, user_id, context)
# Initialize workflow if not already done (lazy initialization)
if self.workflow is None:
self._initialize_workflow()
self.logger.info(f"🚀 Processing: '{user_message[:80]}{'...' if len(user_message) > 80 else ''}'")
# Create initial state
initial_state = create_initial_state(user_message)
# Add session and user info
initial_state["metadata"]["session_id"] = self.session_id
initial_state["metadata"]["user_id"] = user_id
initial_state["metadata"]["request_context"] = context or {}
# Validate initial state
if not validate_state(initial_state):
raise WorkflowError("Invalid initial state created")
# Execute workflow with timeout
self.logger.info("⚡ Executing workflow...")
start_time = datetime.now()
try:
final_state = await asyncio.wait_for(
self.workflow.ainvoke(initial_state),
timeout=timeout
)
except asyncio.TimeoutError:
raise TimeoutError(f"Workflow execution timed out after {timeout}s")
execution_time = (datetime.now() - start_time).total_seconds()
self.logger.info(f"✅ Workflow completed in {execution_time:.2f}s")
# Log tool execution details
if final_state.get("tool_results"):
tools_executed = [r.tool_name for r in final_state["tool_results"]]
self.logger.info(f"🔧 Tools executed: {', '.join(tools_executed)}")
# Extract results
response_data = self._format_response(final_state)
# Update conversation history
self._update_conversation_history(user_message, response_data["response"])
self.logger.info(f"📤 Response generated ({len(response_data['response'])} chars)")
return response_data
except (ValidationError, TimeoutError, WorkflowError) as e:
# Known agent errors
self.logger.warning(f"⚠️ Agent error: {str(e)}")
return self._format_error_response(str(e), user_message, error_type=type(e).__name__)
except Exception as e:
# Unexpected errors
self.logger.error(f"❌ Unexpected error: {str(e)}")
# Add debug stack trace in development
if self.logger.level <= logging.DEBUG:
import traceback
self.logger.debug(f"🔍 Stack trace:\n{traceback.format_exc()}")
return self._format_error_response(
"Si è verificato un errore interno. Riprova più tardi.",
user_message,
error_type="InternalError"
)
async def process_omirl_request(
self,
sensor_type: str = None,
provincia: str = None,
comune: str = None,
**kwargs
) -> Dict[str, Any]:
"""
Direct OMIRL data extraction (convenience method)
This method provides a direct interface for OMIRL data extraction
without going through natural language processing.
Args:
sensor_type: Type of sensor (configurable via sensor types config)
provincia: Province filter (configurable via geography config)
comune: Municipality filter (configurable via geography config)
**kwargs: Additional parameters
Returns:
Formatted response with OMIRL data
Raises:
ValidationError: If parameters are invalid
"""
try:
# Validate OMIRL-specific parameters
if provincia and provincia.upper() not in VALID_PROVINCES:
valid_provinces_str = ", ".join(VALID_PROVINCES)
raise ValidationError(f"Provincia non valida: {provincia}. Usa: {valid_provinces_str}")
if sensor_type and not isinstance(sensor_type, str):
raise ValidationError("Il tipo sensore deve essere una stringa")
if comune and not isinstance(comune, str):
raise ValidationError("Il comune deve essere una stringa")
# Build filters
filters = {}
if sensor_type:
filters["tipo_sensore"] = sensor_type
if provincia:
filters["provincia"] = provincia.upper()
if comune:
filters["comune"] = comune.capitalize()
# Create a formatted request message
filter_parts = []
if sensor_type:
filter_parts.append(f"sensori {sensor_type.lower()}")
if provincia:
filter_parts.append(f"in provincia di {provincia}")
if comune:
filter_parts.append(f"nel comune di {comune}")
request_message = f"Estrai dati stazioni meteo OMIRL " + " ".join(filter_parts)
self.logger.info(f"🌊 Direct OMIRL request: {request_message}")
# Process through normal workflow
return await self.process_request(request_message)
except ValidationError as e:
self.logger.warning(f"⚠️ OMIRL validation error: {str(e)}")
return self._format_error_response(str(e), f"OMIRL request", error_type="ValidationError")
except Exception as e:
self.logger.error(f"❌ OMIRL request failed: {str(e)}")
return self._format_error_response(f"OMIRL request processing failed: {str(e)}", f"OMIRL request", error_type="WorkflowError")
def get_available_tools(self) -> List[Dict[str, Any]]:
"""
Get list of available tools with descriptions
Returns:
List of tool information dictionaries
"""
tools_info = []
for tool_name, tool_info in self.registry.tools.items():
tools_info.append({
"name": tool_name,
"description": tool_info["description"],
"category": tool_info["category"],
"async": tool_info["async"],
"tags": tool_info["tags"]
})
return tools_info
def get_omirl_capabilities(self) -> Dict[str, Any]:
"""
Get information about OMIRL capabilities
Returns:
Dict with OMIRL-specific information
"""
return {
"supported_sensors": SUPPORTED_SENSORS,
"supported_provinces": VALID_PROVINCES,
"data_source": "https://omirl.regione.liguria.it/#/sensorstable",
"description": "Osservatorio Meteorologico e Idropluviometrico Regionale Liguria",
"capabilities": [
"Estrazione dati stazioni meteorologiche",
"Filtraggio per tipo sensore",
"Filtraggio geografico (provincia, comune)",
"Accesso dati in tempo reale",
"Generazione file JSON"
]
}
def get_conversation_history(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
Get conversation history for this session
Args:
limit: Maximum number of exchanges to return
Returns:
List of conversation exchanges
"""
return self.conversation_history[-limit:] if limit > 0 else self.conversation_history
def clear_conversation_history(self):
"""Clear conversation history for this session"""
self.conversation_history = []
self.logger.info(f"🧹 Conversation history cleared")
def get_session_info(self) -> Dict[str, Any]:
"""
Get information about the current session
Returns:
Session information dictionary
"""
return {
"session_id": self.session_id,
"conversation_length": len(self.conversation_history),
"available_tools": len(self.registry.tools),
"default_language": self.default_language,
"created_at": datetime.now().isoformat() # Approximate
}
def _format_response(self, final_state: AgentState) -> Dict[str, Any]:
"""
Format the final workflow state into a response
Args:
final_state: Final state from workflow execution
Returns:
Formatted response dictionary
"""
# Collect artifacts and sources from tool results
all_artifacts = []
all_sources = []
for result in final_state["tool_results"]:
all_artifacts.extend(result.artifacts)
all_sources.extend(result.sources)
# Remove duplicates
unique_artifacts = list(set(all_artifacts))
unique_sources = list(set(all_sources))
# Create response
response = {
"response": final_state["agent_response"],
"artifacts": unique_artifacts,
"sources": unique_sources,
"metadata": {
"session_id": self.session_id,
"processing_status": final_state["processing_status"],
"tools_executed": [r.tool_name for r in final_state["tool_results"]],
"execution_time": datetime.now().isoformat(),
"routing_result": final_state.get("routing_result", "unknown"),
"success": len(final_state["errors"]) == 0
},
"tool_results": [
{
"tool": result.tool_name,
"success": result.success,
"summary": result.summary_text,
"artifacts": result.artifacts,
"warnings": result.warnings
}
for result in final_state["tool_results"]
],
"llm_usage": final_state.get("llm_usage", {}) # Include LLM usage tracking
}
# Add OMIRL-specific metadata if available
if final_state["omirl_data"]:
response["metadata"]["omirl_data"] = final_state["omirl_data"]
return response
def _format_error_response(self, error_message: str, user_message: str, error_type: str = "Error") -> Dict[str, Any]:
"""
Format an error response
Args:
error_message: Error message
user_message: Original user message
error_type: Type of error for categorization
Returns:
Formatted error response
"""
# Italian error message templates
if error_type == "ValidationError":
response_text = f"⚠️ **Errore di Validazione**\n\n{error_message}\n\nVerifica che la richiesta sia corretta e riprova."
elif error_type == "TimeoutError":
response_text = f"⏱️ **Timeout**\n\nLa richiesta ha impiegato troppo tempo. Riprova con una richiesta più semplice."
elif error_type == "WorkflowError":
response_text = f"🔧 **Errore del Workflow**\n\n{error_message}\n\nRiprova o contatta l'assistenza tecnica."
else:
response_text = f"⚠️ **Errore di Sistema**\n\nSi è verificato un errore durante l'elaborazione della richiesta:\n{error_message}\n\nRiprova o contatta l'assistenza tecnica."
return {
"response": response_text,
"artifacts": [],
"sources": [],
"metadata": {
"session_id": self.session_id,
"processing_status": "error",
"error": error_message,
"error_type": error_type,
"success": False,
"execution_time": datetime.now().isoformat()
},
"tool_results": []
}
def _update_conversation_history(self, user_message: str, agent_response: str):
"""
Update conversation history with new exchange
Args:
user_message: User's message
agent_response: Agent's response
"""
exchange = {
"timestamp": datetime.now().isoformat(),
"user_message": user_message,
"agent_response": agent_response[:500] + "..." if len(agent_response) > 500 else agent_response # Truncate long responses
}
self.conversation_history.append(exchange)
# Keep only last 50 exchanges to prevent memory bloat
if len(self.conversation_history) > 50:
self.conversation_history = self.conversation_history[-50:]
# Factory functions for different agent configurations
def create_operations_agent(workflow_type: str = "default") -> OperationsAgent:
"""
Create an operations agent with specified configuration
Args:
workflow_type: Type of workflow ("default", "simple", "debug")
Returns:
Configured OperationsAgent instance
"""
return OperationsAgent(workflow_type=workflow_type)
def create_debug_agent() -> OperationsAgent:
"""
Create an agent with debug configuration
Returns:
OperationsAgent with debug settings
"""
agent = OperationsAgent(workflow_type="debug")
# Print debug information
print_registry_summary()
return agent
# Agent testing and validation
async def test_agent_basic():
"""Test basic agent functionality"""
print("🧪 Testing basic agent functionality...")
agent = create_operations_agent("simple")
test_requests = [
"aiuto",
"mostra stazioni meteo liguria",
"estrai sensori precipitazione genova"
]
for request in test_requests:
print(f"\n📝 Testing: '{request}'")
response = await agent.process_request(request)
print(f"✅ Response: {response['response'][:100]}...")
print(f" Tools: {response['metadata']['tools_executed']}")
if __name__ == "__main__":
# Test agent creation and basic functionality
agent = create_debug_agent()
print(f"\n📊 Agent Info: {agent.get_session_info()}")
print(f"🌊 OMIRL Capabilities: {agent.get_omirl_capabilities()}")
# Run basic tests
# asyncio.run(test_agent_basic())