File size: 4,057 Bytes
aa15bce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Persistence helper for tracking recently processed Gmail message IDs."""

from __future__ import annotations

import json
import threading
from collections import deque
from pathlib import Path
from typing import Deque, Iterable, List, Optional, Set

from ...logging_config import logger


class GmailSeenStore:
    """Maintain a bounded set of Gmail message IDs backed by a JSON file."""

    def __init__(self, path: Path, max_entries: int = 300) -> None:
        self._path = path
        self._max_entries = max_entries
        self._lock = threading.Lock()
        self._entries: Deque[str] = deque()
        self._index: Set[str] = set()
        self._load()

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------
    def has_entries(self) -> bool:
        with self._lock:
            return bool(self._entries)

    def is_seen(self, message_id: str) -> bool:
        normalized = self._normalize(message_id)
        if not normalized:
            return False
        with self._lock:
            return normalized in self._index

    def mark_seen(self, message_ids: Iterable[str]) -> None:
        normalized_ids = [mid for mid in (self._normalize(mid) for mid in message_ids) if mid]
        if not normalized_ids:
            return

        with self._lock:
            for message_id in normalized_ids:
                if message_id in self._index:
                    # Refresh recency by removing and re-appending
                    try:
                        self._entries.remove(message_id)
                    except ValueError:  # pragma: no cover - defensive
                        pass
                else:
                    self._index.add(message_id)
                self._entries.append(message_id)

            self._prune_locked()
            self._persist_locked()

    def snapshot(self) -> List[str]:
        with self._lock:
            return list(self._entries)

    def clear(self) -> None:
        with self._lock:
            self._entries.clear()
            self._index.clear()
            self._persist_locked()

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------
    def _normalize(self, message_id: Optional[str]) -> str:
        if not message_id:
            return ""
        return str(message_id).strip()

    def _load(self) -> None:
        try:
            data = json.loads(self._path.read_text(encoding="utf-8"))
        except FileNotFoundError:
            return
        except Exception as exc:  # pragma: no cover - defensive
            logger.warning(
                "Failed to load Gmail seen-store; starting empty",
                extra={"path": str(self._path), "error": str(exc)},
            )
            return

        if not isinstance(data, list):
            logger.warning(
                "Gmail seen-store payload invalid; expected list",
                extra={"path": str(self._path)},
            )
            return

        for raw_id in data[-self._max_entries :]:
            normalized = self._normalize(raw_id)
            if normalized and normalized not in self._index:
                self._entries.append(normalized)
                self._index.add(normalized)

    def _prune_locked(self) -> None:
        while len(self._entries) > self._max_entries:
            oldest = self._entries.popleft()
            self._index.discard(oldest)

    def _persist_locked(self) -> None:
        try:
            self._path.parent.mkdir(parents=True, exist_ok=True)
            payload = list(self._entries)
            self._path.write_text(json.dumps(payload), encoding="utf-8")
        except Exception as exc:  # pragma: no cover - defensive
            logger.warning(
                "Failed to persist Gmail seen-store",
                extra={"path": str(self._path), "error": str(exc)},
            )


__all__ = ["GmailSeenStore"]