Spaces:
Running
Running
| import asyncio | |
| import json | |
| import time | |
| from unittest.mock import MagicMock, patch | |
| from fastapi.testclient import TestClient | |
| from jose import jwt | |
| def _make_token() -> str: | |
| payload = {"sub": "test-user", "exp": int(time.time()) + 3600} | |
| return jwt.encode(payload, "test-secret-32-chars-long-0000000", algorithm="HS256") | |
| def _chat(client: TestClient, message: str) -> str: | |
| token = _make_token() | |
| response = client.post( | |
| "/chat", | |
| json={"message": message, "session_id": "a1b2c3d4-e5f6-4789-8abc-def012345678"}, | |
| headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, | |
| ) | |
| assert response.status_code == 200 | |
| return response.text | |
| def test_stream_emits_early_status_and_heartbeat(monkeypatch): | |
| # Reduce heartbeat interval so the test can verify keepalive quickly. | |
| monkeypatch.setattr("app.api.chat._SSE_HEARTBEAT_SECONDS", 0.05) | |
| mock_pipeline = MagicMock() | |
| async def delayed_astream(state, stream_mode=None): | |
| await asyncio.sleep(0.12) | |
| yield ("custom", {"type": "status", "label": "Thinking..."}) | |
| yield ("custom", {"type": "token", "text": "Answer text."}) | |
| yield ("updates", {"generate": {"answer": "Answer text.", "sources": []}}) | |
| mock_pipeline.astream = delayed_astream | |
| with patch("app.main.build_pipeline", return_value=mock_pipeline), \ | |
| patch("app.main.QdrantClient"), \ | |
| patch("app.services.embedder.Embedder"), \ | |
| patch("app.services.reranker.Reranker"): | |
| from app.main import create_app | |
| app = create_app() | |
| app.state.pipeline = mock_pipeline | |
| with TestClient(app, raise_server_exceptions=True) as client: | |
| body = _chat(client, "Tell me about TextOps") | |
| assert "event: status" in body | |
| assert "Starting response..." in body | |
| assert "event: ping" in body | |
| assert "Answer text." in body | |