sirus / backend /data_sources /tests /test_api_integration.py
ranilmukesh's picture
Deploy SiRUS SQL Agent backend
b8277c4
"""
Integration tests for data_sources API endpoints.
Tests cover:
- Job creation and execution flow
- Schema retrieval and validation
- Raw SQL execution
- Error handling and edge cases
- Multi-tenant isolation
- Authentication and authorization
"""
import pytest
import json
from fastapi.testclient import TestClient
from unittest.mock import Mock, patch, MagicMock
import sys
import os
from datetime import datetime
# Ensure the project root (parent of `backend`) is on sys.path so
# `import backend...` works when running this test file directly.
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
if project_root not in sys.path:
sys.path.insert(0, project_root)
# Import FastAPI app
from backend.data_sources.main import app
# Import models and utilities for testing
from backend.core.auth import AuthUser, get_current_user
from backend.data_sources.jobs import JobStatus
from backend.data_sources.api import get_tenant_config
# Create test client
client = TestClient(app)
# ============================================================================
# Test Fixtures and Setup
# ============================================================================
@pytest.fixture
def mock_redis():
"""Mock Redis client for testing."""
return Mock()
@pytest.fixture
def mock_minio():
"""Mock MinIO client for testing."""
return Mock()
@pytest.fixture(autouse=True)
def setup_test_dependencies():
"""Setup test dependencies and mocks."""
# Mock get_current_user to always return a valid AuthUser
def mock_get_current_user():
return AuthUser({
"id": "test_user",
"user_metadata": {
"tenant_id": "test_tenant",
"role": "user"
}
})
# Mock get_tenant_config to always return a valid data source for test_tenant
def mock_get_tenant_config(tenant_id, redis_client):
if tenant_id == "test_tenant":
return [{
"source_name": "sample_db",
"source_type": "ibis",
"config": {
"uri": "mysql+pymysql://root:test@localhost:3306/test_db"
}
}]
return []
# Apply dependency overrides using imported functions
app.dependency_overrides[get_current_user] = mock_get_current_user
app.dependency_overrides[get_tenant_config] = mock_get_tenant_config
yield
# Clean up overrides
app.dependency_overrides.pop(get_current_user, None)
app.dependency_overrides.pop(get_tenant_config, None)
# ============================================================================
# Health and Basic Endpoint Tests
# ============================================================================
class TestHealthEndpoints:
"""Tests for health check and basic endpoints."""
def test_health_endpoint(self):
"""Test /health endpoint returns healthy status."""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
assert "service" in data
assert data["service"] == "data-sources-api"
assert "timestamp" in data
def test_root_endpoint(self):
"""Test root / endpoint returns API info."""
response = client.get("/")
assert response.status_code == 200
data = response.json()
assert "name" in data
assert "version" in data
assert "endpoints" in data
assert data["name"] == "Data Sources API"
# ============================================================================
# Job Creation and Management Tests
# ============================================================================
class TestJobLifecycle:
"""Integration tests for complete job lifecycle."""
@patch('backend.data_sources.api.get_redis_client')
@patch('backend.data_sources.api.get_minio_client')
@patch('backend.data_sources.api.get_tenant_config')
@patch('backend.data_sources.worker.process_federated_job.delay')
def test_job_creation_and_execution_flow(self, mock_celery, mock_config, mock_minio, mock_redis):
"""Test complete job creation and execution flow."""
# Setup mocks
mock_redis_instance = MagicMock()
mock_minio_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
mock_minio.return_value = mock_minio_instance
mock_config.return_value = [{"source_name": "sample_db", "source_type": "ibis", "config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"}}]
# Mock Redis operations for job storage
mock_redis_instance.set.return_value = True
# Job creation request
job_request = {
"payload": {
"plan": [
{
"source": "sample_db",
"query": {
"operation": "table",
"name": "orders"
}
}
],
"tenant_id": "test_tenant"
}
}
# Create job
response = client.post("/api/v1/data-sources/jobs", json=job_request)
assert response.status_code == 201
data = response.json()
assert "job_id" in data
assert data["status"] == "pending"
assert "message" in data
job_id = data["job_id"]
# Verify Celery task was called
mock_celery.assert_called_once()
# For this integration test, we verify job creation success
# In a real scenario, the job would be processed asynchronously
# and the status would change to completed
@patch('backend.data_sources.api.get_redis_client')
def test_job_not_found(self, mock_redis):
"""Test retrieving results for non-existent job."""
mock_redis_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None
response = client.get("/api/v1/data-sources/results/nonexistent-job")
assert response.status_code == 404
data = response.json()
assert "detail" in data
@patch('backend.data_sources.api.get_redis_client')
@patch('backend.data_sources.api.get_minio_client')
@patch('backend.data_sources.worker.process_federated_job.delay')
def test_job_creation_validation_errors(self, mock_celery, mock_minio, mock_redis):
"""Test job creation with various validation errors."""
# Setup mocks
mock_redis.return_value = MagicMock()
mock_minio.return_value = MagicMock()
# Test missing payload
response = client.post("/api/v1/data-sources/jobs", json={})
assert response.status_code == 422
# Test invalid AST operation
invalid_job = {
"payload": {
"plan": [{
"source": "sample_db",
"query": {
"operation": "invalid_operation",
"name": "orders"
}
}],
"tenant_id": "test_tenant"
}
}
response = client.post("/api/v1/data-sources/jobs", json=invalid_job)
assert response.status_code == 422
# Test empty plan
empty_plan_job = {
"payload": {
"plan": [],
"tenant_id": "test_tenant"
}
}
response = client.post("/api/v1/data-sources/jobs", json=empty_plan_job)
assert response.status_code == 422
# ============================================================================
# Schema Retrieval Tests
# ============================================================================
class TestSchemaRetrieval:
"""Integration tests for schema retrieval functionality."""
@patch('backend.data_sources.api.get_redis_client')
@patch('backend.data_sources.api.get_minio_client')
@patch('backend.data_sources.api.get_tenant_config')
@patch('backend.data_sources.api.FederationAgent')
def test_schema_retrieval_success(self, mock_agent, mock_config, mock_minio, mock_redis):
"""Test successful schema retrieval for a data source."""
# Setup mocks
mock_redis_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None # No cache
mock_config.return_value = [{
"source_name": "sample_db",
"source_type": "ibis",
"config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"}
}]
# Mock FederationAgent and connector
mock_connector = MagicMock()
mock_connector.get_schema.return_value = '{"tables": ["orders", "customers", "products"]}'
mock_agent_instance = MagicMock()
mock_agent_instance.connectors = {"sample_db": mock_connector}
mock_agent.return_value = mock_agent_instance
# Get schema
response = client.get("/api/v1/data-sources/schema/sample_db?tenant_id=test_tenant")
assert response.status_code == 200
data = response.json()
assert data["source_name"] == "sample_db"
assert "schema" in data
assert "last_updated" in data
# Parse the schema data
schema = json.loads(data["schema"])
assert "tables" in schema
assert "orders" in schema["tables"]
@patch('backend.data_sources.api.get_redis_client')
@patch('backend.data_sources.api.get_minio_client')
@patch('backend.data_sources.api.get_tenant_config')
def test_schema_retrieval_unknown_source(self, mock_config, mock_minio, mock_redis):
"""Test schema retrieval for unknown data source."""
mock_redis.return_value = MagicMock()
mock_minio.return_value = MagicMock()
# Mock config to return empty list (no sources)
mock_config.return_value = []
response = client.get("/api/v1/data-sources/schema/unknown_db?tenant_id=test_tenant")
assert response.status_code == 404
data = response.json()
assert "detail" in data
@patch('backend.data_sources.api.get_redis_client')
@patch('backend.data_sources.api.get_minio_client')
@patch('backend.data_sources.api.get_tenant_config')
@patch('backend.data_sources.api.FederationAgent')
def test_schema_caching(self, mock_agent, mock_config, mock_minio, mock_redis):
"""Test that schema retrieval uses caching."""
mock_redis_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
# First call - cache miss
mock_redis_instance.get.return_value = None
mock_config.return_value = [{
"source_name": "sample_db",
"source_type": "ibis",
"config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"}
}]
mock_connector = MagicMock()
mock_connector.get_schema.return_value = '{"tables": ["orders"]}'
mock_agent_instance = MagicMock()
mock_agent_instance.connectors = {"sample_db": mock_connector}
mock_agent.return_value = mock_agent_instance
# First request
response1 = client.get("/api/v1/data-sources/schema/sample_db?tenant_id=test_tenant")
assert response1.status_code == 200
# Verify the response contains expected data
data = response1.json()
assert data["source_name"] == "sample_db"
assert "schema" in data
# ============================================================================
# Raw SQL Execution Tests
# ============================================================================
class TestRawSQLExecution:
"""Integration tests for raw SQL execution endpoint."""
@patch('backend.data_sources.api.get_redis_client')
@patch('backend.data_sources.api.get_minio_client')
@patch('backend.data_sources.api.get_tenant_config')
@patch('backend.data_sources.api.FederationAgent')
def test_raw_sql_execution_sync_success(self, mock_agent, mock_config, mock_minio, mock_redis):
"""Test successful synchronous raw SQL execution."""
# Setup mocks
mock_redis.return_value = MagicMock()
mock_minio.return_value = MagicMock()
mock_config.return_value = [{
"source_name": "sample_db",
"source_type": "ibis",
"config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"}
}]
# Mock FederationAgent
mock_agent_instance = MagicMock()
mock_agent_instance.execute_raw_query.return_value = [
{"id": 1, "name": "John Doe", "email": "john@example.com"},
{"id": 2, "name": "Jane Smith", "email": "jane@example.com"}
]
mock_agent_instance.connectors = {"sample_db": MagicMock()} # Mock connectors dict
mock_agent.return_value = mock_agent_instance
# Execute raw SQL synchronously
sql_request = {
"tenant_id": "test_tenant",
"source_name": "sample_db",
"sql_query": "SELECT id, name, email FROM users LIMIT 10",
"async_mode": False,
"max_rows": 100
}
response = client.post("/api/v1/data-sources/execute-raw-sql", json=sql_request)
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert len(data["results"]) == 2
assert data["rows_returned"] == 2
assert "execution_time_ms" in data
@patch('backend.data_sources.api.get_redis_client')
@patch('backend.data_sources.api.get_minio_client')
@patch('backend.data_sources.api.get_tenant_config')
@patch('backend.data_sources.worker.process_raw_sql_job.apply_async')
def test_raw_sql_execution_async_success(self, mock_celery, mock_config, mock_minio, mock_redis):
"""Test successful asynchronous raw SQL execution."""
# Setup mocks
mock_redis.return_value = MagicMock()
mock_minio.return_value = MagicMock()
mock_config.return_value = [{
"source_name": "sample_db",
"source_type": "ibis",
"config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"}
}]
# Execute raw SQL asynchronously
sql_request = {
"tenant_id": "test_tenant",
"source_name": "sample_db",
"sql_query": "SELECT * FROM large_table",
"async_mode": True
}
response = client.post("/api/v1/data-sources/execute-raw-sql", json=sql_request)
assert response.status_code == 200
data = response.json()
assert data["status"] == "accepted"
assert "job_id" in data
assert data["tenant_id"] == "test_tenant"
# Verify async task was queued
mock_celery.assert_called_once()
def test_raw_sql_validation_errors(self):
"""Test raw SQL execution with validation errors."""
# Test missing source_name
response = client.post("/api/v1/data-sources/execute-raw-sql", json={
"sql_query": "SELECT * FROM users"
})
assert response.status_code == 422
# Test empty SQL query
response = client.post("/api/v1/data-sources/execute-raw-sql", json={
"source_name": "sample_db",
"sql_query": ""
})
assert response.status_code == 422
# ============================================================================
# Schema Search Tests
# ============================================================================
class TestSchemaSearch:
"""Integration tests for schema search functionality."""
@patch('backend.data_sources.schema_search.format_search_results')
@patch('backend.data_sources.schema_search.rank_tables_for_keywords')
@patch('backend.data_sources.schema_search.build_relationship_graph')
@patch('backend.data_sources.schema_search.build_keyword_index')
@patch('backend.data_sources.schema_search.flatten_schema')
@patch('backend.data_sources.connectors.ibis_connector.IbisConnector')
@patch('backend.data_sources.api.get_redis_client')
@patch('backend.data_sources.api.get_minio_client')
@patch('backend.data_sources.api.get_tenant_config')
@patch('backend.data_sources.api.FederationAgent')
def test_schema_search_basic(self, mock_agent, mock_config, mock_minio, mock_redis,
mock_ibis_connector, mock_flatten, mock_build_index,
mock_build_graph, mock_rank_tables, mock_format_results):
"""Test basic schema search functionality."""
# Setup mocks
mock_redis_instance = MagicMock()
mock_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None # No cache
mock_config.return_value = [{
"source_name": "sample_db",
"source_type": "ibis",
"config": {"uri": "mysql+pymysql://root:test@localhost:3306/test_db"}
}]
# Mock schema search components
mock_flatten.return_value = {"sample_db": {"orders": ["id", "customer_id", "amount"]}}
mock_build_index.return_value = {"customer": ["orders.customer_id"]}
mock_build_graph.return_value = {}
mock_rank_tables.return_value = [
{"table_name": "orders", "score": 0.8, "matched_columns": ["customer_id"], "source_name": "sample_db"}
]
mock_format_results.return_value = {
"table_matches": [{"table_name": "orders", "score": 0.8, "matched_columns": ["customer_id"], "source_name": "sample_db"}],
"formatted_string": "orders table with customer_id column"
}
# Perform schema search
search_request = {
"keywords": ["customer"]
}
response = client.post("/api/v1/data-sources/schema/search", json=search_request)
assert response.status_code == 200
data = response.json()
assert "available_sources" in data
assert "matches" in data
assert "formatted_schema_string" in data
assert data["total_matches"] >= 0
def test_schema_search_validation_errors(self):
"""Test schema search with validation errors."""
# Test missing keywords
response = client.post("/api/v1/data-sources/schema/search", json={})
assert response.status_code == 422
# Test empty keywords
response = client.post("/api/v1/data-sources/schema/search", json={
"keywords": []
})
assert response.status_code == 422
# ============================================================================
# Error Handling and Edge Cases
# ============================================================================
class TestErrorHandling:
"""Tests for error handling and edge cases."""
def test_404_not_found(self):
"""Test that non-existent routes return 404."""
response = client.get("/api/v1/data-sources/nonexistent")
assert response.status_code == 404
def test_method_not_allowed(self):
"""Test that incorrect HTTP methods are rejected."""
response = client.get("/api/v1/data-sources/jobs")
assert response.status_code == 405
def test_invalid_json(self):
"""Test that invalid JSON is rejected."""
response = client.post(
"/api/v1/data-sources/jobs",
data="invalid json {",
headers={"Content-Type": "application/json"}
)
assert response.status_code == 422
@patch('backend.data_sources.api.get_redis_client')
@patch('backend.data_sources.api.get_minio_client')
@patch('backend.data_sources.api.get_tenant_config')
def test_tenant_isolation(self, mock_config, mock_minio, mock_redis):
"""Test that tenants are properly isolated."""
mock_redis.return_value = MagicMock()
mock_minio.return_value = MagicMock()
# Mock config to return no sources for different tenant
mock_config.return_value = []
# Try to access schema for a source that doesn't belong to test_tenant
response = client.get("/api/v1/data-sources/schema/sample_db?tenant_id=other_tenant")
# Should fail because other_tenant has no sources configured
assert response.status_code == 404
# ============================================================================
# Documentation and API Discovery
# ============================================================================
class TestDocumentation:
"""Tests for API documentation and discovery."""
def test_openapi_schema(self):
"""Test that OpenAPI schema is available."""
response = client.get("/openapi.json")
assert response.status_code == 200
data = response.json()
assert "openapi" in data or "swagger" in data
assert "paths" in data
def test_docs_endpoint(self):
"""Test that /docs endpoint is available."""
response = client.get("/docs")
# Swagger UI page
assert response.status_code == 200
assert "text/html" in response.headers.get("content-type", "")
def test_redoc_endpoint(self):
"""Test that /redoc endpoint is available."""
response = client.get("/redoc")
# ReDoc page
assert response.status_code == 200
assert "text/html" in response.headers.get("content-type", "")
# ============================================================================
# CORS Tests
# ============================================================================
class TestCORS:
"""Tests for CORS configuration."""
def test_cors_headers_on_api_endpoints(self):
"""Test that CORS headers are present on API endpoints."""
response = client.options(
"/api/v1/data-sources/jobs",
headers={
"Origin": "http://localhost:3000",
"Access-Control-Request-Method": "POST",
"Access-Control-Request-Headers": "Content-Type,Authorization"
}
)
# CORS preflight should be handled
assert response.status_code in [200, 204]
def test_cors_headers_on_health_endpoint(self):
"""Test CORS headers on health endpoint."""
response = client.get("/health")
assert response.status_code == 200
# Note: TestClient may not include CORS headers in response
if __name__ == "__main__":
pytest.main([__file__, "-v", "--tb=short"])