OffGridSchedula / tests /test_trigger_direction.py
ParetoOptimal's picture
Initial Commit
0366d65
Raw
History Blame Contribute Delete
1.94 kB
"""TRIGGER_ON: autonomous mode fires on YOUR sent/accepted messages by default."""
import pytest
import app as app_module
from app import IngestBatch, IngestMessage, ingest
TOKEN = "Bearer test-token" # matches conftest INGEST_TOKEN
def _m(text, is_from_me, chat="Fam"):
return IngestMessage(
chat=chat,
sender=("me" if is_from_me else "Mom"),
text=text,
timestamp="2026-06-07T10:00:00",
is_from_me=is_from_me,
)
@pytest.fixture
def captured(monkeypatch):
calls = []
monkeypatch.setattr(app_module, "AUTONOMOUS", True)
monkeypatch.setattr(app_module, "_run_autonomous", lambda chats: calls.append(set(chats)))
return calls
def _ingest_and_flush(batch, authorization):
"""Direct route call + run queued background tasks (autonomous work moved
off the request so the collector's 30s POST never times out)."""
from fastapi import BackgroundTasks
bt = BackgroundTasks()
out = ingest(batch, bt, authorization=authorization)
for task in bt.tasks:
task.func(*task.args, **task.kwargs)
return out
def test_outgoing_only_is_the_default(monkeypatch, captured):
monkeypatch.setattr(app_module, "TRIGGER_ON", "outgoing")
# Incoming message from someone else: no autonomous action.
_ingest_and_flush(IngestBatch(messages=[_m("dinner 6pm?", is_from_me=False)]), authorization=TOKEN)
assert captured == []
# You reply/accept (outgoing): fires for that chat.
_ingest_and_flush(IngestBatch(messages=[_m("yes, 6pm works", is_from_me=True)]), authorization=TOKEN)
assert captured[-1] == {"Fam"}
def test_any_mode_also_triggers_on_incoming(monkeypatch, captured):
monkeypatch.setattr(app_module, "TRIGGER_ON", "any")
_ingest_and_flush(IngestBatch(messages=[_m("dinner 6pm?", is_from_me=False)]), authorization=TOKEN)
assert captured[-1] == {"Fam"}