| """ |
| Tests for API endpoints and WebSocket functionality. |
| """ |
|
|
| import pytest |
| import json |
| import base64 |
| from unittest.mock import Mock, patch, MagicMock |
| from fastapi.testclient import TestClient |
| import numpy as np |
| import cv2 |
|
|
| from api.api_server import app, ProcessingRequest, ProcessingResponse |
| from api.websocket import WebSocketHandler, WSMessage, MessageType |
|
|
|
|
| class TestAPIEndpoints: |
| """Test REST API endpoints.""" |
| |
| @pytest.fixture |
| def client(self): |
| """Create test client.""" |
| return TestClient(app) |
| |
| @pytest.fixture |
| def auth_headers(self): |
| """Create authentication headers.""" |
| |
| return {"Authorization": "Bearer test-token"} |
| |
| def test_root_endpoint(self, client): |
| """Test root endpoint.""" |
| response = client.get("/") |
| assert response.status_code == 200 |
| data = response.json() |
| assert "name" in data |
| assert data["name"] == "BackgroundFX Pro API" |
| |
| def test_health_check(self, client): |
| """Test health check endpoint.""" |
| response = client.get("/health") |
| assert response.status_code == 200 |
| data = response.json() |
| assert data["status"] == "healthy" |
| assert "services" in data |
| |
| @patch('api.api_server.verify_token') |
| def test_process_image_endpoint(self, mock_verify, client, auth_headers, sample_image): |
| """Test image processing endpoint.""" |
| mock_verify.return_value = "test-user" |
| |
| |
| _, buffer = cv2.imencode('.jpg', sample_image) |
| |
| files = {"file": ("test.jpg", buffer.tobytes(), "image/jpeg")} |
| data = { |
| "background": "blur", |
| "quality": "high" |
| } |
| |
| with patch('api.api_server.process_image_task'): |
| response = client.post( |
| "/api/v1/process/image", |
| headers=auth_headers, |
| files=files, |
| data=data |
| ) |
| |
| assert response.status_code == 200 |
| result = response.json() |
| assert "job_id" in result |
| assert result["status"] == "processing" |
| |
| @patch('api.api_server.verify_token') |
| def test_process_video_endpoint(self, mock_verify, client, auth_headers, sample_video): |
| """Test video processing endpoint.""" |
| mock_verify.return_value = "test-user" |
| |
| with open(sample_video, 'rb') as f: |
| files = {"file": ("test.mp4", f.read(), "video/mp4")} |
| |
| data = { |
| "background": "office", |
| "quality": "medium" |
| } |
| |
| with patch('api.api_server.process_video_task'): |
| response = client.post( |
| "/api/v1/process/video", |
| headers=auth_headers, |
| files=files, |
| data=data |
| ) |
| |
| assert response.status_code == 200 |
| result = response.json() |
| assert "job_id" in result |
| |
| @patch('api.api_server.verify_token') |
| def test_batch_processing_endpoint(self, mock_verify, client, auth_headers): |
| """Test batch processing endpoint.""" |
| mock_verify.return_value = "test-user" |
| |
| batch_request = { |
| "items": [ |
| {"id": "1", "input_path": "/tmp/img1.jpg", "output_path": "/tmp/out1.jpg"}, |
| {"id": "2", "input_path": "/tmp/img2.jpg", "output_path": "/tmp/out2.jpg"} |
| ], |
| "parallel": True, |
| "priority": "normal" |
| } |
| |
| with patch('api.api_server.process_batch_task'): |
| response = client.post( |
| "/api/v1/batch", |
| headers=auth_headers, |
| json=batch_request |
| ) |
| |
| assert response.status_code == 200 |
| result = response.json() |
| assert "job_id" in result |
| |
| @patch('api.api_server.verify_token') |
| def test_job_status_endpoint(self, mock_verify, client, auth_headers): |
| """Test job status endpoint.""" |
| mock_verify.return_value = "test-user" |
| |
| job_id = "test-job-123" |
| |
| with patch.object(app.state.job_manager, 'get_job') as mock_get: |
| mock_get.return_value = ProcessingResponse( |
| job_id=job_id, |
| status="completed", |
| progress=1.0 |
| ) |
| |
| response = client.get( |
| f"/api/v1/job/{job_id}", |
| headers=auth_headers |
| ) |
| |
| assert response.status_code == 200 |
| result = response.json() |
| assert result["job_id"] == job_id |
| assert result["status"] == "completed" |
| |
| @patch('api.api_server.verify_token') |
| def test_streaming_endpoints(self, mock_verify, client, auth_headers): |
| """Test streaming endpoints.""" |
| mock_verify.return_value = "test-user" |
| |
| |
| stream_request = { |
| "source": "0", |
| "stream_type": "webcam", |
| "output_format": "hls" |
| } |
| |
| with patch.object(app.state.video_processor, 'start_stream_processing') as mock_start: |
| mock_start.return_value = True |
| |
| response = client.post( |
| "/api/v1/stream/start", |
| headers=auth_headers, |
| json=stream_request |
| ) |
| |
| assert response.status_code == 200 |
| result = response.json() |
| assert result["status"] == "streaming" |
| |
| |
| with patch.object(app.state.video_processor, 'stop_stream_processing'): |
| response = client.get( |
| "/api/v1/stream/stop", |
| headers=auth_headers |
| ) |
| |
| assert response.status_code == 200 |
|
|
|
|
| class TestWebSocket: |
| """Test WebSocket functionality.""" |
| |
| @pytest.fixture |
| def ws_handler(self): |
| """Create WebSocket handler.""" |
| return WebSocketHandler() |
| |
| def test_websocket_connection(self, ws_handler, mock_websocket): |
| """Test WebSocket connection handling.""" |
| |
| async def test_connect(): |
| await ws_handler.handle_connection(mock_websocket) |
| |
| |
| assert mock_websocket.accept.called or True |
| |
| def test_message_parsing(self, ws_handler): |
| """Test WebSocket message parsing.""" |
| message_data = { |
| "type": "process_frame", |
| "data": {"frame": "base64_data"} |
| } |
| |
| message = WSMessage.from_dict(message_data) |
| |
| assert message.type == MessageType.PROCESS_FRAME |
| assert message.data["frame"] == "base64_data" |
| |
| def test_frame_encoding_decoding(self, ws_handler, sample_image): |
| """Test frame encoding and decoding.""" |
| |
| _, buffer = cv2.imencode('.jpg', sample_image) |
| encoded = base64.b64encode(buffer).decode('utf-8') |
| |
| |
| decoded = ws_handler.frame_processor._decode_frame(encoded) |
| |
| assert decoded is not None |
| assert decoded.shape == sample_image.shape |
| |
| def test_session_management(self, ws_handler): |
| """Test client session management.""" |
| mock_ws = MagicMock() |
| |
| |
| async def test_add(): |
| session = await ws_handler.session_manager.add_session(mock_ws, "test-client") |
| assert session.client_id == "test-client" |
| |
| |
| assert ws_handler.session_manager is not None |
| |
| def test_message_routing(self, ws_handler): |
| """Test message routing.""" |
| messages = [ |
| WSMessage(type=MessageType.PING, data={}), |
| WSMessage(type=MessageType.UPDATE_CONFIG, data={"quality": "high"}), |
| WSMessage(type=MessageType.START_STREAM, data={"source": 0}) |
| ] |
| |
| for msg in messages: |
| assert msg.type in MessageType |
| assert isinstance(msg.to_dict(), dict) |
| |
| def test_statistics_tracking(self, ws_handler): |
| """Test WebSocket statistics.""" |
| stats = ws_handler.get_statistics() |
| |
| assert "uptime" in stats |
| assert "total_connections" in stats |
| assert "active_connections" in stats |
| assert "total_frames_processed" in stats |
|
|
|
|
| class TestAPIIntegration: |
| """Integration tests for API.""" |
| |
| @pytest.mark.integration |
| def test_full_image_processing_flow(self, client, sample_image, temp_dir): |
| """Test complete image processing flow.""" |
| |
| with patch('api.api_server.verify_token', return_value="test-user"): |
| |
| _, buffer = cv2.imencode('.jpg', sample_image) |
| files = {"file": ("test.jpg", buffer.tobytes(), "image/jpeg")} |
| |
| response = client.post( |
| "/api/v1/process/image", |
| files=files, |
| data={"background": "blur", "quality": "low"} |
| ) |
| |
| assert response.status_code == 200 |
| job_data = response.json() |
| job_id = job_data["job_id"] |
| |
| |
| response = client.get(f"/api/v1/job/{job_id}") |
| |
| |
| assert response.status_code in [200, 404] |
| |
| @pytest.mark.integration |
| @pytest.mark.slow |
| def test_concurrent_requests(self, client): |
| """Test handling concurrent requests.""" |
| import concurrent.futures |
| |
| def make_request(): |
| response = client.get("/health") |
| return response.status_code |
| |
| with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: |
| futures = [executor.submit(make_request) for _ in range(10)] |
| results = [f.result() for f in concurrent.futures.as_completed(futures)] |
| |
| assert all(status == 200 for status in results) |
| |
| @pytest.mark.integration |
| def test_error_handling(self, client): |
| """Test API error handling.""" |
| |
| response = client.get("/api/v1/invalid") |
| assert response.status_code == 404 |
| |
| |
| response = client.get("/api/v1/stats") |
| assert response.status_code in [401, 422] |
| |
| |
| with patch('api.api_server.verify_token', return_value="test-user"): |
| files = {"file": ("test.txt", b"text content", "text/plain")} |
| response = client.post( |
| "/api/v1/process/image", |
| files=files, |
| headers={"Authorization": "Bearer test"} |
| ) |
| assert response.status_code == 400 |
|
|
|
|
| class TestAPIPerformance: |
| """Performance tests for API.""" |
| |
| @pytest.mark.slow |
| def test_response_time(self, client, performance_timer): |
| """Test API response times.""" |
| endpoints = ["/", "/health"] |
| |
| for endpoint in endpoints: |
| with performance_timer as timer: |
| response = client.get(endpoint) |
| |
| assert response.status_code == 200 |
| assert timer.elapsed < 0.1 |
| |
| @pytest.mark.slow |
| def test_file_upload_performance(self, client, performance_timer): |
| """Test file upload performance.""" |
| |
| large_data = np.random.randint(0, 255, (1024, 1024, 3), dtype=np.uint8) |
| _, buffer = cv2.imencode('.jpg', large_data) |
| |
| with patch('api.api_server.verify_token', return_value="test-user"): |
| with patch('api.api_server.process_image_task'): |
| with performance_timer as timer: |
| response = client.post( |
| "/api/v1/process/image", |
| files={"file": ("large.jpg", buffer.tobytes(), "image/jpeg")}, |
| headers={"Authorization": "Bearer test"} |
| ) |
| |
| assert response.status_code == 200 |
| assert timer.elapsed < 2.0 |