Spaces:
Sleeping
Sleeping
File size: 5,464 Bytes
2dfc473 d3ef6c9 2dfc473 35db168 2dfc473 35db168 2dfc473 d3ef6c9 2dfc473 d3ef6c9 2dfc473 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | """Base agent module for all specialized agents."""
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
import re
from openai import OpenAI
from config import settings
logger = logging.getLogger(__name__)
class BaseAgent:
"""Base class for all agents in the system."""
def __init__(self, agent_id: str, agent_name: str, system_prompt: str):
"""Initialize the base agent.
Args:
agent_id: Unique identifier for the agent
agent_name: Human-readable name of the agent
system_prompt: System prompt for the agent's behavior
"""
self.agent_id = agent_id
self.agent_name = agent_name
self.system_prompt = system_prompt
# Initialize DashScope client (OpenAI-compatible API)
self.client = OpenAI(
api_key=settings.openrouter_api_key,
base_url=settings.openrouter_base_url,
)
# Data storage
self.data_dir = Path(settings.data_dir) / agent_id
self.data_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Initialized agent: {agent_name} ({agent_id})")
def process(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Process inputs and generate outputs using the LLM.
Args:
inputs: Dictionary containing input data for the agent
Returns:
Dictionary containing the agent's outputs
"""
# Prepare the prompt
input_text = json.dumps(inputs, indent=2)
logger.info(f"{self.agent_name}: Processing inputs")
try:
# Call DashScope API via OpenAI client
response = self.client.chat.completions.create(
model=settings.model_name,
messages=[
{
"role": "system",
"content": self.system_prompt,
},
{
"role": "user",
"content": f"Process the following inputs:\n\n{input_text}",
},
],
temperature=0.7,
max_tokens=4096,
)
output_text = response.choices[0].message.content
# Parse output (assuming JSON format)
try:
# Attempt direct JSON parsing first
output_data = json.loads(output_text)
except json.JSONDecodeError:
# If direct parsing fails, try to extract JSON from markdown or embedded text
json_match = re.search(r'```json\s*(.*?)\s*```', output_text, re.DOTALL)
if json_match:
try:
output_data = json.loads(json_match.group(1))
except json.JSONDecodeError:
# Fallback if markdown-wrapped JSON is malformed
output_data = {"output": output_text}
else:
# Try to find any JSON object embedded in the text
json_obj_match = re.search(r'(\{.*\})', output_text, re.DOTALL)
if json_obj_match:
try:
output_data = json.loads(json_obj_match.group(1))
except json.JSONDecodeError:
# Fallback if embedded JSON is malformed
output_data = {"output": output_text}
else:
# If no JSON found, wrap the entire output
output_data = {"output": output_text}
logger.info(f"{self.agent_name}: Processing completed")
return output_data
except Exception as e:
logger.error(f"{self.agent_name}: Error during processing - {str(e)}")
raise
def save_state(self, data: Dict[str, Any], filename: Optional[str] = None) -> str:
"""Save agent state and data to persistent JSON storage.
Args:
data: Data to save
filename: Optional custom filename (defaults to timestamp)
Returns:
Path to saved file
"""
if filename is None:
timestamp = datetime.now().isoformat().replace(":", "-")
filename = f"{self.agent_id}_{timestamp}.json"
filepath = self.data_dir / filename
# Add metadata
state_data = {
"agent_id": self.agent_id,
"agent_name": self.agent_name,
"timestamp": datetime.now().isoformat(),
"data": data,
}
with open(filepath, "w") as f:
json.dump(state_data, f, indent=2)
logger.info(f"{self.agent_name}: State saved to {filepath}")
return str(filepath)
def load_state(self, filename: str) -> Dict[str, Any]:
"""Load agent state from persistent storage.
Args:
filename: Filename to load
Returns:
Loaded data
"""
filepath = self.data_dir / filename
with open(filepath, "r") as f:
state_data = json.load(f)
logger.info(f"{self.agent_name}: State loaded from {filepath}")
return state_data.get("data", {})
|