""" Integration tests for data_sources API endpoints. Tests cover: - Job creation and execution flow - Schema retrieval and validation - Raw SQL execution - Error handling and edge cases - Multi-tenant isolation - Authentication and authorization """ import pytest import json from fastapi.testclient import TestClient from unittest.mock import Mock, patch, MagicMock import sys import os from datetime import datetime # Ensure the project root (parent of `backend`) is on sys.path so # `import backend...` works when running this test file directly. project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")) if project_root not in sys.path: sys.path.insert(0, project_root) # Import FastAPI app from backend.data_sources.main import app # Import models and utilities for testing from backend.core.auth import AuthUser, get_current_user from backend.data_sources.jobs import JobStatus from backend.data_sources.api import get_tenant_config # Create test client client = TestClient(app) # ============================================================================ # Test Fixtures and Setup # ============================================================================ @pytest.fixture def mock_redis(): """Mock Redis client for testing.""" return Mock() @pytest.fixture def mock_minio(): """Mock MinIO client for testing.""" return Mock() @pytest.fixture(autouse=True) def setup_test_dependencies(): """Setup test dependencies and mocks.""" # Mock get_current_user to always return a valid AuthUser def mock_get_current_user(): return AuthUser({ "id": "test_user", "user_metadata": { "tenant_id": "test_tenant", "role": "user" } }) # Mock get_tenant_config to always return a valid data source for test_tenant def mock_get_tenant_config(tenant_id, redis_client): if tenant_id == "test_tenant": return [{ "source_name": "sample_db", "source_type": "ibis", "config": { "uri": "mysql+pymysql://root:test@localhost:3306/test_db" } }] return [] # Apply dependency overrides using imported functions app.dependency_overrides[get_current_user] = mock_get_current_user app.dependency_overrides[get_tenant_config] = mock_get_tenant_config yield # Clean up overrides app.dependency_overrides.pop(get_current_user, None) app.dependency_overrides.pop(get_tenant_config, None) # ============================================================================ # Health and Basic Endpoint Tests # ============================================================================ class TestHealthEndpoints: """Tests for health check and basic endpoints.""" def test_health_endpoint(self): """Test /health endpoint returns healthy status.""" response = client.get("/health") assert response.status_code == 200 data = response.json() assert data["status"] == "healthy" assert "service" in data assert data["service"] == "data-sources-api" assert "timestamp" in data def test_root_endpoint(self): """Test root / endpoint returns API info.""" response = client.get("/") assert response.status_code == 200 data = response.json() assert "name" in data assert "version" in data assert "endpoints" in data assert data["name"] == "Data Sources API" # ============================================================================ # Job Creation and Management Tests # ============================================================================ class TestJobLifecycle: """Integration tests for complete job lifecycle.""" @patch('backend.data_sources.api.get_redis_client') @patch('backend.data_sources.api.get_minio_client') @patch('backend.data_sources.api.get_tenant_config') @patch('backend.data_sources.worker.process_federated_job.delay') def test_job_creation_and_execution_flow(self, mock_celery, mock_config, mock_minio, mock_redis): """Test complete job creation and execution flow.""" # Setup mocks mock_redis_instance = MagicMock() mock_minio_instance = MagicMock() mock_redis.return_value = mock_redis_instance mock_minio.return_value = mock_minio_instance mock_config.return_value = [{"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"}}] # Mock Redis operations for job storage mock_redis_instance.set.return_value = True # Job creation request job_request = { "payload": { "plan": [ { "source": "sample_db", "query": { "operation": "table", "name": "orders" } } ], "tenant_id": "test_tenant" } } # Create job response = client.post("/api/v1/data-sources/jobs", json=job_request) assert response.status_code == 201 data = response.json() assert "job_id" in data assert data["status"] == "pending" assert "message" in data job_id = data["job_id"] # Verify Celery task was called mock_celery.assert_called_once() # For this integration test, we verify job creation success # In a real scenario, the job would be processed asynchronously # and the status would change to completed @patch('backend.data_sources.api.get_redis_client') def test_job_not_found(self, mock_redis): """Test retrieving results for non-existent job.""" mock_redis_instance = MagicMock() mock_redis.return_value = mock_redis_instance mock_redis_instance.get.return_value = None response = client.get("/api/v1/data-sources/results/nonexistent-job") assert response.status_code == 404 data = response.json() assert "detail" in data @patch('backend.data_sources.api.get_redis_client') @patch('backend.data_sources.api.get_minio_client') @patch('backend.data_sources.worker.process_federated_job.delay') def test_job_creation_validation_errors(self, mock_celery, mock_minio, mock_redis): """Test job creation with various validation errors.""" # Setup mocks mock_redis.return_value = MagicMock() mock_minio.return_value = MagicMock() # Test missing payload response = client.post("/api/v1/data-sources/jobs", json={}) assert response.status_code == 422 # Test invalid AST operation invalid_job = { "payload": { "plan": [{ "source": "sample_db", "query": { "operation": "invalid_operation", "name": "orders" } }], "tenant_id": "test_tenant" } } response = client.post("/api/v1/data-sources/jobs", json=invalid_job) assert response.status_code == 422 # Test empty plan empty_plan_job = { "payload": { "plan": [], "tenant_id": "test_tenant" } } response = client.post("/api/v1/data-sources/jobs", json=empty_plan_job) assert response.status_code == 422 # ============================================================================ # Schema Retrieval Tests # ============================================================================ class TestSchemaRetrieval: """Integration tests for schema retrieval functionality.""" @patch('backend.data_sources.api.get_redis_client') @patch('backend.data_sources.api.get_minio_client') @patch('backend.data_sources.api.get_tenant_config') @patch('backend.data_sources.api.FederationAgent') def test_schema_retrieval_success(self, mock_agent, mock_config, mock_minio, mock_redis): """Test successful schema retrieval for a data source.""" # Setup mocks mock_redis_instance = MagicMock() mock_redis.return_value = mock_redis_instance mock_redis_instance.get.return_value = None # No cache mock_config.return_value = [{ "source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"} }] # Mock FederationAgent and connector mock_connector = MagicMock() mock_connector.get_schema.return_value = '{"tables": ["orders", "customers", "products"]}' mock_agent_instance = MagicMock() mock_agent_instance.connectors = {"sample_db": mock_connector} mock_agent.return_value = mock_agent_instance # Get schema response = client.get("/api/v1/data-sources/schema/sample_db?tenant_id=test_tenant") assert response.status_code == 200 data = response.json() assert data["source_name"] == "sample_db" assert "schema" in data assert "last_updated" in data # Parse the schema data schema = json.loads(data["schema"]) assert "tables" in schema assert "orders" in schema["tables"] @patch('backend.data_sources.api.get_redis_client') @patch('backend.data_sources.api.get_minio_client') @patch('backend.data_sources.api.get_tenant_config') def test_schema_retrieval_unknown_source(self, mock_config, mock_minio, mock_redis): """Test schema retrieval for unknown data source.""" mock_redis.return_value = MagicMock() mock_minio.return_value = MagicMock() # Mock config to return empty list (no sources) mock_config.return_value = [] response = client.get("/api/v1/data-sources/schema/unknown_db?tenant_id=test_tenant") assert response.status_code == 404 data = response.json() assert "detail" in data @patch('backend.data_sources.api.get_redis_client') @patch('backend.data_sources.api.get_minio_client') @patch('backend.data_sources.api.get_tenant_config') @patch('backend.data_sources.api.FederationAgent') def test_schema_caching(self, mock_agent, mock_config, mock_minio, mock_redis): """Test that schema retrieval uses caching.""" mock_redis_instance = MagicMock() mock_redis.return_value = mock_redis_instance # First call - cache miss mock_redis_instance.get.return_value = None mock_config.return_value = [{ "source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"} }] mock_connector = MagicMock() mock_connector.get_schema.return_value = '{"tables": ["orders"]}' mock_agent_instance = MagicMock() mock_agent_instance.connectors = {"sample_db": mock_connector} mock_agent.return_value = mock_agent_instance # First request response1 = client.get("/api/v1/data-sources/schema/sample_db?tenant_id=test_tenant") assert response1.status_code == 200 # Verify the response contains expected data data = response1.json() assert data["source_name"] == "sample_db" assert "schema" in data # ============================================================================ # Raw SQL Execution Tests # ============================================================================ class TestRawSQLExecution: """Integration tests for raw SQL execution endpoint.""" @patch('backend.data_sources.api.get_redis_client') @patch('backend.data_sources.api.get_minio_client') @patch('backend.data_sources.api.get_tenant_config') @patch('backend.data_sources.api.FederationAgent') def test_raw_sql_execution_sync_success(self, mock_agent, mock_config, mock_minio, mock_redis): """Test successful synchronous raw SQL execution.""" # Setup mocks mock_redis.return_value = MagicMock() mock_minio.return_value = MagicMock() mock_config.return_value = [{ "source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"} }] # Mock FederationAgent mock_agent_instance = MagicMock() mock_agent_instance.execute_raw_query.return_value = [ {"id": 1, "name": "John Doe", "email": "john@example.com"}, {"id": 2, "name": "Jane Smith", "email": "jane@example.com"} ] mock_agent_instance.connectors = {"sample_db": MagicMock()} # Mock connectors dict mock_agent.return_value = mock_agent_instance # Execute raw SQL synchronously sql_request = { "tenant_id": "test_tenant", "source_name": "sample_db", "sql_query": "SELECT id, name, email FROM users LIMIT 10", "async_mode": False, "max_rows": 100 } response = client.post("/api/v1/data-sources/execute-raw-sql", json=sql_request) assert response.status_code == 200 data = response.json() assert data["status"] == "success" assert len(data["results"]) == 2 assert data["rows_returned"] == 2 assert "execution_time_ms" in data @patch('backend.data_sources.api.get_redis_client') @patch('backend.data_sources.api.get_minio_client') @patch('backend.data_sources.api.get_tenant_config') @patch('backend.data_sources.worker.process_raw_sql_job.apply_async') def test_raw_sql_execution_async_success(self, mock_celery, mock_config, mock_minio, mock_redis): """Test successful asynchronous raw SQL execution.""" # Setup mocks mock_redis.return_value = MagicMock() mock_minio.return_value = MagicMock() mock_config.return_value = [{ "source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"} }] # Execute raw SQL asynchronously sql_request = { "tenant_id": "test_tenant", "source_name": "sample_db", "sql_query": "SELECT * FROM large_table", "async_mode": True } response = client.post("/api/v1/data-sources/execute-raw-sql", json=sql_request) assert response.status_code == 200 data = response.json() assert data["status"] == "accepted" assert "job_id" in data assert data["tenant_id"] == "test_tenant" # Verify async task was queued mock_celery.assert_called_once() def test_raw_sql_validation_errors(self): """Test raw SQL execution with validation errors.""" # Test missing source_name response = client.post("/api/v1/data-sources/execute-raw-sql", json={ "sql_query": "SELECT * FROM users" }) assert response.status_code == 422 # Test empty SQL query response = client.post("/api/v1/data-sources/execute-raw-sql", json={ "source_name": "sample_db", "sql_query": "" }) assert response.status_code == 422 # ============================================================================ # Schema Search Tests # ============================================================================ class TestSchemaSearch: """Integration tests for schema search functionality.""" @patch('backend.data_sources.schema_search.format_search_results') @patch('backend.data_sources.schema_search.rank_tables_for_keywords') @patch('backend.data_sources.schema_search.build_relationship_graph') @patch('backend.data_sources.schema_search.build_keyword_index') @patch('backend.data_sources.schema_search.flatten_schema') @patch('backend.data_sources.connectors.ibis_connector.IbisConnector') @patch('backend.data_sources.api.get_redis_client') @patch('backend.data_sources.api.get_minio_client') @patch('backend.data_sources.api.get_tenant_config') @patch('backend.data_sources.api.FederationAgent') def test_schema_search_basic(self, mock_agent, mock_config, mock_minio, mock_redis, mock_ibis_connector, mock_flatten, mock_build_index, mock_build_graph, mock_rank_tables, mock_format_results): """Test basic schema search functionality.""" # Setup mocks mock_redis_instance = MagicMock() mock_redis.return_value = mock_redis_instance mock_redis_instance.get.return_value = None # No cache mock_config.return_value = [{ "source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"} }] # Mock schema search components mock_flatten.return_value = {"sample_db": {"orders": ["id", "customer_id", "amount"]}} mock_build_index.return_value = {"customer": ["orders.customer_id"]} mock_build_graph.return_value = {} mock_rank_tables.return_value = [ {"table_name": "orders", "score": 0.8, "matched_columns": ["customer_id"], "source_name": "sample_db"} ] mock_format_results.return_value = { "table_matches": [{"table_name": "orders", "score": 0.8, "matched_columns": ["customer_id"], "source_name": "sample_db"}], "formatted_string": "orders table with customer_id column" } # Perform schema search search_request = { "keywords": ["customer"] } response = client.post("/api/v1/data-sources/schema/search", json=search_request) assert response.status_code == 200 data = response.json() assert "available_sources" in data assert "matches" in data assert "formatted_schema_string" in data assert data["total_matches"] >= 0 def test_schema_search_validation_errors(self): """Test schema search with validation errors.""" # Test missing keywords response = client.post("/api/v1/data-sources/schema/search", json={}) assert response.status_code == 422 # Test empty keywords response = client.post("/api/v1/data-sources/schema/search", json={ "keywords": [] }) assert response.status_code == 422 # ============================================================================ # Error Handling and Edge Cases # ============================================================================ class TestErrorHandling: """Tests for error handling and edge cases.""" def test_404_not_found(self): """Test that non-existent routes return 404.""" response = client.get("/api/v1/data-sources/nonexistent") assert response.status_code == 404 def test_method_not_allowed(self): """Test that incorrect HTTP methods are rejected.""" response = client.get("/api/v1/data-sources/jobs") assert response.status_code == 405 def test_invalid_json(self): """Test that invalid JSON is rejected.""" response = client.post( "/api/v1/data-sources/jobs", data="invalid json {", headers={"Content-Type": "application/json"} ) assert response.status_code == 422 @patch('backend.data_sources.api.get_redis_client') @patch('backend.data_sources.api.get_minio_client') @patch('backend.data_sources.api.get_tenant_config') def test_tenant_isolation(self, mock_config, mock_minio, mock_redis): """Test that tenants are properly isolated.""" mock_redis.return_value = MagicMock() mock_minio.return_value = MagicMock() # Mock config to return no sources for different tenant mock_config.return_value = [] # Try to access schema for a source that doesn't belong to test_tenant response = client.get("/api/v1/data-sources/schema/sample_db?tenant_id=other_tenant") # Should fail because other_tenant has no sources configured assert response.status_code == 404 # ============================================================================ # Documentation and API Discovery # ============================================================================ class TestDocumentation: """Tests for API documentation and discovery.""" def test_openapi_schema(self): """Test that OpenAPI schema is available.""" response = client.get("/openapi.json") assert response.status_code == 200 data = response.json() assert "openapi" in data or "swagger" in data assert "paths" in data def test_docs_endpoint(self): """Test that /docs endpoint is available.""" response = client.get("/docs") # Swagger UI page assert response.status_code == 200 assert "text/html" in response.headers.get("content-type", "") def test_redoc_endpoint(self): """Test that /redoc endpoint is available.""" response = client.get("/redoc") # ReDoc page assert response.status_code == 200 assert "text/html" in response.headers.get("content-type", "") # ============================================================================ # CORS Tests # ============================================================================ class TestCORS: """Tests for CORS configuration.""" def test_cors_headers_on_api_endpoints(self): """Test that CORS headers are present on API endpoints.""" response = client.options( "/api/v1/data-sources/jobs", headers={ "Origin": "http://localhost:3000", "Access-Control-Request-Method": "POST", "Access-Control-Request-Headers": "Content-Type,Authorization" } ) # CORS preflight should be handled assert response.status_code in [200, 204] def test_cors_headers_on_health_endpoint(self): """Test CORS headers on health endpoint.""" response = client.get("/health") assert response.status_code == 200 # Note: TestClient may not include CORS headers in response if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"])