Spaces:
Running
Running
| """Integration tests for WebSocket connection lifecycle. | |
| [Task]: T079 | |
| [From]: specs/004-ai-chatbot/tasks.md | |
| These tests verify the WebSocket connection management: | |
| - Connection establishment with JWT authentication | |
| - Message broadcasting to multiple connections | |
| - Connection cleanup on disconnect | |
| - Authentication failure handling | |
| """ | |
| import asyncio | |
| import uuid | |
| from unittest.mock import AsyncMock, MagicMock, patch | |
| import pytest | |
| from fastapi.testclient import TestClient | |
| from fastapi import WebSocket | |
| from httpx_ws import aconnect_wsjson | |
| from core.database import get_db | |
| from core.security import create_access_token | |
| from main import app | |
| # Test database session dependency override | |
| def db_session(): | |
| """Mock database session for testing.""" | |
| yield MagicMock() | |
| def test_user(): | |
| """Create a test user ID.""" | |
| return str(uuid.uuid4()) | |
| def test_jwt(test_user): | |
| """Create a test JWT token.""" | |
| return create_access_token({"sub": test_user}) | |
| def client(db_session): | |
| """Create a test client with database override.""" | |
| app.dependency_overrides[get_db] = lambda: db_session | |
| with TestClient(app) as test_client: | |
| yield test_client | |
| app.dependency_overrides.clear() | |
| class TestWebSocketConnection: | |
| """Tests for WebSocket connection lifecycle.""" | |
| def test_websocket_connection_established(self, client: TestClient, test_user, test_jwt): | |
| """Test that WebSocket connection can be established with valid JWT. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| """ | |
| with client.websocket_connect( | |
| f"/ws/{test_user}/chat?token={test_jwt}" | |
| ) as websocket: | |
| # Receive connection_established event | |
| data = websocket.receive_json() | |
| assert data["event_type"] == "connection_established" | |
| assert data["message"] == "Connected to real-time updates" | |
| assert data["user_id"] == test_user | |
| def test_websocket_connection_invalid_jwt(self, client: TestClient, test_user): | |
| """Test that WebSocket connection fails with invalid JWT. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| """ | |
| invalid_token = "invalid.jwt.token" | |
| with pytest.raises(Exception) as exc_info: | |
| with client.websocket_connect( | |
| f"/ws/{test_user}/chat?token={invalid_token}" | |
| ) as websocket: | |
| websocket.receive_json() | |
| # Connection should be closed | |
| assert exc_info.value is not None | |
| def test_websocket_connection_user_mismatch(self, client: TestClient, test_user, test_jwt): | |
| """Test that WebSocket connection fails when user_id doesn't match JWT. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| """ | |
| wrong_user = str(uuid.uuid4()) | |
| with pytest.raises(Exception): | |
| with client.websocket_connect( | |
| f"/ws/{wrong_user}/chat?token={test_jwt}" | |
| ) as websocket: | |
| websocket.receive_json() | |
| def test_websocket_disconnection_cleanup(self, client: TestClient, test_user, test_jwt): | |
| """Test that WebSocket connection is properly cleaned up on disconnect. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| """ | |
| from websockets.manager import manager | |
| # Clear any existing connections | |
| manager.active_connections.clear() | |
| # Connect first client | |
| with client.websocket_connect( | |
| f"/ws/{test_user}/chat?token={test_jwt}" | |
| ) as websocket1: | |
| # Verify connection is tracked | |
| assert test_user in manager.active_connections | |
| assert len(manager.active_connections[test_user]) == 1 | |
| # Disconnect | |
| pass | |
| # Verify connection was cleaned up | |
| # (Note: In real scenario, there might be a delay) | |
| assert test_user not in manager.active_connections or len( | |
| manager.active_connections.get(test_user, []) | |
| ) == 0 | |
| class TestWebSocketMultipleConnections: | |
| """Tests for multiple WebSocket connections per user.""" | |
| def test_multiple_connections_same_user(self, client: TestClient, test_user, test_jwt): | |
| """Test that multiple WebSocket connections can be established for the same user. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| This simulates multiple browser tabs. | |
| """ | |
| from websockets.manager import manager | |
| manager.active_connections.clear() | |
| # Connect first client | |
| with client.websocket_connect( | |
| f"/ws/{test_user}/chat?token={test_jwt}" | |
| ) as websocket1: | |
| websocket1.receive_json() | |
| # Connect second client | |
| with client.websocket_connect( | |
| f"/ws/{test_user}/chat?token={test_jwt}" | |
| ) as websocket2: | |
| websocket2.receive_json() | |
| # Both connections should be tracked | |
| assert test_user in manager.active_connections | |
| assert len(manager.active_connections[test_user]) == 2 | |
| def test_broadcast_to_multiple_connections(self, client: TestClient, test_user, test_jwt): | |
| """Test that broadcasts reach all connections for a user. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| """ | |
| from websockets.manager import manager | |
| manager.active_connections.clear() | |
| received_messages = [] | |
| def receive_messages(websocket, target_list): | |
| try: | |
| # Skip connection_established | |
| websocket.receive_json() | |
| # Receive subsequent messages | |
| while True: | |
| data = websocket.receive_json(timeout=0.5) | |
| target_list.append(data) | |
| except: | |
| pass | |
| # Connect two clients | |
| with client.websocket_connect( | |
| f"/ws/{test_user}/chat?token={test_jwt}" | |
| ) as ws1: | |
| ws1.receive_json() # connection_established | |
| with client.websocket_connect( | |
| f"/ws/{test_user}/chat?token={test_jwt}" | |
| ) as ws2: | |
| ws2.receive_json() # connection_established | |
| # Broadcast a test event | |
| test_event = { | |
| "event_type": "test_event", | |
| "message": "Test broadcast", | |
| } | |
| asyncio.run(manager.broadcast(test_user, test_event)) | |
| # Both clients should receive the event | |
| # (Note: In synchronous test, this is tricky - just verify broadcast doesn't error) | |
| assert test_user in manager.active_connections | |
| class TestWebSocketMessageHandling: | |
| """Tests for WebSocket message handling.""" | |
| def test_websocket_receives_json_messages(self, client: TestClient, test_user, test_jwt): | |
| """Test that WebSocket can receive and parse JSON messages. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| """ | |
| with client.websocket_connect( | |
| f"/ws/{test_user}/chat?token={test_jwt}" | |
| ) as websocket: | |
| # Receive connection_established | |
| data = websocket.receive_json() | |
| assert isinstance(data, dict) | |
| assert "event_type" in data | |
| assert "message" in data | |