""" Memory management for quantitative analysis tasks. """ from typing import Dict, Any, List from experts.shared_memory import SharedMemory from experts.shared import shared class QuantAnalystMemoryManager: """Memory manager specialized for quantitative analysis.""" def __init__(self): self.shared_memory = shared.memory def store_analysis(self, analysis: Dict[str, Any], metadata: Dict[str, Any]) -> None: """Store quantitative analysis results with metadata.""" self.shared_memory.store( content=analysis, metadata={ **metadata, "domain": "quantitative_analysis", "type": "analysis_result" } ) def query(self, task: Dict[str, Any]) -> List[Dict[str, Any]]: """Query relevant analysis results from memory.""" return self.shared_memory.query( query=task, filter={ "domain": "quantitative_analysis", "type": "analysis_result" } ) def add_context(self, context: Dict[str, Any], tags: List[str]) -> None: """Add contextual information with tags.""" self.shared_memory.store( content=context, metadata={ "domain": "quantitative_analysis", "type": "context", "tags": tags } ) def get_similar_analyses(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: """Retrieve similar analysis results.""" return self.shared_memory.get_similar( query=query, filter={"domain": "quantitative_analysis"}, top_k=top_k )