OffGridSchedula / server /dedup.py
ParetoOptimal's picture
Initial Commit
0366d65
Raw
History Blame Contribute Delete
2.63 kB
"""Idempotency for autonomous mode: don't create the same event twice.
As more messages stream into a chat, re-running the agent over a rolling window
re-surfaces events already captured. ``filter_new`` returns only events not seen
before, keyed by normalized title + minute-rounded start. Durable JSON store
mirrors ``server/impact.py`` (env path + lock; no DB — local-first).
"""
from __future__ import annotations
import json
import os
import threading
from pathlib import Path
from dateutil import parser as dtparser
from .schema import Event
_lock = threading.Lock()
def _path() -> Path:
return Path(os.environ.get("DEDUP_PATH", "/tmp/agent_seen.json"))
def event_key(ev: Event) -> str:
"""Normalized identity: lowercased title + start rounded to the minute.
Conservative by design — if the model rewords a title between messages we may
miss a dedup (a duplicate event), which is safer than dropping a real event.
"""
title = (ev.title or "").strip().lower()
try:
start = dtparser.isoparse(ev.start).replace(second=0, microsecond=0).isoformat()
except (ValueError, TypeError):
start = (ev.start or "").strip()
return f"{title}|{start}"
def _load() -> list[str]:
try:
return json.loads(_path().read_text())
except Exception: # noqa: BLE001 missing/corrupt -> start fresh
return []
def filter_new(events: list[Event], record: bool = True) -> list[Event]:
"""Return only events whose key hasn't been recorded.
``record=False`` filters WITHOUT persisting — callers delivering the events
somewhere fallible (e.g. a calendar push) should filter first and
``mark_seen`` only after delivery succeeds; otherwise a transient failure
permanently swallows the events ("seen" but never delivered)."""
with _lock:
seen = set(_load())
fresh = []
for ev in events:
k = event_key(ev)
if k in seen:
continue
seen.add(k)
fresh.append(ev)
if fresh and record:
_path().write_text(json.dumps(sorted(seen), indent=2))
return fresh
def mark_seen(events: list[Event]) -> None:
"""Persist the keys of successfully delivered events."""
if not events:
return
with _lock:
seen = set(_load())
seen.update(event_key(ev) for ev in events)
_path().write_text(json.dumps(sorted(seen), indent=2))
def reset() -> None:
"""Drop the seen-set (test helper)."""
with _lock:
try:
_path().unlink()
except FileNotFoundError:
pass