| """Idempotency for autonomous mode: don't create the same event twice. |
| |
| As more messages stream into a chat, re-running the agent over a rolling window |
| re-surfaces events already captured. ``filter_new`` returns only events not seen |
| before, keyed by normalized title + minute-rounded start. Durable JSON store |
| mirrors ``server/impact.py`` (env path + lock; no DB — local-first). |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import os |
| import threading |
| from pathlib import Path |
|
|
| from dateutil import parser as dtparser |
|
|
| from .schema import Event |
|
|
| _lock = threading.Lock() |
|
|
|
|
| def _path() -> Path: |
| return Path(os.environ.get("DEDUP_PATH", "/tmp/agent_seen.json")) |
|
|
|
|
| def event_key(ev: Event) -> str: |
| """Normalized identity: lowercased title + start rounded to the minute. |
| |
| Conservative by design — if the model rewords a title between messages we may |
| miss a dedup (a duplicate event), which is safer than dropping a real event. |
| """ |
| title = (ev.title or "").strip().lower() |
| try: |
| start = dtparser.isoparse(ev.start).replace(second=0, microsecond=0).isoformat() |
| except (ValueError, TypeError): |
| start = (ev.start or "").strip() |
| return f"{title}|{start}" |
|
|
|
|
| def _load() -> list[str]: |
| try: |
| return json.loads(_path().read_text()) |
| except Exception: |
| return [] |
|
|
|
|
| def filter_new(events: list[Event], record: bool = True) -> list[Event]: |
| """Return only events whose key hasn't been recorded. |
| |
| ``record=False`` filters WITHOUT persisting — callers delivering the events |
| somewhere fallible (e.g. a calendar push) should filter first and |
| ``mark_seen`` only after delivery succeeds; otherwise a transient failure |
| permanently swallows the events ("seen" but never delivered).""" |
| with _lock: |
| seen = set(_load()) |
| fresh = [] |
| for ev in events: |
| k = event_key(ev) |
| if k in seen: |
| continue |
| seen.add(k) |
| fresh.append(ev) |
| if fresh and record: |
| _path().write_text(json.dumps(sorted(seen), indent=2)) |
| return fresh |
|
|
|
|
| def mark_seen(events: list[Event]) -> None: |
| """Persist the keys of successfully delivered events.""" |
| if not events: |
| return |
| with _lock: |
| seen = set(_load()) |
| seen.update(event_key(ev) for ev in events) |
| _path().write_text(json.dumps(sorted(seen), indent=2)) |
|
|
|
|
| def reset() -> None: |
| """Drop the seen-set (test helper).""" |
| with _lock: |
| try: |
| _path().unlink() |
| except FileNotFoundError: |
| pass |
|
|