sirus / backend /data_sources /tests /test_api.py
ranilmukesh's picture
Deploy SiRUS SQL Agent backend
b8277c4
"""Simplified API tests for the data federation endpoints.
This module tests the core API functionality with proper mocking.
"""
import json
import pytest
import sys
import os
from unittest.mock import MagicMock, patch
from datetime import datetime
from fastapi.testclient import TestClient
from fastapi import FastAPI
# Add parent to path for imports
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from backend.data_sources.api import router, get_redis_client, get_minio_client
from backend.core.auth import AuthUser, get_current_user
from backend.data_sources.jobs import JobStatus
# Create test app
app = FastAPI()
app.include_router(router)
# Mock get_current_user to always return a valid AuthUser
def mock_get_current_user():
return AuthUser({
"id": "test_user",
"user_metadata": {
"tenant_id": "test_tenant",
"role": "user"
}
})
app.dependency_overrides[get_current_user] = mock_get_current_user
# Mock get_tenant_config to always return a valid data source for test_tenant
def mock_get_tenant_config(tenant_id, redis_client):
if tenant_id == "test_tenant":
return [{
"source_name": "sample_db",
"source_type": "ibis",
"config": {
"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"
}
}]
return []
import backend.data_sources.api as api_module
app.dependency_overrides[api_module.get_tenant_config] = mock_get_tenant_config
client = TestClient(app)
class TestJobSchemas:
"""Tests for Pydantic schema validation."""
def test_valid_job_schema(self):
"""Test that a valid job request passes validation."""
job_request = {
"payload": {
"plan": [
{
"source": "test_db",
"query": {
"operation": "table",
"name": "orders"
}
}
],
"tenant_id": "test_tenant"
}
}
# This should not raise validation errors
with patch('backend.data_sources.api.get_tenant_config') as mock_config, \
patch('backend.data_sources.worker.process_federated_job.delay') as mock_celery:
mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}]
response = client.post("/api/v1/data-sources/jobs", json=job_request)
assert response.status_code == 201
mock_celery.assert_called_once()
def test_invalid_ast_operation(self):
"""Test that invalid AST operations are rejected."""
job_request = {
"payload": {
"plan": [
{
"source": "test_db",
"query": {
"operation": "delete_all", # Invalid operation
"name": "orders"
}
}
],
"tenant_id": "test_tenant"
}
}
response = client.post("/api/v1/data-sources/jobs", json=job_request)
assert response.status_code == 422
def test_table_operation_missing_name(self):
"""Test that table operations require a name field."""
job_request = {
"payload": {
"plan": [
{
"source": "test_db",
"query": {
"operation": "table"
# Missing 'name' field
}
}
],
"tenant_id": "test_tenant"
}
}
# This should fail at Pydantic validation level (before processing)
response = client.post("/api/v1/data-sources/jobs", json=job_request)
assert response.status_code == 422
def test_filter_operation_requires_predicate(self):
"""Test that filter operations require a predicate."""
job_request = {
"payload": {
"plan": [
{
"source": "test_db",
"query": {
"operation": "filter",
"source": {"operation": "table", "name": "orders"}
# Missing 'predicate' field
}
}
],
"tenant_id": "test_tenant"
}
}
response = client.post("/api/v1/data-sources/jobs", json=job_request)
assert response.status_code == 422
def test_valid_filter_operation(self):
"""Test that valid filter operations pass validation."""
job_request = {
"payload": {
"plan": [
{
"source": "test_db",
"query": {
"operation": "filter",
"source": {"operation": "table", "name": "orders"},
"predicate": {
"column": "amount",
"operation": "gt",
"value": 100
}
}
}
],
"tenant_id": "test_tenant"
}
}
with patch('backend.data_sources.api.get_tenant_config') as mock_config, \
patch('backend.data_sources.worker.process_federated_job.delay') as mock_celery:
mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}]
response = client.post("/api/v1/data-sources/jobs", json=job_request)
assert response.status_code == 201
mock_celery.assert_called_once()
def test_empty_plan_rejected(self):
"""Test that empty plans are rejected."""
job_request = {
"payload": {
"plan": [], # Empty plan
"tenant_id": "test_tenant"
}
}
response = client.post("/api/v1/data-sources/jobs", json=job_request)
assert response.status_code == 422
class TestJobEndpoints:
"""Tests for job-related endpoints with proper mocking."""
@patch('backend.data_sources.api.get_redis_client')
@patch('backend.data_sources.api.get_minio_client')
@patch('backend.data_sources.api.get_tenant_config')
@patch('backend.data_sources.worker.process_federated_job.delay')
def test_create_job_success(self, mock_celery, mock_config, mock_minio, mock_redis):
"""Test successful job creation."""
# Setup mocks
mock_redis.return_value = MagicMock()
mock_minio.return_value = MagicMock()
mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}]
job_request = {
"payload": {
"plan": [{"source": "test_db", "query": {"operation": "table", "name": "orders"}}],
"tenant_id": "test_tenant"
}
}
response = client.post("/api/v1/data-sources/jobs", json=job_request)
assert response.status_code == 201
data = response.json()
assert "job_id" in data
assert data["status"] == "pending"
mock_celery.assert_called_once()
def test_get_job_results(self):
"""Test getting job results."""
mock_redis_instance = MagicMock()
# Mock completed job
metadata = {
"job_id": "test-job-123",
"tenant_id": "test_tenant",
"status": "completed",
"created_at": datetime.utcnow().isoformat(),
"completed_at": datetime.utcnow().isoformat(),
"result_location": "redis://job:test-job-123:results"
}
results = {"step_1": [{"id": 1, "amount": 100}]}
mock_redis_instance.get.side_effect = [
json.dumps(metadata),
json.dumps(results)
]
# Override the dependency
app.dependency_overrides[get_redis_client] = lambda: mock_redis_instance
try:
response = client.get("/api/v1/data-sources/results/test-job-123")
assert response.status_code == 200
data = response.json()
assert data["job_id"] == "test-job-123"
assert data["status"] == "completed"
assert "results" in data
finally:
# Clean up dependency override
app.dependency_overrides.pop(get_redis_client, None)
@patch('backend.data_sources.api.get_redis_client')
def test_get_job_not_found(self, mock_redis):
"""Test getting results for non-existent job."""
mock_redis_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None
response = client.get("/api/v1/data-sources/results/nonexistent")
assert response.status_code == 404
@patch('backend.data_sources.api.get_redis_client')
@patch('backend.data_sources.api.get_minio_client')
@patch('backend.data_sources.api.get_tenant_config')
@patch('backend.data_sources.api.FederationAgent')
def test_get_schema(self, mock_agent, mock_config, mock_minio, mock_redis):
"""Test getting schema for a data source."""
# Setup mocks
mock_redis_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None # No cache
mock_config.return_value = [{"source_name": "test_db", "source_type": "ibis", "config": {"uri": "duckdb:///:memory:"}}]
mock_connector = MagicMock()
mock_connector.get_schema.return_value = '{"tables": ["orders"]}'
mock_agent_instance = MagicMock()
mock_agent_instance.connectors = {"test_db": mock_connector}
mock_agent.return_value = mock_agent_instance
response = client.get("/api/v1/data-sources/schema/test_db?tenant_id=test_tenant")
assert response.status_code == 200
data = response.json()
assert data["source_name"] == "test_db"
assert "tables" in data["schema"]
# ============================================================================
# Test Schema Search Endpoint (Section 3.7 - Hybrid Agent)
# ============================================================================
class TestSchemaSearchEndpoint:
"""Tests for the /schema/search endpoint with metadata support."""
@patch('backend.data_sources.schema_search.format_search_results')
@patch('backend.data_sources.schema_search.rank_tables_for_keywords')
@patch('backend.data_sources.schema_search.build_relationship_graph')
@patch('backend.data_sources.schema_search.build_keyword_index')
@patch('backend.data_sources.schema_search.flatten_schema')
@patch('backend.data_sources.connectors.ibis_connector.IbisConnector')
@patch('data_sources.api.get_redis_client')
@patch('data_sources.api.get_minio_client')
@patch('data_sources.api.get_tenant_config')
@patch('data_sources.api.FederationAgent')
def test_schema_search_basic(self, mock_agent, mock_config, mock_minio, mock_redis,
mock_ibis_connector, mock_flatten, mock_build_index,
mock_build_graph, mock_rank_tables, mock_format_results):
"""Test basic schema search with keywords only."""
# Setup mocks
mock_redis_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None # No cache
mock_config.return_value = [
{"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}}
]
# Mock schema search module functions
mock_flatten.return_value = {
'schema_name': 'public',
'tables': [{'table_name': 'customers', 'fields': [{'name': 'id', 'type': 'int'}]}]
}
mock_build_index.return_value = {'customer': {'customers': {'id'}}}
mock_build_graph.return_value = {}
mock_rank_tables.return_value = [
('customers', 15.0, {'id'})
]
mock_format_results.return_value = {
'formatted_string': 'Table: customers',
'table_matches': [{'table_name': 'customers', 'score': 15.0, 'matched_columns': ['id']}],
'total_matches': 1
}
# Mock connector
mock_connector_instance = MagicMock()
mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]'
mock_ibis_connector.return_value = mock_connector_instance
mock_agent_instance = MagicMock()
mock_agent_instance.connectors = {"sample_db": mock_connector_instance}
mock_agent.return_value = mock_agent_instance
# Make request
request_data = {
"tenant_id": "test_tenant",
"keywords": ["customer", "revenue"]
}
response = client.post("/api/v1/data-sources/schema/search", json=request_data)
assert response.status_code == 200
data = response.json()
assert "available_sources" in data
assert "matches" in data
assert "formatted_schema_string" in data
assert "total_matches" in data
@patch('backend.data_sources.schema_search.format_search_results')
@patch('backend.data_sources.schema_search.rank_tables_for_keywords')
@patch('backend.data_sources.schema_search.build_relationship_graph')
@patch('backend.data_sources.schema_search.build_keyword_index')
@patch('backend.data_sources.schema_search.flatten_schema')
@patch('backend.data_sources.connectors.ibis_connector.IbisConnector')
@patch('data_sources.api.get_redis_client')
@patch('data_sources.api.get_minio_client')
@patch('data_sources.api.get_tenant_config')
@patch('data_sources.api.FederationAgent')
def test_schema_search_with_original_question(self, mock_agent, mock_config, mock_minio,
mock_redis, mock_ibis_connector, mock_flatten,
mock_build_index, mock_build_graph,
mock_rank_tables, mock_format_results, caplog):
"""Test schema search with original_question metadata (NEW FIELD)."""
import logging
caplog.set_level(logging.INFO)
# Setup mocks
mock_redis_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None
mock_config.return_value = [
{"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}}
]
# Mock schema search functions
mock_flatten.return_value = {'schema_name': 'public', 'tables': []}
mock_build_index.return_value = {}
mock_build_graph.return_value = {}
mock_rank_tables.return_value = []
mock_format_results.return_value = {
'formatted_string': '',
'table_matches': [],
'total_matches': 0
}
# Mock connector
mock_connector_instance = MagicMock()
mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]'
mock_ibis_connector.return_value = mock_connector_instance
mock_agent_instance = MagicMock()
mock_agent_instance.connectors = {"sample_db": mock_connector_instance}
mock_agent.return_value = mock_agent_instance
# Make request with original_question
request_data = {
"tenant_id": "test_tenant",
"keywords": ["revenue", "premium", "customers"],
"original_question": "What was revenue from premium customers last quarter?"
}
response = client.post("/api/v1/data-sources/schema/search", json=request_data)
assert response.status_code == 200
# Verify original_question was logged
assert any("Schema search for question" in record.message for record in caplog.records)
assert any("What was revenue from premium customers" in record.message for record in caplog.records)
@patch('backend.data_sources.schema_search.format_search_results')
@patch('backend.data_sources.schema_search.rank_tables_for_keywords')
@patch('backend.data_sources.schema_search.build_relationship_graph')
@patch('backend.data_sources.schema_search.build_keyword_index')
@patch('backend.data_sources.schema_search.flatten_schema')
@patch('backend.data_sources.connectors.ibis_connector.IbisConnector')
@patch('data_sources.api.get_redis_client')
@patch('data_sources.api.get_minio_client')
@patch('data_sources.api.get_tenant_config')
@patch('data_sources.api.FederationAgent')
def test_schema_search_with_keyword_metadata(self, mock_agent, mock_config, mock_minio,
mock_redis, mock_ibis_connector, mock_flatten,
mock_build_index, mock_build_graph,
mock_rank_tables, mock_format_results, caplog):
"""Test schema search with keyword_metadata breakdown (NEW FIELD)."""
import logging
caplog.set_level(logging.INFO)
# Setup mocks
mock_redis_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None
mock_config.return_value = [
{"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}}
]
# Mock schema search functions
mock_flatten.return_value = {'schema_name': 'public', 'tables': []}
mock_build_index.return_value = {}
mock_build_graph.return_value = {}
mock_rank_tables.return_value = []
mock_format_results.return_value = {
'formatted_string': '',
'table_matches': [],
'total_matches': 0
}
# Mock connector
mock_connector_instance = MagicMock()
mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]'
mock_ibis_connector.return_value = mock_connector_instance
mock_agent_instance = MagicMock()
mock_agent_instance.connectors = {"sample_db": mock_connector_instance}
mock_agent.return_value = mock_agent_instance
# Make request with keyword_metadata
request_data = {
"tenant_id": "test_tenant",
"keywords": ["premium", "revenue", "customers", "financial metrics"],
"keyword_metadata": {
"base": ["revenue", "customers"],
"semantic": ["financial metrics"],
"concepts": ["premium"]
}
}
response = client.post("/api/v1/data-sources/schema/search", json=request_data)
assert response.status_code == 200
# Verify keyword_metadata was logged
assert any("Keyword breakdown" in record.message for record in caplog.records)
assert any("Base: 2" in record.message for record in caplog.records)
assert any("Semantic: 1" in record.message for record in caplog.records)
assert any("Concepts: 1" in record.message for record in caplog.records)
@patch('backend.data_sources.schema_search.format_search_results')
@patch('backend.data_sources.schema_search.rank_tables_for_keywords')
@patch('backend.data_sources.schema_search.build_relationship_graph')
@patch('backend.data_sources.schema_search.build_keyword_index')
@patch('backend.data_sources.schema_search.flatten_schema')
@patch('backend.data_sources.connectors.ibis_connector.IbisConnector')
@patch('data_sources.api.get_redis_client')
@patch('data_sources.api.get_minio_client')
@patch('data_sources.api.get_tenant_config')
@patch('data_sources.api.FederationAgent')
def test_schema_search_with_all_metadata(self, mock_agent, mock_config, mock_minio,
mock_redis, mock_ibis_connector, mock_flatten,
mock_build_index, mock_build_graph,
mock_rank_tables, mock_format_results, caplog):
"""Test schema search with both original_question and keyword_metadata."""
import logging
caplog.set_level(logging.INFO)
# Setup mocks
mock_redis_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None
mock_config.return_value = [
{"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}}
]
# Mock schema search functions
mock_flatten.return_value = {'schema_name': 'public', 'tables': []}
mock_build_index.return_value = {}
mock_build_graph.return_value = {}
mock_rank_tables.return_value = []
mock_format_results.return_value = {
'formatted_string': '',
'table_matches': [],
'total_matches': 0
}
# Mock connector
mock_connector_instance = MagicMock()
mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]'
mock_ibis_connector.return_value = mock_connector_instance
mock_agent_instance = MagicMock()
mock_agent_instance.connectors = {"sample_db": mock_connector_instance}
mock_agent.return_value = mock_agent_instance
# Make request with all metadata
request_data = {
"tenant_id": "test_tenant",
"keywords": ["sales", "monthly", "trend"],
"original_question": "Show monthly sales trends",
"keyword_metadata": {
"base": ["sales", "monthly", "trend"],
"semantic": ["time series"],
"concepts": []
}
}
response = client.post("/api/v1/data-sources/schema/search", json=request_data)
assert response.status_code == 200
# Verify both pieces of metadata were logged
log_messages = [record.message for record in caplog.records]
assert any("Show monthly sales trends" in msg for msg in log_messages)
assert any("Keyword breakdown" in msg for msg in log_messages)
@patch('backend.data_sources.schema_search.format_search_results')
@patch('backend.data_sources.schema_search.rank_tables_for_keywords')
@patch('backend.data_sources.schema_search.build_relationship_graph')
@patch('backend.data_sources.schema_search.build_keyword_index')
@patch('backend.data_sources.schema_search.flatten_schema')
@patch('backend.data_sources.connectors.ibis_connector.IbisConnector')
@patch('data_sources.api.get_redis_client')
@patch('data_sources.api.get_minio_client')
@patch('data_sources.api.get_tenant_config')
@patch('data_sources.api.FederationAgent')
def test_schema_search_backward_compatible(self, mock_agent, mock_config, mock_minio,
mock_redis, mock_ibis_connector, mock_flatten,
mock_build_index, mock_build_graph,
mock_rank_tables, mock_format_results):
"""Test that schema search is backward compatible (metadata fields optional)."""
# Setup mocks
mock_redis_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None
mock_config.return_value = [
{"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:bwgadmin%402023@65.0.127.253:3306/bookwedgo"}}
]
# Mock schema search functions
mock_flatten.return_value = {'schema_name': 'public', 'tables': []}
mock_build_index.return_value = {}
mock_build_graph.return_value = {}
mock_rank_tables.return_value = []
mock_format_results.return_value = {
'formatted_string': '',
'table_matches': [],
'total_matches': 0
}
# Mock connector
mock_connector_instance = MagicMock()
mock_connector_instance.get_schema.return_value = '[{"schema_name": "public", "tables": []}]'
mock_ibis_connector.return_value = mock_connector_instance
mock_agent_instance = MagicMock()
mock_agent_instance.connectors = {"sample_db": mock_connector_instance}
mock_agent.return_value = mock_agent_instance
# Make request WITHOUT optional metadata fields (old API behavior)
request_data = {
"tenant_id": "test_tenant",
"keywords": ["customer"]
# NO original_question
# NO keyword_metadata
}
response = client.post("/api/v1/data-sources/schema/search", json=request_data)
# Should still work (backward compatible)
assert response.status_code == 200
data = response.json()
assert "available_sources" in data
assert "matches" in data
def test_schema_search_missing_tenant_id(self):
"""Test that missing tenant_id returns validation error."""
request_data = {
# Missing tenant_id
"keywords": ["customer"]
}
response = client.post("/api/v1/data-sources/schema/search", json=request_data)
assert response.status_code == 422 # Validation error
def test_schema_search_missing_keywords(self):
"""Test that missing keywords returns validation error."""
request_data = {
"tenant_id": "tenant-123"
# Missing keywords
}
response = client.post("/api/v1/data-sources/schema/search", json=request_data)
assert response.status_code == 422 # Validation error
def test_schema_search_empty_keywords(self):
"""Test that empty keywords list returns validation error."""
request_data = {
"tenant_id": "tenant-123",
"keywords": [] # Empty list
}
response = client.post("/api/v1/data-sources/schema/search", json=request_data)
assert response.status_code == 422 # Validation error
if __name__ == "__main__":
pytest.main([__file__, "-v"])