"""Idempotency for autonomous mode: don't create the same event twice. As more messages stream into a chat, re-running the agent over a rolling window re-surfaces events already captured. ``filter_new`` returns only events not seen before, keyed by normalized title + minute-rounded start. Durable JSON store mirrors ``server/impact.py`` (env path + lock; no DB — local-first). """ from __future__ import annotations import json import os import threading from pathlib import Path from dateutil import parser as dtparser from .schema import Event _lock = threading.Lock() def _path() -> Path: return Path(os.environ.get("DEDUP_PATH", "/tmp/agent_seen.json")) def event_key(ev: Event) -> str: """Normalized identity: lowercased title + start rounded to the minute. Conservative by design — if the model rewords a title between messages we may miss a dedup (a duplicate event), which is safer than dropping a real event. """ title = (ev.title or "").strip().lower() try: start = dtparser.isoparse(ev.start).replace(second=0, microsecond=0).isoformat() except (ValueError, TypeError): start = (ev.start or "").strip() return f"{title}|{start}" def _load() -> list[str]: try: return json.loads(_path().read_text()) except Exception: # noqa: BLE001 missing/corrupt -> start fresh return [] def filter_new(events: list[Event], record: bool = True) -> list[Event]: """Return only events whose key hasn't been recorded. ``record=False`` filters WITHOUT persisting — callers delivering the events somewhere fallible (e.g. a calendar push) should filter first and ``mark_seen`` only after delivery succeeds; otherwise a transient failure permanently swallows the events ("seen" but never delivered).""" with _lock: seen = set(_load()) fresh = [] for ev in events: k = event_key(ev) if k in seen: continue seen.add(k) fresh.append(ev) if fresh and record: _path().write_text(json.dumps(sorted(seen), indent=2)) return fresh def mark_seen(events: list[Event]) -> None: """Persist the keys of successfully delivered events.""" if not events: return with _lock: seen = set(_load()) seen.update(event_key(ev) for ev in events) _path().write_text(json.dumps(sorted(seen), indent=2)) def reset() -> None: """Drop the seen-set (test helper).""" with _lock: try: _path().unlink() except FileNotFoundError: pass