SiddharthVenba's picture
Initial commit for HF Space
75d9b3c
Raw
History Blame Contribute Delete
4.45 kB
"""
News Intelligence Cache — Local JSON-based cache with URL dedup and TTL eviction.
Stores processed news items to avoid re-analyzing the same headlines.
"""
import hashlib
import json
import logging
import time
from pathlib import Path
from threading import Lock
logger = logging.getLogger(__name__)
CACHE_PATH = Path(__file__).parent.parent.parent / "data" / "news_cache.json"
MAX_ITEMS = 500
TTL_SECONDS = 24 * 60 * 60 # 24 hours
class NewsCache:
"""Thread-safe local JSON cache for processed news intelligence."""
def __init__(self, path: Path = CACHE_PATH):
self._path = path
self._lock = Lock()
self._data: dict = self._load()
def _load(self) -> dict:
if self._path.exists():
try:
with open(self._path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
logger.warning("Corrupted cache file, starting fresh")
return {"items": {}, "last_refresh": None}
def _save(self):
self._path.parent.mkdir(parents=True, exist_ok=True)
with open(self._path, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2, ensure_ascii=False)
@staticmethod
def _url_hash(url: str) -> str:
return hashlib.sha256(url.encode()).hexdigest()[:16]
def has(self, url: str) -> bool:
"""Check if a URL has already been processed."""
h = self._url_hash(url)
with self._lock:
if h not in self._data["items"]:
return False
item = self._data["items"][h]
# Check TTL
if time.time() - item.get("cached_at", 0) > TTL_SECONDS:
del self._data["items"][h]
return False
return True
def get(self, url: str) -> dict | None:
"""Get a cached item by URL."""
h = self._url_hash(url)
with self._lock:
return self._data["items"].get(h)
def put(self, url: str, item: dict):
"""Store a processed news item."""
h = self._url_hash(url)
with self._lock:
item["cached_at"] = time.time()
item["url_hash"] = h
self._data["items"][h] = item
self._evict()
self._save()
def put_batch(self, items: list[dict]):
"""Store multiple items at once (single write)."""
with self._lock:
now = time.time()
for item in items:
url = item.get("url", "")
h = self._url_hash(url)
item["cached_at"] = now
item["url_hash"] = h
self._data["items"][h] = item
self._evict()
self._data["last_refresh"] = now
self._save()
def get_all(self, concern_level: str | None = None) -> list[dict]:
"""Get all cached items, optionally filtered by concern level."""
with self._lock:
now = time.time()
items = []
for item in self._data["items"].values():
if now - item.get("cached_at", 0) > TTL_SECONDS:
continue
if concern_level and item.get("concern_level") != concern_level:
continue
items.append(item)
# Sort by published time (newest first)
items.sort(key=lambda x: x.get("published", ""), reverse=True)
return items
def get_stats(self) -> dict:
"""Get summary counts per concern level."""
all_items = self.get_all()
counts = {"total": len(all_items), "high": 0, "medium": 0, "low": 0}
for item in all_items:
level = item.get("concern_level", "low")
counts[level] = counts.get(level, 0) + 1
counts["last_refresh"] = self._data.get("last_refresh")
return counts
def _evict(self):
"""Remove oldest items if cache exceeds MAX_ITEMS."""
items = self._data["items"]
if len(items) <= MAX_ITEMS:
return
sorted_keys = sorted(items.keys(), key=lambda k: items[k].get("cached_at", 0))
to_remove = len(items) - MAX_ITEMS
for key in sorted_keys[:to_remove]:
del items[key]
def clear(self):
"""Clear all cached items."""
with self._lock:
self._data = {"items": {}, "last_refresh": None}
self._save()