Spaces:
Configuration error
Configuration error
File size: 4,057 Bytes
aa15bce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
"""Persistence helper for tracking recently processed Gmail message IDs."""
from __future__ import annotations
import json
import threading
from collections import deque
from pathlib import Path
from typing import Deque, Iterable, List, Optional, Set
from ...logging_config import logger
class GmailSeenStore:
"""Maintain a bounded set of Gmail message IDs backed by a JSON file."""
def __init__(self, path: Path, max_entries: int = 300) -> None:
self._path = path
self._max_entries = max_entries
self._lock = threading.Lock()
self._entries: Deque[str] = deque()
self._index: Set[str] = set()
self._load()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def has_entries(self) -> bool:
with self._lock:
return bool(self._entries)
def is_seen(self, message_id: str) -> bool:
normalized = self._normalize(message_id)
if not normalized:
return False
with self._lock:
return normalized in self._index
def mark_seen(self, message_ids: Iterable[str]) -> None:
normalized_ids = [mid for mid in (self._normalize(mid) for mid in message_ids) if mid]
if not normalized_ids:
return
with self._lock:
for message_id in normalized_ids:
if message_id in self._index:
# Refresh recency by removing and re-appending
try:
self._entries.remove(message_id)
except ValueError: # pragma: no cover - defensive
pass
else:
self._index.add(message_id)
self._entries.append(message_id)
self._prune_locked()
self._persist_locked()
def snapshot(self) -> List[str]:
with self._lock:
return list(self._entries)
def clear(self) -> None:
with self._lock:
self._entries.clear()
self._index.clear()
self._persist_locked()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _normalize(self, message_id: Optional[str]) -> str:
if not message_id:
return ""
return str(message_id).strip()
def _load(self) -> None:
try:
data = json.loads(self._path.read_text(encoding="utf-8"))
except FileNotFoundError:
return
except Exception as exc: # pragma: no cover - defensive
logger.warning(
"Failed to load Gmail seen-store; starting empty",
extra={"path": str(self._path), "error": str(exc)},
)
return
if not isinstance(data, list):
logger.warning(
"Gmail seen-store payload invalid; expected list",
extra={"path": str(self._path)},
)
return
for raw_id in data[-self._max_entries :]:
normalized = self._normalize(raw_id)
if normalized and normalized not in self._index:
self._entries.append(normalized)
self._index.add(normalized)
def _prune_locked(self) -> None:
while len(self._entries) > self._max_entries:
oldest = self._entries.popleft()
self._index.discard(oldest)
def _persist_locked(self) -> None:
try:
self._path.parent.mkdir(parents=True, exist_ok=True)
payload = list(self._entries)
self._path.write_text(json.dumps(payload), encoding="utf-8")
except Exception as exc: # pragma: no cover - defensive
logger.warning(
"Failed to persist Gmail seen-store",
extra={"path": str(self._path), "error": str(exc)},
)
__all__ = ["GmailSeenStore"]
|