Spaces:
Sleeping
Sleeping
| """Base agent module for all specialized agents.""" | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, Optional | |
| import re | |
| from openai import OpenAI | |
| from config import settings | |
| logger = logging.getLogger(__name__) | |
| class BaseAgent: | |
| """Base class for all agents in the system.""" | |
| def __init__(self, agent_id: str, agent_name: str, system_prompt: str): | |
| """Initialize the base agent. | |
| Args: | |
| agent_id: Unique identifier for the agent | |
| agent_name: Human-readable name of the agent | |
| system_prompt: System prompt for the agent's behavior | |
| """ | |
| self.agent_id = agent_id | |
| self.agent_name = agent_name | |
| self.system_prompt = system_prompt | |
| # Initialize DashScope client (OpenAI-compatible API) | |
| self.client = OpenAI( | |
| api_key=settings.openrouter_api_key, | |
| base_url=settings.openrouter_base_url, | |
| ) | |
| # Data storage | |
| self.data_dir = Path(settings.data_dir) / agent_id | |
| self.data_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"Initialized agent: {agent_name} ({agent_id})") | |
| def process(self, inputs: Dict[str, Any]) -> Dict[str, Any]: | |
| """Process inputs and generate outputs using the LLM. | |
| Args: | |
| inputs: Dictionary containing input data for the agent | |
| Returns: | |
| Dictionary containing the agent's outputs | |
| """ | |
| # Prepare the prompt | |
| input_text = json.dumps(inputs, indent=2) | |
| logger.info(f"{self.agent_name}: Processing inputs") | |
| try: | |
| # Call DashScope API via OpenAI client | |
| response = self.client.chat.completions.create( | |
| model=settings.model_name, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": self.system_prompt, | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Process the following inputs:\n\n{input_text}", | |
| }, | |
| ], | |
| temperature=0.7, | |
| max_tokens=4096, | |
| ) | |
| output_text = response.choices[0].message.content | |
| # Parse output (assuming JSON format) | |
| try: | |
| # Attempt direct JSON parsing first | |
| output_data = json.loads(output_text) | |
| except json.JSONDecodeError: | |
| # If direct parsing fails, try to extract JSON from markdown or embedded text | |
| json_match = re.search(r'```json\s*(.*?)\s*```', output_text, re.DOTALL) | |
| if json_match: | |
| try: | |
| output_data = json.loads(json_match.group(1)) | |
| except json.JSONDecodeError: | |
| # Fallback if markdown-wrapped JSON is malformed | |
| output_data = {"output": output_text} | |
| else: | |
| # Try to find any JSON object embedded in the text | |
| json_obj_match = re.search(r'(\{.*\})', output_text, re.DOTALL) | |
| if json_obj_match: | |
| try: | |
| output_data = json.loads(json_obj_match.group(1)) | |
| except json.JSONDecodeError: | |
| # Fallback if embedded JSON is malformed | |
| output_data = {"output": output_text} | |
| else: | |
| # If no JSON found, wrap the entire output | |
| output_data = {"output": output_text} | |
| logger.info(f"{self.agent_name}: Processing completed") | |
| return output_data | |
| except Exception as e: | |
| logger.error(f"{self.agent_name}: Error during processing - {str(e)}") | |
| raise | |
| def save_state(self, data: Dict[str, Any], filename: Optional[str] = None) -> str: | |
| """Save agent state and data to persistent JSON storage. | |
| Args: | |
| data: Data to save | |
| filename: Optional custom filename (defaults to timestamp) | |
| Returns: | |
| Path to saved file | |
| """ | |
| if filename is None: | |
| timestamp = datetime.now().isoformat().replace(":", "-") | |
| filename = f"{self.agent_id}_{timestamp}.json" | |
| filepath = self.data_dir / filename | |
| # Add metadata | |
| state_data = { | |
| "agent_id": self.agent_id, | |
| "agent_name": self.agent_name, | |
| "timestamp": datetime.now().isoformat(), | |
| "data": data, | |
| } | |
| with open(filepath, "w") as f: | |
| json.dump(state_data, f, indent=2) | |
| logger.info(f"{self.agent_name}: State saved to {filepath}") | |
| return str(filepath) | |
| def load_state(self, filename: str) -> Dict[str, Any]: | |
| """Load agent state from persistent storage. | |
| Args: | |
| filename: Filename to load | |
| Returns: | |
| Loaded data | |
| """ | |
| filepath = self.data_dir / filename | |
| with open(filepath, "r") as f: | |
| state_data = json.load(f) | |
| logger.info(f"{self.agent_name}: State loaded from {filepath}") | |
| return state_data.get("data", {}) | |