Spaces:
Running
Running
| """ | |
| Intelligent Cache Manager for MiroOrg v2. | |
| Caches answers for generic queries to avoid redundant pipeline runs. | |
| Only learns from specific/domain queries to keep memory clean. | |
| """ | |
| import json | |
| import os | |
| import time | |
| import hashlib | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional | |
| from app.services.query_classifier import QueryType | |
| CACHE_DIR = Path(__file__).parent.parent / "data" / "cache" | |
| CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| # TTL in hours | |
| GENERIC_TTL_HOURS = 720 # 30 days | |
| SPECIFIC_TTL_HOURS = 168 # 7 days | |
| HYBRID_TTL_HOURS = 336 # 14 days | |
| def _query_hash(query: str) -> str: | |
| """Generate a stable hash for a query string.""" | |
| return hashlib.md5(query.lower().strip().encode()).hexdigest() | |
| class CacheEntry: | |
| """Single cache entry with TTL.""" | |
| def __init__( | |
| self, | |
| query: str, | |
| answer: str, | |
| query_type: QueryType, | |
| domain: str, | |
| ttl_hours: int, | |
| model_insights: List[str] = None, | |
| metadata: Dict = None, | |
| ): | |
| self.query = query | |
| self.answer = answer | |
| self.query_type = query_type.value | |
| self.domain = domain | |
| self.created_at = time.time() | |
| self.ttl_seconds = ttl_hours * 3600 | |
| self.hit_count = 0 | |
| self.model_insights = model_insights or [] | |
| self.metadata = metadata or {} | |
| def is_expired(self) -> bool: | |
| return (time.time() - self.created_at) > self.ttl_seconds | |
| def to_dict(self) -> Dict: | |
| return { | |
| "query": self.query, | |
| "answer": self.answer, | |
| "query_type": self.query_type, | |
| "domain": self.domain, | |
| "created_at": self.created_at, | |
| "ttl_seconds": self.ttl_seconds, | |
| "hit_count": self.hit_count, | |
| "model_insights": self.model_insights, | |
| "metadata": self.metadata, | |
| } | |
| def from_dict(cls, data: Dict) -> "CacheEntry": | |
| entry = cls( | |
| query=data["query"], | |
| answer=data["answer"], | |
| query_type=QueryType(data["query_type"]), | |
| domain=data["domain"], | |
| ttl_hours=data["ttl_seconds"] // 3600, | |
| model_insights=data.get("model_insights", []), | |
| metadata=data.get("metadata", {}), | |
| ) | |
| entry.created_at = data["created_at"] | |
| entry.hit_count = data.get("hit_count", 0) | |
| return entry | |
| class IntelligentCacheManager: | |
| """Manages intelligent caching with TTL and hit tracking.""" | |
| def __init__(self, cache_dir: Path = CACHE_DIR): | |
| self.cache_dir = cache_dir | |
| self.cache_dir.mkdir(parents=True, exist_ok=True) | |
| def _get_cache_path(self, query_hash: str) -> Path: | |
| return self.cache_dir / f"{query_hash}.json" | |
| def get(self, query: str) -> Optional[Dict]: | |
| """ | |
| Get cached answer for a query. | |
| Returns: | |
| Dict with answer, cached=True, cache_age_hours if found | |
| None if not found or expired | |
| """ | |
| h = _query_hash(query) | |
| path = self._get_cache_path(h) | |
| if not path.exists(): | |
| return None | |
| try: | |
| with open(path) as f: | |
| data = json.load(f) | |
| entry = CacheEntry.from_dict(data) | |
| if entry.is_expired(): | |
| path.unlink(missing_ok=True) | |
| return None | |
| # Increment hit count | |
| entry.hit_count += 1 | |
| with open(path, "w") as f: | |
| json.dump(entry.to_dict(), f, indent=2) | |
| cache_age_hours = (time.time() - entry.created_at) / 3600 | |
| return { | |
| "answer": entry.answer, | |
| "cached": True, | |
| "cache_age_hours": cache_age_hours, | |
| "hit_count": entry.hit_count, | |
| "query_type": entry.query_type, | |
| "domain": entry.domain, | |
| "model_insights": entry.model_insights, | |
| } | |
| except Exception: | |
| return None | |
| def put( | |
| self, | |
| query: str, | |
| answer: str, | |
| query_type: QueryType, | |
| domain: str, | |
| ttl_hours: int = None, | |
| model_insights: List[str] = None, | |
| metadata: Dict = None, | |
| ) -> None: | |
| """Store an answer in the cache.""" | |
| if ttl_hours is None: | |
| if query_type == QueryType.GENERIC: | |
| ttl_hours = GENERIC_TTL_HOURS | |
| elif query_type == QueryType.SPECIFIC: | |
| ttl_hours = SPECIFIC_TTL_HOURS | |
| else: | |
| ttl_hours = HYBRID_TTL_HOURS | |
| entry = CacheEntry( | |
| query=query, | |
| answer=answer, | |
| query_type=query_type, | |
| domain=domain, | |
| ttl_hours=ttl_hours, | |
| model_insights=model_insights or [], | |
| metadata=metadata or {}, | |
| ) | |
| h = _query_hash(query) | |
| path = self._get_cache_path(h) | |
| with open(path, "w") as f: | |
| json.dump(entry.to_dict(), f, indent=2) | |
| def cleanup_expired(self) -> int: | |
| """Remove expired cache entries. Returns count removed.""" | |
| count = 0 | |
| for path in self.cache_dir.glob("*.json"): | |
| try: | |
| with open(path) as f: | |
| data = json.load(f) | |
| entry = CacheEntry.from_dict(data) | |
| if entry.is_expired(): | |
| path.unlink(missing_ok=True) | |
| count += 1 | |
| except Exception: | |
| path.unlink(missing_ok=True) | |
| count += 1 | |
| return count | |
| def get_stats(self) -> Dict: | |
| """Get cache statistics.""" | |
| total = 0 | |
| generic = 0 | |
| specific = 0 | |
| hybrid = 0 | |
| expired = 0 | |
| total_hits = 0 | |
| for path in self.cache_dir.glob("*.json"): | |
| try: | |
| with open(path) as f: | |
| data = json.load(f) | |
| entry = CacheEntry.from_dict(data) | |
| total += 1 | |
| total_hits += entry.hit_count | |
| if entry.is_expired(): | |
| expired += 1 | |
| elif entry.query_type == "generic": | |
| generic += 1 | |
| elif entry.query_type == "specific": | |
| specific += 1 | |
| else: | |
| hybrid += 1 | |
| except Exception: | |
| pass | |
| # Calculate cache size | |
| cache_size_bytes = sum( | |
| p.stat().st_size for p in self.cache_dir.glob("*.json") if p.exists() | |
| ) | |
| cache_size_mb = cache_size_bytes / (1024 * 1024) | |
| return { | |
| "total_entries": total, | |
| "generic_entries": generic, | |
| "specific_entries": specific, | |
| "hybrid_entries": hybrid, | |
| "expired_entries": expired, | |
| "total_hits": total_hits, | |
| "cache_size_mb": round(cache_size_mb, 2), | |
| "avg_hits_per_entry": round(total_hits / total, 1) if total > 0 else 0, | |
| } | |