Spaces:
Sleeping
Sleeping
File size: 9,078 Bytes
221a7c7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 | """
Memory Tool - ChromaDB-based vector memory for agent collaboration.
Provides persistent memory storage allowing agents to save and retrieve
context, enabling knowledge sharing across the research workflow.
"""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
import json
from crewai.tools import BaseTool
from pydantic import Field
try:
import chromadb
from chromadb.config import Settings as ChromaSettings
except ImportError:
chromadb = None
ChromaSettings = None
from src.config.settings import get_settings
from src.tools.base import ToolError
logger = logging.getLogger(__name__)
class MemoryTool(BaseTool):
"""
Tool for storing and retrieving research context using ChromaDB.
Enables agents to:
- Save findings for team reference
- Retrieve relevant context from previous research
- Share information across the research workflow
"""
name: str = "memory_tool"
description: str = (
"Store and retrieve research context from team memory. "
"Use 'save:<category>:<content>' to save information. "
"Use 'retrieve:<query>' to search for relevant stored information. "
"Categories: 'news', 'metrics', 'analysis', 'general'. "
"Example save: 'save:news:Apple announced new iPhone with AI features' "
"Example retrieve: 'retrieve:Apple recent announcements'"
)
_client: Optional[Any] = None
_collection: Optional[Any] = None
def __init__(self, **kwargs):
"""Initialize the memory tool with ChromaDB connection."""
super().__init__(**kwargs)
self._initialize_db()
def _initialize_db(self) -> None:
"""Initialize ChromaDB client and collection."""
if chromadb is None:
logger.warning("ChromaDB not installed. Memory features disabled.")
return
try:
settings = get_settings()
# Create persistent client
self._client = chromadb.PersistentClient(
path=str(settings.chroma_path),
settings=ChromaSettings(
anonymized_telemetry=False,
allow_reset=True
)
)
# Get or create collection
self._collection = self._client.get_or_create_collection(
name=settings.chroma_collection_name,
metadata={"description": "FinResearch AI agent memory"}
)
logger.info(f"ChromaDB initialized at {settings.chroma_path}")
except Exception as e:
logger.error(f"Failed to initialize ChromaDB: {e}")
self._client = None
self._collection = None
def _run(self, command: str) -> str:
"""
Execute a memory operation.
Args:
command: Memory command in format 'operation:args'
- 'save:<category>:<content>'
- 'retrieve:<query>'
- 'list' - show recent entries
- 'clear' - clear all memory
Returns:
Operation result or error message
"""
if self._collection is None:
return "ERROR: Memory system not available. ChromaDB may not be installed."
command = command.strip()
try:
if command.startswith("save:"):
return self._save(command[5:])
elif command.startswith("retrieve:"):
return self._retrieve(command[9:])
elif command == "list":
return self._list_recent()
elif command == "clear":
return self._clear()
else:
return (
"ERROR: Unknown command. Use:\n"
" - save:<category>:<content>\n"
" - retrieve:<query>\n"
" - list\n"
" - clear"
)
except Exception as e:
logger.exception(f"Memory operation failed: {command}")
return f"ERROR: Memory operation failed: {type(e).__name__}"
def _save(self, content: str) -> str:
"""Save content to memory."""
parts = content.split(":", 1)
if len(parts) < 2:
return "ERROR: Save format is 'save:<category>:<content>'"
category = parts[0].strip().lower()
text = parts[1].strip()
if not text:
return "ERROR: Cannot save empty content"
valid_categories = ['news', 'metrics', 'analysis', 'general']
if category not in valid_categories:
category = 'general'
# Generate unique ID
doc_id = f"{category}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
# Store in ChromaDB
self._collection.add(
documents=[text],
metadatas=[{
"category": category,
"timestamp": datetime.now().isoformat(),
"source": "agent"
}],
ids=[doc_id]
)
logger.debug(f"Saved to memory: {doc_id}")
return f"[OK] Saved to memory [{category}]: {text[:100]}{'...' if len(text) > 100 else ''}"
def _retrieve(self, query: str) -> str:
"""Retrieve relevant content from memory."""
if not query.strip():
return "ERROR: Empty query provided"
results = self._collection.query(
query_texts=[query],
n_results=5
)
if not results['documents'] or not results['documents'][0]:
return f"No relevant memories found for: {query}"
lines = [
f"MEMORY RETRIEVAL: {query}",
"=" * 50,
""
]
for i, (doc, meta) in enumerate(zip(
results['documents'][0],
results['metadatas'][0]
), 1):
category = meta.get('category', 'unknown')
timestamp = meta.get('timestamp', 'unknown')
lines.extend([
f"[{i}] Category: {category}",
f" Saved: {timestamp}",
f" Content: {doc}",
""
])
return "\n".join(lines)
def _list_recent(self, limit: int = 10) -> str:
"""List recent memory entries."""
# Get all entries (ChromaDB doesn't support easy sorting)
results = self._collection.get(
limit=limit,
include=['documents', 'metadatas']
)
if not results['documents']:
return "Memory is empty."
lines = [
"RECENT MEMORY ENTRIES",
"=" * 50,
""
]
for i, (doc, meta) in enumerate(zip(
results['documents'],
results['metadatas']
), 1):
category = meta.get('category', 'unknown') if meta else 'unknown'
lines.append(f"[{i}] [{category}] {doc[:80]}{'...' if len(doc) > 80 else ''}")
return "\n".join(lines)
def _clear(self) -> str:
"""Clear all memory entries."""
# Delete and recreate collection
settings = get_settings()
self._client.delete_collection(settings.chroma_collection_name)
self._collection = self._client.create_collection(
name=settings.chroma_collection_name,
metadata={"description": "FinResearch AI agent memory"}
)
logger.info("Memory cleared")
return "Memory cleared successfully."
# Convenience methods for direct Python access (not via LLM)
def save_context(self, category: str, content: str) -> bool:
"""Direct Python method to save context."""
result = self._run(f"save:{category}:{content}")
return result.startswith("[OK]")
def get_context(self, query: str) -> List[str]:
"""Direct Python method to retrieve context."""
if self._collection is None:
return []
results = self._collection.query(
query_texts=[query],
n_results=5
)
return results['documents'][0] if results['documents'] else []
@classmethod
def reset_all(cls) -> bool:
"""
Class method to reset all memory.
Creates a temporary instance and clears the collection.
Useful for CLI operations and testing.
Returns:
True if reset successful, False otherwise
"""
try:
instance = cls()
if instance._collection is None:
logger.warning("Memory not available for reset")
return False
instance._clear()
return True
except Exception as e:
logger.exception(f"Failed to reset memory: {e}")
return False
|