Spaces:
Running
Running
| """ | |
| Comprehensive End-to-End API Test Suite for Data Sources API | |
| ============================================================== | |
| This test file covers ALL API endpoints from tenant registration through SQL execution. | |
| It uses only the `requests` library to perform real HTTP calls against the running API. | |
| TEST COVERAGE: | |
| -------------- | |
| 1. Tenant Management (API Keys) | |
| - Create API keys | |
| - List API keys | |
| - Revoke API keys | |
| 2. Data Source Configuration | |
| - Add tenant data sources (POST) | |
| - List tenant data sources (GET) | |
| - Update data sources (PUT) | |
| - Delete data sources (DELETE) | |
| 3. Schema Discovery | |
| - List available sources | |
| - Get source schema | |
| - Search schema with keywords | |
| 4. SQL Execution | |
| - Synchronous SQL execution | |
| - Asynchronous job submission | |
| - Job result retrieval | |
| - Paginated results | |
| 5. Job Management | |
| - Job cancellation | |
| - Job deletion | |
| - Dead Letter Queue (DLQ) operations | |
| 6. Metrics & Monitoring | |
| - Get metrics summary | |
| - Cleanup old metrics | |
| 7. Plan Caching (Natural Language) | |
| - Create NL job with caching | |
| - Cache statistics | |
| PREREQUISITES: | |
| -------------- | |
| 1. API server must be running: `python -m uvicorn backend.data_sources.main:app --reload` | |
| 2. Redis must be running: `redis-server` | |
| 3. Set SIRUS_ADMIN_API_KEY environment variable for admin operations | |
| USAGE: | |
| ------ | |
| # Run all tests | |
| python test_api_e2e.py | |
| # Run with custom configuration | |
| python test_api_e2e.py --api-url http://localhost:8000 --admin-key YOUR_KEY --verbose | |
| # Skip cleanup (useful for debugging) | |
| python test_api_e2e.py --no-cleanup | |
| # Run specific test category | |
| python test_api_e2e.py --category tenant-mgmt | |
| """ | |
| import sys | |
| import argparse | |
| import json | |
| import time | |
| import logging | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from datetime import datetime, timedelta | |
| from dataclasses import dataclass, field | |
| import requests | |
| # ============================================================================ | |
| # CONFIGURATION & SETUP | |
| # ============================================================================ | |
| class Colors: | |
| """ANSI color codes for terminal output.""" | |
| HEADER = '\033[95m' | |
| BLUE = '\033[94m' | |
| CYAN = '\033[96m' | |
| GREEN = '\033[92m' | |
| YELLOW = '\033[93m' | |
| RED = '\033[91m' | |
| ENDC = '\033[0m' | |
| BOLD = '\033[1m' | |
| UNDERLINE = '\033[4m' | |
| class TestResult: | |
| """Represents the result of a single test.""" | |
| name: str | |
| passed: bool | |
| duration: float | |
| message: str = "" | |
| error: Optional[str] = None | |
| response_data: Optional[Dict] = None | |
| class TestContext: | |
| """Shared context across all tests.""" | |
| # Configuration | |
| api_url: str | |
| admin_api_key: str | |
| verbose: bool | |
| # Test data | |
| tenant_id: str = "test_e2e_tenant" | |
| api_key: Optional[str] = None | |
| api_key_id: Optional[str] = None | |
| # Created resources (for cleanup) | |
| created_api_keys: List[str] = field(default_factory=list) | |
| created_sources: List[str] = field(default_factory=list) | |
| created_jobs: List[str] = field(default_factory=list) | |
| # Test results | |
| results: List[TestResult] = field(default_factory=list) | |
| category_stats: Dict[str, Dict] = field(default_factory=dict) | |
| class E2ETestRunner: | |
| """Main test runner for end-to-end API tests.""" | |
| def __init__(self, context: TestContext): | |
| self.ctx = context | |
| self.session = requests.Session() | |
| self.logger = self._setup_logger() | |
| # API Header constants | |
| self.ADMIN_HEADER = {"X-Sirus-Admin-Key": self.ctx.admin_api_key} | |
| self.TENANT_HEADER = {} # Will be set after API key creation | |
| def _setup_logger(self) -> logging.Logger: | |
| """Setup logging configuration.""" | |
| logger = logging.getLogger("E2ETestRunner") | |
| logger.setLevel(logging.DEBUG if self.ctx.verbose else logging.INFO) | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| return logger | |
| def _make_request( | |
| self, | |
| method: str, | |
| endpoint: str, | |
| headers: Optional[Dict] = None, | |
| json_data: Optional[Dict] = None, | |
| params: Optional[Dict] = None, | |
| expect_status: int = 200 | |
| ) -> Tuple[bool, Optional[Dict], Optional[str]]: | |
| """Make HTTP request and return (success, data, error).""" | |
| url = f"{self.ctx.api_url}{endpoint}" | |
| try: | |
| if self.ctx.verbose: | |
| self.logger.debug(f"{method} {url}") | |
| if json_data: | |
| self.logger.debug(f"Body: {json.dumps(json_data, indent=2)}") | |
| response = self.session.request( | |
| method=method, | |
| url=url, | |
| headers=headers, | |
| json=json_data, | |
| params=params, | |
| timeout=30 | |
| ) | |
| if self.ctx.verbose: | |
| self.logger.debug(f"Status: {response.status_code}") | |
| self.logger.debug(f"Response: {response.text[:500]}") | |
| # Check expected status | |
| if response.status_code != expect_status: | |
| return False, None, f"Expected {expect_status}, got {response.status_code}: {response.text}" | |
| # Try to parse JSON | |
| try: | |
| data = response.json() | |
| return True, data, None | |
| except: | |
| # Some endpoints might not return JSON | |
| if response.status_code == 204: | |
| return True, None, None | |
| return True, {"text": response.text}, None | |
| except requests.exceptions.Timeout: | |
| return False, None, "Request timeout" | |
| except requests.exceptions.ConnectionError: | |
| return False, None, f"Connection error - is the API running at {self.ctx.api_url}?" | |
| except Exception as e: | |
| return False, None, f"Request failed: {str(e)}" | |
| def _run_test(self, test_name: str, test_func) -> TestResult: | |
| """Run a single test and return result.""" | |
| print(f"\n{Colors.CYAN}▶ Running: {test_name}{Colors.ENDC}") | |
| start_time = time.time() | |
| try: | |
| test_func() | |
| duration = time.time() - start_time | |
| result = TestResult( | |
| name=test_name, | |
| passed=True, | |
| duration=duration, | |
| message="✓ PASSED" | |
| ) | |
| print(f"{Colors.GREEN} ✓ PASSED in {duration:.2f}s{Colors.ENDC}") | |
| except AssertionError as e: | |
| duration = time.time() - start_time | |
| result = TestResult( | |
| name=test_name, | |
| passed=False, | |
| duration=duration, | |
| message="✗ FAILED", | |
| error=str(e) | |
| ) | |
| print(f"{Colors.RED} ✗ FAILED in {duration:.2f}s: {e}{Colors.ENDC}") | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| result = TestResult( | |
| name=test_name, | |
| passed=False, | |
| duration=duration, | |
| message="✗ ERROR", | |
| error=f"Unexpected error: {str(e)}" | |
| ) | |
| print(f"{Colors.RED} ✗ ERROR in {duration:.2f}s: {e}{Colors.ENDC}") | |
| self.ctx.results.append(result) | |
| return result | |
| # ======================================================================== | |
| # CATEGORY 1: TENANT MANAGEMENT (API KEYS) | |
| # ======================================================================== | |
| def test_01_create_api_key(self): | |
| """Test creating a new API key for tenant.""" | |
| headers = self.ADMIN_HEADER | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint=f"/api/v1/data-sources/tenants/{self.ctx.tenant_id}/api-keys", | |
| headers=headers, | |
| json_data={ | |
| "description": "E2E Test API Key", | |
| "expires_in_days": 30 | |
| }, | |
| expect_status=201 | |
| ) | |
| assert success, f"Failed to create API key: {error}" | |
| assert data is not None, "No response data" | |
| assert "api_key" in data, "Missing api_key in response" | |
| assert "key_info" in data, "Missing key_info in response" | |
| assert "key_id" in data["key_info"], "Missing key_id in key_info" | |
| assert len(data["api_key"]) == 64, "API key should be 64 characters" | |
| # Store for later tests | |
| self.ctx.api_key = data["api_key"] | |
| self.ctx.api_key_id = data["key_info"]["key_id"] | |
| self.ctx.created_api_keys.append(data["key_info"]["key_id"]) | |
| # Set the tenant header now that we have the API key | |
| self.TENANT_HEADER = {"X-Sirus-Api-Key": self.ctx.api_key} | |
| self.logger.info(f"Created API key: {data['key_info']['key_id'][:8]}... (expires: {data['key_info'].get('expires_at')})") | |
| def test_02_list_api_keys(self): | |
| """Test listing all API keys for tenant.""" | |
| headers = self.ADMIN_HEADER | |
| success, data, error = self._make_request( | |
| method="GET", | |
| endpoint=f"/api/v1/data-sources/tenants/{self.ctx.tenant_id}/api-keys", | |
| headers=headers | |
| ) | |
| assert success, f"Failed to list API keys: {error}" | |
| assert data is not None, "No response data" | |
| assert "keys" in data, "Missing keys array in response" | |
| assert "total_count" in data, "Missing total_count in response" | |
| assert isinstance(data["keys"], list), "keys should be a list" | |
| # Verify our created key is in the list | |
| key_ids = [k["key_id"] for k in data["keys"]] | |
| assert self.ctx.api_key_id in key_ids, "Created key not found in list" | |
| self.logger.info(f"Listed {data['total_count']} API keys") | |
| def test_03_create_api_key_without_admin_auth(self): | |
| """Test that creating API key without admin auth fails.""" | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint=f"/api/v1/data-sources/tenants/{self.ctx.tenant_id}/api-keys", | |
| json_data={"description": "Unauthorized attempt"}, | |
| expect_status=401 | |
| ) | |
| assert success, "Should return 401 for unauthorized request" | |
| self.logger.info("Correctly rejected unauthorized API key creation") | |
| # ======================================================================== | |
| # CATEGORY 2: DATA SOURCE CONFIGURATION | |
| # ======================================================================== | |
| def test_04_add_data_source(self): | |
| """Test adding a data source to tenant configuration.""" | |
| headers = self.TENANT_HEADER | |
| source_config = { | |
| "sources": [ | |
| { | |
| "source_name": "test_duckdb", | |
| "source_type": "ibis", | |
| "config": { | |
| "uri": "duckdb:///:memory:", | |
| "table_fetch_example_limit": 3 | |
| } | |
| } | |
| ], | |
| "validate_connection": False # Skip validation for in-memory DuckDB | |
| } | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint=f"/api/v1/data-sources/tenants/{self.ctx.tenant_id}/sources", | |
| headers=headers, | |
| json_data=source_config, | |
| expect_status=201 | |
| ) | |
| assert success, f"Failed to add data source: {error}" | |
| assert data is not None, "No response data" | |
| assert data.get("persisted") is True, "Data source was not persisted" | |
| assert len(data.get("sources", [])) == 1, "Should have 1 source in response" | |
| self.ctx.created_sources.append("test_duckdb") | |
| self.logger.info(f"Added data source: test_duckdb") | |
| def test_05_list_data_sources(self): | |
| """Test listing all data sources for tenant.""" | |
| headers = self.TENANT_HEADER | |
| success, data, error = self._make_request( | |
| method="GET", | |
| endpoint=f"/api/v1/data-sources/tenants/{self.ctx.tenant_id}/sources", | |
| headers=headers | |
| ) | |
| assert success, f"Failed to list data sources: {error}" | |
| assert data is not None, "No response data" | |
| assert "sources" in data, "Missing sources array" | |
| assert len(data["sources"]) > 0, "Should have at least one source" | |
| # Verify our source is in the list | |
| source_names = [s["source_name"] for s in data["sources"]] | |
| assert "test_duckdb" in source_names, "test_duckdb not found in sources" | |
| self.logger.info(f"Listed {len(data['sources'])} data sources") | |
| def test_06_update_data_source(self): | |
| """Test updating an existing data source.""" | |
| headers = self.TENANT_HEADER | |
| updated_config = { | |
| "source_name": "test_duckdb", | |
| "source_type": "ibis", | |
| "config": { | |
| "uri": "duckdb:///:memory:", | |
| "table_fetch_example_limit": 5 # Changed from 3 to 5 | |
| } | |
| } | |
| # Add query parameter for validation | |
| success, data, error = self._make_request( | |
| method="PUT", | |
| endpoint=f"/api/v1/data-sources/tenants/{self.ctx.tenant_id}/sources/test_duckdb", | |
| headers=headers, | |
| json_data=updated_config, | |
| params={"validate_connection": False} # Skip validation | |
| ) | |
| assert success, f"Failed to update data source: {error}" | |
| assert data is not None, "No response data" | |
| assert data.get("persisted") is True, "Data source update was not persisted" | |
| assert len(data.get("sources", [])) == 1, "Should have 1 source in response" | |
| self.logger.info("Updated data source: test_duckdb") | |
| def test_07_add_second_data_source(self): | |
| """Test adding a second data source with real PostgreSQL.""" | |
| headers = self.TENANT_HEADER | |
| source_config = { | |
| "sources": [ | |
| { | |
| "source_name": "test_postgres", | |
| "source_type": "ibis", | |
| "config": { | |
| "uri": "postgresql://neondb_owner:npg_dfWNsn2ZGk7c@ep-cool-poetry-a1puamly-pooler.ap-southeast-1.aws.neon.tech:5432/scv-sample?sslmode=require", | |
| "table_fetch_example_limit": 3 | |
| } | |
| } | |
| ], | |
| "validate_connection": True # Validate PostgreSQL connection | |
| } | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint=f"/api/v1/data-sources/tenants/{self.ctx.tenant_id}/sources", | |
| headers=headers, | |
| json_data=source_config, | |
| expect_status=201 | |
| ) | |
| assert success, f"Failed to add PostgreSQL source: {error}" | |
| self.ctx.created_sources.append("test_postgres") | |
| self.logger.info("Added PostgreSQL data source: test_postgres") | |
| # ======================================================================== | |
| # CATEGORY 3: SCHEMA DISCOVERY | |
| # ======================================================================== | |
| def test_08_list_available_sources(self): | |
| """Test listing available data sources via /list endpoint.""" | |
| params = {"tenant_id": self.ctx.tenant_id} | |
| headers = self.TENANT_HEADER | |
| success, data, error = self._make_request( | |
| method="GET", | |
| endpoint="/api/v1/data-sources/list", | |
| headers=headers, | |
| params=params | |
| ) | |
| assert success, f"Failed to list sources: {error}" | |
| assert data is not None, "No response data" | |
| assert "available_sources" in data, "Missing available_sources array" | |
| assert len(data["available_sources"]) >= 2, "Should have at least 2 sources" | |
| source_names = data["available_sources"] | |
| assert "test_duckdb" in source_names, "test_duckdb not found" | |
| assert "test_postgres" in source_names, "test_postgres not found" | |
| self.logger.info(f"Listed {len(data['available_sources'])} available sources") | |
| def test_09_get_source_schema(self): | |
| """Test fetching schema for a specific source.""" | |
| params = {"tenant_id": self.ctx.tenant_id} | |
| headers = self.TENANT_HEADER | |
| success, data, error = self._make_request( | |
| method="GET", | |
| endpoint="/api/v1/data-sources/schema/test_postgres", | |
| headers=headers, | |
| params=params | |
| ) | |
| assert success, f"Failed to get schema: {error}" | |
| assert data is not None, "No response data" | |
| assert "schema" in data, "Missing schema in response" | |
| # Schema should be a JSON array | |
| schema = data["schema"] | |
| if isinstance(schema, str): | |
| schema = json.loads(schema) | |
| assert isinstance(schema, list), "Schema should be a list" | |
| assert len(schema) > 0, "Schema should not be empty" | |
| assert "tables" in schema[0], "Schema should contain tables" | |
| self.logger.info(f"Retrieved schema with {len(schema[0].get('tables', []))} tables") | |
| def test_10_search_schema_with_keywords(self): | |
| """Test searching schema with semantic keywords.""" | |
| headers = self.TENANT_HEADER | |
| search_request = { | |
| "tenant_id": self.ctx.tenant_id, | |
| "keywords": ["customer", "order", "product"], | |
| "max_tables_per_source": 10 | |
| } | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint="/api/v1/data-sources/schema/search", | |
| headers=headers, | |
| json_data=search_request | |
| ) | |
| assert success, f"Failed to search schema: {error}" | |
| assert data is not None, "No response data" | |
| assert "matches" in data, "Missing matches" | |
| assert isinstance(data["matches"], list), "matches should be a list" | |
| self.logger.info(f"Found {len(data['matches'])} matching tables") | |
| # ======================================================================== | |
| # CATEGORY 4: SQL EXECUTION (SYNCHRONOUS) | |
| # ======================================================================== | |
| def test_11_execute_sql_sync_simple_query(self): | |
| """Test synchronous SQL execution with simple query.""" | |
| headers = self.TENANT_HEADER | |
| sql_request = { | |
| "tenant_id": self.ctx.tenant_id, | |
| "source_name": "test_postgres", | |
| "sql_query": "SELECT 1 as test_value, 'hello' as test_string", | |
| "async_mode": False | |
| } | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint="/api/v1/data-sources/execute-raw-sql", | |
| headers=headers, | |
| json_data=sql_request | |
| ) | |
| assert success, f"Failed to execute SQL: {error}" | |
| assert data is not None, "No response data" | |
| assert "results" in data, "Missing results in response" | |
| assert len(data["results"]) > 0, "Result should not be empty" | |
| assert data["results"][0]["test_value"] == 1, "test_value should be 1" | |
| self.logger.info("Successfully executed simple SQL query") | |
| def test_12_execute_sql_sync_table_query(self): | |
| """Test synchronous SQL execution querying real table.""" | |
| headers = self.TENANT_HEADER | |
| # First, get the schema to find an existing table | |
| success, schema_data, error = self._make_request( | |
| method="GET", | |
| endpoint="/api/v1/data-sources/schema/test_postgres", | |
| headers=headers, | |
| params={"tenant_id": self.ctx.tenant_id} | |
| ) | |
| # Extract table name from schema | |
| table_name = None | |
| if success and schema_data and "schema" in schema_data: | |
| schema = schema_data["schema"] | |
| if isinstance(schema, str): | |
| schema = json.loads(schema) | |
| if isinstance(schema, list) and len(schema) > 0 and "tables" in schema[0]: | |
| tables = schema[0]["tables"] | |
| if tables: | |
| table_name = tables[0].get("name") or tables[0].get("table_name") | |
| # If we couldn't get a table name, use a simple query instead | |
| if not table_name: | |
| self.logger.warning("Could not determine table name from schema, using simple query") | |
| sql_query = "SELECT 1 as id, 'test' as name" | |
| else: | |
| sql_query = f"SELECT * FROM {table_name} LIMIT 5" | |
| sql_request = { | |
| "tenant_id": self.ctx.tenant_id, | |
| "source_name": "test_postgres", | |
| "sql_query": sql_query, | |
| "async_mode": False | |
| } | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint="/api/v1/data-sources/execute-raw-sql", | |
| headers=headers, | |
| json_data=sql_request | |
| ) | |
| assert success, f"Failed to execute table query: {error}" | |
| assert data is not None, "No response data" | |
| assert "results" in data, "Missing results" | |
| assert isinstance(data["results"], list), "Results should be a list" | |
| if len(data["results"]) > 0: | |
| self.logger.info(f"Retrieved {len(data['results'])} rows from {table_name or 'query'}") | |
| else: | |
| self.logger.info(f"Query returned no rows (table {table_name or 'query'} might be empty)") | |
| def test_13_execute_sql_sync_invalid_sql(self): | |
| """Test that invalid SQL returns proper error.""" | |
| headers = self.TENANT_HEADER | |
| sql_request = { | |
| "tenant_id": self.ctx.tenant_id, | |
| "source_name": "test_postgres", | |
| "sql_query": "SELECT * FROM nonexistent_table_12345", | |
| "async_mode": False | |
| } | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint="/api/v1/data-sources/execute-raw-sql", | |
| headers=headers, | |
| json_data=sql_request, | |
| expect_status=400 # Should return error (bad request for invalid SQL) | |
| ) | |
| assert success, "Should return 400 for invalid SQL" | |
| self.logger.info("Correctly handled invalid SQL query") | |
| def test_14_execute_sql_blocked_dangerous_query(self): | |
| """Test that dangerous SQL operations are blocked.""" | |
| headers = self.TENANT_HEADER | |
| # Try DROP statement | |
| sql_request = { | |
| "tenant_id": self.ctx.tenant_id, | |
| "source_name": "test_postgres", | |
| "sql_query": "DROP TABLE customers", | |
| "async_mode": False | |
| } | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint="/api/v1/data-sources/execute-raw-sql", | |
| headers=headers, | |
| json_data=sql_request, | |
| expect_status=400 # Should be blocked | |
| ) | |
| assert success, "Should return 400 for dangerous SQL" | |
| self.logger.info("Correctly blocked DROP statement") | |
| # ======================================================================== | |
| # CATEGORY 5: SQL EXECUTION (ASYNCHRONOUS) | |
| # ======================================================================== | |
| def test_15_execute_sql_async_job_submission(self): | |
| """Test asynchronous SQL execution via job submission.""" | |
| headers = self.TENANT_HEADER | |
| # Use a simple query that will always work | |
| sql_request = { | |
| "tenant_id": self.ctx.tenant_id, | |
| "source_name": "test_postgres", | |
| "sql_query": "SELECT 1 as id, 'async_test' as test_type, NOW() as timestamp LIMIT 10", | |
| "async_mode": True | |
| } | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint="/api/v1/data-sources/execute-raw-sql", | |
| headers=headers, | |
| json_data=sql_request | |
| ) | |
| assert success, f"Failed to submit async job: {error}" | |
| assert data is not None, "No response data" | |
| assert "job_id" in data, "Missing job_id" | |
| assert data.get("status") in ["accepted", "pending", "running", "completed"], "Invalid job status" | |
| # Store job ID for later tests | |
| job_id = data["job_id"] | |
| self.ctx.created_jobs.append(job_id) | |
| self.logger.info(f"Submitted async job: {job_id}") | |
| # Wait for job to complete | |
| time.sleep(2) | |
| def test_16_get_job_results(self): | |
| """Test retrieving job results.""" | |
| if not self.ctx.created_jobs: | |
| raise AssertionError("No jobs created in previous tests") | |
| job_id = self.ctx.created_jobs[0] | |
| headers = self.TENANT_HEADER | |
| params = {"tenant_id": self.ctx.tenant_id} | |
| success, data, error = self._make_request( | |
| method="GET", | |
| endpoint=f"/api/v1/data-sources/results/{job_id}", | |
| headers=headers, | |
| params=params | |
| ) | |
| assert success, f"Failed to get job results: {error}" | |
| assert data is not None, "No response data" | |
| assert "status" in data, "Missing status" | |
| if data["status"] == "completed": | |
| assert "results" in data, "Missing results for completed job" | |
| self.logger.info(f"Job {job_id} completed with {len(data.get('results', []))} rows") | |
| else: | |
| self.logger.warning(f"Job {job_id} status: {data['status']}") | |
| def test_17_get_paginated_results(self): | |
| """Test retrieving paginated job results.""" | |
| if not self.ctx.created_jobs: | |
| raise AssertionError("No jobs created") | |
| job_id = self.ctx.created_jobs[0] | |
| headers = self.TENANT_HEADER | |
| params = { | |
| "tenant_id": self.ctx.tenant_id, | |
| "page": 1, | |
| "page_size": 5 | |
| } | |
| success, data, error = self._make_request( | |
| method="GET", | |
| endpoint=f"/api/v1/data-sources/results/{job_id}/paginated", | |
| headers=headers, | |
| params=params | |
| ) | |
| assert success, f"Failed to get paginated results: {error}" | |
| assert data is not None, "No response data" | |
| if data.get("status") == "completed": | |
| assert "results" in data, "Missing results" | |
| assert "pagination" in data, "Missing pagination info" | |
| pagination = data["pagination"] | |
| assert "page" in pagination, "Missing page number" | |
| assert "page_size" in pagination, "Missing page_size" | |
| self.logger.info(f"Retrieved page {pagination['page']} with {len(data['results'])} rows") | |
| # ======================================================================== | |
| # CATEGORY 6: JOB MANAGEMENT | |
| # ======================================================================== | |
| def test_18_submit_job_via_jobs_endpoint(self): | |
| """Test submitting job via dedicated /jobs endpoint.""" | |
| headers = self.TENANT_HEADER | |
| # Get an existing table name from schema | |
| table_name = "action_items" # Default to known table | |
| # Try to get schema to find a valid table | |
| success_schema, schema_data, _ = self._make_request( | |
| method="GET", | |
| endpoint="/api/v1/data-sources/schema/test_postgres", | |
| headers=headers, | |
| params={"tenant_id": self.ctx.tenant_id} | |
| ) | |
| if success_schema and schema_data and "schema" in schema_data: | |
| schema = schema_data["schema"] | |
| if isinstance(schema, str): | |
| schema = json.loads(schema) | |
| if isinstance(schema, list) and len(schema) > 0 and "tables" in schema[0]: | |
| tables = schema[0]["tables"] | |
| if tables: | |
| table_name = tables[0].get("name") or tables[0].get("table_name") or "action_items" | |
| # Use proper AST query structure with table operation | |
| job_request = { | |
| "payload": { | |
| "tenant_id": self.ctx.tenant_id, | |
| "plan": [ | |
| { | |
| "source": "test_postgres", | |
| "query": { | |
| "operation": "table", | |
| "name": table_name | |
| } | |
| } | |
| ], | |
| "priority": 5, | |
| "timeout_seconds": 300 | |
| } | |
| } | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint="/api/v1/data-sources/jobs", | |
| headers=headers, | |
| json_data=job_request, | |
| expect_status=201 | |
| ) | |
| assert success, f"Failed to submit job: {error}" | |
| assert data is not None, "No response data" | |
| assert "job_id" in data, "Missing job_id" | |
| job_id = data["job_id"] | |
| self.ctx.created_jobs.append(job_id) | |
| self.logger.info(f"Submitted job via /jobs: {job_id} (table: {table_name})") | |
| time.sleep(1) | |
| def test_19_cancel_job(self): | |
| """Test cancelling a running/pending job.""" | |
| if len(self.ctx.created_jobs) < 2: | |
| self.logger.warning("Not enough jobs to test cancellation") | |
| return | |
| job_id = self.ctx.created_jobs[-1] | |
| headers = self.TENANT_HEADER | |
| params = {"tenant_id": self.ctx.tenant_id} | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint=f"/api/v1/data-sources/jobs/{job_id}/cancel", | |
| headers=headers, | |
| params=params | |
| ) | |
| # Job might already be completed, which is fine | |
| if success: | |
| self.logger.info(f"Cancelled job: {job_id}") | |
| else: | |
| self.logger.info(f"Job {job_id} might already be completed") | |
| def test_20_delete_job(self): | |
| """Test deleting a job.""" | |
| if not self.ctx.created_jobs: | |
| raise AssertionError("No jobs to delete") | |
| job_id = self.ctx.created_jobs[-1] | |
| headers = self.TENANT_HEADER | |
| params = {"tenant_id": self.ctx.tenant_id} | |
| success, data, error = self._make_request( | |
| method="DELETE", | |
| endpoint=f"/api/v1/data-sources/jobs/{job_id}", | |
| headers=headers, | |
| params=params | |
| ) | |
| # Job deletion might fail if job doesn't exist or is in DLQ | |
| if success: | |
| self.logger.info(f"Deleted job: {job_id}") | |
| self.ctx.created_jobs.remove(job_id) | |
| else: | |
| self.logger.warning(f"Failed to delete job {job_id}: {error}") | |
| # ======================================================================== | |
| # CATEGORY 7: DEAD LETTER QUEUE (DLQ) | |
| # ======================================================================== | |
| def test_21_list_dlq_jobs(self): | |
| """Test listing jobs in Dead Letter Queue.""" | |
| params = {"tenant_id": self.ctx.tenant_id} | |
| headers = self.TENANT_HEADER | |
| success, data, error = self._make_request( | |
| method="GET", | |
| endpoint="/api/v1/data-sources/dlq", | |
| headers=headers, | |
| params=params | |
| ) | |
| assert success, f"Failed to list DLQ jobs: {error}" | |
| assert data is not None, "No response data" | |
| assert "jobs" in data, "Missing jobs array" | |
| self.logger.info(f"DLQ contains {len(data['jobs'])} jobs") | |
| def test_22_get_dlq_job_details(self): | |
| """Test getting details of a DLQ job.""" | |
| # First get list of DLQ jobs | |
| params = {"tenant_id": self.ctx.tenant_id} | |
| headers = self.TENANT_HEADER | |
| success, data, error = self._make_request( | |
| method="GET", | |
| endpoint="/api/v1/data-sources/dlq", | |
| headers=headers, | |
| params=params | |
| ) | |
| if not success or not data.get("jobs"): | |
| self.logger.info("No DLQ jobs to test - skipping") | |
| return | |
| job_id = data["jobs"][0]["job_id"] | |
| success, data, error = self._make_request( | |
| method="GET", | |
| endpoint=f"/api/v1/data-sources/dlq/{job_id}", | |
| headers=headers, | |
| params=params | |
| ) | |
| assert success, f"Failed to get DLQ job details: {error}" | |
| assert data is not None, "No response data" | |
| assert "job_id" in data, "Missing job_id" | |
| self.logger.info(f"Retrieved DLQ job details: {job_id}") | |
| # ======================================================================== | |
| # CATEGORY 8: METRICS & MONITORING | |
| # ======================================================================== | |
| def test_23_get_metrics_summary(self): | |
| """Test retrieving metrics summary.""" | |
| success, data, error = self._make_request( | |
| method="GET", | |
| endpoint="/api/v1/data-sources/metrics" | |
| ) | |
| assert success, f"Failed to get metrics: {error}" | |
| assert data is not None, "No response data" | |
| assert "jobs" in data, "Missing jobs metrics" | |
| assert "connections" in data, "Missing connections metrics" | |
| assert "api_keys" in data, "Missing api_keys metrics" | |
| jobs_metrics = data["jobs"] | |
| assert "total" in jobs_metrics, "Missing total in jobs metrics" | |
| assert "completed" in jobs_metrics, "Missing completed count" | |
| self.logger.info(f"Metrics - Total jobs: {jobs_metrics.get('total', 0)}") | |
| self.logger.info(f"Metrics - API keys created: {data['api_keys'].get('keys_created', 0)}") | |
| def test_24_cleanup_old_metrics(self): | |
| """Test cleaning up old metrics data.""" | |
| params = {"days_to_keep": 7} | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint="/api/v1/data-sources/metrics/cleanup", | |
| params=params | |
| ) | |
| assert success, f"Failed to cleanup metrics: {error}" | |
| assert data is not None, "No response data" | |
| assert "status" in data, "Missing status" | |
| assert data["status"] == "success", "Cleanup should succeed" | |
| self.logger.info(f"Cleaned up {data.get('deleted_records', 0)} old metric records") | |
| # ======================================================================== | |
| # CATEGORY 9: PLAN CACHING (NATURAL LANGUAGE) | |
| # ======================================================================== | |
| def test_25_create_natural_language_job(self): | |
| """Test creating job from natural language query with plan caching.""" | |
| headers = self.TENANT_HEADER | |
| nl_request = { | |
| "payload": { | |
| "tenant_id": self.ctx.tenant_id, | |
| "user_query": "Show me a list of test data with numbers from 1 to 5", | |
| "use_cache": True, | |
| "priority": 5, | |
| "timeout_seconds": 300 | |
| } | |
| } | |
| success, data, error = self._make_request( | |
| method="POST", | |
| endpoint="/api/v1/data-sources/jobs/natural-language", | |
| headers=headers, | |
| json_data=nl_request, | |
| expect_status=201 | |
| ) | |
| assert success, f"Failed to create NL job: {error}" | |
| assert data is not None, "No response data" | |
| assert "job_id" in data, "Missing job_id" | |
| self.ctx.created_jobs.append(data["job_id"]) | |
| cache_status = "cache_info" in data or "cache_status" in data | |
| self.logger.info(f"Created NL job: {data['job_id']} (has cache info: {cache_status})") | |
| # ======================================================================== | |
| # CLEANUP OPERATIONS | |
| # ======================================================================== | |
| def test_99_delete_data_source(self): | |
| """Test deleting a data source.""" | |
| if not self.ctx.created_sources: | |
| return | |
| source_name = self.ctx.created_sources[0] | |
| headers = self.TENANT_HEADER | |
| success, data, error = self._make_request( | |
| method="DELETE", | |
| endpoint=f"/api/v1/data-sources/tenants/{self.ctx.tenant_id}/sources/{source_name}", | |
| headers=headers | |
| ) | |
| if success: | |
| self.logger.info(f"Deleted data source: {source_name}") | |
| self.ctx.created_sources.remove(source_name) | |
| else: | |
| self.logger.warning(f"Failed to delete source {source_name}: {error}") | |
| def test_100_revoke_api_key(self): | |
| """Test revoking the API key.""" | |
| if not self.ctx.api_key_id: | |
| return | |
| headers = self.ADMIN_HEADER | |
| success, data, error = self._make_request( | |
| method="DELETE", | |
| endpoint=f"/api/v1/data-sources/tenants/{self.ctx.tenant_id}/api-keys/{self.ctx.api_key_id}", | |
| headers=headers | |
| ) | |
| if success: | |
| self.logger.info(f"Revoked API key: {self.ctx.api_key_id}") | |
| self.ctx.created_api_keys.remove(self.ctx.api_key_id) | |
| else: | |
| self.logger.warning(f"Failed to revoke API key: {error}") | |
| # ======================================================================== | |
| # TEST RUNNER | |
| # ======================================================================== | |
| def run_all_tests(self): | |
| """Run all test cases in order.""" | |
| print(f"\n{Colors.BOLD}{Colors.HEADER}{'='*70}{Colors.ENDC}") | |
| print(f"{Colors.BOLD}{Colors.HEADER} Data Sources API - End-to-End Test Suite{Colors.ENDC}") | |
| print(f"{Colors.BOLD}{Colors.HEADER}{'='*70}{Colors.ENDC}\n") | |
| print(f"{Colors.CYAN}API URL: {self.ctx.api_url}{Colors.ENDC}") | |
| print(f"{Colors.CYAN}Tenant ID: {self.ctx.tenant_id}{Colors.ENDC}\n") | |
| # Collect all test methods | |
| test_methods = [ | |
| (name, getattr(self, name)) | |
| for name in dir(self) | |
| if name.startswith('test_') and callable(getattr(self, name)) | |
| ] | |
| # Sort by test number | |
| test_methods.sort() | |
| # Group tests by category | |
| categories = { | |
| "Tenant Management (API Keys)": [m for m in test_methods if m[0].startswith('test_0') and int(m[0].split('_')[1]) <= 3], | |
| "Data Source Configuration": [m for m in test_methods if 4 <= int(m[0].split('_')[1]) <= 7], | |
| "Schema Discovery": [m for m in test_methods if 8 <= int(m[0].split('_')[1]) <= 10], | |
| "SQL Execution (Sync)": [m for m in test_methods if 11 <= int(m[0].split('_')[1]) <= 14], | |
| "SQL Execution (Async)": [m for m in test_methods if 15 <= int(m[0].split('_')[1]) <= 17], | |
| "Job Management": [m for m in test_methods if 18 <= int(m[0].split('_')[1]) <= 20], | |
| "Dead Letter Queue": [m for m in test_methods if 21 <= int(m[0].split('_')[1]) <= 22], | |
| "Metrics & Monitoring": [m for m in test_methods if 23 <= int(m[0].split('_')[1]) <= 24], | |
| "Plan Caching": [m for m in test_methods if m[0].startswith('test_25')], | |
| "Cleanup": [m for m in test_methods if int(m[0].split('_')[1]) >= 99], | |
| } | |
| # Run tests by category | |
| for category_name, tests in categories.items(): | |
| if not tests: | |
| continue | |
| print(f"\n{Colors.BOLD}{Colors.BLUE}{'─'*70}{Colors.ENDC}") | |
| print(f"{Colors.BOLD}{Colors.BLUE} {category_name}{Colors.ENDC}") | |
| print(f"{Colors.BOLD}{Colors.BLUE}{'─'*70}{Colors.ENDC}") | |
| category_results = [] | |
| for test_name, test_func in tests: | |
| result = self._run_test(test_name, test_func) | |
| category_results.append(result) | |
| # Store category stats | |
| passed = sum(1 for r in category_results if r.passed) | |
| failed = sum(1 for r in category_results if not r.passed) | |
| total_duration = sum(r.duration for r in category_results) | |
| self.ctx.category_stats[category_name] = { | |
| "passed": passed, | |
| "failed": failed, | |
| "total": len(category_results), | |
| "duration": total_duration | |
| } | |
| self._print_summary() | |
| def _print_summary(self): | |
| """Print test execution summary.""" | |
| print(f"\n\n{Colors.BOLD}{Colors.HEADER}{'='*70}{Colors.ENDC}") | |
| print(f"{Colors.BOLD}{Colors.HEADER} Test Execution Summary{Colors.ENDC}") | |
| print(f"{Colors.BOLD}{Colors.HEADER}{'='*70}{Colors.ENDC}\n") | |
| total_passed = sum(1 for r in self.ctx.results if r.passed) | |
| total_failed = sum(1 for r in self.ctx.results if not r.passed) | |
| total_tests = len(self.ctx.results) | |
| total_duration = sum(r.duration for r in self.ctx.results) | |
| print(f"{Colors.BOLD}Overall Results:{Colors.ENDC}") | |
| print(f" Total Tests: {total_tests}") | |
| print(f" {Colors.GREEN}✓ Passed: {total_passed}{Colors.ENDC}") | |
| print(f" {Colors.RED}✗ Failed: {total_failed}{Colors.ENDC}") | |
| print(f" Total Duration: {total_duration:.2f}s") | |
| print(f" Success Rate: {(total_passed/total_tests*100):.1f}%\n") | |
| print(f"{Colors.BOLD}Category Breakdown:{Colors.ENDC}") | |
| for category, stats in self.ctx.category_stats.items(): | |
| status_color = Colors.GREEN if stats["failed"] == 0 else Colors.YELLOW | |
| print(f" {status_color}{category}:{Colors.ENDC}") | |
| print(f" {stats['passed']}/{stats['total']} passed ({stats['duration']:.2f}s)") | |
| # Show failed tests | |
| failed_tests = [r for r in self.ctx.results if not r.passed] | |
| if failed_tests: | |
| print(f"\n{Colors.BOLD}{Colors.RED}Failed Tests:{Colors.ENDC}") | |
| for result in failed_tests: | |
| print(f" {Colors.RED}✗ {result.name}{Colors.ENDC}") | |
| if result.error: | |
| print(f" Error: {result.error}") | |
| # Show created resources | |
| print(f"\n{Colors.BOLD}Created Resources:{Colors.ENDC}") | |
| print(f" API Keys: {len(self.ctx.created_api_keys)}") | |
| print(f" Data Sources: {len(self.ctx.created_sources)}") | |
| print(f" Jobs: {len(self.ctx.created_jobs)}") | |
| print(f"\n{Colors.BOLD}{Colors.HEADER}{'='*70}{Colors.ENDC}\n") | |
| # Exit with proper code | |
| if total_failed > 0: | |
| sys.exit(1) | |
| else: | |
| print(f"{Colors.GREEN}{Colors.BOLD}✓ All tests passed!{Colors.ENDC}\n") | |
| sys.exit(0) | |
| # ============================================================================ | |
| # MAIN ENTRY POINT | |
| # ============================================================================ | |
| def main(): | |
| """Main entry point for E2E test suite.""" | |
| parser = argparse.ArgumentParser( | |
| description="End-to-End API Test Suite for Data Sources API", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Run all tests with default settings | |
| python test_api_e2e.py | |
| # Run with custom API URL and admin key | |
| python test_api_e2e.py --api-url http://localhost:8000 --admin-key YOUR_KEY | |
| # Run with verbose output | |
| python test_api_e2e.py --verbose | |
| # Skip cleanup (useful for debugging) | |
| python test_api_e2e.py --no-cleanup | |
| """ | |
| ) | |
| parser.add_argument( | |
| "--api-url", | |
| default="http://localhost:8000", | |
| help="Base URL of the API server (default: http://localhost:8000)" | |
| ) | |
| parser.add_argument( | |
| "--admin-key", | |
| default="phobosq1", | |
| help="Admin API key for authentication (default: phobosq1)" | |
| ) | |
| parser.add_argument( | |
| "--tenant-id", | |
| default="test_e2e_tenant", | |
| help="Tenant ID to use for testing (default: test_e2e_tenant)" | |
| ) | |
| parser.add_argument( | |
| "--verbose", | |
| action="store_true", | |
| help="Enable verbose output with detailed request/response logging" | |
| ) | |
| parser.add_argument( | |
| "--no-cleanup", | |
| action="store_true", | |
| help="Skip cleanup operations (keep created resources)" | |
| ) | |
| args = parser.parse_args() | |
| # Create test context | |
| context = TestContext( | |
| api_url=args.api_url, | |
| admin_api_key=args.admin_key, | |
| verbose=args.verbose, | |
| tenant_id=args.tenant_id | |
| ) | |
| # Create and run test suite | |
| runner = E2ETestRunner(context) | |
| try: | |
| runner.run_all_tests() | |
| except KeyboardInterrupt: | |
| print(f"\n\n{Colors.YELLOW}Test execution interrupted by user{Colors.ENDC}\n") | |
| sys.exit(130) | |
| except Exception as e: | |
| print(f"\n\n{Colors.RED}Fatal error: {e}{Colors.ENDC}\n") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |