Spaces:
Sleeping
Sleeping
| """TRIGGER_ON: autonomous mode fires on YOUR sent/accepted messages by default.""" | |
| import pytest | |
| import app as app_module | |
| from app import IngestBatch, IngestMessage, ingest | |
| TOKEN = "Bearer test-token" # matches conftest INGEST_TOKEN | |
| def _m(text, is_from_me, chat="Fam"): | |
| return IngestMessage( | |
| chat=chat, | |
| sender=("me" if is_from_me else "Mom"), | |
| text=text, | |
| timestamp="2026-06-07T10:00:00", | |
| is_from_me=is_from_me, | |
| ) | |
| def captured(monkeypatch): | |
| calls = [] | |
| monkeypatch.setattr(app_module, "AUTONOMOUS", True) | |
| monkeypatch.setattr(app_module, "_run_autonomous", lambda chats: calls.append(set(chats))) | |
| return calls | |
| def _ingest_and_flush(batch, authorization): | |
| """Direct route call + run queued background tasks (autonomous work moved | |
| off the request so the collector's 30s POST never times out).""" | |
| from fastapi import BackgroundTasks | |
| bt = BackgroundTasks() | |
| out = ingest(batch, bt, authorization=authorization) | |
| for task in bt.tasks: | |
| task.func(*task.args, **task.kwargs) | |
| return out | |
| def test_outgoing_only_is_the_default(monkeypatch, captured): | |
| monkeypatch.setattr(app_module, "TRIGGER_ON", "outgoing") | |
| # Incoming message from someone else: no autonomous action. | |
| _ingest_and_flush(IngestBatch(messages=[_m("dinner 6pm?", is_from_me=False)]), authorization=TOKEN) | |
| assert captured == [] | |
| # You reply/accept (outgoing): fires for that chat. | |
| _ingest_and_flush(IngestBatch(messages=[_m("yes, 6pm works", is_from_me=True)]), authorization=TOKEN) | |
| assert captured[-1] == {"Fam"} | |
| def test_any_mode_also_triggers_on_incoming(monkeypatch, captured): | |
| monkeypatch.setattr(app_module, "TRIGGER_ON", "any") | |
| _ingest_and_flush(IngestBatch(messages=[_m("dinner 6pm?", is_from_me=False)]), authorization=TOKEN) | |
| assert captured[-1] == {"Fam"} | |