"""Base agent module for all specialized agents.""" import json import logging from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional import re from openai import OpenAI from config import settings logger = logging.getLogger(__name__) class BaseAgent: """Base class for all agents in the system.""" def __init__(self, agent_id: str, agent_name: str, system_prompt: str): """Initialize the base agent. Args: agent_id: Unique identifier for the agent agent_name: Human-readable name of the agent system_prompt: System prompt for the agent's behavior """ self.agent_id = agent_id self.agent_name = agent_name self.system_prompt = system_prompt # Initialize DashScope client (OpenAI-compatible API) self.client = OpenAI( api_key=settings.openrouter_api_key, base_url=settings.openrouter_base_url, ) # Data storage self.data_dir = Path(settings.data_dir) / agent_id self.data_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Initialized agent: {agent_name} ({agent_id})") def process(self, inputs: Dict[str, Any]) -> Dict[str, Any]: """Process inputs and generate outputs using the LLM. Args: inputs: Dictionary containing input data for the agent Returns: Dictionary containing the agent's outputs """ # Prepare the prompt input_text = json.dumps(inputs, indent=2) logger.info(f"{self.agent_name}: Processing inputs") try: # Call DashScope API via OpenAI client response = self.client.chat.completions.create( model=settings.model_name, messages=[ { "role": "system", "content": self.system_prompt, }, { "role": "user", "content": f"Process the following inputs:\n\n{input_text}", }, ], temperature=0.7, max_tokens=4096, ) output_text = response.choices[0].message.content # Parse output (assuming JSON format) try: # Attempt direct JSON parsing first output_data = json.loads(output_text) except json.JSONDecodeError: # If direct parsing fails, try to extract JSON from markdown or embedded text json_match = re.search(r'```json\s*(.*?)\s*```', output_text, re.DOTALL) if json_match: try: output_data = json.loads(json_match.group(1)) except json.JSONDecodeError: # Fallback if markdown-wrapped JSON is malformed output_data = {"output": output_text} else: # Try to find any JSON object embedded in the text json_obj_match = re.search(r'(\{.*\})', output_text, re.DOTALL) if json_obj_match: try: output_data = json.loads(json_obj_match.group(1)) except json.JSONDecodeError: # Fallback if embedded JSON is malformed output_data = {"output": output_text} else: # If no JSON found, wrap the entire output output_data = {"output": output_text} logger.info(f"{self.agent_name}: Processing completed") return output_data except Exception as e: logger.error(f"{self.agent_name}: Error during processing - {str(e)}") raise def save_state(self, data: Dict[str, Any], filename: Optional[str] = None) -> str: """Save agent state and data to persistent JSON storage. Args: data: Data to save filename: Optional custom filename (defaults to timestamp) Returns: Path to saved file """ if filename is None: timestamp = datetime.now().isoformat().replace(":", "-") filename = f"{self.agent_id}_{timestamp}.json" filepath = self.data_dir / filename # Add metadata state_data = { "agent_id": self.agent_id, "agent_name": self.agent_name, "timestamp": datetime.now().isoformat(), "data": data, } with open(filepath, "w") as f: json.dump(state_data, f, indent=2) logger.info(f"{self.agent_name}: State saved to {filepath}") return str(filepath) def load_state(self, filename: str) -> Dict[str, Any]: """Load agent state from persistent storage. Args: filename: Filename to load Returns: Loaded data """ filepath = self.data_dir / filename with open(filepath, "r") as f: state_data = json.load(f) logger.info(f"{self.agent_name}: State loaded from {filepath}") return state_data.get("data", {})