File size: 4,446 Bytes
75d9b3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
News Intelligence Cache — Local JSON-based cache with URL dedup and TTL eviction.
Stores processed news items to avoid re-analyzing the same headlines.
"""
import hashlib
import json
import logging
import time
from pathlib import Path
from threading import Lock

logger = logging.getLogger(__name__)

CACHE_PATH = Path(__file__).parent.parent.parent / "data" / "news_cache.json"
MAX_ITEMS = 500
TTL_SECONDS = 24 * 60 * 60  # 24 hours


class NewsCache:
    """Thread-safe local JSON cache for processed news intelligence."""

    def __init__(self, path: Path = CACHE_PATH):
        self._path = path
        self._lock = Lock()
        self._data: dict = self._load()

    def _load(self) -> dict:
        if self._path.exists():
            try:
                with open(self._path, "r", encoding="utf-8") as f:
                    return json.load(f)
            except (json.JSONDecodeError, IOError):
                logger.warning("Corrupted cache file, starting fresh")
        return {"items": {}, "last_refresh": None}

    def _save(self):
        self._path.parent.mkdir(parents=True, exist_ok=True)
        with open(self._path, "w", encoding="utf-8") as f:
            json.dump(self._data, f, indent=2, ensure_ascii=False)

    @staticmethod
    def _url_hash(url: str) -> str:
        return hashlib.sha256(url.encode()).hexdigest()[:16]

    def has(self, url: str) -> bool:
        """Check if a URL has already been processed."""
        h = self._url_hash(url)
        with self._lock:
            if h not in self._data["items"]:
                return False
            item = self._data["items"][h]
            # Check TTL
            if time.time() - item.get("cached_at", 0) > TTL_SECONDS:
                del self._data["items"][h]
                return False
            return True

    def get(self, url: str) -> dict | None:
        """Get a cached item by URL."""
        h = self._url_hash(url)
        with self._lock:
            return self._data["items"].get(h)

    def put(self, url: str, item: dict):
        """Store a processed news item."""
        h = self._url_hash(url)
        with self._lock:
            item["cached_at"] = time.time()
            item["url_hash"] = h
            self._data["items"][h] = item
            self._evict()
            self._save()

    def put_batch(self, items: list[dict]):
        """Store multiple items at once (single write)."""
        with self._lock:
            now = time.time()
            for item in items:
                url = item.get("url", "")
                h = self._url_hash(url)
                item["cached_at"] = now
                item["url_hash"] = h
                self._data["items"][h] = item
            self._evict()
            self._data["last_refresh"] = now
            self._save()

    def get_all(self, concern_level: str | None = None) -> list[dict]:
        """Get all cached items, optionally filtered by concern level."""
        with self._lock:
            now = time.time()
            items = []
            for item in self._data["items"].values():
                if now - item.get("cached_at", 0) > TTL_SECONDS:
                    continue
                if concern_level and item.get("concern_level") != concern_level:
                    continue
                items.append(item)
            # Sort by published time (newest first)
            items.sort(key=lambda x: x.get("published", ""), reverse=True)
            return items

    def get_stats(self) -> dict:
        """Get summary counts per concern level."""
        all_items = self.get_all()
        counts = {"total": len(all_items), "high": 0, "medium": 0, "low": 0}
        for item in all_items:
            level = item.get("concern_level", "low")
            counts[level] = counts.get(level, 0) + 1
        counts["last_refresh"] = self._data.get("last_refresh")
        return counts

    def _evict(self):
        """Remove oldest items if cache exceeds MAX_ITEMS."""
        items = self._data["items"]
        if len(items) <= MAX_ITEMS:
            return
        sorted_keys = sorted(items.keys(), key=lambda k: items[k].get("cached_at", 0))
        to_remove = len(items) - MAX_ITEMS
        for key in sorted_keys[:to_remove]:
            del items[key]

    def clear(self):
        """Clear all cached items."""
        with self._lock:
            self._data = {"items": {}, "last_refresh": None}
            self._save()