"""TRIGGER_ON: autonomous mode fires on YOUR sent/accepted messages by default.""" import pytest import app as app_module from app import IngestBatch, IngestMessage, ingest TOKEN = "Bearer test-token" # matches conftest INGEST_TOKEN def _m(text, is_from_me, chat="Fam"): return IngestMessage( chat=chat, sender=("me" if is_from_me else "Mom"), text=text, timestamp="2026-06-07T10:00:00", is_from_me=is_from_me, ) @pytest.fixture def captured(monkeypatch): calls = [] monkeypatch.setattr(app_module, "AUTONOMOUS", True) monkeypatch.setattr(app_module, "_run_autonomous", lambda chats: calls.append(set(chats))) return calls def _ingest_and_flush(batch, authorization): """Direct route call + run queued background tasks (autonomous work moved off the request so the collector's 30s POST never times out).""" from fastapi import BackgroundTasks bt = BackgroundTasks() out = ingest(batch, bt, authorization=authorization) for task in bt.tasks: task.func(*task.args, **task.kwargs) return out def test_outgoing_only_is_the_default(monkeypatch, captured): monkeypatch.setattr(app_module, "TRIGGER_ON", "outgoing") # Incoming message from someone else: no autonomous action. _ingest_and_flush(IngestBatch(messages=[_m("dinner 6pm?", is_from_me=False)]), authorization=TOKEN) assert captured == [] # You reply/accept (outgoing): fires for that chat. _ingest_and_flush(IngestBatch(messages=[_m("yes, 6pm works", is_from_me=True)]), authorization=TOKEN) assert captured[-1] == {"Fam"} def test_any_mode_also_triggers_on_incoming(monkeypatch, captured): monkeypatch.setattr(app_module, "TRIGGER_ON", "any") _ingest_and_flush(IngestBatch(messages=[_m("dinner 6pm?", is_from_me=False)]), authorization=TOKEN) assert captured[-1] == {"Fam"}