Spaces:
Sleeping
Sleeping
| # services/agents/base_agent.py | |
| """ | |
| Base class for all utility agents with logging and CrewAI integration. | |
| """ | |
| import os | |
| import hashlib | |
| import json | |
| import logging | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, Callable, Optional | |
| from abc import ABC, abstractmethod | |
| from crewai import Agent, Task, Crew | |
| # Configure logging | |
| logging.basicConfig( | |
| level=os.getenv("AGENT_LOG_LEVEL", "INFO"), | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| class BaseUtilityAgent(ABC): | |
| """ | |
| Base class for all utility agents. | |
| Each agent: | |
| - Wraps one utility function from /utilities | |
| - Uses CrewAI Agent with LiteLLM/Gemini | |
| - Logs all executions with structured metadata | |
| - Exposes run(input: dict) -> dict interface | |
| """ | |
| def __init__( | |
| self, | |
| name: str, | |
| role: str, | |
| goal: str, | |
| backstory: str, | |
| utility_function: Callable, | |
| model: Optional[str] = None | |
| ): | |
| """ | |
| Initialize the agent. | |
| Args: | |
| name: Agent identifier (e.g., "extract_text") | |
| role: Agent's role description | |
| goal: Agent's primary goal | |
| backstory: Agent's backstory for context | |
| utility_function: The original utility function to wrap | |
| model: LLM model to use (defaults to env AGENT_MODEL) | |
| """ | |
| self.name = name | |
| self.utility_function = utility_function | |
| self.model = model or os.getenv("AGENT_MODEL", "gemini/gemini-2.0-flash-exp") | |
| self.logger = logging.getLogger(f"agent.{name}") | |
| # Create CrewAI agent with LiteLLM | |
| self.agent = Agent( | |
| role=role, | |
| goal=goal, | |
| backstory=backstory, | |
| allow_delegation=False, # CRITICAL: No delegation in Phase 1 | |
| verbose=os.getenv("AGENT_LOG_LEVEL", "INFO") == "DEBUG", | |
| llm=self._create_llm() | |
| ) | |
| def _create_llm(self): | |
| """Create LLM instance compatible with CrewAI ≥0.80.0""" | |
| # CrewAI ≥0.80.0 has native Gemini support via google-generativeai | |
| # We use the model string directly instead of a wrapper | |
| # CrewAI will handle the LLM initialization internally | |
| # For CrewAI with Gemini, we can pass the model string directly | |
| # The format is "gemini/<model-name>" | |
| # CrewAI will use the GEMINI_API_KEY or GOOGLE_API_KEY from environment | |
| # Ensure API key is set | |
| if not os.getenv("GEMINI_API_KEY") and not os.getenv("GOOGLE_API_KEY"): | |
| raise ValueError("GEMINI_API_KEY or GOOGLE_API_KEY not found in environment") | |
| # Return the model string - CrewAI will handle it | |
| # CrewAI ≥0.80.0 accepts model strings directly | |
| return self.model | |
| def _hash_data(self, data: Any) -> str: | |
| """Create SHA256 hash of data for logging.""" | |
| json_str = json.dumps(data, sort_keys=True, default=str) | |
| return hashlib.sha256(json_str.encode()).hexdigest()[:16] | |
| def _log_execution( | |
| self, | |
| input_data: Dict[str, Any], | |
| output_data: Dict[str, Any], | |
| execution_time: float, | |
| success: bool, | |
| error: Optional[str] = None | |
| ): | |
| """Log agent execution with structured metadata.""" | |
| log_entry = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "agent_name": self.name, | |
| "model_used": self.model, | |
| "input_hash": self._hash_data(input_data), | |
| "output_hash": self._hash_data(output_data) if success else None, | |
| "execution_time_ms": round(execution_time * 1000, 2), | |
| "success": success, | |
| "error": error | |
| } | |
| if success: | |
| self.logger.info(f"Agent execution: {json.dumps(log_entry)}") | |
| else: | |
| self.logger.error(f"Agent execution failed: {json.dumps(log_entry)}") | |
| def _prepare_task_description(self, input_data: Dict[str, Any]) -> str: | |
| """ | |
| Prepare the task description for the CrewAI agent. | |
| This method should be implemented by each concrete agent | |
| to translate the input dict into a natural language task. | |
| Args: | |
| input_data: Input dictionary from caller | |
| Returns: | |
| Task description string for the agent | |
| """ | |
| pass | |
| def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Execute the agent with the given input. | |
| This is the MANDATORY interface contract. | |
| Args: | |
| input_data: Input dictionary specific to the utility | |
| Returns: | |
| Dictionary with: | |
| - Original utility output fields | |
| - confidence: float (0-1) | |
| - agent_metadata: execution details | |
| """ | |
| start_time = datetime.now(timezone.utc) | |
| try: | |
| # Handle task message structure from MasterOrchestrator | |
| # Task messages have structure: {"description": "...", "input": {...}} | |
| # We need to extract the actual input for the utility | |
| if "input" in input_data and "description" in input_data: | |
| # This is a task message from MasterOrchestrator | |
| actual_input = input_data["input"] | |
| task_description = input_data["description"] | |
| else: | |
| # Direct call (backward compatibility) | |
| actual_input = input_data | |
| task_description = None | |
| # Step 1: Call the original utility function | |
| # This ensures backward compatibility and correctness | |
| utility_result = self.utility_function(actual_input) | |
| # Step 2: Create a CrewAI task for the agent to validate/enhance the result | |
| # The agent doesn't replace the utility - it adds intelligence on top | |
| if not task_description: | |
| task_description = self._prepare_task_description(actual_input) | |
| task = Task( | |
| description=task_description, | |
| agent=self.agent, | |
| expected_output="Validation summary and confidence score" | |
| ) | |
| # Step 3: Execute the agent task | |
| crew = Crew( | |
| agents=[self.agent], | |
| tasks=[task], | |
| verbose=False | |
| ) | |
| # Agent provides validation/confidence | |
| agent_output = crew.kickoff() | |
| # Step 4: Combine utility result with agent metadata | |
| execution_time = (datetime.now(timezone.utc) - start_time).total_seconds() | |
| result = { | |
| **utility_result, # Original utility output | |
| "confidence": self._extract_confidence(str(agent_output)), | |
| "agent_metadata": { | |
| "agent_name": self.name, | |
| "model": self.model, | |
| "execution_time_ms": round(execution_time * 1000, 2), | |
| "validation": str(agent_output)[:200] # Truncated for brevity | |
| } | |
| } | |
| # Step 5: Log execution | |
| self._log_execution(actual_input, result, execution_time, True) | |
| return result | |
| except Exception as e: | |
| execution_time = (datetime.now(timezone.utc) - start_time).total_seconds() | |
| error_msg = str(e) | |
| # Log failure | |
| self._log_execution(input_data, {}, execution_time, False, error_msg) | |
| # Re-raise with context | |
| raise RuntimeError(f"Agent {self.name} failed: {error_msg}") from e | |
| def _extract_confidence(self, agent_output: str) -> float: | |
| """ | |
| Extract confidence score from agent output. | |
| Default implementation looks for patterns like "confidence: 0.95" | |
| Subclasses can override for custom extraction. | |
| """ | |
| import re | |
| # Look for confidence pattern | |
| match = re.search(r'confidence[:\s]+([0-9.]+)', agent_output.lower()) | |
| if match: | |
| try: | |
| return float(match.group(1)) | |
| except ValueError: | |
| pass | |
| # Default to high confidence if utility succeeded | |
| return 0.9 | |