Spaces:
Running
Running
| """ | |
| Integration tests for data_sources API endpoints. | |
| Tests cover: | |
| - Job creation and execution flow | |
| - Schema retrieval and validation | |
| - Raw SQL execution | |
| - Error handling and edge cases | |
| - Multi-tenant isolation | |
| - Authentication and authorization | |
| """ | |
| import pytest | |
| import json | |
| from fastapi.testclient import TestClient | |
| from unittest.mock import Mock, patch, MagicMock | |
| import sys | |
| import os | |
| from datetime import datetime | |
| # Ensure the project root (parent of `backend`) is on sys.path so | |
| # `import backend...` works when running this test file directly. | |
| project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")) | |
| if project_root not in sys.path: | |
| sys.path.insert(0, project_root) | |
| # Import FastAPI app | |
| from backend.data_sources.main import app | |
| # Import models and utilities for testing | |
| from backend.core.auth import AuthUser, get_current_user | |
| from backend.data_sources.jobs import JobStatus | |
| from backend.data_sources.api import get_tenant_config | |
| # Create test client | |
| client = TestClient(app) | |
| # ============================================================================ | |
| # Test Fixtures and Setup | |
| # ============================================================================ | |
| def mock_redis(): | |
| """Mock Redis client for testing.""" | |
| return Mock() | |
| def mock_minio(): | |
| """Mock MinIO client for testing.""" | |
| return Mock() | |
| def setup_test_dependencies(): | |
| """Setup test dependencies and mocks.""" | |
| # Mock get_current_user to always return a valid AuthUser | |
| def mock_get_current_user(): | |
| return AuthUser({ | |
| "id": "test_user", | |
| "user_metadata": { | |
| "tenant_id": "test_tenant", | |
| "role": "user" | |
| } | |
| }) | |
| # Mock get_tenant_config to always return a valid data source for test_tenant | |
| def mock_get_tenant_config(tenant_id, redis_client): | |
| if tenant_id == "test_tenant": | |
| return [{ | |
| "source_name": "sample_db", | |
| "source_type": "ibis", | |
| "config": { | |
| "uri": "mysql+pymysql://root:test@localhost:3306/test_db" | |
| } | |
| }] | |
| return [] | |
| # Apply dependency overrides using imported functions | |
| app.dependency_overrides[get_current_user] = mock_get_current_user | |
| app.dependency_overrides[get_tenant_config] = mock_get_tenant_config | |
| yield | |
| # Clean up overrides | |
| app.dependency_overrides.pop(get_current_user, None) | |
| app.dependency_overrides.pop(get_tenant_config, None) | |
| # ============================================================================ | |
| # Health and Basic Endpoint Tests | |
| # ============================================================================ | |
| class TestHealthEndpoints: | |
| """Tests for health check and basic endpoints.""" | |
| def test_health_endpoint(self): | |
| """Test /health endpoint returns healthy status.""" | |
| response = client.get("/health") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["status"] == "healthy" | |
| assert "service" in data | |
| assert data["service"] == "data-sources-api" | |
| assert "timestamp" in data | |
| def test_root_endpoint(self): | |
| """Test root / endpoint returns API info.""" | |
| response = client.get("/") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "name" in data | |
| assert "version" in data | |
| assert "endpoints" in data | |
| assert data["name"] == "Data Sources API" | |
| # ============================================================================ | |
| # Job Creation and Management Tests | |
| # ============================================================================ | |
| class TestJobLifecycle: | |
| """Integration tests for complete job lifecycle.""" | |
| def test_job_creation_and_execution_flow(self, mock_celery, mock_config, mock_minio, mock_redis): | |
| """Test complete job creation and execution flow.""" | |
| # Setup mocks | |
| mock_redis_instance = MagicMock() | |
| mock_minio_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| mock_minio.return_value = mock_minio_instance | |
| mock_config.return_value = [{"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"}}] | |
| # Mock Redis operations for job storage | |
| mock_redis_instance.set.return_value = True | |
| # Job creation request | |
| job_request = { | |
| "payload": { | |
| "plan": [ | |
| { | |
| "source": "sample_db", | |
| "query": { | |
| "operation": "table", | |
| "name": "orders" | |
| } | |
| } | |
| ], | |
| "tenant_id": "test_tenant" | |
| } | |
| } | |
| # Create job | |
| response = client.post("/api/v1/data-sources/jobs", json=job_request) | |
| assert response.status_code == 201 | |
| data = response.json() | |
| assert "job_id" in data | |
| assert data["status"] == "pending" | |
| assert "message" in data | |
| job_id = data["job_id"] | |
| # Verify Celery task was called | |
| mock_celery.assert_called_once() | |
| # For this integration test, we verify job creation success | |
| # In a real scenario, the job would be processed asynchronously | |
| # and the status would change to completed | |
| def test_job_not_found(self, mock_redis): | |
| """Test retrieving results for non-existent job.""" | |
| mock_redis_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| mock_redis_instance.get.return_value = None | |
| response = client.get("/api/v1/data-sources/results/nonexistent-job") | |
| assert response.status_code == 404 | |
| data = response.json() | |
| assert "detail" in data | |
| def test_job_creation_validation_errors(self, mock_celery, mock_minio, mock_redis): | |
| """Test job creation with various validation errors.""" | |
| # Setup mocks | |
| mock_redis.return_value = MagicMock() | |
| mock_minio.return_value = MagicMock() | |
| # Test missing payload | |
| response = client.post("/api/v1/data-sources/jobs", json={}) | |
| assert response.status_code == 422 | |
| # Test invalid AST operation | |
| invalid_job = { | |
| "payload": { | |
| "plan": [{ | |
| "source": "sample_db", | |
| "query": { | |
| "operation": "invalid_operation", | |
| "name": "orders" | |
| } | |
| }], | |
| "tenant_id": "test_tenant" | |
| } | |
| } | |
| response = client.post("/api/v1/data-sources/jobs", json=invalid_job) | |
| assert response.status_code == 422 | |
| # Test empty plan | |
| empty_plan_job = { | |
| "payload": { | |
| "plan": [], | |
| "tenant_id": "test_tenant" | |
| } | |
| } | |
| response = client.post("/api/v1/data-sources/jobs", json=empty_plan_job) | |
| assert response.status_code == 422 | |
| # ============================================================================ | |
| # Schema Retrieval Tests | |
| # ============================================================================ | |
| class TestSchemaRetrieval: | |
| """Integration tests for schema retrieval functionality.""" | |
| def test_schema_retrieval_success(self, mock_agent, mock_config, mock_minio, mock_redis): | |
| """Test successful schema retrieval for a data source.""" | |
| # Setup mocks | |
| mock_redis_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| mock_redis_instance.get.return_value = None # No cache | |
| mock_config.return_value = [{ | |
| "source_name": "sample_db", | |
| "source_type": "ibis", | |
| "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"} | |
| }] | |
| # Mock FederationAgent and connector | |
| mock_connector = MagicMock() | |
| mock_connector.get_schema.return_value = '{"tables": ["orders", "customers", "products"]}' | |
| mock_agent_instance = MagicMock() | |
| mock_agent_instance.connectors = {"sample_db": mock_connector} | |
| mock_agent.return_value = mock_agent_instance | |
| # Get schema | |
| response = client.get("/api/v1/data-sources/schema/sample_db?tenant_id=test_tenant") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["source_name"] == "sample_db" | |
| assert "schema" in data | |
| assert "last_updated" in data | |
| # Parse the schema data | |
| schema = json.loads(data["schema"]) | |
| assert "tables" in schema | |
| assert "orders" in schema["tables"] | |
| def test_schema_retrieval_unknown_source(self, mock_config, mock_minio, mock_redis): | |
| """Test schema retrieval for unknown data source.""" | |
| mock_redis.return_value = MagicMock() | |
| mock_minio.return_value = MagicMock() | |
| # Mock config to return empty list (no sources) | |
| mock_config.return_value = [] | |
| response = client.get("/api/v1/data-sources/schema/unknown_db?tenant_id=test_tenant") | |
| assert response.status_code == 404 | |
| data = response.json() | |
| assert "detail" in data | |
| def test_schema_caching(self, mock_agent, mock_config, mock_minio, mock_redis): | |
| """Test that schema retrieval uses caching.""" | |
| mock_redis_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| # First call - cache miss | |
| mock_redis_instance.get.return_value = None | |
| mock_config.return_value = [{ | |
| "source_name": "sample_db", | |
| "source_type": "ibis", | |
| "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"} | |
| }] | |
| mock_connector = MagicMock() | |
| mock_connector.get_schema.return_value = '{"tables": ["orders"]}' | |
| mock_agent_instance = MagicMock() | |
| mock_agent_instance.connectors = {"sample_db": mock_connector} | |
| mock_agent.return_value = mock_agent_instance | |
| # First request | |
| response1 = client.get("/api/v1/data-sources/schema/sample_db?tenant_id=test_tenant") | |
| assert response1.status_code == 200 | |
| # Verify the response contains expected data | |
| data = response1.json() | |
| assert data["source_name"] == "sample_db" | |
| assert "schema" in data | |
| # ============================================================================ | |
| # Raw SQL Execution Tests | |
| # ============================================================================ | |
| class TestRawSQLExecution: | |
| """Integration tests for raw SQL execution endpoint.""" | |
| def test_raw_sql_execution_sync_success(self, mock_agent, mock_config, mock_minio, mock_redis): | |
| """Test successful synchronous raw SQL execution.""" | |
| # Setup mocks | |
| mock_redis.return_value = MagicMock() | |
| mock_minio.return_value = MagicMock() | |
| mock_config.return_value = [{ | |
| "source_name": "sample_db", | |
| "source_type": "ibis", | |
| "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"} | |
| }] | |
| # Mock FederationAgent | |
| mock_agent_instance = MagicMock() | |
| mock_agent_instance.execute_raw_query.return_value = [ | |
| {"id": 1, "name": "John Doe", "email": "john@example.com"}, | |
| {"id": 2, "name": "Jane Smith", "email": "jane@example.com"} | |
| ] | |
| mock_agent_instance.connectors = {"sample_db": MagicMock()} # Mock connectors dict | |
| mock_agent.return_value = mock_agent_instance | |
| # Execute raw SQL synchronously | |
| sql_request = { | |
| "tenant_id": "test_tenant", | |
| "source_name": "sample_db", | |
| "sql_query": "SELECT id, name, email FROM users LIMIT 10", | |
| "async_mode": False, | |
| "max_rows": 100 | |
| } | |
| response = client.post("/api/v1/data-sources/execute-raw-sql", json=sql_request) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["status"] == "success" | |
| assert len(data["results"]) == 2 | |
| assert data["rows_returned"] == 2 | |
| assert "execution_time_ms" in data | |
| def test_raw_sql_execution_async_success(self, mock_celery, mock_config, mock_minio, mock_redis): | |
| """Test successful asynchronous raw SQL execution.""" | |
| # Setup mocks | |
| mock_redis.return_value = MagicMock() | |
| mock_minio.return_value = MagicMock() | |
| mock_config.return_value = [{ | |
| "source_name": "sample_db", | |
| "source_type": "ibis", | |
| "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"} | |
| }] | |
| # Execute raw SQL asynchronously | |
| sql_request = { | |
| "tenant_id": "test_tenant", | |
| "source_name": "sample_db", | |
| "sql_query": "SELECT * FROM large_table", | |
| "async_mode": True | |
| } | |
| response = client.post("/api/v1/data-sources/execute-raw-sql", json=sql_request) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["status"] == "accepted" | |
| assert "job_id" in data | |
| assert data["tenant_id"] == "test_tenant" | |
| # Verify async task was queued | |
| mock_celery.assert_called_once() | |
| def test_raw_sql_validation_errors(self): | |
| """Test raw SQL execution with validation errors.""" | |
| # Test missing source_name | |
| response = client.post("/api/v1/data-sources/execute-raw-sql", json={ | |
| "sql_query": "SELECT * FROM users" | |
| }) | |
| assert response.status_code == 422 | |
| # Test empty SQL query | |
| response = client.post("/api/v1/data-sources/execute-raw-sql", json={ | |
| "source_name": "sample_db", | |
| "sql_query": "" | |
| }) | |
| assert response.status_code == 422 | |
| # ============================================================================ | |
| # Schema Search Tests | |
| # ============================================================================ | |
| class TestSchemaSearch: | |
| """Integration tests for schema search functionality.""" | |
| def test_schema_search_basic(self, mock_agent, mock_config, mock_minio, mock_redis, | |
| mock_ibis_connector, mock_flatten, mock_build_index, | |
| mock_build_graph, mock_rank_tables, mock_format_results): | |
| """Test basic schema search functionality.""" | |
| # Setup mocks | |
| mock_redis_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| mock_redis_instance.get.return_value = None # No cache | |
| mock_config.return_value = [{ | |
| "source_name": "sample_db", | |
| "source_type": "ibis", | |
| "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"} | |
| }] | |
| # Mock schema search components | |
| mock_flatten.return_value = {"sample_db": {"orders": ["id", "customer_id", "amount"]}} | |
| mock_build_index.return_value = {"customer": ["orders.customer_id"]} | |
| mock_build_graph.return_value = {} | |
| mock_rank_tables.return_value = [ | |
| {"table_name": "orders", "score": 0.8, "matched_columns": ["customer_id"], "source_name": "sample_db"} | |
| ] | |
| mock_format_results.return_value = { | |
| "table_matches": [{"table_name": "orders", "score": 0.8, "matched_columns": ["customer_id"], "source_name": "sample_db"}], | |
| "formatted_string": "orders table with customer_id column" | |
| } | |
| # Perform schema search | |
| search_request = { | |
| "keywords": ["customer"] | |
| } | |
| response = client.post("/api/v1/data-sources/schema/search", json=search_request) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "available_sources" in data | |
| assert "matches" in data | |
| assert "formatted_schema_string" in data | |
| assert data["total_matches"] >= 0 | |
| def test_schema_search_validation_errors(self): | |
| """Test schema search with validation errors.""" | |
| # Test missing keywords | |
| response = client.post("/api/v1/data-sources/schema/search", json={}) | |
| assert response.status_code == 422 | |
| # Test empty keywords | |
| response = client.post("/api/v1/data-sources/schema/search", json={ | |
| "keywords": [] | |
| }) | |
| assert response.status_code == 422 | |
| # ============================================================================ | |
| # Error Handling and Edge Cases | |
| # ============================================================================ | |
| class TestErrorHandling: | |
| """Tests for error handling and edge cases.""" | |
| def test_404_not_found(self): | |
| """Test that non-existent routes return 404.""" | |
| response = client.get("/api/v1/data-sources/nonexistent") | |
| assert response.status_code == 404 | |
| def test_method_not_allowed(self): | |
| """Test that incorrect HTTP methods are rejected.""" | |
| response = client.get("/api/v1/data-sources/jobs") | |
| assert response.status_code == 405 | |
| def test_invalid_json(self): | |
| """Test that invalid JSON is rejected.""" | |
| response = client.post( | |
| "/api/v1/data-sources/jobs", | |
| data="invalid json {", | |
| headers={"Content-Type": "application/json"} | |
| ) | |
| assert response.status_code == 422 | |
| def test_tenant_isolation(self, mock_config, mock_minio, mock_redis): | |
| """Test that tenants are properly isolated.""" | |
| mock_redis.return_value = MagicMock() | |
| mock_minio.return_value = MagicMock() | |
| # Mock config to return no sources for different tenant | |
| mock_config.return_value = [] | |
| # Try to access schema for a source that doesn't belong to test_tenant | |
| response = client.get("/api/v1/data-sources/schema/sample_db?tenant_id=other_tenant") | |
| # Should fail because other_tenant has no sources configured | |
| assert response.status_code == 404 | |
| # ============================================================================ | |
| # Documentation and API Discovery | |
| # ============================================================================ | |
| class TestDocumentation: | |
| """Tests for API documentation and discovery.""" | |
| def test_openapi_schema(self): | |
| """Test that OpenAPI schema is available.""" | |
| response = client.get("/openapi.json") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "openapi" in data or "swagger" in data | |
| assert "paths" in data | |
| def test_docs_endpoint(self): | |
| """Test that /docs endpoint is available.""" | |
| response = client.get("/docs") | |
| # Swagger UI page | |
| assert response.status_code == 200 | |
| assert "text/html" in response.headers.get("content-type", "") | |
| def test_redoc_endpoint(self): | |
| """Test that /redoc endpoint is available.""" | |
| response = client.get("/redoc") | |
| # ReDoc page | |
| assert response.status_code == 200 | |
| assert "text/html" in response.headers.get("content-type", "") | |
| # ============================================================================ | |
| # CORS Tests | |
| # ============================================================================ | |
| class TestCORS: | |
| """Tests for CORS configuration.""" | |
| def test_cors_headers_on_api_endpoints(self): | |
| """Test that CORS headers are present on API endpoints.""" | |
| response = client.options( | |
| "/api/v1/data-sources/jobs", | |
| headers={ | |
| "Origin": "http://localhost:3000", | |
| "Access-Control-Request-Method": "POST", | |
| "Access-Control-Request-Headers": "Content-Type,Authorization" | |
| } | |
| ) | |
| # CORS preflight should be handled | |
| assert response.status_code in [200, 204] | |
| def test_cors_headers_on_health_endpoint(self): | |
| """Test CORS headers on health endpoint.""" | |
| response = client.get("/health") | |
| assert response.status_code == 200 | |
| # Note: TestClient may not include CORS headers in response | |
| if __name__ == "__main__": | |
| pytest.main([__file__, "-v", "--tb=short"]) | |