| """ |
| News Intelligence Cache — Local JSON-based cache with URL dedup and TTL eviction. |
| Stores processed news items to avoid re-analyzing the same headlines. |
| """ |
| import hashlib |
| import json |
| import logging |
| import time |
| from pathlib import Path |
| from threading import Lock |
|
|
| logger = logging.getLogger(__name__) |
|
|
| CACHE_PATH = Path(__file__).parent.parent.parent / "data" / "news_cache.json" |
| MAX_ITEMS = 500 |
| TTL_SECONDS = 24 * 60 * 60 |
|
|
|
|
| class NewsCache: |
| """Thread-safe local JSON cache for processed news intelligence.""" |
|
|
| def __init__(self, path: Path = CACHE_PATH): |
| self._path = path |
| self._lock = Lock() |
| self._data: dict = self._load() |
|
|
| def _load(self) -> dict: |
| if self._path.exists(): |
| try: |
| with open(self._path, "r", encoding="utf-8") as f: |
| return json.load(f) |
| except (json.JSONDecodeError, IOError): |
| logger.warning("Corrupted cache file, starting fresh") |
| return {"items": {}, "last_refresh": None} |
|
|
| def _save(self): |
| self._path.parent.mkdir(parents=True, exist_ok=True) |
| with open(self._path, "w", encoding="utf-8") as f: |
| json.dump(self._data, f, indent=2, ensure_ascii=False) |
|
|
| @staticmethod |
| def _url_hash(url: str) -> str: |
| return hashlib.sha256(url.encode()).hexdigest()[:16] |
|
|
| def has(self, url: str) -> bool: |
| """Check if a URL has already been processed.""" |
| h = self._url_hash(url) |
| with self._lock: |
| if h not in self._data["items"]: |
| return False |
| item = self._data["items"][h] |
| |
| if time.time() - item.get("cached_at", 0) > TTL_SECONDS: |
| del self._data["items"][h] |
| return False |
| return True |
|
|
| def get(self, url: str) -> dict | None: |
| """Get a cached item by URL.""" |
| h = self._url_hash(url) |
| with self._lock: |
| return self._data["items"].get(h) |
|
|
| def put(self, url: str, item: dict): |
| """Store a processed news item.""" |
| h = self._url_hash(url) |
| with self._lock: |
| item["cached_at"] = time.time() |
| item["url_hash"] = h |
| self._data["items"][h] = item |
| self._evict() |
| self._save() |
|
|
| def put_batch(self, items: list[dict]): |
| """Store multiple items at once (single write).""" |
| with self._lock: |
| now = time.time() |
| for item in items: |
| url = item.get("url", "") |
| h = self._url_hash(url) |
| item["cached_at"] = now |
| item["url_hash"] = h |
| self._data["items"][h] = item |
| self._evict() |
| self._data["last_refresh"] = now |
| self._save() |
|
|
| def get_all(self, concern_level: str | None = None) -> list[dict]: |
| """Get all cached items, optionally filtered by concern level.""" |
| with self._lock: |
| now = time.time() |
| items = [] |
| for item in self._data["items"].values(): |
| if now - item.get("cached_at", 0) > TTL_SECONDS: |
| continue |
| if concern_level and item.get("concern_level") != concern_level: |
| continue |
| items.append(item) |
| |
| items.sort(key=lambda x: x.get("published", ""), reverse=True) |
| return items |
|
|
| def get_stats(self) -> dict: |
| """Get summary counts per concern level.""" |
| all_items = self.get_all() |
| counts = {"total": len(all_items), "high": 0, "medium": 0, "low": 0} |
| for item in all_items: |
| level = item.get("concern_level", "low") |
| counts[level] = counts.get(level, 0) + 1 |
| counts["last_refresh"] = self._data.get("last_refresh") |
| return counts |
|
|
| def _evict(self): |
| """Remove oldest items if cache exceeds MAX_ITEMS.""" |
| items = self._data["items"] |
| if len(items) <= MAX_ITEMS: |
| return |
| sorted_keys = sorted(items.keys(), key=lambda k: items[k].get("cached_at", 0)) |
| to_remove = len(items) - MAX_ITEMS |
| for key in sorted_keys[:to_remove]: |
| del items[key] |
|
|
| def clear(self): |
| """Clear all cached items.""" |
| with self._lock: |
| self._data = {"items": {}, "last_refresh": None} |
| self._save() |
|
|