# agent/agent.py """ Operations Agent This module provides the main agent interface for the operations system. It integrates the LangGraph workflow with OMIRL data extraction capabilities and provides a clean API for processing user requests. Purpose: - Provide main agent interface for the operations system - Integrate LangGraph workflow execution - Handle request preprocessing and response formatting - Support conversation context and session management - Expose OMIRL and other operational tools Dependencies: - graph.py: LangGraph workflow definitions - state.py: Agent state management - registry.py: Tool registry and discovery Used by: - app/main.py: Web application interface - Direct usage: CLI tools and testing - API endpoints: RESTful agent services """ import asyncio import logging import sys import os from typing import Dict, Any, List, Optional, Union from datetime import datetime import uuid from .state import AgentState, create_initial_state, validate_state from .graph import get_default_workflow, create_simple_workflow, print_workflow_summary from .registry import get_tool_registry, print_registry_summary # Simple constants from existing geography config (avoid complex config loading) VALID_PROVINCES = ["GENOVA", "SAVONA", "IMPERIA", "LA SPEZIA"] SUPPORTED_SENSORS = [ "Precipitazione", "Temperatura", "Livelli Idrometrici", "Vento", "Umidità dell'aria", "Eliofanie", "Radiazione solare", "Bagnatura Fogliare", "Pressione Atmosferica", "Tensione Batteria", "Stato del Mare", "Neve" ] class ColoredFormatter(logging.Formatter): """Custom formatter with colors and emojis for development""" COLORS = { 'DEBUG': '\033[36m', # Cyan 'INFO': '\033[32m', # Green 'WARNING': '\033[33m', # Yellow 'ERROR': '\033[31m', # Red 'CRITICAL': '\033[35m' # Magenta } RESET = '\033[0m' def format(self, record): color = self.COLORS.get(record.levelname, '') reset = self.RESET # Simple format for development timestamp = datetime.now().strftime('%H:%M:%S') return f"{color}[{timestamp}] {record.getMessage()}{reset}" def setup_agent_logging(session_id: str, dev_mode: bool = True) -> logging.Logger: """Setup logging for agent with session-specific logger""" logger_name = f"operations_agent.{session_id[:8]}" logger = logging.getLogger(logger_name) # Avoid duplicate handlers if logger.handlers: return logger if dev_mode: logger.setLevel(logging.DEBUG) handler = logging.StreamHandler(sys.stdout) handler.setFormatter(ColoredFormatter()) else: logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) logger.addHandler(handler) logger.propagate = False # Prevent duplicate logs return logger class AgentError(Exception): """Base exception for agent errors""" pass class ValidationError(AgentError): """Raised when input validation fails""" pass class WorkflowError(AgentError): """Raised when workflow execution fails""" pass class TimeoutError(AgentError): """Raised when operations timeout""" pass class OperationsAgent: """ Main operations agent with OMIRL integration This class provides the primary interface for the operations agent, handling request processing, workflow execution, and response formatting. Attributes: workflow: Compiled LangGraph workflow session_id: Unique session identifier conversation_history: History of requests and responses default_language: Default language for responses (Italian) """ def __init__(self, workflow_type: str = "default", session_id: str = None, lazy_init: bool = False, dev_mode: bool = None): """ Initialize the operations agent Args: workflow_type: Type of workflow to use ("default", "simple", "debug") session_id: Optional session ID for conversation tracking lazy_init: If True, delay workflow creation until first request dev_mode: If True, use development logging with colors. Auto-detect if None. """ self.session_id = session_id or str(uuid.uuid4()) self.conversation_history = [] self.default_language = "it" self.workflow_type = workflow_type self.lazy_init = lazy_init self.workflow = None self.registry = None # Auto-detect dev mode if not specified if dev_mode is None: dev_mode = os.getenv('ENVIRONMENT', 'development').lower() in ['development', 'dev', 'local'] # Setup logging self.logger = setup_agent_logging(self.session_id, dev_mode) self.logger.info(f"🚀 Initializing Operations Agent (session: {self.session_id[:8]})") if not lazy_init: self._initialize_workflow() def _validate_request_input(self, user_message: str, user_id: str = None, context: Dict[str, Any] = None) -> None: """ Validate input parameters for request processing Args: user_message: User's input message user_id: Optional user identifier context: Optional additional context Raises: ValidationError: If validation fails """ if not user_message: raise ValidationError("Il messaggio utente è richiesto") if not isinstance(user_message, str): raise ValidationError("Il messaggio utente deve essere una stringa") if not user_message.strip(): raise ValidationError("Il messaggio utente non può essere vuoto") if len(user_message) > 10000: # Reasonable limit raise ValidationError("Messaggio utente troppo lungo (massimo 10.000 caratteri)") if user_id is not None and not isinstance(user_id, str): raise ValidationError("L'ID utente deve essere una stringa") if context is not None and not isinstance(context, dict): raise ValidationError("Il contesto deve essere un dizionario") def _initialize_workflow(self): """Initialize the workflow and tools (called lazily if needed)""" if self.workflow is not None: return # Already initialized self.logger.info(f"🔧 Creating workflow: {self.workflow_type}") try: # Initialize workflow based on type if self.workflow_type == "simple": self.workflow = create_simple_workflow() self.logger.info("🤖 Simple workflow compiled successfully") elif self.workflow_type == "debug": self.workflow = create_simple_workflow() # Use simple for debug self.logger.debug("🐛 Debug workflow initialized") else: self.workflow = get_default_workflow() self.logger.info("⚡ Default workflow compiled successfully") # Initialize tool registry self.registry = get_tool_registry() self.logger.info(f"🔧 Registry initialized with {len(self.registry.tools)} tools") self.logger.info(f"✅ Operations agent ready") except Exception as e: self.logger.error(f"💥 Workflow initialization failed: {e}") raise async def process_request( self, user_message: str, user_id: str = None, context: Dict[str, Any] = None, timeout: float = 60.0 # Default timeout in seconds ) -> Dict[str, Any]: """ Process a user request through the agent workflow This is the main entry point for processing user requests. It executes the full workflow and returns formatted results. Args: user_message: User's input message user_id: Optional user identifier context: Optional additional context timeout: Maximum execution time in seconds Returns: Dict containing: - response: Formatted agent response - artifacts: List of generated files - sources: List of data sources - metadata: Request processing metadata - session_id: Session identifier Raises: ValidationError: If input validation fails TimeoutError: If execution exceeds timeout WorkflowError: If workflow execution fails """ try: # Validate input self._validate_request_input(user_message, user_id, context) # Initialize workflow if not already done (lazy initialization) if self.workflow is None: self._initialize_workflow() self.logger.info(f"🚀 Processing: '{user_message[:80]}{'...' if len(user_message) > 80 else ''}'") # Create initial state initial_state = create_initial_state(user_message) # Add session and user info initial_state["metadata"]["session_id"] = self.session_id initial_state["metadata"]["user_id"] = user_id initial_state["metadata"]["request_context"] = context or {} # Validate initial state if not validate_state(initial_state): raise WorkflowError("Invalid initial state created") # Execute workflow with timeout self.logger.info("⚡ Executing workflow...") start_time = datetime.now() try: final_state = await asyncio.wait_for( self.workflow.ainvoke(initial_state), timeout=timeout ) except asyncio.TimeoutError: raise TimeoutError(f"Workflow execution timed out after {timeout}s") execution_time = (datetime.now() - start_time).total_seconds() self.logger.info(f"✅ Workflow completed in {execution_time:.2f}s") # Log tool execution details if final_state.get("tool_results"): tools_executed = [r.tool_name for r in final_state["tool_results"]] self.logger.info(f"🔧 Tools executed: {', '.join(tools_executed)}") # Extract results response_data = self._format_response(final_state) # Update conversation history self._update_conversation_history(user_message, response_data["response"]) self.logger.info(f"📤 Response generated ({len(response_data['response'])} chars)") return response_data except (ValidationError, TimeoutError, WorkflowError) as e: # Known agent errors self.logger.warning(f"⚠️ Agent error: {str(e)}") return self._format_error_response(str(e), user_message, error_type=type(e).__name__) except Exception as e: # Unexpected errors self.logger.error(f"❌ Unexpected error: {str(e)}") # Add debug stack trace in development if self.logger.level <= logging.DEBUG: import traceback self.logger.debug(f"🔍 Stack trace:\n{traceback.format_exc()}") return self._format_error_response( "Si è verificato un errore interno. Riprova più tardi.", user_message, error_type="InternalError" ) async def process_omirl_request( self, sensor_type: str = None, provincia: str = None, comune: str = None, **kwargs ) -> Dict[str, Any]: """ Direct OMIRL data extraction (convenience method) This method provides a direct interface for OMIRL data extraction without going through natural language processing. Args: sensor_type: Type of sensor (configurable via sensor types config) provincia: Province filter (configurable via geography config) comune: Municipality filter (configurable via geography config) **kwargs: Additional parameters Returns: Formatted response with OMIRL data Raises: ValidationError: If parameters are invalid """ try: # Validate OMIRL-specific parameters if provincia and provincia.upper() not in VALID_PROVINCES: valid_provinces_str = ", ".join(VALID_PROVINCES) raise ValidationError(f"Provincia non valida: {provincia}. Usa: {valid_provinces_str}") if sensor_type and not isinstance(sensor_type, str): raise ValidationError("Il tipo sensore deve essere una stringa") if comune and not isinstance(comune, str): raise ValidationError("Il comune deve essere una stringa") # Build filters filters = {} if sensor_type: filters["tipo_sensore"] = sensor_type if provincia: filters["provincia"] = provincia.upper() if comune: filters["comune"] = comune.capitalize() # Create a formatted request message filter_parts = [] if sensor_type: filter_parts.append(f"sensori {sensor_type.lower()}") if provincia: filter_parts.append(f"in provincia di {provincia}") if comune: filter_parts.append(f"nel comune di {comune}") request_message = f"Estrai dati stazioni meteo OMIRL " + " ".join(filter_parts) self.logger.info(f"🌊 Direct OMIRL request: {request_message}") # Process through normal workflow return await self.process_request(request_message) except ValidationError as e: self.logger.warning(f"⚠️ OMIRL validation error: {str(e)}") return self._format_error_response(str(e), f"OMIRL request", error_type="ValidationError") except Exception as e: self.logger.error(f"❌ OMIRL request failed: {str(e)}") return self._format_error_response(f"OMIRL request processing failed: {str(e)}", f"OMIRL request", error_type="WorkflowError") def get_available_tools(self) -> List[Dict[str, Any]]: """ Get list of available tools with descriptions Returns: List of tool information dictionaries """ tools_info = [] for tool_name, tool_info in self.registry.tools.items(): tools_info.append({ "name": tool_name, "description": tool_info["description"], "category": tool_info["category"], "async": tool_info["async"], "tags": tool_info["tags"] }) return tools_info def get_omirl_capabilities(self) -> Dict[str, Any]: """ Get information about OMIRL capabilities Returns: Dict with OMIRL-specific information """ return { "supported_sensors": SUPPORTED_SENSORS, "supported_provinces": VALID_PROVINCES, "data_source": "https://omirl.regione.liguria.it/#/sensorstable", "description": "Osservatorio Meteorologico e Idropluviometrico Regionale Liguria", "capabilities": [ "Estrazione dati stazioni meteorologiche", "Filtraggio per tipo sensore", "Filtraggio geografico (provincia, comune)", "Accesso dati in tempo reale", "Generazione file JSON" ] } def get_conversation_history(self, limit: int = 10) -> List[Dict[str, Any]]: """ Get conversation history for this session Args: limit: Maximum number of exchanges to return Returns: List of conversation exchanges """ return self.conversation_history[-limit:] if limit > 0 else self.conversation_history def clear_conversation_history(self): """Clear conversation history for this session""" self.conversation_history = [] self.logger.info(f"🧹 Conversation history cleared") def get_session_info(self) -> Dict[str, Any]: """ Get information about the current session Returns: Session information dictionary """ return { "session_id": self.session_id, "conversation_length": len(self.conversation_history), "available_tools": len(self.registry.tools), "default_language": self.default_language, "created_at": datetime.now().isoformat() # Approximate } def _format_response(self, final_state: AgentState) -> Dict[str, Any]: """ Format the final workflow state into a response Args: final_state: Final state from workflow execution Returns: Formatted response dictionary """ # Collect artifacts and sources from tool results all_artifacts = [] all_sources = [] for result in final_state["tool_results"]: all_artifacts.extend(result.artifacts) all_sources.extend(result.sources) # Remove duplicates unique_artifacts = list(set(all_artifacts)) unique_sources = list(set(all_sources)) # Create response response = { "response": final_state["agent_response"], "artifacts": unique_artifacts, "sources": unique_sources, "metadata": { "session_id": self.session_id, "processing_status": final_state["processing_status"], "tools_executed": [r.tool_name for r in final_state["tool_results"]], "execution_time": datetime.now().isoformat(), "routing_result": final_state.get("routing_result", "unknown"), "success": len(final_state["errors"]) == 0 }, "tool_results": [ { "tool": result.tool_name, "success": result.success, "summary": result.summary_text, "artifacts": result.artifacts, "warnings": result.warnings } for result in final_state["tool_results"] ], "llm_usage": final_state.get("llm_usage", {}) # Include LLM usage tracking } # Add OMIRL-specific metadata if available if final_state["omirl_data"]: response["metadata"]["omirl_data"] = final_state["omirl_data"] return response def _format_error_response(self, error_message: str, user_message: str, error_type: str = "Error") -> Dict[str, Any]: """ Format an error response Args: error_message: Error message user_message: Original user message error_type: Type of error for categorization Returns: Formatted error response """ # Italian error message templates if error_type == "ValidationError": response_text = f"⚠️ **Errore di Validazione**\n\n{error_message}\n\nVerifica che la richiesta sia corretta e riprova." elif error_type == "TimeoutError": response_text = f"⏱️ **Timeout**\n\nLa richiesta ha impiegato troppo tempo. Riprova con una richiesta più semplice." elif error_type == "WorkflowError": response_text = f"🔧 **Errore del Workflow**\n\n{error_message}\n\nRiprova o contatta l'assistenza tecnica." else: response_text = f"⚠️ **Errore di Sistema**\n\nSi è verificato un errore durante l'elaborazione della richiesta:\n{error_message}\n\nRiprova o contatta l'assistenza tecnica." return { "response": response_text, "artifacts": [], "sources": [], "metadata": { "session_id": self.session_id, "processing_status": "error", "error": error_message, "error_type": error_type, "success": False, "execution_time": datetime.now().isoformat() }, "tool_results": [] } def _update_conversation_history(self, user_message: str, agent_response: str): """ Update conversation history with new exchange Args: user_message: User's message agent_response: Agent's response """ exchange = { "timestamp": datetime.now().isoformat(), "user_message": user_message, "agent_response": agent_response[:500] + "..." if len(agent_response) > 500 else agent_response # Truncate long responses } self.conversation_history.append(exchange) # Keep only last 50 exchanges to prevent memory bloat if len(self.conversation_history) > 50: self.conversation_history = self.conversation_history[-50:] # Factory functions for different agent configurations def create_operations_agent(workflow_type: str = "default") -> OperationsAgent: """ Create an operations agent with specified configuration Args: workflow_type: Type of workflow ("default", "simple", "debug") Returns: Configured OperationsAgent instance """ return OperationsAgent(workflow_type=workflow_type) def create_debug_agent() -> OperationsAgent: """ Create an agent with debug configuration Returns: OperationsAgent with debug settings """ agent = OperationsAgent(workflow_type="debug") # Print debug information print_registry_summary() return agent # Agent testing and validation async def test_agent_basic(): """Test basic agent functionality""" print("🧪 Testing basic agent functionality...") agent = create_operations_agent("simple") test_requests = [ "aiuto", "mostra stazioni meteo liguria", "estrai sensori precipitazione genova" ] for request in test_requests: print(f"\n📝 Testing: '{request}'") response = await agent.process_request(request) print(f"✅ Response: {response['response'][:100]}...") print(f" Tools: {response['metadata']['tools_executed']}") if __name__ == "__main__": # Test agent creation and basic functionality agent = create_debug_agent() print(f"\n📊 Agent Info: {agent.get_session_info()}") print(f"🌊 OMIRL Capabilities: {agent.get_omirl_capabilities()}") # Run basic tests # asyncio.run(test_agent_basic())