""" News Intelligence Cache — Local JSON-based cache with URL dedup and TTL eviction. Stores processed news items to avoid re-analyzing the same headlines. """ import hashlib import json import logging import time from pathlib import Path from threading import Lock logger = logging.getLogger(__name__) CACHE_PATH = Path(__file__).parent.parent.parent / "data" / "news_cache.json" MAX_ITEMS = 500 TTL_SECONDS = 24 * 60 * 60 # 24 hours class NewsCache: """Thread-safe local JSON cache for processed news intelligence.""" def __init__(self, path: Path = CACHE_PATH): self._path = path self._lock = Lock() self._data: dict = self._load() def _load(self) -> dict: if self._path.exists(): try: with open(self._path, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, IOError): logger.warning("Corrupted cache file, starting fresh") return {"items": {}, "last_refresh": None} def _save(self): self._path.parent.mkdir(parents=True, exist_ok=True) with open(self._path, "w", encoding="utf-8") as f: json.dump(self._data, f, indent=2, ensure_ascii=False) @staticmethod def _url_hash(url: str) -> str: return hashlib.sha256(url.encode()).hexdigest()[:16] def has(self, url: str) -> bool: """Check if a URL has already been processed.""" h = self._url_hash(url) with self._lock: if h not in self._data["items"]: return False item = self._data["items"][h] # Check TTL if time.time() - item.get("cached_at", 0) > TTL_SECONDS: del self._data["items"][h] return False return True def get(self, url: str) -> dict | None: """Get a cached item by URL.""" h = self._url_hash(url) with self._lock: return self._data["items"].get(h) def put(self, url: str, item: dict): """Store a processed news item.""" h = self._url_hash(url) with self._lock: item["cached_at"] = time.time() item["url_hash"] = h self._data["items"][h] = item self._evict() self._save() def put_batch(self, items: list[dict]): """Store multiple items at once (single write).""" with self._lock: now = time.time() for item in items: url = item.get("url", "") h = self._url_hash(url) item["cached_at"] = now item["url_hash"] = h self._data["items"][h] = item self._evict() self._data["last_refresh"] = now self._save() def get_all(self, concern_level: str | None = None) -> list[dict]: """Get all cached items, optionally filtered by concern level.""" with self._lock: now = time.time() items = [] for item in self._data["items"].values(): if now - item.get("cached_at", 0) > TTL_SECONDS: continue if concern_level and item.get("concern_level") != concern_level: continue items.append(item) # Sort by published time (newest first) items.sort(key=lambda x: x.get("published", ""), reverse=True) return items def get_stats(self) -> dict: """Get summary counts per concern level.""" all_items = self.get_all() counts = {"total": len(all_items), "high": 0, "medium": 0, "low": 0} for item in all_items: level = item.get("concern_level", "low") counts[level] = counts.get(level, 0) + 1 counts["last_refresh"] = self._data.get("last_refresh") return counts def _evict(self): """Remove oldest items if cache exceeds MAX_ITEMS.""" items = self._data["items"] if len(items) <= MAX_ITEMS: return sorted_keys = sorted(items.keys(), key=lambda k: items[k].get("cached_at", 0)) to_remove = len(items) - MAX_ITEMS for key in sorted_keys[:to_remove]: del items[key] def clear(self): """Clear all cached items.""" with self._lock: self._data = {"items": {}, "last_refresh": None} self._save()