Spaces:
Running
Running
| """Simplified API tests for the data federation endpoints. | |
| This module tests the core API functionality with proper mocking. | |
| """ | |
| import json | |
| import pytest | |
| import sys | |
| import os | |
| from unittest.mock import MagicMock, patch | |
| from datetime import datetime | |
| from fastapi.testclient import TestClient | |
| from fastapi import FastAPI | |
| # Add parent to path for imports | |
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) | |
| from backend.data_sources.api import router, get_redis_client, get_minio_client | |
| from backend.core.auth import AuthUser, get_current_user | |
| from backend.data_sources.jobs import JobStatus | |
| # Create test app | |
| app = FastAPI() | |
| app.include_router(router) | |
| # Mock get_current_user to always return a valid AuthUser | |
| def mock_get_current_user(): | |
| return AuthUser({ | |
| "id": "test_user", | |
| "user_metadata": { | |
| "tenant_id": "test_tenant", | |
| "role": "user" | |
| } | |
| }) | |
| app.dependency_overrides[get_current_user] = mock_get_current_user | |
| # Mock get_tenant_config to always return a valid data source for test_tenant | |
| def mock_get_tenant_config(tenant_id, redis_client): | |
| if tenant_id == "test_tenant": | |
| return [{ | |
| "source_name": "sample_db", | |
| "source_type": "ibis", | |
| "config": { | |
| "uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo" | |
| } | |
| }] | |
| return [] | |
| import backend.data_sources.api as api_module | |
| app.dependency_overrides[api_module.get_tenant_config] = mock_get_tenant_config | |
| client = TestClient(app) | |
| class TestJobSchemas: | |
| """Tests for Pydantic schema validation.""" | |
| def test_valid_job_schema(self): | |
| """Test that a valid job request passes validation.""" | |
| job_request = { | |
| "payload": { | |
| "plan": [ | |
| { | |
| "source": "test_db", | |
| "query": { | |
| "operation": "table", | |
| "name": "orders" | |
| } | |
| } | |
| ], | |
| "tenant_id": "test_tenant" | |
| } | |
| } | |
| # This should not raise validation errors | |
| with patch('backend.data_sources.api.get_tenant_config') as mock_config, \ | |
| patch('backend.data_sources.worker.process_federated_job.delay') as mock_celery: | |
| mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}] | |
| response = client.post("/api/v1/data-sources/jobs", json=job_request) | |
| assert response.status_code == 201 | |
| mock_celery.assert_called_once() | |
| def test_invalid_ast_operation(self): | |
| """Test that invalid AST operations are rejected.""" | |
| job_request = { | |
| "payload": { | |
| "plan": [ | |
| { | |
| "source": "test_db", | |
| "query": { | |
| "operation": "delete_all", # Invalid operation | |
| "name": "orders" | |
| } | |
| } | |
| ], | |
| "tenant_id": "test_tenant" | |
| } | |
| } | |
| response = client.post("/api/v1/data-sources/jobs", json=job_request) | |
| assert response.status_code == 422 | |
| def test_table_operation_missing_name(self): | |
| """Test that table operations require a name field.""" | |
| job_request = { | |
| "payload": { | |
| "plan": [ | |
| { | |
| "source": "test_db", | |
| "query": { | |
| "operation": "table" | |
| # Missing 'name' field | |
| } | |
| } | |
| ], | |
| "tenant_id": "test_tenant" | |
| } | |
| } | |
| # This should fail at Pydantic validation level (before processing) | |
| response = client.post("/api/v1/data-sources/jobs", json=job_request) | |
| assert response.status_code == 422 | |
| def test_filter_operation_requires_predicate(self): | |
| """Test that filter operations require a predicate.""" | |
| job_request = { | |
| "payload": { | |
| "plan": [ | |
| { | |
| "source": "test_db", | |
| "query": { | |
| "operation": "filter", | |
| "source": {"operation": "table", "name": "orders"} | |
| # Missing 'predicate' field | |
| } | |
| } | |
| ], | |
| "tenant_id": "test_tenant" | |
| } | |
| } | |
| response = client.post("/api/v1/data-sources/jobs", json=job_request) | |
| assert response.status_code == 422 | |
| def test_valid_filter_operation(self): | |
| """Test that valid filter operations pass validation.""" | |
| job_request = { | |
| "payload": { | |
| "plan": [ | |
| { | |
| "source": "test_db", | |
| "query": { | |
| "operation": "filter", | |
| "source": {"operation": "table", "name": "orders"}, | |
| "predicate": { | |
| "column": "amount", | |
| "operation": "gt", | |
| "value": 100 | |
| } | |
| } | |
| } | |
| ], | |
| "tenant_id": "test_tenant" | |
| } | |
| } | |
| with patch('backend.data_sources.api.get_tenant_config') as mock_config, \ | |
| patch('backend.data_sources.worker.process_federated_job.delay') as mock_celery: | |
| mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}] | |
| response = client.post("/api/v1/data-sources/jobs", json=job_request) | |
| assert response.status_code == 201 | |
| mock_celery.assert_called_once() | |
| def test_empty_plan_rejected(self): | |
| """Test that empty plans are rejected.""" | |
| job_request = { | |
| "payload": { | |
| "plan": [], # Empty plan | |
| "tenant_id": "test_tenant" | |
| } | |
| } | |
| response = client.post("/api/v1/data-sources/jobs", json=job_request) | |
| assert response.status_code == 422 | |
| class TestJobEndpoints: | |
| """Tests for job-related endpoints with proper mocking.""" | |
| def test_create_job_success(self, mock_celery, mock_config, mock_minio, mock_redis): | |
| """Test successful job creation.""" | |
| # Setup mocks | |
| mock_redis.return_value = MagicMock() | |
| mock_minio.return_value = MagicMock() | |
| mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}] | |
| job_request = { | |
| "payload": { | |
| "plan": [{"source": "test_db", "query": {"operation": "table", "name": "orders"}}], | |
| "tenant_id": "test_tenant" | |
| } | |
| } | |
| response = client.post("/api/v1/data-sources/jobs", json=job_request) | |
| assert response.status_code == 201 | |
| data = response.json() | |
| assert "job_id" in data | |
| assert data["status"] == "pending" | |
| mock_celery.assert_called_once() | |
| def test_get_job_results(self): | |
| """Test getting job results.""" | |
| mock_redis_instance = MagicMock() | |
| # Mock completed job | |
| metadata = { | |
| "job_id": "test-job-123", | |
| "tenant_id": "test_tenant", | |
| "status": "completed", | |
| "created_at": datetime.utcnow().isoformat(), | |
| "completed_at": datetime.utcnow().isoformat(), | |
| "result_location": "redis://job:test-job-123:results" | |
| } | |
| results = {"step_1": [{"id": 1, "amount": 100}]} | |
| mock_redis_instance.get.side_effect = [ | |
| json.dumps(metadata), | |
| json.dumps(results) | |
| ] | |
| # Override the dependency | |
| app.dependency_overrides[get_redis_client] = lambda: mock_redis_instance | |
| try: | |
| response = client.get("/api/v1/data-sources/results/test-job-123") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["job_id"] == "test-job-123" | |
| assert data["status"] == "completed" | |
| assert "results" in data | |
| finally: | |
| # Clean up dependency override | |
| app.dependency_overrides.pop(get_redis_client, None) | |
| def test_get_job_not_found(self, mock_redis): | |
| """Test getting results for non-existent job.""" | |
| mock_redis_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| mock_redis_instance.get.return_value = None | |
| response = client.get("/api/v1/data-sources/results/nonexistent") | |
| assert response.status_code == 404 | |
| def test_get_schema(self, mock_agent, mock_config, mock_minio, mock_redis): | |
| """Test getting schema for a data source.""" | |
| # Setup mocks | |
| mock_redis_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| mock_redis_instance.get.return_value = None # No cache | |
| mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}] | |
| mock_connector = MagicMock() | |
| mock_connector.get_schema.return_value = '{"tables": ["orders"]}' | |
| mock_agent_instance = MagicMock() | |
| mock_agent_instance.connectors = {"test_db": mock_connector} | |
| mock_agent.return_value = mock_agent_instance | |
| response = client.get("/api/v1/data-sources/schema/test_db?tenant_id=test_tenant") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["source_name"] == "test_db" | |
| assert "tables" in data["schema"] | |
| # ============================================================================ | |
| # Test Schema Search Endpoint (Section 3.7 - Hybrid Agent) | |
| # ============================================================================ | |
| class TestSchemaSearchEndpoint: | |
| """Tests for the /schema/search endpoint with metadata support.""" | |
| def test_schema_search_basic(self, mock_agent, mock_config, mock_minio, mock_redis, | |
| mock_ibis_connector, mock_flatten, mock_build_index, | |
| mock_build_graph, mock_rank_tables, mock_format_results): | |
| """Test basic schema search with keywords only.""" | |
| # Setup mocks | |
| mock_redis_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| mock_redis_instance.get.return_value = None # No cache | |
| mock_config.return_value = [ | |
| {"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}} | |
| ] | |
| # Mock schema search module functions | |
| mock_flatten.return_value = { | |
| 'schema_name': 'public', | |
| 'tables': [{'table_name': 'customers', 'fields': [{'name': 'id', 'type': 'int'}]}] | |
| } | |
| mock_build_index.return_value = {'customer': {'customers': {'id'}}} | |
| mock_build_graph.return_value = {} | |
| mock_rank_tables.return_value = [ | |
| ('customers', 15.0, {'id'}) | |
| ] | |
| mock_format_results.return_value = { | |
| 'formatted_string': 'Table: customers', | |
| 'table_matches': [{'table_name': 'customers', 'score': 15.0, 'matched_columns': ['id']}], | |
| 'total_matches': 1 | |
| } | |
| # Mock connector | |
| mock_connector_instance = MagicMock() | |
| mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]' | |
| mock_ibis_connector.return_value = mock_connector_instance | |
| mock_agent_instance = MagicMock() | |
| mock_agent_instance.connectors = {"sample_db": mock_connector_instance} | |
| mock_agent.return_value = mock_agent_instance | |
| # Make request | |
| request_data = { | |
| "tenant_id": "test_tenant", | |
| "keywords": ["customer", "revenue"] | |
| } | |
| response = client.post("/api/v1/data-sources/schema/search", json=request_data) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "available_sources" in data | |
| assert "matches" in data | |
| assert "formatted_schema_string" in data | |
| assert "total_matches" in data | |
| def test_schema_search_with_original_question(self, mock_agent, mock_config, mock_minio, | |
| mock_redis, mock_ibis_connector, mock_flatten, | |
| mock_build_index, mock_build_graph, | |
| mock_rank_tables, mock_format_results, caplog): | |
| """Test schema search with original_question metadata (NEW FIELD).""" | |
| import logging | |
| caplog.set_level(logging.INFO) | |
| # Setup mocks | |
| mock_redis_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| mock_redis_instance.get.return_value = None | |
| mock_config.return_value = [ | |
| {"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}} | |
| ] | |
| # Mock schema search functions | |
| mock_flatten.return_value = {'schema_name': 'public', 'tables': []} | |
| mock_build_index.return_value = {} | |
| mock_build_graph.return_value = {} | |
| mock_rank_tables.return_value = [] | |
| mock_format_results.return_value = { | |
| 'formatted_string': '', | |
| 'table_matches': [], | |
| 'total_matches': 0 | |
| } | |
| # Mock connector | |
| mock_connector_instance = MagicMock() | |
| mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]' | |
| mock_ibis_connector.return_value = mock_connector_instance | |
| mock_agent_instance = MagicMock() | |
| mock_agent_instance.connectors = {"sample_db": mock_connector_instance} | |
| mock_agent.return_value = mock_agent_instance | |
| # Make request with original_question | |
| request_data = { | |
| "tenant_id": "test_tenant", | |
| "keywords": ["revenue", "premium", "customers"], | |
| "original_question": "What was revenue from premium customers last quarter?" | |
| } | |
| response = client.post("/api/v1/data-sources/schema/search", json=request_data) | |
| assert response.status_code == 200 | |
| # Verify original_question was logged | |
| assert any("Schema search for question" in record.message for record in caplog.records) | |
| assert any("What was revenue from premium customers" in record.message for record in caplog.records) | |
| def test_schema_search_with_keyword_metadata(self, mock_agent, mock_config, mock_minio, | |
| mock_redis, mock_ibis_connector, mock_flatten, | |
| mock_build_index, mock_build_graph, | |
| mock_rank_tables, mock_format_results, caplog): | |
| """Test schema search with keyword_metadata breakdown (NEW FIELD).""" | |
| import logging | |
| caplog.set_level(logging.INFO) | |
| # Setup mocks | |
| mock_redis_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| mock_redis_instance.get.return_value = None | |
| mock_config.return_value = [ | |
| {"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}} | |
| ] | |
| # Mock schema search functions | |
| mock_flatten.return_value = {'schema_name': 'public', 'tables': []} | |
| mock_build_index.return_value = {} | |
| mock_build_graph.return_value = {} | |
| mock_rank_tables.return_value = [] | |
| mock_format_results.return_value = { | |
| 'formatted_string': '', | |
| 'table_matches': [], | |
| 'total_matches': 0 | |
| } | |
| # Mock connector | |
| mock_connector_instance = MagicMock() | |
| mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]' | |
| mock_ibis_connector.return_value = mock_connector_instance | |
| mock_agent_instance = MagicMock() | |
| mock_agent_instance.connectors = {"sample_db": mock_connector_instance} | |
| mock_agent.return_value = mock_agent_instance | |
| # Make request with keyword_metadata | |
| request_data = { | |
| "tenant_id": "test_tenant", | |
| "keywords": ["premium", "revenue", "customers", "financial metrics"], | |
| "keyword_metadata": { | |
| "base": ["revenue", "customers"], | |
| "semantic": ["financial metrics"], | |
| "concepts": ["premium"] | |
| } | |
| } | |
| response = client.post("/api/v1/data-sources/schema/search", json=request_data) | |
| assert response.status_code == 200 | |
| # Verify keyword_metadata was logged | |
| assert any("Keyword breakdown" in record.message for record in caplog.records) | |
| assert any("Base: 2" in record.message for record in caplog.records) | |
| assert any("Semantic: 1" in record.message for record in caplog.records) | |
| assert any("Concepts: 1" in record.message for record in caplog.records) | |
| def test_schema_search_with_all_metadata(self, mock_agent, mock_config, mock_minio, | |
| mock_redis, mock_ibis_connector, mock_flatten, | |
| mock_build_index, mock_build_graph, | |
| mock_rank_tables, mock_format_results, caplog): | |
| """Test schema search with both original_question and keyword_metadata.""" | |
| import logging | |
| caplog.set_level(logging.INFO) | |
| # Setup mocks | |
| mock_redis_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| mock_redis_instance.get.return_value = None | |
| mock_config.return_value = [ | |
| {"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}} | |
| ] | |
| # Mock schema search functions | |
| mock_flatten.return_value = {'schema_name': 'public', 'tables': []} | |
| mock_build_index.return_value = {} | |
| mock_build_graph.return_value = {} | |
| mock_rank_tables.return_value = [] | |
| mock_format_results.return_value = { | |
| 'formatted_string': '', | |
| 'table_matches': [], | |
| 'total_matches': 0 | |
| } | |
| # Mock connector | |
| mock_connector_instance = MagicMock() | |
| mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]' | |
| mock_ibis_connector.return_value = mock_connector_instance | |
| mock_agent_instance = MagicMock() | |
| mock_agent_instance.connectors = {"sample_db": mock_connector_instance} | |
| mock_agent.return_value = mock_agent_instance | |
| # Make request with all metadata | |
| request_data = { | |
| "tenant_id": "test_tenant", | |
| "keywords": ["sales", "monthly", "trend"], | |
| "original_question": "Show monthly sales trends", | |
| "keyword_metadata": { | |
| "base": ["sales", "monthly", "trend"], | |
| "semantic": ["time series"], | |
| "concepts": [] | |
| } | |
| } | |
| response = client.post("/api/v1/data-sources/schema/search", json=request_data) | |
| assert response.status_code == 200 | |
| # Verify both pieces of metadata were logged | |
| log_messages = [record.message for record in caplog.records] | |
| assert any("Show monthly sales trends" in msg for msg in log_messages) | |
| assert any("Keyword breakdown" in msg for msg in log_messages) | |
| def test_schema_search_backward_compatible(self, mock_agent, mock_config, mock_minio, | |
| mock_redis, mock_ibis_connector, mock_flatten, | |
| mock_build_index, mock_build_graph, | |
| mock_rank_tables, mock_format_results): | |
| """Test that schema search is backward compatible (metadata fields optional).""" | |
| # Setup mocks | |
| mock_redis_instance = MagicMock() | |
| mock_redis.return_value = mock_redis_instance | |
| mock_redis_instance.get.return_value = None | |
| mock_config.return_value = [ | |
| {"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}} | |
| ] | |
| # Mock schema search functions | |
| mock_flatten.return_value = {'schema_name': 'public', 'tables': []} | |
| mock_build_index.return_value = {} | |
| mock_build_graph.return_value = {} | |
| mock_rank_tables.return_value = [] | |
| mock_format_results.return_value = { | |
| 'formatted_string': '', | |
| 'table_matches': [], | |
| 'total_matches': 0 | |
| } | |
| # Mock connector | |
| mock_connector_instance = MagicMock() | |
| mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]' | |
| mock_ibis_connector.return_value = mock_connector_instance | |
| mock_agent_instance = MagicMock() | |
| mock_agent_instance.connectors = {"sample_db": mock_connector_instance} | |
| mock_agent.return_value = mock_agent_instance | |
| # Make request WITHOUT optional metadata fields (old API behavior) | |
| request_data = { | |
| "tenant_id": "test_tenant", | |
| "keywords": ["customer"] | |
| # NO original_question | |
| # NO keyword_metadata | |
| } | |
| response = client.post("/api/v1/data-sources/schema/search", json=request_data) | |
| # Should still work (backward compatible) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "available_sources" in data | |
| assert "matches" in data | |
| def test_schema_search_missing_tenant_id(self): | |
| """Test that missing tenant_id returns validation error.""" | |
| request_data = { | |
| # Missing tenant_id | |
| "keywords": ["customer"] | |
| } | |
| response = client.post("/api/v1/data-sources/schema/search", json=request_data) | |
| assert response.status_code == 422 # Validation error | |
| def test_schema_search_missing_keywords(self): | |
| """Test that missing keywords returns validation error.""" | |
| request_data = { | |
| "tenant_id": "tenant-123" | |
| # Missing keywords | |
| } | |
| response = client.post("/api/v1/data-sources/schema/search", json=request_data) | |
| assert response.status_code == 422 # Validation error | |
| def test_schema_search_empty_keywords(self): | |
| """Test that empty keywords list returns validation error.""" | |
| request_data = { | |
| "tenant_id": "tenant-123", | |
| "keywords": [] # Empty list | |
| } | |
| response = client.post("/api/v1/data-sources/schema/search", json=request_data) | |
| assert response.status_code == 422 # Validation error | |
| if __name__ == "__main__": | |
| pytest.main([__file__, "-v"]) |