Janus-backend / backend /app /services /cache_manager.py
DevodG's picture
deploy: Janus full system stabilization
24f95f0
"""
Intelligent Cache Manager for MiroOrg v2.
Caches answers for generic queries to avoid redundant pipeline runs.
Only learns from specific/domain queries to keep memory clean.
"""
import json
import os
import time
import hashlib
from pathlib import Path
from typing import Dict, List, Any, Optional
from app.services.query_classifier import QueryType
CACHE_DIR = Path(__file__).parent.parent / "data" / "cache"
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# TTL in hours
GENERIC_TTL_HOURS = 720 # 30 days
SPECIFIC_TTL_HOURS = 168 # 7 days
HYBRID_TTL_HOURS = 336 # 14 days
def _query_hash(query: str) -> str:
"""Generate a stable hash for a query string."""
return hashlib.md5(query.lower().strip().encode()).hexdigest()
class CacheEntry:
"""Single cache entry with TTL."""
def __init__(
self,
query: str,
answer: str,
query_type: QueryType,
domain: str,
ttl_hours: int,
model_insights: List[str] = None,
metadata: Dict = None,
):
self.query = query
self.answer = answer
self.query_type = query_type.value
self.domain = domain
self.created_at = time.time()
self.ttl_seconds = ttl_hours * 3600
self.hit_count = 0
self.model_insights = model_insights or []
self.metadata = metadata or {}
def is_expired(self) -> bool:
return (time.time() - self.created_at) > self.ttl_seconds
def to_dict(self) -> Dict:
return {
"query": self.query,
"answer": self.answer,
"query_type": self.query_type,
"domain": self.domain,
"created_at": self.created_at,
"ttl_seconds": self.ttl_seconds,
"hit_count": self.hit_count,
"model_insights": self.model_insights,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: Dict) -> "CacheEntry":
entry = cls(
query=data["query"],
answer=data["answer"],
query_type=QueryType(data["query_type"]),
domain=data["domain"],
ttl_hours=data["ttl_seconds"] // 3600,
model_insights=data.get("model_insights", []),
metadata=data.get("metadata", {}),
)
entry.created_at = data["created_at"]
entry.hit_count = data.get("hit_count", 0)
return entry
class IntelligentCacheManager:
"""Manages intelligent caching with TTL and hit tracking."""
def __init__(self, cache_dir: Path = CACHE_DIR):
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _get_cache_path(self, query_hash: str) -> Path:
return self.cache_dir / f"{query_hash}.json"
def get(self, query: str) -> Optional[Dict]:
"""
Get cached answer for a query.
Returns:
Dict with answer, cached=True, cache_age_hours if found
None if not found or expired
"""
h = _query_hash(query)
path = self._get_cache_path(h)
if not path.exists():
return None
try:
with open(path) as f:
data = json.load(f)
entry = CacheEntry.from_dict(data)
if entry.is_expired():
path.unlink(missing_ok=True)
return None
# Increment hit count
entry.hit_count += 1
with open(path, "w") as f:
json.dump(entry.to_dict(), f, indent=2)
cache_age_hours = (time.time() - entry.created_at) / 3600
return {
"answer": entry.answer,
"cached": True,
"cache_age_hours": cache_age_hours,
"hit_count": entry.hit_count,
"query_type": entry.query_type,
"domain": entry.domain,
"model_insights": entry.model_insights,
}
except Exception:
return None
def put(
self,
query: str,
answer: str,
query_type: QueryType,
domain: str,
ttl_hours: int = None,
model_insights: List[str] = None,
metadata: Dict = None,
) -> None:
"""Store an answer in the cache."""
if ttl_hours is None:
if query_type == QueryType.GENERIC:
ttl_hours = GENERIC_TTL_HOURS
elif query_type == QueryType.SPECIFIC:
ttl_hours = SPECIFIC_TTL_HOURS
else:
ttl_hours = HYBRID_TTL_HOURS
entry = CacheEntry(
query=query,
answer=answer,
query_type=query_type,
domain=domain,
ttl_hours=ttl_hours,
model_insights=model_insights or [],
metadata=metadata or {},
)
h = _query_hash(query)
path = self._get_cache_path(h)
with open(path, "w") as f:
json.dump(entry.to_dict(), f, indent=2)
def cleanup_expired(self) -> int:
"""Remove expired cache entries. Returns count removed."""
count = 0
for path in self.cache_dir.glob("*.json"):
try:
with open(path) as f:
data = json.load(f)
entry = CacheEntry.from_dict(data)
if entry.is_expired():
path.unlink(missing_ok=True)
count += 1
except Exception:
path.unlink(missing_ok=True)
count += 1
return count
def get_stats(self) -> Dict:
"""Get cache statistics."""
total = 0
generic = 0
specific = 0
hybrid = 0
expired = 0
total_hits = 0
for path in self.cache_dir.glob("*.json"):
try:
with open(path) as f:
data = json.load(f)
entry = CacheEntry.from_dict(data)
total += 1
total_hits += entry.hit_count
if entry.is_expired():
expired += 1
elif entry.query_type == "generic":
generic += 1
elif entry.query_type == "specific":
specific += 1
else:
hybrid += 1
except Exception:
pass
# Calculate cache size
cache_size_bytes = sum(
p.stat().st_size for p in self.cache_dir.glob("*.json") if p.exists()
)
cache_size_mb = cache_size_bytes / (1024 * 1024)
return {
"total_entries": total,
"generic_entries": generic,
"specific_entries": specific,
"hybrid_entries": hybrid,
"expired_entries": expired,
"total_hits": total_hits,
"cache_size_mb": round(cache_size_mb, 2),
"avg_hits_per_entry": round(total_hits / total, 1) if total > 0 else 0,
}