multi-agent-system / base_agent.py
factorstudios's picture
Update base_agent.py
d3ef6c9 verified
Raw
History Blame Contribute Delete
5.46 kB
"""Base agent module for all specialized agents."""
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
import re
from openai import OpenAI
from config import settings
logger = logging.getLogger(__name__)
class BaseAgent:
"""Base class for all agents in the system."""
def __init__(self, agent_id: str, agent_name: str, system_prompt: str):
"""Initialize the base agent.
Args:
agent_id: Unique identifier for the agent
agent_name: Human-readable name of the agent
system_prompt: System prompt for the agent's behavior
"""
self.agent_id = agent_id
self.agent_name = agent_name
self.system_prompt = system_prompt
# Initialize DashScope client (OpenAI-compatible API)
self.client = OpenAI(
api_key=settings.openrouter_api_key,
base_url=settings.openrouter_base_url,
)
# Data storage
self.data_dir = Path(settings.data_dir) / agent_id
self.data_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Initialized agent: {agent_name} ({agent_id})")
def process(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Process inputs and generate outputs using the LLM.
Args:
inputs: Dictionary containing input data for the agent
Returns:
Dictionary containing the agent's outputs
"""
# Prepare the prompt
input_text = json.dumps(inputs, indent=2)
logger.info(f"{self.agent_name}: Processing inputs")
try:
# Call DashScope API via OpenAI client
response = self.client.chat.completions.create(
model=settings.model_name,
messages=[
{
"role": "system",
"content": self.system_prompt,
},
{
"role": "user",
"content": f"Process the following inputs:\n\n{input_text}",
},
],
temperature=0.7,
max_tokens=4096,
)
output_text = response.choices[0].message.content
# Parse output (assuming JSON format)
try:
# Attempt direct JSON parsing first
output_data = json.loads(output_text)
except json.JSONDecodeError:
# If direct parsing fails, try to extract JSON from markdown or embedded text
json_match = re.search(r'```json\s*(.*?)\s*```', output_text, re.DOTALL)
if json_match:
try:
output_data = json.loads(json_match.group(1))
except json.JSONDecodeError:
# Fallback if markdown-wrapped JSON is malformed
output_data = {"output": output_text}
else:
# Try to find any JSON object embedded in the text
json_obj_match = re.search(r'(\{.*\})', output_text, re.DOTALL)
if json_obj_match:
try:
output_data = json.loads(json_obj_match.group(1))
except json.JSONDecodeError:
# Fallback if embedded JSON is malformed
output_data = {"output": output_text}
else:
# If no JSON found, wrap the entire output
output_data = {"output": output_text}
logger.info(f"{self.agent_name}: Processing completed")
return output_data
except Exception as e:
logger.error(f"{self.agent_name}: Error during processing - {str(e)}")
raise
def save_state(self, data: Dict[str, Any], filename: Optional[str] = None) -> str:
"""Save agent state and data to persistent JSON storage.
Args:
data: Data to save
filename: Optional custom filename (defaults to timestamp)
Returns:
Path to saved file
"""
if filename is None:
timestamp = datetime.now().isoformat().replace(":", "-")
filename = f"{self.agent_id}_{timestamp}.json"
filepath = self.data_dir / filename
# Add metadata
state_data = {
"agent_id": self.agent_id,
"agent_name": self.agent_name,
"timestamp": datetime.now().isoformat(),
"data": data,
}
with open(filepath, "w") as f:
json.dump(state_data, f, indent=2)
logger.info(f"{self.agent_name}: State saved to {filepath}")
return str(filepath)
def load_state(self, filename: str) -> Dict[str, Any]:
"""Load agent state from persistent storage.
Args:
filename: Filename to load
Returns:
Loaded data
"""
filepath = self.data_dir / filename
with open(filepath, "r") as f:
state_data = json.load(f)
logger.info(f"{self.agent_name}: State loaded from {filepath}")
return state_data.get("data", {})