import asyncio import json import time from unittest.mock import MagicMock, patch from fastapi.testclient import TestClient from jose import jwt def _make_token() -> str: payload = {"sub": "test-user", "exp": int(time.time()) + 3600} return jwt.encode(payload, "test-secret-32-chars-long-0000000", algorithm="HS256") def _chat(client: TestClient, message: str) -> str: token = _make_token() response = client.post( "/chat", json={"message": message, "session_id": "a1b2c3d4-e5f6-4789-8abc-def012345678"}, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, ) assert response.status_code == 200 return response.text def test_stream_emits_early_status_and_heartbeat(monkeypatch): # Reduce heartbeat interval so the test can verify keepalive quickly. monkeypatch.setattr("app.api.chat._SSE_HEARTBEAT_SECONDS", 0.05) mock_pipeline = MagicMock() async def delayed_astream(state, stream_mode=None): await asyncio.sleep(0.12) yield ("custom", {"type": "status", "label": "Thinking..."}) yield ("custom", {"type": "token", "text": "Answer text."}) yield ("updates", {"generate": {"answer": "Answer text.", "sources": []}}) mock_pipeline.astream = delayed_astream with patch("app.main.build_pipeline", return_value=mock_pipeline), \ patch("app.main.QdrantClient"), \ patch("app.services.embedder.Embedder"), \ patch("app.services.reranker.Reranker"): from app.main import create_app app = create_app() app.state.pipeline = mock_pipeline with TestClient(app, raise_server_exceptions=True) as client: body = _chat(client, "Tell me about TextOps") assert "event: status" in body assert "Starting response..." in body assert "event: ping" in body assert "Answer text." in body