| """ | |
| Memory management for quantitative analysis tasks. | |
| """ | |
| from typing import Dict, Any, List | |
| from experts.shared_memory import SharedMemory | |
| from experts.shared import shared | |
| class QuantAnalystMemoryManager: | |
| """Memory manager specialized for quantitative analysis.""" | |
| def __init__(self): | |
| self.shared_memory = shared.memory | |
| def store_analysis(self, analysis: Dict[str, Any], metadata: Dict[str, Any]) -> None: | |
| """Store quantitative analysis results with metadata.""" | |
| self.shared_memory.store( | |
| content=analysis, | |
| metadata={ | |
| **metadata, | |
| "domain": "quantitative_analysis", | |
| "type": "analysis_result" | |
| } | |
| ) | |
| def query(self, task: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Query relevant analysis results from memory.""" | |
| return self.shared_memory.query( | |
| query=task, | |
| filter={ | |
| "domain": "quantitative_analysis", | |
| "type": "analysis_result" | |
| } | |
| ) | |
| def add_context(self, context: Dict[str, Any], tags: List[str]) -> None: | |
| """Add contextual information with tags.""" | |
| self.shared_memory.store( | |
| content=context, | |
| metadata={ | |
| "domain": "quantitative_analysis", | |
| "type": "context", | |
| "tags": tags | |
| } | |
| ) | |
| def get_similar_analyses(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: | |
| """Retrieve similar analysis results.""" | |
| return self.shared_memory.get_similar( | |
| query=query, | |
| filter={"domain": "quantitative_analysis"}, | |
| top_k=top_k | |
| ) | |