hollywoodfrancis's picture
Upload 16 files
b3e9d26 verified
"""
Memory management for quantitative analysis tasks.
"""
from typing import Dict, Any, List
from experts.shared_memory import SharedMemory
from experts.shared import shared
class QuantAnalystMemoryManager:
"""Memory manager specialized for quantitative analysis."""
def __init__(self):
self.shared_memory = shared.memory
def store_analysis(self, analysis: Dict[str, Any], metadata: Dict[str, Any]) -> None:
"""Store quantitative analysis results with metadata."""
self.shared_memory.store(
content=analysis,
metadata={
**metadata,
"domain": "quantitative_analysis",
"type": "analysis_result"
}
)
def query(self, task: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Query relevant analysis results from memory."""
return self.shared_memory.query(
query=task,
filter={
"domain": "quantitative_analysis",
"type": "analysis_result"
}
)
def add_context(self, context: Dict[str, Any], tags: List[str]) -> None:
"""Add contextual information with tags."""
self.shared_memory.store(
content=context,
metadata={
"domain": "quantitative_analysis",
"type": "context",
"tags": tags
}
)
def get_similar_analyses(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""Retrieve similar analysis results."""
return self.shared_memory.get_similar(
query=query,
filter={"domain": "quantitative_analysis"},
top_k=top_k
)