"""Simplified API tests for the data federation endpoints. This module tests the core API functionality with proper mocking. """ import json import pytest import sys import os from unittest.mock import MagicMock, patch from datetime import datetime from fastapi.testclient import TestClient from fastapi import FastAPI # Add parent to path for imports sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) from backend.data_sources.api import router, get_redis_client, get_minio_client from backend.core.auth import AuthUser, get_current_user from backend.data_sources.jobs import JobStatus # Create test app app = FastAPI() app.include_router(router) # Mock get_current_user to always return a valid AuthUser def mock_get_current_user(): return AuthUser({ "id": "test_user", "user_metadata": { "tenant_id": "test_tenant", "role": "user" } }) app.dependency_overrides[get_current_user] = mock_get_current_user # Mock get_tenant_config to always return a valid data source for test_tenant def mock_get_tenant_config(tenant_id, redis_client): if tenant_id == "test_tenant": return [{ "source_name": "sample_db", "source_type": "ibis", "config": { "uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo" } }] return [] import backend.data_sources.api as api_module app.dependency_overrides[api_module.get_tenant_config] = mock_get_tenant_config client = TestClient(app) class TestJobSchemas: """Tests for Pydantic schema validation.""" def test_valid_job_schema(self): """Test that a valid job request passes validation.""" job_request = { "payload": { "plan": [ { "source": "test_db", "query": { "operation": "table", "name": "orders" } } ], "tenant_id": "test_tenant" } } # This should not raise validation errors with patch('backend.data_sources.api.get_tenant_config') as mock_config, \ patch('backend.data_sources.worker.process_federated_job.delay') as mock_celery: mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}] response = client.post("/api/v1/data-sources/jobs", json=job_request) assert response.status_code == 201 mock_celery.assert_called_once() def test_invalid_ast_operation(self): """Test that invalid AST operations are rejected.""" job_request = { "payload": { "plan": [ { "source": "test_db", "query": { "operation": "delete_all", # Invalid operation "name": "orders" } } ], "tenant_id": "test_tenant" } } response = client.post("/api/v1/data-sources/jobs", json=job_request) assert response.status_code == 422 def test_table_operation_missing_name(self): """Test that table operations require a name field.""" job_request = { "payload": { "plan": [ { "source": "test_db", "query": { "operation": "table" # Missing 'name' field } } ], "tenant_id": "test_tenant" } } # This should fail at Pydantic validation level (before processing) response = client.post("/api/v1/data-sources/jobs", json=job_request) assert response.status_code == 422 def test_filter_operation_requires_predicate(self): """Test that filter operations require a predicate.""" job_request = { "payload": { "plan": [ { "source": "test_db", "query": { "operation": "filter", "source": {"operation": "table", "name": "orders"} # Missing 'predicate' field } } ], "tenant_id": "test_tenant" } } response = client.post("/api/v1/data-sources/jobs", json=job_request) assert response.status_code == 422 def test_valid_filter_operation(self): """Test that valid filter operations pass validation.""" job_request = { "payload": { "plan": [ { "source": "test_db", "query": { "operation": "filter", "source": {"operation": "table", "name": "orders"}, "predicate": { "column": "amount", "operation": "gt", "value": 100 } } } ], "tenant_id": "test_tenant" } } with patch('backend.data_sources.api.get_tenant_config') as mock_config, \ patch('backend.data_sources.worker.process_federated_job.delay') as mock_celery: mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}] response = client.post("/api/v1/data-sources/jobs", json=job_request) assert response.status_code == 201 mock_celery.assert_called_once() def test_empty_plan_rejected(self): """Test that empty plans are rejected.""" job_request = { "payload": { "plan": [], # Empty plan "tenant_id": "test_tenant" } } response = client.post("/api/v1/data-sources/jobs", json=job_request) assert response.status_code == 422 class TestJobEndpoints: """Tests for job-related endpoints with proper mocking.""" @patch('backend.data_sources.api.get_redis_client') @patch('backend.data_sources.api.get_minio_client') @patch('backend.data_sources.api.get_tenant_config') @patch('backend.data_sources.worker.process_federated_job.delay') def test_create_job_success(self, mock_celery, mock_config, mock_minio, mock_redis): """Test successful job creation.""" # Setup mocks mock_redis.return_value = MagicMock() mock_minio.return_value = MagicMock() mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}] job_request = { "payload": { "plan": [{"source": "test_db", "query": {"operation": "table", "name": "orders"}}], "tenant_id": "test_tenant" } } response = client.post("/api/v1/data-sources/jobs", json=job_request) assert response.status_code == 201 data = response.json() assert "job_id" in data assert data["status"] == "pending" mock_celery.assert_called_once() def test_get_job_results(self): """Test getting job results.""" mock_redis_instance = MagicMock() # Mock completed job metadata = { "job_id": "test-job-123", "tenant_id": "test_tenant", "status": "completed", "created_at": datetime.utcnow().isoformat(), "completed_at": datetime.utcnow().isoformat(), "result_location": "redis://job:test-job-123:results" } results = {"step_1": [{"id": 1, "amount": 100}]} mock_redis_instance.get.side_effect = [ json.dumps(metadata), json.dumps(results) ] # Override the dependency app.dependency_overrides[get_redis_client] = lambda: mock_redis_instance try: response = client.get("/api/v1/data-sources/results/test-job-123") assert response.status_code == 200 data = response.json() assert data["job_id"] == "test-job-123" assert data["status"] == "completed" assert "results" in data finally: # Clean up dependency override app.dependency_overrides.pop(get_redis_client, None) @patch('backend.data_sources.api.get_redis_client') def test_get_job_not_found(self, mock_redis): """Test getting results for non-existent job.""" mock_redis_instance = MagicMock() mock_redis.return_value = mock_redis_instance mock_redis_instance.get.return_value = None response = client.get("/api/v1/data-sources/results/nonexistent") assert response.status_code == 404 @patch('backend.data_sources.api.get_redis_client') @patch('backend.data_sources.api.get_minio_client') @patch('backend.data_sources.api.get_tenant_config') @patch('backend.data_sources.api.FederationAgent') def test_get_schema(self, mock_agent, mock_config, mock_minio, mock_redis): """Test getting schema for a data source.""" # Setup mocks mock_redis_instance = MagicMock() mock_redis.return_value = mock_redis_instance mock_redis_instance.get.return_value = None # No cache mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}] mock_connector = MagicMock() mock_connector.get_schema.return_value = '{"tables": ["orders"]}' mock_agent_instance = MagicMock() mock_agent_instance.connectors = {"test_db": mock_connector} mock_agent.return_value = mock_agent_instance response = client.get("/api/v1/data-sources/schema/test_db?tenant_id=test_tenant") assert response.status_code == 200 data = response.json() assert data["source_name"] == "test_db" assert "tables" in data["schema"] # ============================================================================ # Test Schema Search Endpoint (Section 3.7 - Hybrid Agent) # ============================================================================ class TestSchemaSearchEndpoint: """Tests for the /schema/search endpoint with metadata support.""" @patch('backend.data_sources.schema_search.format_search_results') @patch('backend.data_sources.schema_search.rank_tables_for_keywords') @patch('backend.data_sources.schema_search.build_relationship_graph') @patch('backend.data_sources.schema_search.build_keyword_index') @patch('backend.data_sources.schema_search.flatten_schema') @patch('backend.data_sources.connectors.ibis_connector.IbisConnector') @patch('data_sources.api.get_redis_client') @patch('data_sources.api.get_minio_client') @patch('data_sources.api.get_tenant_config') @patch('data_sources.api.FederationAgent') def test_schema_search_basic(self, mock_agent, mock_config, mock_minio, mock_redis, mock_ibis_connector, mock_flatten, mock_build_index, mock_build_graph, mock_rank_tables, mock_format_results): """Test basic schema search with keywords only.""" # Setup mocks mock_redis_instance = MagicMock() mock_redis.return_value = mock_redis_instance mock_redis_instance.get.return_value = None # No cache mock_config.return_value = [ {"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}} ] # Mock schema search module functions mock_flatten.return_value = { 'schema_name': 'public', 'tables': [{'table_name': 'customers', 'fields': [{'name': 'id', 'type': 'int'}]}] } mock_build_index.return_value = {'customer': {'customers': {'id'}}} mock_build_graph.return_value = {} mock_rank_tables.return_value = [ ('customers', 15.0, {'id'}) ] mock_format_results.return_value = { 'formatted_string': 'Table: customers', 'table_matches': [{'table_name': 'customers', 'score': 15.0, 'matched_columns': ['id']}], 'total_matches': 1 } # Mock connector mock_connector_instance = MagicMock() mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]' mock_ibis_connector.return_value = mock_connector_instance mock_agent_instance = MagicMock() mock_agent_instance.connectors = {"sample_db": mock_connector_instance} mock_agent.return_value = mock_agent_instance # Make request request_data = { "tenant_id": "test_tenant", "keywords": ["customer", "revenue"] } response = client.post("/api/v1/data-sources/schema/search", json=request_data) assert response.status_code == 200 data = response.json() assert "available_sources" in data assert "matches" in data assert "formatted_schema_string" in data assert "total_matches" in data @patch('backend.data_sources.schema_search.format_search_results') @patch('backend.data_sources.schema_search.rank_tables_for_keywords') @patch('backend.data_sources.schema_search.build_relationship_graph') @patch('backend.data_sources.schema_search.build_keyword_index') @patch('backend.data_sources.schema_search.flatten_schema') @patch('backend.data_sources.connectors.ibis_connector.IbisConnector') @patch('data_sources.api.get_redis_client') @patch('data_sources.api.get_minio_client') @patch('data_sources.api.get_tenant_config') @patch('data_sources.api.FederationAgent') def test_schema_search_with_original_question(self, mock_agent, mock_config, mock_minio, mock_redis, mock_ibis_connector, mock_flatten, mock_build_index, mock_build_graph, mock_rank_tables, mock_format_results, caplog): """Test schema search with original_question metadata (NEW FIELD).""" import logging caplog.set_level(logging.INFO) # Setup mocks mock_redis_instance = MagicMock() mock_redis.return_value = mock_redis_instance mock_redis_instance.get.return_value = None mock_config.return_value = [ {"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}} ] # Mock schema search functions mock_flatten.return_value = {'schema_name': 'public', 'tables': []} mock_build_index.return_value = {} mock_build_graph.return_value = {} mock_rank_tables.return_value = [] mock_format_results.return_value = { 'formatted_string': '', 'table_matches': [], 'total_matches': 0 } # Mock connector mock_connector_instance = MagicMock() mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]' mock_ibis_connector.return_value = mock_connector_instance mock_agent_instance = MagicMock() mock_agent_instance.connectors = {"sample_db": mock_connector_instance} mock_agent.return_value = mock_agent_instance # Make request with original_question request_data = { "tenant_id": "test_tenant", "keywords": ["revenue", "premium", "customers"], "original_question": "What was revenue from premium customers last quarter?" } response = client.post("/api/v1/data-sources/schema/search", json=request_data) assert response.status_code == 200 # Verify original_question was logged assert any("Schema search for question" in record.message for record in caplog.records) assert any("What was revenue from premium customers" in record.message for record in caplog.records) @patch('backend.data_sources.schema_search.format_search_results') @patch('backend.data_sources.schema_search.rank_tables_for_keywords') @patch('backend.data_sources.schema_search.build_relationship_graph') @patch('backend.data_sources.schema_search.build_keyword_index') @patch('backend.data_sources.schema_search.flatten_schema') @patch('backend.data_sources.connectors.ibis_connector.IbisConnector') @patch('data_sources.api.get_redis_client') @patch('data_sources.api.get_minio_client') @patch('data_sources.api.get_tenant_config') @patch('data_sources.api.FederationAgent') def test_schema_search_with_keyword_metadata(self, mock_agent, mock_config, mock_minio, mock_redis, mock_ibis_connector, mock_flatten, mock_build_index, mock_build_graph, mock_rank_tables, mock_format_results, caplog): """Test schema search with keyword_metadata breakdown (NEW FIELD).""" import logging caplog.set_level(logging.INFO) # Setup mocks mock_redis_instance = MagicMock() mock_redis.return_value = mock_redis_instance mock_redis_instance.get.return_value = None mock_config.return_value = [ {"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}} ] # Mock schema search functions mock_flatten.return_value = {'schema_name': 'public', 'tables': []} mock_build_index.return_value = {} mock_build_graph.return_value = {} mock_rank_tables.return_value = [] mock_format_results.return_value = { 'formatted_string': '', 'table_matches': [], 'total_matches': 0 } # Mock connector mock_connector_instance = MagicMock() mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]' mock_ibis_connector.return_value = mock_connector_instance mock_agent_instance = MagicMock() mock_agent_instance.connectors = {"sample_db": mock_connector_instance} mock_agent.return_value = mock_agent_instance # Make request with keyword_metadata request_data = { "tenant_id": "test_tenant", "keywords": ["premium", "revenue", "customers", "financial metrics"], "keyword_metadata": { "base": ["revenue", "customers"], "semantic": ["financial metrics"], "concepts": ["premium"] } } response = client.post("/api/v1/data-sources/schema/search", json=request_data) assert response.status_code == 200 # Verify keyword_metadata was logged assert any("Keyword breakdown" in record.message for record in caplog.records) assert any("Base: 2" in record.message for record in caplog.records) assert any("Semantic: 1" in record.message for record in caplog.records) assert any("Concepts: 1" in record.message for record in caplog.records) @patch('backend.data_sources.schema_search.format_search_results') @patch('backend.data_sources.schema_search.rank_tables_for_keywords') @patch('backend.data_sources.schema_search.build_relationship_graph') @patch('backend.data_sources.schema_search.build_keyword_index') @patch('backend.data_sources.schema_search.flatten_schema') @patch('backend.data_sources.connectors.ibis_connector.IbisConnector') @patch('data_sources.api.get_redis_client') @patch('data_sources.api.get_minio_client') @patch('data_sources.api.get_tenant_config') @patch('data_sources.api.FederationAgent') def test_schema_search_with_all_metadata(self, mock_agent, mock_config, mock_minio, mock_redis, mock_ibis_connector, mock_flatten, mock_build_index, mock_build_graph, mock_rank_tables, mock_format_results, caplog): """Test schema search with both original_question and keyword_metadata.""" import logging caplog.set_level(logging.INFO) # Setup mocks mock_redis_instance = MagicMock() mock_redis.return_value = mock_redis_instance mock_redis_instance.get.return_value = None mock_config.return_value = [ {"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}} ] # Mock schema search functions mock_flatten.return_value = {'schema_name': 'public', 'tables': []} mock_build_index.return_value = {} mock_build_graph.return_value = {} mock_rank_tables.return_value = [] mock_format_results.return_value = { 'formatted_string': '', 'table_matches': [], 'total_matches': 0 } # Mock connector mock_connector_instance = MagicMock() mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]' mock_ibis_connector.return_value = mock_connector_instance mock_agent_instance = MagicMock() mock_agent_instance.connectors = {"sample_db": mock_connector_instance} mock_agent.return_value = mock_agent_instance # Make request with all metadata request_data = { "tenant_id": "test_tenant", "keywords": ["sales", "monthly", "trend"], "original_question": "Show monthly sales trends", "keyword_metadata": { "base": ["sales", "monthly", "trend"], "semantic": ["time series"], "concepts": [] } } response = client.post("/api/v1/data-sources/schema/search", json=request_data) assert response.status_code == 200 # Verify both pieces of metadata were logged log_messages = [record.message for record in caplog.records] assert any("Show monthly sales trends" in msg for msg in log_messages) assert any("Keyword breakdown" in msg for msg in log_messages) @patch('backend.data_sources.schema_search.format_search_results') @patch('backend.data_sources.schema_search.rank_tables_for_keywords') @patch('backend.data_sources.schema_search.build_relationship_graph') @patch('backend.data_sources.schema_search.build_keyword_index') @patch('backend.data_sources.schema_search.flatten_schema') @patch('backend.data_sources.connectors.ibis_connector.IbisConnector') @patch('data_sources.api.get_redis_client') @patch('data_sources.api.get_minio_client') @patch('data_sources.api.get_tenant_config') @patch('data_sources.api.FederationAgent') def test_schema_search_backward_compatible(self, mock_agent, mock_config, mock_minio, mock_redis, mock_ibis_connector, mock_flatten, mock_build_index, mock_build_graph, mock_rank_tables, mock_format_results): """Test that schema search is backward compatible (metadata fields optional).""" # Setup mocks mock_redis_instance = MagicMock() mock_redis.return_value = mock_redis_instance mock_redis_instance.get.return_value = None mock_config.return_value = [ {"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}} ] # Mock schema search functions mock_flatten.return_value = {'schema_name': 'public', 'tables': []} mock_build_index.return_value = {} mock_build_graph.return_value = {} mock_rank_tables.return_value = [] mock_format_results.return_value = { 'formatted_string': '', 'table_matches': [], 'total_matches': 0 } # Mock connector mock_connector_instance = MagicMock() mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]' mock_ibis_connector.return_value = mock_connector_instance mock_agent_instance = MagicMock() mock_agent_instance.connectors = {"sample_db": mock_connector_instance} mock_agent.return_value = mock_agent_instance # Make request WITHOUT optional metadata fields (old API behavior) request_data = { "tenant_id": "test_tenant", "keywords": ["customer"] # NO original_question # NO keyword_metadata } response = client.post("/api/v1/data-sources/schema/search", json=request_data) # Should still work (backward compatible) assert response.status_code == 200 data = response.json() assert "available_sources" in data assert "matches" in data def test_schema_search_missing_tenant_id(self): """Test that missing tenant_id returns validation error.""" request_data = { # Missing tenant_id "keywords": ["customer"] } response = client.post("/api/v1/data-sources/schema/search", json=request_data) assert response.status_code == 422 # Validation error def test_schema_search_missing_keywords(self): """Test that missing keywords returns validation error.""" request_data = { "tenant_id": "tenant-123" # Missing keywords } response = client.post("/api/v1/data-sources/schema/search", json=request_data) assert response.status_code == 422 # Validation error def test_schema_search_empty_keywords(self): """Test that empty keywords list returns validation error.""" request_data = { "tenant_id": "tenant-123", "keywords": [] # Empty list } response = client.post("/api/v1/data-sources/schema/search", json=request_data) assert response.status_code == 422 # Validation error if __name__ == "__main__": pytest.main([__file__, "-v"])