Spaces:
Runtime error
Runtime error
| # agent/agent.py | |
| """ | |
| Operations Agent | |
| This module provides the main agent interface for the operations system. | |
| It integrates the LangGraph workflow with OMIRL data extraction capabilities | |
| and provides a clean API for processing user requests. | |
| Purpose: | |
| - Provide main agent interface for the operations system | |
| - Integrate LangGraph workflow execution | |
| - Handle request preprocessing and response formatting | |
| - Support conversation context and session management | |
| - Expose OMIRL and other operational tools | |
| Dependencies: | |
| - graph.py: LangGraph workflow definitions | |
| - state.py: Agent state management | |
| - registry.py: Tool registry and discovery | |
| Used by: | |
| - app/main.py: Web application interface | |
| - Direct usage: CLI tools and testing | |
| - API endpoints: RESTful agent services | |
| """ | |
| import asyncio | |
| import logging | |
| import sys | |
| import os | |
| from typing import Dict, Any, List, Optional, Union | |
| from datetime import datetime | |
| import uuid | |
| from .state import AgentState, create_initial_state, validate_state | |
| from .graph import get_default_workflow, create_simple_workflow, print_workflow_summary | |
| from .registry import get_tool_registry, print_registry_summary | |
| # Simple constants from existing geography config (avoid complex config loading) | |
| VALID_PROVINCES = ["GENOVA", "SAVONA", "IMPERIA", "LA SPEZIA"] | |
| SUPPORTED_SENSORS = [ | |
| "Precipitazione", "Temperatura", "Livelli Idrometrici", "Vento", | |
| "Umidità dell'aria", "Eliofanie", "Radiazione solare", "Bagnatura Fogliare", | |
| "Pressione Atmosferica", "Tensione Batteria", "Stato del Mare", "Neve" | |
| ] | |
| class ColoredFormatter(logging.Formatter): | |
| """Custom formatter with colors and emojis for development""" | |
| COLORS = { | |
| 'DEBUG': '\033[36m', # Cyan | |
| 'INFO': '\033[32m', # Green | |
| 'WARNING': '\033[33m', # Yellow | |
| 'ERROR': '\033[31m', # Red | |
| 'CRITICAL': '\033[35m' # Magenta | |
| } | |
| RESET = '\033[0m' | |
| def format(self, record): | |
| color = self.COLORS.get(record.levelname, '') | |
| reset = self.RESET | |
| # Simple format for development | |
| timestamp = datetime.now().strftime('%H:%M:%S') | |
| return f"{color}[{timestamp}] {record.getMessage()}{reset}" | |
| def setup_agent_logging(session_id: str, dev_mode: bool = True) -> logging.Logger: | |
| """Setup logging for agent with session-specific logger""" | |
| logger_name = f"operations_agent.{session_id[:8]}" | |
| logger = logging.getLogger(logger_name) | |
| # Avoid duplicate handlers | |
| if logger.handlers: | |
| return logger | |
| if dev_mode: | |
| logger.setLevel(logging.DEBUG) | |
| handler = logging.StreamHandler(sys.stdout) | |
| handler.setFormatter(ColoredFormatter()) | |
| else: | |
| logger.setLevel(logging.INFO) | |
| handler = logging.StreamHandler() | |
| handler.setFormatter(logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| )) | |
| logger.addHandler(handler) | |
| logger.propagate = False # Prevent duplicate logs | |
| return logger | |
| class AgentError(Exception): | |
| """Base exception for agent errors""" | |
| pass | |
| class ValidationError(AgentError): | |
| """Raised when input validation fails""" | |
| pass | |
| class WorkflowError(AgentError): | |
| """Raised when workflow execution fails""" | |
| pass | |
| class TimeoutError(AgentError): | |
| """Raised when operations timeout""" | |
| pass | |
| class OperationsAgent: | |
| """ | |
| Main operations agent with OMIRL integration | |
| This class provides the primary interface for the operations agent, | |
| handling request processing, workflow execution, and response formatting. | |
| Attributes: | |
| workflow: Compiled LangGraph workflow | |
| session_id: Unique session identifier | |
| conversation_history: History of requests and responses | |
| default_language: Default language for responses (Italian) | |
| """ | |
| def __init__(self, workflow_type: str = "default", session_id: str = None, lazy_init: bool = False, dev_mode: bool = None): | |
| """ | |
| Initialize the operations agent | |
| Args: | |
| workflow_type: Type of workflow to use ("default", "simple", "debug") | |
| session_id: Optional session ID for conversation tracking | |
| lazy_init: If True, delay workflow creation until first request | |
| dev_mode: If True, use development logging with colors. Auto-detect if None. | |
| """ | |
| self.session_id = session_id or str(uuid.uuid4()) | |
| self.conversation_history = [] | |
| self.default_language = "it" | |
| self.workflow_type = workflow_type | |
| self.lazy_init = lazy_init | |
| self.workflow = None | |
| self.registry = None | |
| # Auto-detect dev mode if not specified | |
| if dev_mode is None: | |
| dev_mode = os.getenv('ENVIRONMENT', 'development').lower() in ['development', 'dev', 'local'] | |
| # Setup logging | |
| self.logger = setup_agent_logging(self.session_id, dev_mode) | |
| self.logger.info(f"🚀 Initializing Operations Agent (session: {self.session_id[:8]})") | |
| if not lazy_init: | |
| self._initialize_workflow() | |
| def _validate_request_input(self, user_message: str, user_id: str = None, context: Dict[str, Any] = None) -> None: | |
| """ | |
| Validate input parameters for request processing | |
| Args: | |
| user_message: User's input message | |
| user_id: Optional user identifier | |
| context: Optional additional context | |
| Raises: | |
| ValidationError: If validation fails | |
| """ | |
| if not user_message: | |
| raise ValidationError("Il messaggio utente è richiesto") | |
| if not isinstance(user_message, str): | |
| raise ValidationError("Il messaggio utente deve essere una stringa") | |
| if not user_message.strip(): | |
| raise ValidationError("Il messaggio utente non può essere vuoto") | |
| if len(user_message) > 10000: # Reasonable limit | |
| raise ValidationError("Messaggio utente troppo lungo (massimo 10.000 caratteri)") | |
| if user_id is not None and not isinstance(user_id, str): | |
| raise ValidationError("L'ID utente deve essere una stringa") | |
| if context is not None and not isinstance(context, dict): | |
| raise ValidationError("Il contesto deve essere un dizionario") | |
| def _initialize_workflow(self): | |
| """Initialize the workflow and tools (called lazily if needed)""" | |
| if self.workflow is not None: | |
| return # Already initialized | |
| self.logger.info(f"🔧 Creating workflow: {self.workflow_type}") | |
| try: | |
| # Initialize workflow based on type | |
| if self.workflow_type == "simple": | |
| self.workflow = create_simple_workflow() | |
| self.logger.info("🤖 Simple workflow compiled successfully") | |
| elif self.workflow_type == "debug": | |
| self.workflow = create_simple_workflow() # Use simple for debug | |
| self.logger.debug("🐛 Debug workflow initialized") | |
| else: | |
| self.workflow = get_default_workflow() | |
| self.logger.info("⚡ Default workflow compiled successfully") | |
| # Initialize tool registry | |
| self.registry = get_tool_registry() | |
| self.logger.info(f"🔧 Registry initialized with {len(self.registry.tools)} tools") | |
| self.logger.info(f"✅ Operations agent ready") | |
| except Exception as e: | |
| self.logger.error(f"💥 Workflow initialization failed: {e}") | |
| raise | |
| async def process_request( | |
| self, | |
| user_message: str, | |
| user_id: str = None, | |
| context: Dict[str, Any] = None, | |
| timeout: float = 60.0 # Default timeout in seconds | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process a user request through the agent workflow | |
| This is the main entry point for processing user requests. | |
| It executes the full workflow and returns formatted results. | |
| Args: | |
| user_message: User's input message | |
| user_id: Optional user identifier | |
| context: Optional additional context | |
| timeout: Maximum execution time in seconds | |
| Returns: | |
| Dict containing: | |
| - response: Formatted agent response | |
| - artifacts: List of generated files | |
| - sources: List of data sources | |
| - metadata: Request processing metadata | |
| - session_id: Session identifier | |
| Raises: | |
| ValidationError: If input validation fails | |
| TimeoutError: If execution exceeds timeout | |
| WorkflowError: If workflow execution fails | |
| """ | |
| try: | |
| # Validate input | |
| self._validate_request_input(user_message, user_id, context) | |
| # Initialize workflow if not already done (lazy initialization) | |
| if self.workflow is None: | |
| self._initialize_workflow() | |
| self.logger.info(f"🚀 Processing: '{user_message[:80]}{'...' if len(user_message) > 80 else ''}'") | |
| # Create initial state | |
| initial_state = create_initial_state(user_message) | |
| # Add session and user info | |
| initial_state["metadata"]["session_id"] = self.session_id | |
| initial_state["metadata"]["user_id"] = user_id | |
| initial_state["metadata"]["request_context"] = context or {} | |
| # Validate initial state | |
| if not validate_state(initial_state): | |
| raise WorkflowError("Invalid initial state created") | |
| # Execute workflow with timeout | |
| self.logger.info("⚡ Executing workflow...") | |
| start_time = datetime.now() | |
| try: | |
| final_state = await asyncio.wait_for( | |
| self.workflow.ainvoke(initial_state), | |
| timeout=timeout | |
| ) | |
| except asyncio.TimeoutError: | |
| raise TimeoutError(f"Workflow execution timed out after {timeout}s") | |
| execution_time = (datetime.now() - start_time).total_seconds() | |
| self.logger.info(f"✅ Workflow completed in {execution_time:.2f}s") | |
| # Log tool execution details | |
| if final_state.get("tool_results"): | |
| tools_executed = [r.tool_name for r in final_state["tool_results"]] | |
| self.logger.info(f"🔧 Tools executed: {', '.join(tools_executed)}") | |
| # Extract results | |
| response_data = self._format_response(final_state) | |
| # Update conversation history | |
| self._update_conversation_history(user_message, response_data["response"]) | |
| self.logger.info(f"📤 Response generated ({len(response_data['response'])} chars)") | |
| return response_data | |
| except (ValidationError, TimeoutError, WorkflowError) as e: | |
| # Known agent errors | |
| self.logger.warning(f"⚠️ Agent error: {str(e)}") | |
| return self._format_error_response(str(e), user_message, error_type=type(e).__name__) | |
| except Exception as e: | |
| # Unexpected errors | |
| self.logger.error(f"❌ Unexpected error: {str(e)}") | |
| # Add debug stack trace in development | |
| if self.logger.level <= logging.DEBUG: | |
| import traceback | |
| self.logger.debug(f"🔍 Stack trace:\n{traceback.format_exc()}") | |
| return self._format_error_response( | |
| "Si è verificato un errore interno. Riprova più tardi.", | |
| user_message, | |
| error_type="InternalError" | |
| ) | |
| async def process_omirl_request( | |
| self, | |
| sensor_type: str = None, | |
| provincia: str = None, | |
| comune: str = None, | |
| **kwargs | |
| ) -> Dict[str, Any]: | |
| """ | |
| Direct OMIRL data extraction (convenience method) | |
| This method provides a direct interface for OMIRL data extraction | |
| without going through natural language processing. | |
| Args: | |
| sensor_type: Type of sensor (configurable via sensor types config) | |
| provincia: Province filter (configurable via geography config) | |
| comune: Municipality filter (configurable via geography config) | |
| **kwargs: Additional parameters | |
| Returns: | |
| Formatted response with OMIRL data | |
| Raises: | |
| ValidationError: If parameters are invalid | |
| """ | |
| try: | |
| # Validate OMIRL-specific parameters | |
| if provincia and provincia.upper() not in VALID_PROVINCES: | |
| valid_provinces_str = ", ".join(VALID_PROVINCES) | |
| raise ValidationError(f"Provincia non valida: {provincia}. Usa: {valid_provinces_str}") | |
| if sensor_type and not isinstance(sensor_type, str): | |
| raise ValidationError("Il tipo sensore deve essere una stringa") | |
| if comune and not isinstance(comune, str): | |
| raise ValidationError("Il comune deve essere una stringa") | |
| # Build filters | |
| filters = {} | |
| if sensor_type: | |
| filters["tipo_sensore"] = sensor_type | |
| if provincia: | |
| filters["provincia"] = provincia.upper() | |
| if comune: | |
| filters["comune"] = comune.capitalize() | |
| # Create a formatted request message | |
| filter_parts = [] | |
| if sensor_type: | |
| filter_parts.append(f"sensori {sensor_type.lower()}") | |
| if provincia: | |
| filter_parts.append(f"in provincia di {provincia}") | |
| if comune: | |
| filter_parts.append(f"nel comune di {comune}") | |
| request_message = f"Estrai dati stazioni meteo OMIRL " + " ".join(filter_parts) | |
| self.logger.info(f"🌊 Direct OMIRL request: {request_message}") | |
| # Process through normal workflow | |
| return await self.process_request(request_message) | |
| except ValidationError as e: | |
| self.logger.warning(f"⚠️ OMIRL validation error: {str(e)}") | |
| return self._format_error_response(str(e), f"OMIRL request", error_type="ValidationError") | |
| except Exception as e: | |
| self.logger.error(f"❌ OMIRL request failed: {str(e)}") | |
| return self._format_error_response(f"OMIRL request processing failed: {str(e)}", f"OMIRL request", error_type="WorkflowError") | |
| def get_available_tools(self) -> List[Dict[str, Any]]: | |
| """ | |
| Get list of available tools with descriptions | |
| Returns: | |
| List of tool information dictionaries | |
| """ | |
| tools_info = [] | |
| for tool_name, tool_info in self.registry.tools.items(): | |
| tools_info.append({ | |
| "name": tool_name, | |
| "description": tool_info["description"], | |
| "category": tool_info["category"], | |
| "async": tool_info["async"], | |
| "tags": tool_info["tags"] | |
| }) | |
| return tools_info | |
| def get_omirl_capabilities(self) -> Dict[str, Any]: | |
| """ | |
| Get information about OMIRL capabilities | |
| Returns: | |
| Dict with OMIRL-specific information | |
| """ | |
| return { | |
| "supported_sensors": SUPPORTED_SENSORS, | |
| "supported_provinces": VALID_PROVINCES, | |
| "data_source": "https://omirl.regione.liguria.it/#/sensorstable", | |
| "description": "Osservatorio Meteorologico e Idropluviometrico Regionale Liguria", | |
| "capabilities": [ | |
| "Estrazione dati stazioni meteorologiche", | |
| "Filtraggio per tipo sensore", | |
| "Filtraggio geografico (provincia, comune)", | |
| "Accesso dati in tempo reale", | |
| "Generazione file JSON" | |
| ] | |
| } | |
| def get_conversation_history(self, limit: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| Get conversation history for this session | |
| Args: | |
| limit: Maximum number of exchanges to return | |
| Returns: | |
| List of conversation exchanges | |
| """ | |
| return self.conversation_history[-limit:] if limit > 0 else self.conversation_history | |
| def clear_conversation_history(self): | |
| """Clear conversation history for this session""" | |
| self.conversation_history = [] | |
| self.logger.info(f"🧹 Conversation history cleared") | |
| def get_session_info(self) -> Dict[str, Any]: | |
| """ | |
| Get information about the current session | |
| Returns: | |
| Session information dictionary | |
| """ | |
| return { | |
| "session_id": self.session_id, | |
| "conversation_length": len(self.conversation_history), | |
| "available_tools": len(self.registry.tools), | |
| "default_language": self.default_language, | |
| "created_at": datetime.now().isoformat() # Approximate | |
| } | |
| def _format_response(self, final_state: AgentState) -> Dict[str, Any]: | |
| """ | |
| Format the final workflow state into a response | |
| Args: | |
| final_state: Final state from workflow execution | |
| Returns: | |
| Formatted response dictionary | |
| """ | |
| # Collect artifacts and sources from tool results | |
| all_artifacts = [] | |
| all_sources = [] | |
| for result in final_state["tool_results"]: | |
| all_artifacts.extend(result.artifacts) | |
| all_sources.extend(result.sources) | |
| # Remove duplicates | |
| unique_artifacts = list(set(all_artifacts)) | |
| unique_sources = list(set(all_sources)) | |
| # Create response | |
| response = { | |
| "response": final_state["agent_response"], | |
| "artifacts": unique_artifacts, | |
| "sources": unique_sources, | |
| "metadata": { | |
| "session_id": self.session_id, | |
| "processing_status": final_state["processing_status"], | |
| "tools_executed": [r.tool_name for r in final_state["tool_results"]], | |
| "execution_time": datetime.now().isoformat(), | |
| "routing_result": final_state.get("routing_result", "unknown"), | |
| "success": len(final_state["errors"]) == 0 | |
| }, | |
| "tool_results": [ | |
| { | |
| "tool": result.tool_name, | |
| "success": result.success, | |
| "summary": result.summary_text, | |
| "artifacts": result.artifacts, | |
| "warnings": result.warnings | |
| } | |
| for result in final_state["tool_results"] | |
| ], | |
| "llm_usage": final_state.get("llm_usage", {}) # Include LLM usage tracking | |
| } | |
| # Add OMIRL-specific metadata if available | |
| if final_state["omirl_data"]: | |
| response["metadata"]["omirl_data"] = final_state["omirl_data"] | |
| return response | |
| def _format_error_response(self, error_message: str, user_message: str, error_type: str = "Error") -> Dict[str, Any]: | |
| """ | |
| Format an error response | |
| Args: | |
| error_message: Error message | |
| user_message: Original user message | |
| error_type: Type of error for categorization | |
| Returns: | |
| Formatted error response | |
| """ | |
| # Italian error message templates | |
| if error_type == "ValidationError": | |
| response_text = f"⚠️ **Errore di Validazione**\n\n{error_message}\n\nVerifica che la richiesta sia corretta e riprova." | |
| elif error_type == "TimeoutError": | |
| response_text = f"⏱️ **Timeout**\n\nLa richiesta ha impiegato troppo tempo. Riprova con una richiesta più semplice." | |
| elif error_type == "WorkflowError": | |
| response_text = f"🔧 **Errore del Workflow**\n\n{error_message}\n\nRiprova o contatta l'assistenza tecnica." | |
| else: | |
| response_text = f"⚠️ **Errore di Sistema**\n\nSi è verificato un errore durante l'elaborazione della richiesta:\n{error_message}\n\nRiprova o contatta l'assistenza tecnica." | |
| return { | |
| "response": response_text, | |
| "artifacts": [], | |
| "sources": [], | |
| "metadata": { | |
| "session_id": self.session_id, | |
| "processing_status": "error", | |
| "error": error_message, | |
| "error_type": error_type, | |
| "success": False, | |
| "execution_time": datetime.now().isoformat() | |
| }, | |
| "tool_results": [] | |
| } | |
| def _update_conversation_history(self, user_message: str, agent_response: str): | |
| """ | |
| Update conversation history with new exchange | |
| Args: | |
| user_message: User's message | |
| agent_response: Agent's response | |
| """ | |
| exchange = { | |
| "timestamp": datetime.now().isoformat(), | |
| "user_message": user_message, | |
| "agent_response": agent_response[:500] + "..." if len(agent_response) > 500 else agent_response # Truncate long responses | |
| } | |
| self.conversation_history.append(exchange) | |
| # Keep only last 50 exchanges to prevent memory bloat | |
| if len(self.conversation_history) > 50: | |
| self.conversation_history = self.conversation_history[-50:] | |
| # Factory functions for different agent configurations | |
| def create_operations_agent(workflow_type: str = "default") -> OperationsAgent: | |
| """ | |
| Create an operations agent with specified configuration | |
| Args: | |
| workflow_type: Type of workflow ("default", "simple", "debug") | |
| Returns: | |
| Configured OperationsAgent instance | |
| """ | |
| return OperationsAgent(workflow_type=workflow_type) | |
| def create_debug_agent() -> OperationsAgent: | |
| """ | |
| Create an agent with debug configuration | |
| Returns: | |
| OperationsAgent with debug settings | |
| """ | |
| agent = OperationsAgent(workflow_type="debug") | |
| # Print debug information | |
| print_registry_summary() | |
| return agent | |
| # Agent testing and validation | |
| async def test_agent_basic(): | |
| """Test basic agent functionality""" | |
| print("🧪 Testing basic agent functionality...") | |
| agent = create_operations_agent("simple") | |
| test_requests = [ | |
| "aiuto", | |
| "mostra stazioni meteo liguria", | |
| "estrai sensori precipitazione genova" | |
| ] | |
| for request in test_requests: | |
| print(f"\n📝 Testing: '{request}'") | |
| response = await agent.process_request(request) | |
| print(f"✅ Response: {response['response'][:100]}...") | |
| print(f" Tools: {response['metadata']['tools_executed']}") | |
| if __name__ == "__main__": | |
| # Test agent creation and basic functionality | |
| agent = create_debug_agent() | |
| print(f"\n📊 Agent Info: {agent.get_session_info()}") | |
| print(f"🌊 OMIRL Capabilities: {agent.get_omirl_capabilities()}") | |
| # Run basic tests | |
| # asyncio.run(test_agent_basic()) | |