Spaces:
Running
Running
| """Integration tests for proxy batch APIs with compression. | |
| These tests verify that batch endpoints work correctly with real API calls | |
| and compression enabled, testing token savings tracking. | |
| Required environment variables: | |
| - OPENAI_API_KEY: For OpenAI /v1/batches endpoint | |
| - ANTHROPIC_API_KEY: For Anthropic /v1/messages/batches endpoint | |
| IMPORTANT: Batch API tests create real batch jobs which may incur costs. | |
| Use sparingly and clean up resources after testing. | |
| Run with: | |
| OPENAI_API_KEY=... ANTHROPIC_API_KEY=... pytest tests/test_proxy_batch_integration.py -v | |
| """ | |
| import json | |
| import os | |
| import pytest | |
| pytest.importorskip("fastapi") | |
| pytest.importorskip("httpx") | |
| from fastapi.testclient import TestClient | |
| from headroom.proxy.server import ProxyConfig, create_app | |
| # ============================================================================= | |
| # Fixtures | |
| # ============================================================================= | |
| def openai_batch_client(): | |
| """Create test client for OpenAI batch API with compression enabled.""" | |
| config = ProxyConfig( | |
| optimize=True, # Enable compression for batch | |
| cache_enabled=False, | |
| rate_limit_enabled=False, | |
| cost_tracking_enabled=False, | |
| ) | |
| app = create_app(config) | |
| with TestClient(app) as client: | |
| yield client | |
| def anthropic_batch_client(): | |
| """Create test client for Anthropic batch API with compression enabled.""" | |
| config = ProxyConfig( | |
| optimize=True, # Enable compression for batch | |
| cache_enabled=False, | |
| rate_limit_enabled=False, | |
| cost_tracking_enabled=False, | |
| ) | |
| app = create_app(config) | |
| with TestClient(app) as client: | |
| yield client | |
| def openai_api_key(): | |
| """Get OpenAI API key from environment.""" | |
| return os.environ.get("OPENAI_API_KEY") | |
| def anthropic_api_key(): | |
| """Get Anthropic API key from environment.""" | |
| return os.environ.get("ANTHROPIC_API_KEY") | |
| def create_large_messages(num_items: int = 50) -> list[dict]: | |
| """Create messages with large JSON data for compression testing.""" | |
| # Create a list of items that will be compressible | |
| items = [ | |
| { | |
| "id": i, | |
| "name": f"Item number {i}", | |
| "description": f"This is a detailed description for item {i}. It contains additional information.", | |
| "status": "active" if i % 2 == 0 else "inactive", | |
| "metadata": { | |
| "created_at": f"2024-01-{(i % 28) + 1:02d}", | |
| "updated_at": f"2024-06-{(i % 28) + 1:02d}", | |
| "tags": [f"tag{i % 5}", f"category{i % 3}"], | |
| }, | |
| } | |
| for i in range(num_items) | |
| ] | |
| large_json = json.dumps(items, indent=2) | |
| return [ | |
| {"role": "system", "content": "You are a helpful data analyst assistant."}, | |
| {"role": "user", "content": "I have some data I need you to analyze."}, | |
| {"role": "assistant", "content": f"I've received your data:\n\n{large_json}"}, | |
| {"role": "user", "content": "How many items have status 'active'?"}, | |
| ] | |
| # ============================================================================= | |
| # OpenAI Batch API Tests | |
| # ============================================================================= | |
| class TestOpenAIBatchCreate: | |
| """Test OpenAI /v1/batches create endpoint with compression.""" | |
| def test_batch_create_validation_missing_input_file(self, openai_batch_client, openai_api_key): | |
| """POST /v1/batches without input_file_id returns validation error.""" | |
| response = openai_batch_client.post( | |
| "/v1/batches", | |
| headers={"Authorization": f"Bearer {openai_api_key}"}, | |
| json={ | |
| "endpoint": "/v1/chat/completions", | |
| "completion_window": "24h", | |
| }, | |
| ) | |
| assert response.status_code == 400 | |
| data = response.json() | |
| assert "error" in data | |
| assert "input_file_id" in data["error"]["message"].lower() | |
| def test_batch_create_validation_missing_endpoint(self, openai_batch_client, openai_api_key): | |
| """POST /v1/batches without endpoint returns validation error.""" | |
| response = openai_batch_client.post( | |
| "/v1/batches", | |
| headers={"Authorization": f"Bearer {openai_api_key}"}, | |
| json={ | |
| "input_file_id": "file-abc123", | |
| "completion_window": "24h", | |
| }, | |
| ) | |
| assert response.status_code == 400 | |
| data = response.json() | |
| assert "error" in data | |
| assert "endpoint" in data["error"]["message"].lower() | |
| def test_batch_create_with_compression(self, openai_batch_client, openai_api_key): | |
| """Full batch creation flow with compression. | |
| This test: | |
| 1. Creates a JSONL file with compressible content | |
| 2. Uploads it to OpenAI | |
| 3. Creates a batch with compression enabled | |
| 4. Verifies compression stats are tracked | |
| 5. Cancels the batch to avoid costs | |
| """ | |
| # Step 1: Create JSONL content with compressible messages | |
| messages = create_large_messages(num_items=30) | |
| jsonl_lines = [ | |
| json.dumps( | |
| { | |
| "custom_id": f"request-{i}", | |
| "method": "POST", | |
| "url": "/v1/chat/completions", | |
| "body": { | |
| "model": "gpt-4o-mini", | |
| "messages": messages, | |
| "max_tokens": 100, | |
| }, | |
| } | |
| ) | |
| for i in range(3) # 3 requests in batch | |
| ] | |
| jsonl_content = "\n".join(jsonl_lines) | |
| # Step 2: Upload the JSONL file directly to OpenAI | |
| import httpx | |
| upload_response = httpx.post( | |
| "https://api.openai.com/v1/files", | |
| headers={"Authorization": f"Bearer {openai_api_key}"}, | |
| files={"file": ("batch_input.jsonl", jsonl_content.encode(), "application/jsonl")}, | |
| data={"purpose": "batch"}, | |
| ) | |
| assert upload_response.status_code == 200, f"File upload failed: {upload_response.text}" | |
| file_data = upload_response.json() | |
| input_file_id = file_data["id"] | |
| try: | |
| # Step 3: Create batch through proxy with compression | |
| response = openai_batch_client.post( | |
| "/v1/batches", | |
| headers={"Authorization": f"Bearer {openai_api_key}"}, | |
| json={ | |
| "input_file_id": input_file_id, | |
| "endpoint": "/v1/chat/completions", | |
| "completion_window": "24h", | |
| "metadata": {"test": "compression_integration"}, | |
| }, | |
| ) | |
| assert response.status_code == 200, f"Batch creation failed: {response.text}" | |
| batch_data = response.json() | |
| # Verify batch was created | |
| assert "id" in batch_data | |
| assert batch_data["object"] == "batch" | |
| batch_id = batch_data["id"] | |
| # Verify compression stats in response headers | |
| if "x-headroom-tokens-saved" in response.headers: | |
| tokens_saved = int(response.headers["x-headroom-tokens-saved"]) | |
| assert tokens_saved >= 0 | |
| if "x-headroom-savings-percent" in response.headers: | |
| savings_percent = float(response.headers["x-headroom-savings-percent"]) | |
| assert 0 <= savings_percent <= 100 | |
| # Verify compression metadata was added | |
| metadata = batch_data.get("metadata", {}) | |
| if metadata.get("headroom_compressed") == "true": | |
| # Compression was applied | |
| assert "headroom_tokens_saved" in metadata | |
| assert "headroom_original_tokens" in metadata | |
| assert "headroom_compressed_tokens" in metadata | |
| tokens_saved = int(metadata["headroom_tokens_saved"]) | |
| assert tokens_saved >= 0 | |
| # Step 4: Cancel the batch to avoid costs | |
| cancel_response = openai_batch_client.post( | |
| f"/v1/batches/{batch_id}/cancel", | |
| headers={"Authorization": f"Bearer {openai_api_key}"}, | |
| ) | |
| # Cancel may succeed or fail if batch already completed/cancelled | |
| assert cancel_response.status_code in [200, 400] | |
| finally: | |
| # Cleanup: Delete the uploaded file | |
| httpx.delete( | |
| f"https://api.openai.com/v1/files/{input_file_id}", | |
| headers={"Authorization": f"Bearer {openai_api_key}"}, | |
| ) | |
| class TestOpenAIBatchList: | |
| """Test OpenAI /v1/batches list endpoint passthrough.""" | |
| def test_list_batches(self, openai_batch_client, openai_api_key): | |
| """GET /v1/batches returns list of batches.""" | |
| response = openai_batch_client.get( | |
| "/v1/batches", | |
| headers={"Authorization": f"Bearer {openai_api_key}"}, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| # Verify list response format | |
| assert "data" in data | |
| assert "object" in data | |
| assert data["object"] == "list" | |
| def test_list_batches_with_limit(self, openai_batch_client, openai_api_key): | |
| """GET /v1/batches with limit parameter.""" | |
| response = openai_batch_client.get( | |
| "/v1/batches?limit=5", | |
| headers={"Authorization": f"Bearer {openai_api_key}"}, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert len(data["data"]) <= 5 | |
| # ============================================================================= | |
| # Anthropic Batch API Tests | |
| # ============================================================================= | |
| class TestAnthropicBatchCreate: | |
| """Test Anthropic /v1/messages/batches create endpoint with compression.""" | |
| def test_batch_create_validation_missing_requests( | |
| self, anthropic_batch_client, anthropic_api_key | |
| ): | |
| """POST /v1/messages/batches without requests returns validation error.""" | |
| response = anthropic_batch_client.post( | |
| "/v1/messages/batches", | |
| headers={ | |
| "x-api-key": anthropic_api_key, | |
| "anthropic-version": "2023-06-01", | |
| "anthropic-beta": "message-batches-2024-09-24", | |
| }, | |
| json={}, | |
| ) | |
| assert response.status_code == 400 | |
| data = response.json() | |
| assert "error" in data | |
| def test_batch_create_validation_empty_requests( | |
| self, anthropic_batch_client, anthropic_api_key | |
| ): | |
| """POST /v1/messages/batches with empty requests list returns error.""" | |
| response = anthropic_batch_client.post( | |
| "/v1/messages/batches", | |
| headers={ | |
| "x-api-key": anthropic_api_key, | |
| "anthropic-version": "2023-06-01", | |
| "anthropic-beta": "message-batches-2024-09-24", | |
| }, | |
| json={"requests": []}, | |
| ) | |
| assert response.status_code == 400 | |
| data = response.json() | |
| assert "error" in data | |
| def test_batch_create_with_compression(self, anthropic_batch_client, anthropic_api_key): | |
| """Create Anthropic batch with compression. | |
| This test: | |
| 1. Creates a batch request with compressible messages | |
| 2. Verifies the batch is created successfully | |
| 3. Checks that compression stats are tracked | |
| 4. Cancels the batch to avoid costs | |
| """ | |
| # Create messages with compressible content | |
| messages = create_large_messages(num_items=25) | |
| # Create batch request in Anthropic format | |
| batch_requests = [ | |
| { | |
| "custom_id": f"req-{i}", | |
| "params": { | |
| "model": "claude-3-5-haiku-20241022", | |
| "max_tokens": 100, | |
| "messages": messages, | |
| }, | |
| } | |
| for i in range(2) # 2 requests in batch | |
| ] | |
| response = anthropic_batch_client.post( | |
| "/v1/messages/batches", | |
| headers={ | |
| "x-api-key": anthropic_api_key, | |
| "anthropic-version": "2023-06-01", | |
| "anthropic-beta": "message-batches-2024-09-24", | |
| "content-type": "application/json", | |
| }, | |
| json={"requests": batch_requests}, | |
| ) | |
| assert response.status_code == 200, f"Batch creation failed: {response.text}" | |
| batch_data = response.json() | |
| # Verify batch was created | |
| assert "id" in batch_data | |
| assert batch_data["type"] == "message_batch" | |
| batch_id = batch_data["id"] | |
| # Verify processing status | |
| assert "processing_status" in batch_data | |
| assert batch_data["processing_status"] in ["in_progress", "ended", "canceling"] | |
| # Check proxy stats for compression | |
| stats_response = anthropic_batch_client.get("/stats") | |
| stats = stats_response.json() | |
| # Batch requests should be tracked | |
| assert stats["requests"]["total"] >= 1 | |
| # Cancel the batch to avoid costs | |
| cancel_response = anthropic_batch_client.post( | |
| f"/v1/messages/batches/{batch_id}/cancel", | |
| headers={ | |
| "x-api-key": anthropic_api_key, | |
| "anthropic-version": "2023-06-01", | |
| "anthropic-beta": "message-batches-2024-09-24", | |
| }, | |
| ) | |
| # Cancel may succeed or return error if already processed | |
| assert cancel_response.status_code in [200, 400, 409] | |
| class TestAnthropicBatchList: | |
| """Test Anthropic /v1/messages/batches list endpoint passthrough.""" | |
| def test_list_batches(self, anthropic_batch_client, anthropic_api_key): | |
| """GET /v1/messages/batches returns list of batches.""" | |
| response = anthropic_batch_client.get( | |
| "/v1/messages/batches", | |
| headers={ | |
| "x-api-key": anthropic_api_key, | |
| "anthropic-version": "2023-06-01", | |
| "anthropic-beta": "message-batches-2024-09-24", | |
| }, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| # Verify list response format | |
| assert "data" in data | |
| def test_list_batches_with_limit(self, anthropic_batch_client, anthropic_api_key): | |
| """GET /v1/messages/batches with limit parameter.""" | |
| response = anthropic_batch_client.get( | |
| "/v1/messages/batches?limit=5", | |
| headers={ | |
| "x-api-key": anthropic_api_key, | |
| "anthropic-version": "2023-06-01", | |
| "anthropic-beta": "message-batches-2024-09-24", | |
| }, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert len(data.get("data", [])) <= 5 | |
| # ============================================================================= | |
| # Compression Verification Tests | |
| # ============================================================================= | |
| class TestBatchCompressionStats: | |
| """Test that batch compression stats are properly tracked.""" | |
| def test_stats_track_batch_requests(self, openai_batch_client, openai_api_key): | |
| """Verify batch requests update proxy stats correctly.""" | |
| # Get initial stats | |
| initial_stats = openai_batch_client.get("/stats").json() | |
| initial_requests = initial_stats["requests"]["total"] | |
| # Make a batch list request (passthrough) | |
| openai_batch_client.get( | |
| "/v1/batches", | |
| headers={"Authorization": f"Bearer {openai_api_key}"}, | |
| ) | |
| # Verify stats updated | |
| updated_stats = openai_batch_client.get("/stats").json() | |
| assert updated_stats["requests"]["total"] >= initial_requests | |
| class TestAnthropicBatchCompressionStats: | |
| """Test Anthropic batch compression stats tracking.""" | |
| def test_stats_track_anthropic_batch_requests(self, anthropic_batch_client, anthropic_api_key): | |
| """Verify Anthropic batch requests update proxy stats.""" | |
| # Get initial stats | |
| initial_stats = anthropic_batch_client.get("/stats").json() | |
| initial_requests = initial_stats["requests"]["total"] | |
| # Make a batch list request | |
| anthropic_batch_client.get( | |
| "/v1/messages/batches", | |
| headers={ | |
| "x-api-key": anthropic_api_key, | |
| "anthropic-version": "2023-06-01", | |
| "anthropic-beta": "message-batches-2024-09-24", | |
| }, | |
| ) | |
| # Verify stats updated | |
| updated_stats = anthropic_batch_client.get("/stats").json() | |
| assert updated_stats["requests"]["total"] >= initial_requests | |
| # ============================================================================= | |
| # Error Handling Tests | |
| # ============================================================================= | |
| class TestBatchErrorHandling: | |
| """Test error handling for batch endpoints.""" | |
| def test_openai_batch_invalid_file_id(self, openai_batch_client, openai_api_key): | |
| """Invalid file ID returns appropriate error.""" | |
| response = openai_batch_client.post( | |
| "/v1/batches", | |
| headers={"Authorization": f"Bearer {openai_api_key}"}, | |
| json={ | |
| "input_file_id": "file-nonexistent12345", | |
| "endpoint": "/v1/chat/completions", | |
| "completion_window": "24h", | |
| }, | |
| ) | |
| # Should return error for non-existent file | |
| assert response.status_code in [400, 404] | |
| def test_openai_batch_missing_auth(self, openai_batch_client): | |
| """Missing authentication returns error (401 or 404 depending on routing).""" | |
| response = openai_batch_client.post( | |
| "/v1/batches", | |
| json={ | |
| "input_file_id": "file-abc123", | |
| "endpoint": "/v1/chat/completions", | |
| }, | |
| ) | |
| # Proxy may return 404 (no route match) or 401 (auth error) | |
| assert response.status_code in [401, 404] | |
| def test_anthropic_batch_missing_auth(self, anthropic_batch_client): | |
| """Missing authentication returns error (401 or 400 depending on validation).""" | |
| response = anthropic_batch_client.post( | |
| "/v1/messages/batches", | |
| headers={ | |
| "anthropic-version": "2023-06-01", | |
| "anthropic-beta": "message-batches-2024-09-24", | |
| }, | |
| json={"requests": []}, | |
| ) | |
| # Proxy may return 400 (validation) or 401 (auth error) | |
| assert response.status_code in [400, 401] | |
| def test_openai_batch_invalid_json(self, openai_batch_client, openai_api_key): | |
| """Invalid JSON body returns 400.""" | |
| response = openai_batch_client.post( | |
| "/v1/batches", | |
| headers={ | |
| "Authorization": f"Bearer {openai_api_key}", | |
| "Content-Type": "application/json", | |
| }, | |
| content=b"not valid json", | |
| ) | |
| assert response.status_code == 400 | |