Spaces:
Running
Running
| """FastAPI router for data federation API endpoints. | |
| This module provides REST API endpoints for job creation, result retrieval, | |
| and schema access for the data federation system with plan caching support. | |
| """ | |
| from typing import Any, Dict, List, Optional, Union | |
| import json | |
| import orjson as _orjson | |
| import logging | |
| import asyncio | |
| import time | |
| import sys | |
| import os | |
| from datetime import datetime, timedelta, timezone | |
| from fastapi import APIRouter, HTTPException, Depends, status, Query, Request | |
| from fastapi.responses import JSONResponse, ORJSONResponse | |
| from pydantic import BaseModel, Field | |
| import redis | |
| from minio import Minio | |
| # Ensure project root (parent of `backend`) is on sys.path when running modules directly | |
| project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) | |
| if project_root not in sys.path: | |
| sys.path.insert(0, project_root) | |
| # Import centralized storage configuration | |
| from backend.core.minio.config import get_minio_config, get_redis_config | |
| # Import JWT authentication dependencies | |
| from backend.core.auth import get_current_user, get_tenant_admin, AuthUser | |
| from backend.data_sources.jobs import ( | |
| JobRequest, JobResponse, JobResultResponse, PaginatedJobResultResponse, | |
| PaginationInfo, SchemaResponse, ErrorResponse, | |
| JobPayload, JobMetadata, JobStatus, generate_job_id, | |
| # New plan caching models | |
| NaturalLanguageJobRequest, NaturalLanguageJobPayload, JobResponseWithCache, | |
| PlanCacheResponse, CacheStatsResponse | |
| ) | |
| from backend.data_sources.federation_agent import FederationAgent | |
| try: | |
| from backend.data_sources.worker import Worker | |
| except ImportError: | |
| Worker = None # Celery not available (e.g., HF Spaces deployment) | |
| import backend.data_sources.tracing as tracing | |
| from backend.data_sources.tracing import traced_span, SpanType, add_trace_event, add_trace_metadata | |
| from backend.data_sources.plan_cache import init_plan_cache, get_plan_cache, check_plan_cache, cache_generated_plan | |
| class DLQJobResponse(BaseModel): | |
| """Response model for Dead Letter Queue job information.""" | |
| job_id: str = Field(..., description="Job identifier") | |
| tenant_id: str = Field(..., description="Tenant identifier") | |
| failed_at: datetime = Field(..., description="When the job was moved to DLQ") | |
| error_message: str = Field(..., description="Final error message") | |
| retry_count: int = Field(..., description="Number of retries attempted") | |
| payload: Dict[str, Any] = Field(..., description="Original job payload") | |
| class DLQListResponse(BaseModel): | |
| """Response model for DLQ job list.""" | |
| jobs: List[DLQJobResponse] = Field(..., description="List of DLQ jobs") | |
| total_count: int = Field(..., description="Total number of jobs in DLQ") | |
| page: int = Field(..., description="Current page number") | |
| page_size: int = Field(..., description="Number of jobs per page") | |
| class RawSQLRequest(BaseModel): | |
| """Request model for raw SQL execution.""" | |
| source_name: str = Field(..., description="Target data source name") | |
| sql_query: str = Field(..., min_length=1, description="Raw SQL query to execute") | |
| async_mode: bool = Field(False, description="If True, enqueue job and return job_id; if False, execute synchronously") | |
| max_rows: Optional[int] = Field(None, ge=1, le=100000, description="Maximum rows to return (server enforced)") | |
| timeout_seconds: Optional[int] = Field(None, ge=1, le=300, description="Query timeout in seconds (max 5 minutes for sync mode)") | |
| class RawSQLSyncResponse(BaseModel): | |
| """Response model for synchronous raw SQL execution.""" | |
| status: str = Field("success", description="Execution status") | |
| results: List[Dict[str, Any]] = Field(..., description="Query results") | |
| rows_returned: int = Field(..., description="Number of rows in results") | |
| rows_limited: bool = Field(False, description="True if results were truncated due to max_rows limit") | |
| execution_time_ms: float = Field(..., description="Query execution time in milliseconds") | |
| class RawSQLAsyncResponse(BaseModel): | |
| """Response model for asynchronous raw SQL execution.""" | |
| status: str = Field("accepted", description="Job accepted status") | |
| job_id: str = Field(..., description="Job identifier for tracking") | |
| tenant_id: str = Field(..., description="Tenant identifier") | |
| message: str = Field(..., description="Human-readable status message") | |
| class SchemaSearchRequest(BaseModel): | |
| """Request model for schema keyword search.""" | |
| keywords: List[str] = Field(..., min_items=1, description="Search keywords") | |
| source_names: Optional[List[str]] = Field(None, description="Filter by specific sources") | |
| include_samples: bool = Field(False, description="Include example rows in results") | |
| max_tables_per_source: int = Field(10, ge=1, le=50, description="Max tables per source") | |
| original_question: Optional[str] = Field(None, description="Original user question for analytics and logging") | |
| keyword_metadata: Optional[Dict[str, List[str]]] = Field(None, description="Breakdown of keyword sources: base, semantic, concepts") | |
| class SchemaMatch(BaseModel): | |
| """Model for a single matched table.""" | |
| table_name: str | |
| score: float | |
| matched_columns: List[str] | |
| source_name: str | |
| class SchemaSearchResponse(BaseModel): | |
| """Response model for schema search results.""" | |
| available_sources: List[str] | |
| matches: List[SchemaMatch] | |
| formatted_schema_string: str | |
| total_matches: int | |
| cache_hit: bool = False | |
| # =============================== | |
| # PHASE 1: TENANT MANAGEMENT MODELS | |
| # =============================== | |
| class TenantSourceConfig(BaseModel): | |
| """Configuration for a single tenant data source.""" | |
| source_name: str = Field(..., min_length=1, max_length=255, description="Unique name for this data source") | |
| source_type: str = Field(..., description="Type of connector (e.g., 'ibis')") | |
| config: Dict[str, Any] = Field(..., description="Connector-specific configuration") | |
| class Config: | |
| json_schema_extra = { | |
| "example": { | |
| "source_name": "my_postgres_db", | |
| "source_type": "ibis", | |
| "config": { | |
| "uri": "postgresql://user:pass@host:5432/dbname", | |
| "table_fetch_example_limit": 5 | |
| } | |
| } | |
| } | |
| class TenantSourceUpsertRequest(BaseModel): | |
| """Request to create or update tenant data sources.""" | |
| sources: List[TenantSourceConfig] = Field(..., min_items=1, description="List of data sources to configure") | |
| validate_connection: bool = Field(True, description="Whether to validate connectivity before persisting") | |
| class TenantSourceListResponse(BaseModel): | |
| """Response listing all tenant data sources.""" | |
| tenant_id: str | |
| sources: List[TenantSourceConfig] | |
| count: int | |
| class ConnectionProbeResult(BaseModel): | |
| """Result of connection validation probe.""" | |
| source_name: str | |
| status: str = Field(..., description="'success' or 'failed'") | |
| message: str | |
| latency_ms: Optional[float] = None | |
| tables_found: Optional[int] = None | |
| error_details: Optional[str] = None | |
| class TenantSourceUpsertResponse(BaseModel): | |
| """Response after creating/updating tenant sources.""" | |
| tenant_id: str | |
| sources: List[TenantSourceConfig] | |
| probe_results: List[ConnectionProbeResult] | |
| persisted: bool | |
| class TenantSourcePatchRequest(BaseModel): | |
| """Request for partial update of a source.""" | |
| new_source_name: Optional[str] = Field(None, description="New name for the source (dynamic renaming)") | |
| config_updates: Optional[Dict[str, Any]] = Field(None, description="Partial configuration updates to merge") | |
| class SourceListResponse(BaseModel): | |
| """Response for listing available data sources for a tenant.""" | |
| tenant_id: str | |
| available_sources: List[str] = Field(..., description="List of configured source names") | |
| count: int = Field(..., description="Number of available sources") | |
| class APIKeyValidationResult(BaseModel): | |
| """Result of API key validation.""" | |
| valid: bool | |
| tenant_id: Optional[str] = None | |
| error: Optional[str] = None | |
| logger = logging.getLogger(__name__) | |
| # Create FastAPI router - all data source operations under one prefix | |
| router = APIRouter(prefix="/api/v1/data-sources", tags=["data-sources"]) | |
| # Global clients - these would normally be dependency injected | |
| _redis_client: Optional[redis.Redis] = None | |
| _minio_client: Optional[Minio] = None | |
| # Configuration constants | |
| JOB_TTL_SECONDS = 86400 # 24 hours | |
| RESULT_TTL_SECONDS = 3600 # 1 hour | |
| MAX_RESULT_SIZE_BYTES = 10 * 1024 * 1024 # 10MB | |
| DEFAULT_PAGE_SIZE = 100 # Default items per page | |
| MAX_PAGE_SIZE = 1000 # Maximum items per page | |
| # Rate limiting configuration | |
| RATE_LIMIT_WINDOW_SECONDS = 60 # 1 minute window | |
| RATE_LIMIT_MAX_REQUESTS = { | |
| "execute_sql": 30, # Max 30 SQL executions per tenant per minute | |
| "schema_search": 60, # Max 60 schema searches per tenant per minute | |
| "tenant_crud": 20, # Max 20 tenant CRUD operations per minute | |
| } | |
| # SQL execution limits | |
| DEFAULT_MAX_ROWS = 10000 # Default max rows if not specified | |
| ABSOLUTE_MAX_ROWS = 100000 # Hard limit on rows returned | |
| DEFAULT_SYNC_TIMEOUT = 30 # Default timeout for sync queries (seconds) | |
| MAX_SYNC_TIMEOUT = 300 # Maximum timeout for sync queries (5 minutes) | |
| MAX_SQL_LENGTH = 50000 # Maximum SQL query length | |
| def check_rate_limit( | |
| redis_client: redis.Redis, | |
| tenant_id: str, | |
| operation: str, | |
| max_requests: int = None | |
| ) -> bool: | |
| """ | |
| Check if a tenant has exceeded rate limits for an operation. | |
| Uses Redis INCR with expiry to track request counts per tenant per operation. | |
| Args: | |
| redis_client: Redis connection | |
| tenant_id: Tenant identifier | |
| operation: Operation name (e.g., 'execute_sql', 'schema_search') | |
| max_requests: Override default max requests for this operation | |
| Returns: | |
| True if request is allowed, False if rate limit exceeded | |
| Raises: | |
| HTTPException: 429 if rate limit exceeded | |
| """ | |
| if max_requests is None: | |
| max_requests = RATE_LIMIT_MAX_REQUESTS.get(operation, 100) | |
| rate_key = f"rate_limit:{tenant_id}:{operation}" | |
| try: | |
| # Increment counter | |
| current_count = redis_client.incr(rate_key) | |
| # Set expiry on first request | |
| if current_count == 1: | |
| redis_client.expire(rate_key, RATE_LIMIT_WINDOW_SECONDS) | |
| # Check if limit exceeded | |
| if current_count > max_requests: | |
| logger.warning( | |
| f"Rate limit exceeded for tenant '{tenant_id}' on operation '{operation}': " | |
| f"{current_count}/{max_requests} in {RATE_LIMIT_WINDOW_SECONDS}s" | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, | |
| detail=f"Rate limit exceeded: {max_requests} requests per {RATE_LIMIT_WINDOW_SECONDS} seconds for operation '{operation}'. Please try again later." | |
| ) | |
| return True | |
| except HTTPException: | |
| raise # Re-raise HTTP exceptions | |
| except Exception as e: | |
| # If rate limiting fails, log but allow request (fail open) | |
| logger.error(f"Rate limit check failed for tenant '{tenant_id}': {e}") | |
| return True | |
| def paginate_results(results: List[Dict[str, Any]], page: int, page_size: int) -> tuple[List[Dict[str, Any]], PaginationInfo]: | |
| """ | |
| Paginate a list of results and return the page data with pagination info. | |
| Args: | |
| results: Complete list of result records | |
| page: Page number (1-based) | |
| page_size: Number of items per page | |
| Returns: | |
| Tuple of (paginated_results, pagination_info) | |
| Raises: | |
| ValueError: If page or page_size are not positive integers. | |
| """ | |
| if page < 1: | |
| raise ValueError("Page number cannot be less than 1.") | |
| if page_size < 1: | |
| raise ValueError("Page size cannot be less than 1.") | |
| total_items = len(results) | |
| total_pages = (total_items + page_size - 1) // page_size if page_size > 0 else 0 | |
| # Calculate start and end indices | |
| start_idx = (page - 1) * page_size | |
| end_idx = start_idx + page_size | |
| # Get the page slice | |
| page_results = results[start_idx:end_idx] | |
| # Create pagination info | |
| pagination_info = PaginationInfo( | |
| page=page, | |
| page_size=page_size, | |
| total_items=total_items, | |
| total_pages=total_pages, | |
| has_next=page < total_pages, | |
| has_previous=page > 1, | |
| next_page=page + 1 if page < total_pages else None, | |
| previous_page=page - 1 if page > 1 else None | |
| ) | |
| return page_results, pagination_info | |
| def estimate_result_size(results: List[Dict[str, Any]]) -> int: | |
| """ | |
| Estimate the memory size of results in bytes. | |
| Args: | |
| results: List of result records | |
| Returns: | |
| Estimated size in bytes | |
| """ | |
| if not results: | |
| return 0 | |
| # Estimate based on JSON serialization of a sample | |
| sample_size = min(10, len(results)) | |
| sample = results[:sample_size] | |
| sample_json = json.dumps(sample) | |
| sample_bytes = len(sample_json.encode('utf-8')) | |
| # Extrapolate to full results with safety margin | |
| estimated_size = (sample_bytes * len(results) // sample_size) * 1.2 | |
| return int(estimated_size) | |
| def get_redis_client() -> redis.Redis: | |
| """Dependency for Redis client.""" | |
| global _redis_client | |
| if _redis_client is None: | |
| # Use centralized Redis configuration | |
| redis_config = get_redis_config() | |
| _redis_client = redis.Redis(**redis_config) | |
| # Initialize plan cache when Redis client is first created | |
| try: | |
| init_plan_cache(_redis_client, default_ttl_hours=24, enable_metrics=True) | |
| logger.info("Plan cache initialized successfully") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize plan cache: {e}") | |
| # Initialize metrics collector | |
| try: | |
| from backend.data_sources import metrics | |
| metrics.init_metrics_collector(_redis_client) | |
| logger.info("Metrics collector initialized successfully") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize metrics collector: {e}") | |
| # Initialize enhanced tracing | |
| try: | |
| tracing.init_tracer(_redis_client, enable_storage=True) | |
| logger.info("Enhanced tracing initialized successfully") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize enhanced tracing: {e}") | |
| return _redis_client | |
| def get_minio_client(): | |
| """Dependency for MinIO client. Returns None when MinIO is disabled.""" | |
| global _minio_client | |
| if os.getenv("MINIO_ENABLED", "true").lower() == "false": | |
| return None | |
| if _minio_client is None: | |
| # Use centralized MinIO configuration | |
| minio_config = get_minio_config() | |
| _minio_client = Minio(**minio_config) | |
| return _minio_client | |
| # =========================== | |
| # Admin Authentication (DEPRECATED - Using JWT instead for new endpoints) | |
| # =========================== | |
| # =========================== | |
| # API Key Management Endpoints (DEPRECATED - Using JWT instead) | |
| # =========================== | |
| # NOTE: API Key endpoints below remain for backward compatibility | |
| # but new integrations should use JWT authentication via get_current_user | |
| # =========================== | |
| # Tenant Configuration Management | |
| # =========================== | |
| def get_tenant_config(tenant_id: str, redis_client: redis.Redis) -> list: | |
| """Retrieve tenant configuration from Redis storage. | |
| This function explicitly returns an empty list when no configuration is found, | |
| avoiding any silent fallbacks or default configurations. Callers must handle | |
| empty configurations appropriately. | |
| Args: | |
| tenant_id: Tenant identifier | |
| redis_client: Redis client for configuration retrieval | |
| Returns: | |
| list: Tenant configuration as list of source config dicts. | |
| Empty list if no configuration found or on Redis errors. | |
| Design Note: | |
| Returns empty list rather than raising exceptions to allow callers | |
| to distinguish between "no sources configured" (valid state) vs | |
| actual errors. Callers should check the list and decide whether | |
| to return 404 or an empty response based on context. | |
| """ | |
| try: | |
| config_key = f"tenant:{tenant_id}:config" | |
| config_json = redis_client.get(config_key) | |
| if config_json: | |
| logger.info(f"Retrieved config for tenant {tenant_id} from Redis") | |
| return json.loads(config_json) | |
| else: | |
| logger.info(f"No config found for tenant {tenant_id}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error retrieving tenant config from Redis: {e}") | |
| return [] | |
| def _validate_source_connection( | |
| source_config: TenantSourceConfig, | |
| redis_client: redis.Redis, | |
| minio_client: Minio | |
| ) -> ConnectionProbeResult: | |
| """Validate that a data source configuration is valid and connectable. | |
| Returns a ConnectionProbeResult with status and diagnostics. | |
| """ | |
| start_time = time.time() | |
| try: | |
| # Validate source_type is supported | |
| if source_config.source_type not in ["ibis"]: | |
| return ConnectionProbeResult( | |
| source_name=source_config.source_name, | |
| status="failed", | |
| message=f"Unsupported source_type: {source_config.source_type}", | |
| error_details="Only 'ibis' source_type is currently supported" | |
| ) | |
| # Validate required config fields | |
| if source_config.source_type == "ibis": | |
| if "uri" not in source_config.config: | |
| return ConnectionProbeResult( | |
| source_name=source_config.source_name, | |
| status="failed", | |
| message="Missing required 'uri' in config", | |
| error_details="Ibis connectors require a 'uri' field in config" | |
| ) | |
| # Attempt to create a FederationAgent with this source only | |
| config_dict = { | |
| "source_name": source_config.source_name, | |
| "source_type": source_config.source_type, | |
| "config": source_config.config | |
| } | |
| agent = FederationAgent([config_dict], redis_client, minio_client) | |
| # Check if connector was initialized | |
| if source_config.source_name not in agent.connectors: | |
| return ConnectionProbeResult( | |
| source_name=source_config.source_name, | |
| status="failed", | |
| message="Failed to initialize connector", | |
| error_details="Connector initialization failed, check config and logs" | |
| ) | |
| connector = agent.connectors[source_config.source_name] | |
| # Attempt to connect and get basic info | |
| connector.connect() | |
| # Try to list tables as a lightweight connectivity test | |
| try: | |
| schema_str = connector.get_schema() | |
| # Count tables in schema (simple heuristic) | |
| tables_found = schema_str.count("Table:") | |
| latency_ms = (time.time() - start_time) * 1000 | |
| connector.disconnect() | |
| return ConnectionProbeResult( | |
| source_name=source_config.source_name, | |
| status="success", | |
| message="Connection successful", | |
| latency_ms=round(latency_ms, 2), | |
| tables_found=tables_found | |
| ) | |
| except Exception as probe_error: | |
| connector.disconnect() | |
| return ConnectionProbeResult( | |
| source_name=source_config.source_name, | |
| status="failed", | |
| message="Connected but failed to probe schema", | |
| error_details=str(probe_error), | |
| latency_ms=round((time.time() - start_time) * 1000, 2) | |
| ) | |
| except Exception as e: | |
| logger.exception(f"Connection validation failed for {source_config.source_name}: {e}") | |
| return ConnectionProbeResult( | |
| source_name=source_config.source_name, | |
| status="failed", | |
| message="Connection validation error", | |
| error_details=str(e), | |
| latency_ms=round((time.time() - start_time) * 1000, 2) | |
| ) | |
| # =============================== | |
| # PHASE 1: TENANT MANAGEMENT ENDPOINTS | |
| # =============================== | |
| async def create_tenant_sources( | |
| request: TenantSourceUpsertRequest, | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| minio_client: Minio = Depends(get_minio_client) | |
| ): | |
| """Create or replace data sources for the authenticated user's tenant. | |
| This endpoint validates the source configurations, optionally probes connectivity, | |
| and persists the configuration to Redis. All sources must pass validation | |
| before any changes are persisted. | |
| Authentication: Requires valid JWT token | |
| Args: | |
| request: Source configurations to create/update | |
| user: Authenticated user from JWT | |
| Returns: | |
| Created configurations with probe results | |
| Raises: | |
| HTTPException: 400 for validation errors, 502 for connection failures, 403 for auth errors | |
| """ | |
| tenant_id = user.tenant_id | |
| try: | |
| # Input validation | |
| if not tenant_id or not tenant_id.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="tenant_id cannot be empty" | |
| ) | |
| # Validate no duplicate source names | |
| source_names = [s.source_name for s in request.sources] | |
| if len(source_names) != len(set(source_names)): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Duplicate source_name values are not allowed" | |
| ) | |
| # Probe connections if requested | |
| probe_results = [] | |
| if request.validate_connection: | |
| logger.info(f"Validating {len(request.sources)} source(s) for tenant {tenant_id}") | |
| for source_config in request.sources: | |
| probe_result = _validate_source_connection( | |
| source_config, | |
| redis_client, | |
| minio_client | |
| ) | |
| probe_results.append(probe_result) | |
| # Fail fast if validation requested and connection fails | |
| if probe_result.status == "failed": | |
| logger.warning(f"Connection validation failed for {source_config.source_name}: {probe_result.message}") | |
| raise HTTPException( | |
| status_code=status.HTTP_502_BAD_GATEWAY, | |
| detail=f"Connection validation failed for source '{source_config.source_name}': {probe_result.message}. Details: {probe_result.error_details}" | |
| ) | |
| else: | |
| # Skip validation, create placeholder results | |
| for source_config in request.sources: | |
| probe_results.append(ConnectionProbeResult( | |
| source_name=source_config.source_name, | |
| status="skipped", | |
| message="Connection validation was not requested" | |
| )) | |
| # All validations passed, persist to Redis | |
| # Merge with existing config to support adding sources incrementally | |
| config_key = f"tenant:{tenant_id}:config" | |
| existing_config = get_tenant_config(tenant_id, redis_client) | |
| # Build a dict of existing sources by name for easy lookup | |
| existing_sources_dict = {src['source_name']: src for src in existing_config} | |
| # Add or replace sources from request | |
| for source in request.sources: | |
| existing_sources_dict[source.source_name] = source.model_dump() | |
| # Save merged config | |
| config_data = list(existing_sources_dict.values()) | |
| redis_client.set(config_key, json.dumps(config_data)) | |
| # Invalidate schema cache for this tenant | |
| cache_pattern = f"schema:{tenant_id}:*" | |
| cache_keys = redis_client.keys(cache_pattern) | |
| if cache_keys: | |
| redis_client.delete(*cache_keys) | |
| logger.info(f"Invalidated {len(cache_keys)} schema cache entries for tenant {tenant_id}") | |
| logger.info(f"Successfully persisted {len(request.sources)} source(s) for tenant {tenant_id}") | |
| # Record metrics for tenant update | |
| from backend.data_sources import metrics | |
| source_types = [s.source_type for s in request.sources] | |
| metrics.record_tenant_update(tenant_id, len(request.sources), source_types) | |
| # Record connection validation metrics | |
| for probe in probe_results: | |
| if probe.status != "skipped": | |
| metrics.record_connection_validation( | |
| tenant_id, | |
| probe.source_name, | |
| probe.status == "success" | |
| ) | |
| return TenantSourceUpsertResponse( | |
| tenant_id=tenant_id, | |
| sources=request.sources, | |
| probe_results=probe_results, | |
| persisted=True | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to create tenant sources for {tenant_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to create tenant sources: {str(e)}" | |
| ) | |
| async def list_tenant_sources( | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client) | |
| ): | |
| """List all configured data sources for the authenticated user's tenant. | |
| Authentication: Requires valid JWT token | |
| Args: | |
| user: Authenticated user from JWT | |
| Returns: | |
| List of configured sources (empty list if none configured) | |
| """ | |
| tenant_id = user.tenant_id | |
| try: | |
| if not tenant_id or not tenant_id.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="tenant_id cannot be empty" | |
| ) | |
| tenant_config = get_tenant_config(tenant_id, redis_client) | |
| # Convert to Pydantic models for validation | |
| sources = [] | |
| for config in tenant_config: | |
| try: | |
| source = TenantSourceConfig(**config) | |
| sources.append(source) | |
| except Exception as e: | |
| logger.warning(f"Skipping invalid source config for tenant {tenant_id}: {e}") | |
| continue | |
| return TenantSourceListResponse( | |
| tenant_id=tenant_id, | |
| sources=sources, | |
| count=len(sources) | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to list tenant sources for {tenant_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to list tenant sources: {str(e)}" | |
| ) | |
| async def update_tenant_source( | |
| source_name: str, | |
| source_config: TenantSourceConfig, | |
| validate_connection: bool = Query(True, description="Whether to validate connectivity before persisting"), | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| minio_client: Minio = Depends(get_minio_client) | |
| ): | |
| """Update an existing data source configuration for the authenticated user's tenant. | |
| Authentication: Requires valid JWT token | |
| Args: | |
| source_name: Name of the source to update (must match source_config.source_name) | |
| source_config: New source configuration | |
| validate_connection: Whether to validate the connection before persisting | |
| user: Authenticated user from JWT | |
| Returns: | |
| Updated configuration with probe results | |
| Raises: | |
| HTTPException: 400 for validation errors, 404 if source not found, 502 for connection failures | |
| """ | |
| tenant_id = user.tenant_id | |
| try: | |
| # Input validation | |
| if not tenant_id or not tenant_id.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="tenant_id cannot be empty" | |
| ) | |
| if source_name != source_config.source_name: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"URL parameter source_name '{source_name}' must match body source_name '{source_config.source_name}'" | |
| ) | |
| # Get existing config | |
| tenant_config = get_tenant_config(tenant_id, redis_client) | |
| # Find the source to update (case-insensitive) | |
| source_index = None | |
| actual_source_name = None | |
| for idx, config in enumerate(tenant_config): | |
| if config.get("source_name", "").lower() == source_name.lower(): | |
| source_index = idx | |
| actual_source_name = config.get("source_name") | |
| break | |
| if source_index is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Source '{source_name}' not found for tenant '{tenant_id}'" | |
| ) | |
| # Validate connection if requested | |
| probe_result = None | |
| if validate_connection: | |
| probe_result = _validate_source_connection( | |
| source_config, | |
| redis_client, | |
| minio_client | |
| ) | |
| if probe_result.status == "failed": | |
| logger.warning(f"Connection validation failed for {source_name}: {probe_result.message}") | |
| raise HTTPException( | |
| status_code=status.HTTP_502_BAD_GATEWAY, | |
| detail=f"Connection validation failed: {probe_result.message}. Details: {probe_result.error_details}" | |
| ) | |
| else: | |
| probe_result = ConnectionProbeResult( | |
| source_name=source_name, | |
| status="skipped", | |
| message="Connection validation was not requested" | |
| ) | |
| # Update the config | |
| tenant_config[source_index] = source_config.model_dump() | |
| # Persist to Redis | |
| config_key = f"tenant:{tenant_id}:config" | |
| redis_client.set(config_key, json.dumps(tenant_config)) | |
| # Invalidate schema cache for this source | |
| cache_key = f"schema:{tenant_id}:{source_name}" | |
| redis_client.delete(cache_key) | |
| logger.info(f"Successfully updated source '{source_name}' for tenant {tenant_id}") | |
| return TenantSourceUpsertResponse( | |
| tenant_id=tenant_id, | |
| sources=[source_config], | |
| probe_results=[probe_result], | |
| persisted=True | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to update tenant source {source_name} for {tenant_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to update tenant source: {str(e)}" | |
| ) | |
| async def patch_tenant_source( | |
| source_name: str, | |
| request: TenantSourcePatchRequest, | |
| validate_connection: bool = Query(True, description="Whether to validate connectivity before persisting"), | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| minio_client: Minio = Depends(get_minio_client) | |
| ): | |
| """ | |
| Partially update an existing data source. Supports dynamic renaming and merging new config keys | |
| without sending the entire config object. | |
| Authentication: Requires valid JWT token | |
| """ | |
| tenant_id = user.tenant_id | |
| try: | |
| tenant_config = get_tenant_config(tenant_id, redis_client) | |
| # Find the source | |
| source_index = None | |
| for idx, config in enumerate(tenant_config): | |
| if config.get("source_name", "").lower() == source_name.lower(): | |
| source_index = idx | |
| break | |
| if source_index is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Source '{source_name}' not found for tenant '{tenant_id}'" | |
| ) | |
| current_config = tenant_config[source_index] | |
| # Apply updates | |
| new_name = request.new_source_name or current_config["source_name"] | |
| # If renaming, ensure the new name doesn't conflict | |
| if request.new_source_name and request.new_source_name.lower() != source_name.lower(): | |
| if any(c.get("source_name", "").lower() == request.new_source_name.lower() for c in tenant_config): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"A source named '{request.new_source_name}' already exists." | |
| ) | |
| # Merge dictionary | |
| merged_config_details = current_config.get("config", {}) | |
| if request.config_updates: | |
| merged_config_details.update(request.config_updates) | |
| updated_source_obj = TenantSourceConfig( | |
| source_name=new_name, | |
| source_type=current_config.get("source_type", "ibis"), | |
| config=merged_config_details | |
| ) | |
| # Connection probe if requested | |
| probe_result = None | |
| if validate_connection: | |
| probe_result = _validate_source_connection(updated_source_obj, redis_client, minio_client) | |
| if probe_result.status == "failed": | |
| raise HTTPException( | |
| status_code=status.HTTP_502_BAD_GATEWAY, | |
| detail=f"Connection validation failed: {probe_result.message}" | |
| ) | |
| else: | |
| probe_result = ConnectionProbeResult(source_name=new_name, status="skipped", message="Not requested") | |
| # Store back | |
| tenant_config[source_index] = updated_source_obj.model_dump() | |
| redis_client.set(f"tenant:{tenant_id}:config", json.dumps(tenant_config)) | |
| # Cascade invalidate caches if name changed or config changed | |
| redis_client.delete(f"schema:{tenant_id}:{source_name}") | |
| redis_client.delete(f"instructions:{tenant_id}:{source_name}") | |
| if request.new_source_name: | |
| redis_client.delete(f"schema:{tenant_id}:{request.new_source_name}") | |
| redis_client.delete(f"instructions:{tenant_id}:{request.new_source_name}") | |
| return TenantSourceUpsertResponse( | |
| tenant_id=tenant_id, | |
| sources=[updated_source_obj], | |
| probe_results=[probe_result], | |
| persisted=True | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to patch source {source_name}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| async def delete_tenant_source( | |
| source_name: str, | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client) | |
| ): | |
| """Remove a data source from the authenticated user's tenant configuration. | |
| Authentication: Requires valid JWT token | |
| Args: | |
| source_name: Name of the source to remove | |
| user: Authenticated user from JWT | |
| Returns: | |
| Remaining sources after deletion | |
| Raises: | |
| HTTPException: 404 if tenant or source not found | |
| """ | |
| tenant_id = user.tenant_id | |
| try: | |
| # Input validation | |
| if not tenant_id or not tenant_id.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="tenant_id cannot be empty" | |
| ) | |
| # Get existing config | |
| tenant_config = get_tenant_config(tenant_id, redis_client) | |
| if not tenant_config: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"No configuration found for tenant '{tenant_id}'" | |
| ) | |
| # Find and remove the source (case-insensitive) | |
| new_config = [c for c in tenant_config if c.get("source_name", "").lower() != source_name.lower()] | |
| if len(new_config) == len(tenant_config): | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Source '{source_name}' not found for tenant '{tenant_id}'" | |
| ) | |
| # Persist updated config | |
| config_key = f"tenant:{tenant_id}:config" | |
| if new_config: | |
| redis_client.set(config_key, json.dumps(new_config)) | |
| else: | |
| # Delete the key if no sources remain | |
| redis_client.delete(config_key) | |
| # Invalidate schema cache for this source (Cascade Deletes) | |
| redis_client.delete(f"schema:{tenant_id}:{source_name}") | |
| redis_client.delete(f"instructions:{tenant_id}:{source_name}") | |
| logger.info(f"Successfully deleted source '{source_name}' for tenant {tenant_id}") | |
| # Convert remaining configs to Pydantic models | |
| remaining_sources = [] | |
| for config in new_config: | |
| try: | |
| source = TenantSourceConfig(**config) | |
| remaining_sources.append(source) | |
| except Exception as e: | |
| logger.warning(f"Skipping invalid source config: {e}") | |
| continue | |
| return TenantSourceListResponse( | |
| tenant_id=tenant_id, | |
| sources=remaining_sources, | |
| count=len(remaining_sources) | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to delete tenant source {source_name} for {tenant_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to delete tenant source: {str(e)}" | |
| ) | |
| # =============================== | |
| # PHASE 2: SOURCE DISCOVERY ENDPOINT | |
| # =============================== | |
| async def list_data_sources( | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client) | |
| ): | |
| """List all available data sources for the authenticated user's tenant. | |
| This endpoint provides explicit source discovery, returning a list of | |
| configured source names. Returns an empty list if no sources are configured, | |
| never raises 500 for missing configuration. | |
| Authentication: Requires valid JWT token | |
| Returns: | |
| List of available source names and count | |
| """ | |
| tenant_id = user.tenant_id | |
| try: | |
| # Input validation | |
| if not tenant_id or not tenant_id.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="tenant_id cannot be empty" | |
| ) | |
| # Get tenant configuration | |
| tenant_config = get_tenant_config(tenant_id, redis_client) | |
| # Extract source names | |
| available_sources = [] | |
| for config in tenant_config: | |
| source_name = config.get("source_name") | |
| if source_name: | |
| available_sources.append(source_name) | |
| logger.info(f"Listed {len(available_sources)} source(s) for tenant {tenant_id}") | |
| return SourceListResponse( | |
| tenant_id=tenant_id, | |
| available_sources=available_sources, | |
| count=len(available_sources) | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to list data sources for tenant {tenant_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to list data sources: {str(e)}" | |
| ) | |
| # =============================== | |
| # EXISTING DATA SOURCES ENDPOINTS | |
| # =============================== | |
| async def create_job( | |
| job_request: JobRequest, | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| minio_client: Minio = Depends(get_minio_client) | |
| ): | |
| """Create and enqueue a new federated query job. | |
| This endpoint validates the incoming job payload, stores minimal metadata | |
| in Redis, and enqueues the job for Celery processing with retry support. | |
| Authentication: Requires valid JWT token. Job tenant_id must match authenticated tenant. | |
| """ | |
| tenant_id = user.tenant_id | |
| # Verify tenant_id matches | |
| if job_request.payload.tenant_id != tenant_id: | |
| logger.warning(f"Tenant mismatch: JWT={tenant_id}, request={job_request.payload.tenant_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Tenant ID in request must match your authenticated tenant" | |
| ) | |
| # Start enhanced trace for the API request | |
| tracer = tracing.get_tracer() | |
| if tracer: | |
| trace_context = tracer.start_trace( | |
| operation_name="create_federated_job", | |
| span_type=SpanType.HTTP_REQUEST, | |
| tenant_id=tenant_id | |
| ) | |
| try: | |
| with traced_span("generate_job_id", SpanType.HTTP_REQUEST): | |
| # Generate unique job ID | |
| job_id = generate_job_id() | |
| add_trace_metadata(job_id=job_id) | |
| with traced_span("validate_tenant_config", SpanType.CACHE_OPERATION): | |
| # Validate tenant and get configuration | |
| tenant_config = get_tenant_config(job_request.payload.tenant_id, redis_client) | |
| if not tenant_config: | |
| logger.warning(f"No data sources configured for tenant {job_request.payload.tenant_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"No data sources configured for tenant '{job_request.payload.tenant_id}'. Please configure data sources using the tenant management API." | |
| ) | |
| add_trace_metadata(sources_configured=len(tenant_config)) | |
| with traced_span("store_job_metadata", SpanType.CACHE_OPERATION): | |
| # Store initial job metadata | |
| metadata = JobMetadata( | |
| job_id=job_id, | |
| tenant_id=job_request.payload.tenant_id, | |
| status=JobStatus.PENDING, | |
| trace_id=trace_context.trace_id if tracer else f"job-{job_id}" | |
| ) | |
| redis_client.setex( | |
| f"job:{job_id}:metadata", | |
| JOB_TTL_SECONDS, | |
| metadata.model_dump_json() | |
| ) | |
| with traced_span("store_job_payload", SpanType.CACHE_OPERATION): | |
| # Store job payload (for retry logic) | |
| redis_client.setex( | |
| f"job:{job_id}:payload", | |
| JOB_TTL_SECONDS, | |
| job_request.payload.model_dump_json() | |
| ) | |
| with traced_span("enqueue_job", SpanType.EXTERNAL_API): | |
| # Enqueue job to Celery with retry support | |
| from .worker import process_federated_job | |
| add_trace_event("job_enqueued", level="INFO", job_id=job_id) | |
| process_federated_job.delay( | |
| job_id=job_id, | |
| job_payload=job_request.payload.model_dump(), | |
| tenant_id=job_request.payload.tenant_id | |
| ) | |
| add_trace_event("job_created_successfully", level="INFO", job_id=job_id) | |
| logger.info(f"Created job {job_id} for tenant {job_request.payload.tenant_id}") | |
| if tracer: | |
| tracer.finish_span("success", job_id=job_id) | |
| return JobResponse( | |
| job_id=job_id, | |
| status=JobStatus.PENDING, | |
| message="Job created and queued for processing" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to create job: {e}") | |
| add_trace_event("job_creation_failed", level="ERROR", error=str(e)) | |
| if tracer: | |
| tracer.finish_span("error", str(e)) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to create job: {str(e)}" | |
| ) | |
| async def get_job_results( | |
| job_id: str, | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| minio_client: Minio = Depends(get_minio_client) | |
| ): | |
| """Retrieve results for a specific job. | |
| This endpoint reads job metadata and results from Redis, with support | |
| for large results stored in MinIO. For large result sets, consider using | |
| the paginated endpoint /results/{job_id}/paginated. | |
| Authentication: Requires valid JWT token. User must own the job (same tenant). | |
| """ | |
| try: | |
| # Get job metadata | |
| metadata_json = redis_client.get(f"job:{job_id}:metadata") | |
| if not metadata_json: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Job {job_id} not found" | |
| ) | |
| metadata = JobMetadata.model_validate_json(metadata_json) | |
| # Verify tenant ownership | |
| if metadata.tenant_id != user.tenant_id: | |
| logger.warning(f"Unauthorized access to job {job_id} by tenant {user.tenant_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Cannot access jobs from other tenants" | |
| ) | |
| # Prepare response | |
| response = JobResultResponse( | |
| job_id=job_id, | |
| status=metadata.status, | |
| metadata=metadata | |
| ) | |
| # Get results if job is completed | |
| if metadata.status == JobStatus.COMPLETED and metadata.result_location: | |
| if metadata.result_location.startswith("redis://"): | |
| # Read from Redis | |
| redis_key = metadata.result_location.replace("redis://", "") | |
| results_json = redis_client.get(redis_key) | |
| if results_json: | |
| results_data = json.loads(results_json) | |
| # Check if results are too large for non-paginated response | |
| if isinstance(results_data, list): | |
| estimated_size = estimate_result_size(results_data) | |
| if estimated_size > MAX_RESULT_SIZE_BYTES: | |
| response.results = { | |
| "message": f"Result set too large ({len(results_data)} items). Use /results/{job_id}/paginated endpoint.", | |
| "total_items": len(results_data), | |
| "estimated_size_bytes": estimated_size | |
| } | |
| else: | |
| response.results = {"data": results_data, "total_items": len(results_data)} | |
| else: | |
| response.results = results_data | |
| elif metadata.result_location.startswith("minio://"): | |
| # In production, read from MinIO | |
| # For now, return a placeholder | |
| response.results = {"message": "Large results stored in MinIO", "path": metadata.result_location} | |
| # Include error message if job failed | |
| if metadata.status == JobStatus.FAILED: | |
| response.error_message = metadata.error_message | |
| return response | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to get job results for {job_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to retrieve job results: {str(e)}" | |
| ) | |
| async def get_job_results_paginated( | |
| job_id: str, | |
| page: int = Query(1, ge=1, description="Page number starting from 1"), | |
| page_size: int = Query(DEFAULT_PAGE_SIZE, ge=1, le=MAX_PAGE_SIZE, description="Number of items per page"), | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| minio_client: Minio = Depends(get_minio_client) | |
| ): | |
| """Retrieve paginated results for a specific job. | |
| This endpoint provides pagination support for large result sets, | |
| allowing efficient retrieval of results in smaller chunks. | |
| Authentication: Requires valid JWT token. User must own the job (same tenant). | |
| """ | |
| try: | |
| # Get job metadata | |
| metadata_json = redis_client.get(f"job:{job_id}:metadata") | |
| if not metadata_json: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Job {job_id} not found" | |
| ) | |
| metadata = JobMetadata.model_validate_json(metadata_json) | |
| # Verify tenant ownership | |
| if metadata.tenant_id != user.tenant_id: | |
| logger.warning(f"Unauthorized access to job {job_id} by tenant {user.tenant_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Cannot access jobs from other tenants" | |
| ) | |
| # Prepare response | |
| response = PaginatedJobResultResponse( | |
| job_id=job_id, | |
| status=metadata.status, | |
| metadata=metadata | |
| ) | |
| # Get results if job is completed | |
| if metadata.status == JobStatus.COMPLETED and metadata.result_location: | |
| if metadata.result_location.startswith("redis://"): | |
| # Read from Redis | |
| redis_key = metadata.result_location.replace("redis://", "") | |
| results_json = redis_client.get(redis_key) | |
| if results_json: | |
| results_data = json.loads(results_json) | |
| if isinstance(results_data, list): | |
| # Apply pagination | |
| page_results, pagination_info = paginate_results(results_data, page, page_size) | |
| response.results = page_results | |
| response.pagination = pagination_info | |
| else: | |
| # Non-list results (single values, objects) return as-is on page 1 | |
| if page == 1: | |
| response.results = [results_data] # Wrap in list for consistency | |
| response.pagination = PaginationInfo( | |
| page=1, | |
| page_size=page_size, | |
| total_items=1, | |
| total_pages=1, | |
| has_next=False, | |
| has_previous=False, | |
| next_page=None, | |
| previous_page=None | |
| ) | |
| else: | |
| # Return empty results for pages beyond 1 | |
| response.results = [] | |
| response.pagination = PaginationInfo( | |
| page=page, | |
| page_size=page_size, | |
| total_items=1, | |
| total_pages=1, | |
| has_next=False, | |
| has_previous=True, | |
| next_page=None, | |
| previous_page=1 | |
| ) | |
| elif metadata.result_location.startswith("minio://"): | |
| # TODO: Implement MinIO pagination in future | |
| # For now, return placeholder with pagination info | |
| if page == 1: | |
| response.results = [{"message": "Large results stored in MinIO", "path": metadata.result_location}] | |
| response.pagination = PaginationInfo( | |
| page=1, | |
| page_size=page_size, | |
| total_items=1, | |
| total_pages=1, | |
| has_next=False, | |
| has_previous=False, | |
| next_page=None, | |
| previous_page=None | |
| ) | |
| else: | |
| response.results = [] | |
| response.pagination = PaginationInfo( | |
| page=page, | |
| page_size=page_size, | |
| total_items=1, | |
| total_pages=1, | |
| has_next=False, | |
| has_previous=True, | |
| next_page=None, | |
| previous_page=1 | |
| ) | |
| else: | |
| # No results available (job not completed or no result location) | |
| response.results = [] | |
| response.pagination = PaginationInfo( | |
| page=page, | |
| page_size=page_size, | |
| total_items=0, | |
| total_pages=0, | |
| has_next=False, | |
| has_previous=False, | |
| next_page=None, | |
| previous_page=None | |
| ) | |
| # Include error message if job failed | |
| if metadata.status == JobStatus.FAILED: | |
| response.error_message = metadata.error_message | |
| return response | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to get paginated job results for {job_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to retrieve paginated job results: {str(e)}" | |
| ) | |
| async def get_source_schema( | |
| source_name: str, | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| minio_client: Minio = Depends(get_minio_client) | |
| ): | |
| """Retrieve schema information for a specific data source. | |
| This endpoint returns the schema for a configured data source, | |
| with caching for performance. Returns 404 if source not found. | |
| Phase 8 Note: No default configs or fallbacks - explicit 404 errors only. | |
| """ | |
| tenant_id = user.tenant_id | |
| try: | |
| # Get tenant configuration | |
| tenant_config = get_tenant_config(tenant_id, redis_client) | |
| # Explicit check: no sources configured at all | |
| if not tenant_config: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"No data sources configured for tenant '{tenant_id}'" | |
| ) | |
| # Find the requested source | |
| source_config = None | |
| for config in tenant_config: | |
| if config["source_name"] == source_name: | |
| source_config = config | |
| break | |
| if not source_config: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Data source '{source_name}' not found for tenant '{tenant_id}'. Available sources: {', '.join(c['source_name'] for c in tenant_config)}" | |
| ) | |
| # Check cache first | |
| cache_key = f"schema:{tenant_id}:{source_name}" | |
| cached_schema = redis_client.get(cache_key) | |
| if cached_schema: | |
| schema_data = json.loads(cached_schema) | |
| # Record cache hit | |
| from backend.data_sources import metrics | |
| metrics.record_schema_cache_access(hit=True, tenant_id=tenant_id, source_name=source_name) | |
| return SchemaResponse( | |
| source_name=source_name, | |
| schema_data=schema_data["schema"], | |
| last_updated=datetime.fromisoformat(schema_data["last_updated"]) | |
| ) | |
| # Record cache miss | |
| from backend.data_sources import metrics | |
| metrics.record_schema_cache_access(hit=False, tenant_id=tenant_id, source_name=source_name) | |
| # Create FederationAgent to get schema | |
| agent = FederationAgent([source_config], redis_client, minio_client) | |
| # Get schema from the specific connector | |
| if source_name not in agent.connectors: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to initialize connector for source '{source_name}'" | |
| ) | |
| connector = agent.connectors[source_name] | |
| schema_str = connector.get_schema() | |
| # Cache the schema for 1 hour | |
| schema_data = { | |
| "schema": schema_str, | |
| "last_updated": datetime.utcnow().isoformat() | |
| } | |
| redis_client.setex(cache_key, 3600, json.dumps(schema_data)) | |
| return SchemaResponse( | |
| source_name=source_name, | |
| schema_data=schema_str, | |
| last_updated=datetime.utcnow() | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to get schema for {source_name}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to retrieve schema: {str(e)}" | |
| ) | |
| class InstructionsResponse(BaseModel): | |
| """Response model for source instructions.""" | |
| source_name: str = Field(..., description="Name of the data source") | |
| instructions: str = Field(..., description="SQL dialect and usage instructions for this source") | |
| connector_type: str = Field(..., description="Type of connector (e.g., 'ibis', 'custom')") | |
| async def get_source_instructions( | |
| source_name: str, | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| minio_client: Minio = Depends(get_minio_client) | |
| ): | |
| """Get SQL dialect instructions and usage guidelines for a specific data source. | |
| This endpoint returns source-specific instructions including: | |
| - SQL syntax rules (e.g., DATE_FORMAT patterns, aggregate functions) | |
| - Supported features and limitations | |
| - Connection-specific best practices | |
| These instructions are passed to the LLM agent for proper SQL generation. | |
| """ | |
| tenant_id = user.tenant_id | |
| try: | |
| # Get tenant configuration | |
| tenant_config = get_tenant_config(tenant_id, redis_client) | |
| # Explicit check: no sources configured at all | |
| if not tenant_config: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"No data sources configured for tenant '{tenant_id}'" | |
| ) | |
| # Find the requested source | |
| source_config = None | |
| for config in tenant_config: | |
| if config["source_name"] == source_name: | |
| source_config = config | |
| break | |
| if not source_config: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Data source '{source_name}' not found for tenant '{tenant_id}'. Available sources: {', '.join(c['source_name'] for c in tenant_config)}" | |
| ) | |
| # Check cache first | |
| cache_key = f"instructions:{tenant_id}:{source_name}" | |
| cached_instructions = redis_client.get(cache_key) | |
| if cached_instructions: | |
| cached_data = json.loads(cached_instructions) | |
| return InstructionsResponse( | |
| source_name=source_name, | |
| instructions=cached_data["instructions"], | |
| connector_type=cached_data.get("connector_type", "unknown") | |
| ) | |
| # Create FederationAgent to get instructions from connector | |
| agent = FederationAgent([source_config], redis_client, minio_client) | |
| # Get instructions from the connector | |
| if source_name not in agent.connectors: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to initialize connector for source '{source_name}'" | |
| ) | |
| connector = agent.connectors[source_name] | |
| instructions_str = connector.get_llm_instructions() | |
| connector_type = source_config.get("source_type", "ibis") | |
| # Cache the instructions for 24 hours | |
| instructions_data = { | |
| "instructions": instructions_str, | |
| "connector_type": connector_type | |
| } | |
| redis_client.setex(cache_key, 86400, json.dumps(instructions_data)) | |
| return InstructionsResponse( | |
| source_name=source_name, | |
| instructions=instructions_str, | |
| connector_type=connector_type | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to get instructions for {source_name}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to retrieve instructions: {str(e)}" | |
| ) | |
| async def search_schemas( | |
| request: SchemaSearchRequest, | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| minio_client: Minio = Depends(get_minio_client) | |
| ): | |
| """Search schemas by keywords across authenticated tenant's data sources. | |
| Performs intelligent keyword-based table discovery, ranking results by | |
| relevance based on table names, column names, and relationships. | |
| Authentication: Requires valid JWT token. Search limited to authenticated tenant's sources. | |
| """ | |
| try: | |
| from backend.data_sources import schema_search | |
| tenant_id = user.tenant_id | |
| # Log metadata for quality analytics | |
| if request.original_question: | |
| logger.info(f"Schema search for question: '{request.original_question[:100]}...'") | |
| if request.keyword_metadata: | |
| logger.info(f"Keyword breakdown - Base: {len(request.keyword_metadata.get('base', []))}, " | |
| f"Semantic: {len(request.keyword_metadata.get('semantic', []))}, " | |
| f"Concepts: {len(request.keyword_metadata.get('concepts', []))}") | |
| # Get tenant config | |
| tenant_config = get_tenant_config(tenant_id, redis_client) | |
| # Filter sources if specified | |
| if request.source_names: | |
| tenant_config = [ | |
| conf for conf in tenant_config | |
| if conf.get('source_name') in request.source_names | |
| ] | |
| if not tenant_config: | |
| return SchemaSearchResponse( | |
| available_sources=[], | |
| matches=[], | |
| formatted_schema_string="No data sources configured for this tenant.", | |
| total_matches=0 | |
| ) | |
| available_sources = [conf['source_name'] for conf in tenant_config] | |
| all_matches = [] | |
| formatted_parts = [] | |
| # Prepare to fetch schemas concurrently for uncached sources | |
| cached_any = False | |
| fetch_tasks = [] | |
| fetch_map = {} # source_name -> source_config | |
| for source_config in tenant_config: | |
| source_name = source_config['source_name'] | |
| cache_key = f"schema:{tenant_id}:{source_name}" | |
| cached_schema = redis_client.get(cache_key) | |
| if cached_schema: | |
| # cached value is a JSON string | |
| try: | |
| schema_data = json.loads(cached_schema) | |
| schema_str = schema_data['schema'] | |
| except Exception: | |
| schema_str = cached_schema if isinstance(cached_schema, str) else cached_schema.decode('utf-8') | |
| # Parse and index schema immediately | |
| flat_schema = schema_search.flatten_schema(schema_str) | |
| index = schema_search.build_keyword_index(flat_schema) | |
| rel_graph = schema_search.build_relationship_graph(flat_schema) | |
| ranked = schema_search.rank_tables_for_keywords( | |
| request.keywords, | |
| index, | |
| rel_graph, | |
| flat_schema=flat_schema | |
| ) | |
| if ranked: | |
| formatted = schema_search.format_search_results( | |
| ranked, | |
| flat_schema, | |
| max_tables=request.max_tables_per_source, | |
| include_sample_rows=request.include_samples | |
| ) | |
| formatted_parts.append(f"## Source: {source_name}\n{formatted['formatted_string']}") | |
| for match_meta in formatted['table_matches']: | |
| all_matches.append(SchemaMatch( | |
| table_name=match_meta['table_name'], | |
| score=match_meta['score'], | |
| matched_columns=match_meta['matched_columns'], | |
| source_name=source_name | |
| )) | |
| cached_any = True | |
| else: | |
| # will fetch this source concurrently | |
| fetch_map[source_name] = source_config | |
| async def _fetch_schema_for(source_name: str, source_config: Dict[str, Any]): | |
| """Fetch schema in a thread for a single source and return (source_name, schema_str or Exception).""" | |
| try: | |
| agent = FederationAgent([source_config], redis_client, minio_client) | |
| if source_name not in agent.connectors: | |
| return (source_name, None, f"connector_init_failed") | |
| connector = agent.connectors[source_name] | |
| schema_str = await asyncio.to_thread(connector.get_schema) | |
| return (source_name, schema_str, None) | |
| except Exception as e: | |
| return (source_name, None, str(e)) | |
| # Fire all fetches concurrently | |
| if fetch_map: | |
| tasks = [ _fetch_schema_for(name, cfg) for name, cfg in fetch_map.items() ] | |
| results = await asyncio.gather(*tasks, return_exceptions=False) | |
| # Process fetched schemas | |
| for source_name, schema_str, err in results: | |
| if err: | |
| logger.warning(f"Skipping source {source_name}: {err}") | |
| continue | |
| # Cache the fetched schema (store JSON string) | |
| cache_key = f"schema:{tenant_id}:{source_name}" | |
| try: | |
| redis_client.setex( | |
| cache_key, | |
| 3600, | |
| json.dumps({'schema': schema_str, 'last_updated': datetime.utcnow().isoformat()}) | |
| ) | |
| except Exception: | |
| # Best-effort caching, do not fail the whole search | |
| logger.debug(f"Failed to cache schema for {source_name}") | |
| # Parse and index | |
| flat_schema = schema_search.flatten_schema(schema_str) | |
| index = schema_search.build_keyword_index(flat_schema) | |
| rel_graph = schema_search.build_relationship_graph(flat_schema) | |
| ranked = schema_search.rank_tables_for_keywords( | |
| request.keywords, | |
| index, | |
| rel_graph, | |
| flat_schema=flat_schema | |
| ) | |
| if ranked: | |
| formatted = schema_search.format_search_results( | |
| ranked, | |
| flat_schema, | |
| max_tables=request.max_tables_per_source, | |
| include_sample_rows=request.include_samples | |
| ) | |
| formatted_parts.append(f"## Source: {source_name}\n{formatted['formatted_string']}") | |
| for match_meta in formatted['table_matches']: | |
| all_matches.append(SchemaMatch( | |
| table_name=match_meta['table_name'], | |
| score=match_meta['score'], | |
| matched_columns=match_meta['matched_columns'], | |
| source_name=source_name | |
| )) | |
| # Sort all matches by score | |
| all_matches.sort(key=lambda m: m.score, reverse=True) | |
| return SchemaSearchResponse( | |
| available_sources=available_sources, | |
| matches=all_matches, | |
| formatted_schema_string='\n\n'.join(formatted_parts) if formatted_parts else "No matches found for the provided keywords.", | |
| total_matches=len(all_matches), | |
| cache_hit=bool(cached_schema) | |
| ) | |
| except Exception as e: | |
| logger.exception(f"Schema search failed for tenant {tenant_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Schema search failed: {str(e)}" | |
| ) | |
| async def execute_raw_sql_endpoint( | |
| request: RawSQLRequest, | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| minio_client: Minio = Depends(get_minio_client) | |
| ): | |
| """ | |
| Executes a raw SQL query either synchronously or asynchronously. | |
| **Sync Mode (async_mode=False, default):** | |
| - Executes immediately and returns results | |
| - Subject to timeout limits (max 5 minutes) | |
| - Best for quick queries with small result sets | |
| **Async Mode (async_mode=True):** | |
| - Enqueues job and returns job_id | |
| - No timeout limits (runs until complete or fails) | |
| - Best for long-running queries or large result sets | |
| - Use /jobs/{job_id} to check status and retrieve results | |
| **Rate Limiting:** 30 requests per tenant per minute | |
| **Row Limits:** | |
| - Default: 10,000 rows | |
| - Maximum: 100,000 rows | |
| - Results exceeding max_rows are truncated | |
| **Authentication:** Requires valid JWT token | |
| """ | |
| tenant_id = user.tenant_id | |
| # Rate limiting | |
| check_rate_limit(redis_client, tenant_id, "execute_sql") | |
| # SQL length validation | |
| if len(request.sql_query) > MAX_SQL_LENGTH: | |
| raise HTTPException( | |
| status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, | |
| detail=f"SQL query exceeds maximum length of {MAX_SQL_LENGTH} characters." | |
| ) | |
| # Validate and apply max_rows | |
| max_rows = request.max_rows if request.max_rows is not None else DEFAULT_MAX_ROWS | |
| if max_rows > ABSOLUTE_MAX_ROWS: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"max_rows cannot exceed {ABSOLUTE_MAX_ROWS}" | |
| ) | |
| # Validate timeout for sync mode | |
| if not request.async_mode: | |
| timeout = request.timeout_seconds if request.timeout_seconds is not None else DEFAULT_SYNC_TIMEOUT | |
| if timeout > MAX_SYNC_TIMEOUT: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Sync mode timeout cannot exceed {MAX_SYNC_TIMEOUT} seconds. Use async_mode=True for longer queries." | |
| ) | |
| logger.info( | |
| f"Received {'async' if request.async_mode else 'sync'} raw SQL request " | |
| f"for tenant '{tenant_id}' on source '{request.source_name}'" | |
| ) | |
| # ASYNC MODE: Enqueue job and return immediately | |
| if request.async_mode: | |
| from backend.data_sources.worker import process_raw_sql_job | |
| # Generate job ID | |
| job_id = generate_job_id() | |
| # Start trace | |
| tracer = tracing.get_tracer() | |
| trace_context = None | |
| if tracer: | |
| trace_context = tracer.start_trace( | |
| operation_name="execute_raw_sql_async", | |
| span_type=SpanType.HTTP_REQUEST, | |
| tenant_id=tenant_id, | |
| source_name=request.source_name, | |
| job_id=job_id | |
| ) | |
| add_trace_metadata(mode="async", sql_length=len(request.sql_query)) | |
| try: | |
| # Store initial job metadata | |
| metadata = JobMetadata( | |
| job_id=job_id, | |
| tenant_id=tenant_id, | |
| status=JobStatus.PENDING, | |
| trace_id=trace_context.trace_id if tracer else f"sql-job-{job_id}" | |
| ) | |
| redis_client.setex( | |
| f"job:{job_id}:metadata", | |
| JOB_TTL_SECONDS, | |
| metadata.model_dump_json() | |
| ) | |
| # Enqueue Celery task | |
| process_raw_sql_job.apply_async( | |
| args=[job_id, tenant_id, request.source_name, request.sql_query, max_rows], | |
| task_id=job_id | |
| ) | |
| logger.info(f"Enqueued raw SQL job {job_id} for tenant '{tenant_id}'") | |
| add_trace_event("sql_job_enqueued", level="INFO", job_id=job_id) | |
| if tracer and trace_context: | |
| tracer.finish_span("success", job_id=job_id) | |
| return RawSQLAsyncResponse( | |
| status="accepted", | |
| job_id=job_id, | |
| tenant_id=tenant_id, | |
| message=f"SQL query job enqueued successfully. Use GET /jobs/{job_id} to check status." | |
| ) | |
| except Exception as e: | |
| logger.exception(f"Failed to enqueue raw SQL job: {e}") | |
| if tracer and trace_context: | |
| tracer.finish_span("error", str(e)) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to enqueue job: {str(e)}" | |
| ) | |
| # SYNC MODE: Execute immediately and return results | |
| tracer = tracing.get_tracer() | |
| trace_context = None | |
| if tracer: | |
| trace_context = tracer.start_trace( | |
| operation_name="execute_raw_sql_sync", | |
| span_type=SpanType.HTTP_REQUEST, | |
| tenant_id=tenant_id, | |
| source_name=request.source_name | |
| ) | |
| add_trace_metadata(mode="sync", sql_length=len(request.sql_query)) | |
| try: | |
| # 1. Get tenant config securely and find the specific source config | |
| with traced_span("get_tenant_config", SpanType.CACHE_OPERATION): | |
| tenant_config_list = get_tenant_config(tenant_id, redis_client) | |
| # Explicit check: no sources configured at all | |
| if not tenant_config_list: | |
| logger.warning(f"No data sources configured for tenant '{tenant_id}'") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"No data sources configured for tenant '{tenant_id}'. Please configure sources via /api/v1/data-sources/my-tenant/sources endpoint." | |
| ) | |
| source_config = next((conf for conf in tenant_config_list if conf.get("source_name") == request.source_name), None) | |
| if not source_config: | |
| available_sources = [c.get("source_name") for c in tenant_config_list if c.get("source_name")] | |
| logger.warning(f"Data source '{request.source_name}' not found for tenant '{tenant_id}'.") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Data source '{request.source_name}' not found for tenant '{tenant_id}'. Available sources: {', '.join(available_sources)}" | |
| ) | |
| add_trace_metadata(source_type=source_config.get("source_type", "unknown")) | |
| # 2. Instantiate FederationAgent with *only* the target source's config | |
| with traced_span("initialize_federation_agent", SpanType.BACKGROUND_JOB): | |
| agent = FederationAgent([source_config], redis_client, minio_client) | |
| if request.source_name not in agent.connectors: | |
| logger.error(f"FederationAgent failed to initialize connector for '{request.source_name}'.") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to initialize connector internally." | |
| ) | |
| # 3. Execute the raw query synchronously | |
| with traced_span("execute_raw_query", SpanType.DATABASE_QUERY): | |
| start_time = time.time() | |
| try: | |
| results = agent.execute_raw_query(request.source_name, request.sql_query) | |
| duration = time.time() - start_time | |
| duration_ms = duration * 1000 | |
| # Apply max_rows limit | |
| rows_limited = False | |
| original_row_count = len(results) | |
| if len(results) > max_rows: | |
| results = results[:max_rows] | |
| rows_limited = True | |
| logger.info(f"Results limited to {max_rows} rows (originally {original_row_count} rows)") | |
| logger.info(f"Raw SQL executed successfully in {duration:.3f}s, returned {len(results)} rows.") | |
| add_trace_metadata( | |
| query_duration_ms=duration_ms, | |
| result_rows=len(results), | |
| rows_limited=rows_limited | |
| ) | |
| add_trace_event( | |
| "raw_sql_execution_success", | |
| level="INFO", | |
| duration=f"{duration:.3f}s", | |
| rows=len(results) | |
| ) | |
| # Record SQL execution metrics | |
| from backend.data_sources import metrics | |
| metrics.record_sql_execution( | |
| tenant_id=tenant_id, | |
| async_mode=False, | |
| duration=duration, | |
| rows_returned=len(results), | |
| rows_limited=rows_limited | |
| ) | |
| # --- Specific Error Handling --- | |
| except NotImplementedError as nie: | |
| logger.warning(f"Raw SQL not supported for source '{request.source_name}'.") | |
| add_trace_event("raw_sql_not_supported", level="WARN", error=str(nie)) | |
| if tracer and trace_context: | |
| tracer.finish_span("error", f"Not Implemented: {str(nie)}") | |
| raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail=str(nie)) | |
| except ConnectionError as ce: | |
| logger.error(f"Connection failed for source '{request.source_name}': {ce}") | |
| add_trace_event("db_connection_error", level="ERROR", error=str(ce)) | |
| if tracer and trace_context: | |
| tracer.finish_span("error", f"Connection Error: {str(ce)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail=f"Database connection failed: {ce}" | |
| ) | |
| except ValueError as ve: | |
| logger.error(f"Configuration or Value error: {ve}") | |
| add_trace_event("configuration_error", level="ERROR", error=str(ve)) | |
| if tracer and trace_context: | |
| tracer.finish_span("error", f"Value Error: {str(ve)}") | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(ve)) | |
| except RuntimeError as rte: | |
| error_detail = f"Error during SQL execution: {rte}" | |
| logger.error(f"Raw SQL Execution Failed: {error_detail}", exc_info=False) | |
| add_trace_event("raw_sql_execution_error", level="ERROR", error=str(rte)) | |
| if tracer and trace_context: | |
| tracer.finish_span("error", error_detail) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_detail) | |
| except Exception as e: | |
| error_detail = f"Unexpected error during SQL execution: {e}" | |
| logger.exception(f"Raw SQL Execution Unexpected Error: {error_detail}") | |
| add_trace_event("raw_sql_unexpected_error", level="CRITICAL", error=error_detail) | |
| if tracer and trace_context: | |
| tracer.finish_span("error", error_detail) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=error_detail | |
| ) | |
| # 4. Return results successfully | |
| response_payload = RawSQLSyncResponse( | |
| status="success", | |
| results=results, | |
| rows_returned=len(results), | |
| rows_limited=rows_limited, | |
| execution_time_ms=duration_ms | |
| ) | |
| if tracer and trace_context: | |
| tracer.finish_span("success", result_rows=len(results)) | |
| return response_payload | |
| # --- Exception Handling for Outer Scope (Setup, Config) --- | |
| except HTTPException as he: | |
| logger.error(f"HTTPException during raw SQL setup for tenant '{tenant_id}': {he.detail}") | |
| try: | |
| if tracer and trace_context and not getattr(trace_context, 'is_finished', False): | |
| tracer.finish_span("error", f"Setup Error: {he.detail}") | |
| except Exception as finish_err: | |
| logger.debug(f"Could not finish trace span: {finish_err}") | |
| raise he | |
| except Exception as e: | |
| logger.exception(f"Unexpected error handling raw SQL request for tenant '{tenant_id}': {e}") | |
| error_detail = f"An unexpected server error occurred during request setup: {e}" | |
| try: | |
| if tracer and trace_context and not getattr(trace_context, 'is_finished', False): | |
| tracer.finish_span("error", error_detail) | |
| except Exception as finish_err: | |
| logger.debug(f"Could not finish trace span: {finish_err}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=error_detail | |
| ) | |
| async def delete_job( | |
| job_id: str, | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client) | |
| ): | |
| """Delete a job and set a cancellation flag. | |
| Workers should periodically check this flag and terminate gracefully. | |
| Requires authentication to prevent unauthorized job cancellation. | |
| """ | |
| try: | |
| # Check if job exists | |
| metadata_json = redis_client.get(f"job:{job_id}:metadata") | |
| if not metadata_json: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Job {job_id} not found" | |
| ) | |
| metadata = JobMetadata.model_validate_json(metadata_json) | |
| # Verify tenant ownership | |
| if metadata.tenant_id != user.tenant_id: | |
| logger.warning(f"Unauthorized attempt to cancel job {job_id} by tenant {user.tenant_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Cannot cancel jobs from other tenants" | |
| ) | |
| # Only allow cancelling pending or running jobs | |
| if metadata.status not in [JobStatus.PENDING, JobStatus.RUNNING]: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Cannot cancel job in status: {metadata.status}" | |
| ) | |
| # Set cancellation flag | |
| redis_client.setex(f"job:{job_id}:cancel", JOB_TTL_SECONDS, "true") | |
| # Update job status | |
| metadata.status = JobStatus.CANCELLED | |
| metadata.completed_at = datetime.utcnow() | |
| redis_client.setex( | |
| f"job:{job_id}:metadata", | |
| JOB_TTL_SECONDS, | |
| metadata.model_dump_json() | |
| ) | |
| logger.info(f"Job {job_id} cancelled") | |
| return {"message": f"Job {job_id} has been cancelled"} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to cancel job {job_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to cancel job: {str(e)}" | |
| ) | |
| async def list_dlq_jobs( | |
| page: int = Query(1, ge=1, description="Page number starting from 1"), | |
| page_size: int = Query(20, ge=1, le=100, description="Number of jobs per page"), | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client) | |
| ) -> DLQListResponse: | |
| """ | |
| List jobs in the Dead Letter Queue with pagination for the authenticated tenant. | |
| This endpoint retrieves failed jobs that have exhausted all retry attempts | |
| and were moved to the Dead Letter Queue for manual inspection. | |
| """ | |
| tenant_id = user.tenant_id | |
| try: | |
| # Get all DLQ job keys for this tenant | |
| pattern = f"dlq:job:*:tenant:{tenant_id}" | |
| dlq_keys = redis_client.keys(pattern) | |
| total_count = len(dlq_keys) | |
| # Calculate pagination | |
| start_idx = (page - 1) * page_size | |
| end_idx = start_idx + page_size | |
| page_keys = dlq_keys[start_idx:end_idx] | |
| jobs = [] | |
| for key in page_keys: | |
| try: | |
| dlq_data = redis_client.get(key) | |
| if dlq_data: | |
| job_data = json.loads(dlq_data) | |
| dlq_job = DLQJobResponse( | |
| job_id=job_data["job_id"], | |
| tenant_id=job_data["tenant_id"], | |
| failed_at=datetime.fromisoformat(job_data["failed_at"]), | |
| error_message=job_data["error_message"], | |
| retry_count=job_data["retry_count"], | |
| payload=job_data["payload"] | |
| ) | |
| jobs.append(dlq_job) | |
| except (json.JSONDecodeError, KeyError, ValueError) as e: | |
| logger.warning(f"Failed to parse DLQ job data from {key}: {e}") | |
| continue | |
| return DLQListResponse( | |
| jobs=jobs, | |
| total_count=total_count, | |
| page=page, | |
| page_size=page_size | |
| ) | |
| except Exception as e: | |
| logger.exception(f"Failed to list DLQ jobs: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to list DLQ jobs: {str(e)}" | |
| ) | |
| async def get_dlq_job( | |
| job_id: str, | |
| redis_client: redis.Redis = Depends(get_redis_client) | |
| ) -> DLQJobResponse: | |
| """ | |
| Get detailed information about a specific job in the Dead Letter Queue. | |
| """ | |
| try: | |
| dlq_key = f"dlq:job:{job_id}" | |
| dlq_data = redis_client.get(dlq_key) | |
| if not dlq_data: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Job {job_id} not found in Dead Letter Queue" | |
| ) | |
| job_data = json.loads(dlq_data) | |
| return DLQJobResponse( | |
| job_id=job_data["job_id"], | |
| tenant_id=job_data["tenant_id"], | |
| failed_at=datetime.fromisoformat(job_data["failed_at"]), | |
| error_message=job_data["error_message"], | |
| retry_count=job_data["retry_count"], | |
| payload=job_data["payload"] | |
| ) | |
| except HTTPException: | |
| raise | |
| except (json.JSONDecodeError, KeyError, ValueError) as e: | |
| logger.exception(f"Failed to parse DLQ job data for {job_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Invalid DLQ job data: {str(e)}" | |
| ) | |
| except Exception as e: | |
| logger.exception(f"Failed to get DLQ job {job_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to get DLQ job: {str(e)}" | |
| ) | |
| async def retry_dlq_job( | |
| job_id: str, | |
| redis_client: redis.Redis = Depends(get_redis_client) | |
| ) -> Dict[str, str]: | |
| """ | |
| Retry a job from the Dead Letter Queue. | |
| This moves the job from DLQ back to the main processing queue | |
| with reset retry count. | |
| """ | |
| try: | |
| dlq_key = f"dlq:job:{job_id}" | |
| dlq_data = redis_client.get(dlq_key) | |
| if not dlq_data: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Job {job_id} not found in Dead Letter Queue" | |
| ) | |
| job_data = json.loads(dlq_data) | |
| # Recreate job metadata with pending status | |
| metadata = JobMetadata( | |
| job_id=job_id, | |
| tenant_id=job_data["tenant_id"], | |
| status=JobStatus.PENDING, | |
| created_at=datetime.utcnow() | |
| ) | |
| # Store job metadata | |
| redis_client.setex( | |
| f"job:{job_id}:metadata", | |
| JOB_TTL_SECONDS, | |
| metadata.model_dump_json() | |
| ) | |
| # Store job payload | |
| redis_client.setex( | |
| f"job:{job_id}:payload", | |
| JOB_TTL_SECONDS, | |
| json.dumps(job_data["payload"]) | |
| ) | |
| # Re-enqueue the job to Celery | |
| from worker import process_federated_job | |
| process_federated_job.delay( | |
| job_id=job_id, | |
| job_payload=job_data["payload"], | |
| tenant_id=job_data["tenant_id"] | |
| ) | |
| # Remove from DLQ | |
| redis_client.delete(dlq_key) | |
| logger.info(f"Retried DLQ job {job_id}") | |
| return {"message": f"Job {job_id} has been retried from DLQ"} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to retry DLQ job {job_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to retry DLQ job: {str(e)}" | |
| ) | |
| async def delete_dlq_job( | |
| job_id: str, | |
| redis_client: redis.Redis = Depends(get_redis_client) | |
| ) -> Dict[str, str]: | |
| """ | |
| Permanently delete a job from the Dead Letter Queue. | |
| This action cannot be undone. | |
| """ | |
| try: | |
| dlq_key = f"dlq:job:{job_id}" | |
| if not redis_client.exists(dlq_key): | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Job {job_id} not found in Dead Letter Queue" | |
| ) | |
| redis_client.delete(dlq_key) | |
| logger.info(f"Deleted DLQ job {job_id}") | |
| return {"message": f"Job {job_id} has been permanently deleted from DLQ"} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to delete DLQ job {job_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to delete DLQ job: {str(e)}" | |
| ) | |
| async def clear_dlq( | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| ) -> Dict[str, str]: | |
| """ | |
| Clear all jobs from the Dead Letter Queue for the authenticated tenant. | |
| This action cannot be undone and is strictly scoped to the tenant specified | |
| in the JWT of the authenticated user. | |
| """ | |
| try: | |
| tenant_id = user.tenant_id | |
| pattern = f"dlq:job:*:tenant:{tenant_id}" | |
| dlq_keys = redis_client.keys(pattern) | |
| if dlq_keys: | |
| deleted_count = redis_client.delete(*dlq_keys) | |
| logger.info(f"Cleared {deleted_count} jobs from DLQ for tenant {tenant_id}") | |
| return { | |
| "message": f"Cleared {deleted_count} jobs from DLQ for tenant {tenant_id}" | |
| } | |
| else: | |
| return {"message": "No jobs found in DLQ"} | |
| except Exception as e: | |
| logger.exception(f"Failed to clear DLQ: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to clear DLQ: {str(e)}" | |
| ) | |
| async def cancel_job( | |
| job_id: str, | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| ) -> Dict[str, str]: | |
| """ | |
| Cancel a running or pending job. | |
| Args: | |
| job_id: The ID of the job to cancel | |
| Returns: | |
| Dict containing cancellation status and message | |
| Raises: | |
| HTTPException: If job not found or cancellation fails | |
| """ | |
| try: | |
| # Check if job exists | |
| metadata_key = f"job:{job_id}:metadata" | |
| metadata_json = redis_client.get(metadata_key) | |
| if not metadata_json: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Job {job_id} not found" | |
| ) | |
| # Parse job metadata | |
| metadata = JobMetadata.model_validate_json(metadata_json) | |
| # Check if job can be cancelled | |
| if metadata.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: | |
| return { | |
| "status": "already_finished", | |
| "message": f"Job {job_id} is already {metadata.status.value} and cannot be cancelled" | |
| } | |
| # Set cancellation flag | |
| cancel_key = f"job:{job_id}:cancel" | |
| redis_client.set(cancel_key, "true", ex=3600) # Expire after 1 hour | |
| # Update job status to cancelled if it's not running | |
| if metadata.status == JobStatus.PENDING: | |
| metadata.status = JobStatus.CANCELLED | |
| redis_client.set(metadata_key, metadata.model_dump_json()) | |
| logger.info(f"Job {job_id} cancelled (was pending)") | |
| return { | |
| "status": "cancelled", | |
| "message": f"Job {job_id} has been cancelled" | |
| } | |
| else: | |
| # Job is running, worker will pick up the cancellation flag | |
| logger.info(f"Cancellation requested for running job {job_id}") | |
| return { | |
| "status": "cancellation_requested", | |
| "message": f"Cancellation requested for job {job_id}. The job will be cancelled when the worker checks the flag." | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to cancel job {job_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to cancel job: {str(e)}" | |
| ) | |
| # =============================== | |
| # SIMPLE METRICS ENDPOINT | |
| # =============================== | |
| async def get_metrics() -> Dict[str, Any]: | |
| """Get simple job and performance metrics without heavy observability stack. | |
| Returns basic metrics stored in Redis: | |
| - Job counts by status (completed, failed, pending, running) | |
| - Success/failure rates | |
| - Average job duration | |
| - Current database connections | |
| - Recent hourly job rates | |
| """ | |
| try: | |
| from backend.data_sources import metrics | |
| metrics_summary = metrics.get_metrics_summary() | |
| # Add system metadata | |
| metrics_summary["system"] = { | |
| "metrics_version": "simple_redis_v1", | |
| "collection_method": "redis_based", | |
| "description": "Lightweight metrics without Prometheus/OpenTelemetry overhead" | |
| } | |
| return metrics_summary | |
| except Exception as e: | |
| logger.exception(f"Failed to get metrics: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to get metrics: {str(e)}" | |
| ) | |
| async def cleanup_old_metrics(days_to_keep: int = Query(7, description="Days of metrics data to keep")) -> Dict[str, Any]: | |
| """Clean up old metrics data to prevent Redis bloat.""" | |
| try: | |
| from backend.data_sources import metrics | |
| collector = metrics.get_metrics_collector() | |
| if not collector: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Metrics collector not initialized" | |
| ) | |
| deleted_count = collector.cleanup_old_metrics(days_to_keep) | |
| return { | |
| "status": "success", | |
| "deleted_records": deleted_count, | |
| "days_kept": days_to_keep, | |
| "message": f"Cleaned up {deleted_count} old metric records, keeping {days_to_keep} days of data" | |
| } | |
| except Exception as e: | |
| logger.exception(f"Failed to cleanup metrics: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to cleanup metrics: {str(e)}" | |
| ) | |
| # =============================== | |
| # PLAN CACHING ENDPOINTS | |
| # =============================== | |
| async def get_cache_statistics( | |
| redis_client: redis.Redis = Depends(get_redis_client) | |
| ): | |
| """Get comprehensive plan cache statistics and performance metrics. | |
| Returns cache hit rates, cost savings, memory usage, and other optimization metrics. | |
| """ | |
| try: | |
| cache = get_plan_cache() | |
| if not cache: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Plan cache not available" | |
| ) | |
| stats = cache.get_cache_stats() | |
| # Build response | |
| return CacheStatsResponse( | |
| total_cached_plans=stats["redis_stats"]["total_cached_plans"], | |
| cache_hit_rate=stats["cache_metrics"]["hit_rate"], | |
| total_cost_savings=stats["cost_optimization"]["estimated_savings_usd"], | |
| memory_usage_mb=stats["redis_stats"]["estimated_memory_mb"], | |
| top_tenants=[] # Could be enhanced to show top tenants | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to get cache statistics: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to get cache statistics: {str(e)}" | |
| ) | |
| async def invalidate_tenant_cache( | |
| tenant_id: str, | |
| redis_client: redis.Redis = Depends(get_redis_client) | |
| ): | |
| """Invalidate all cached plans for a specific tenant. | |
| Useful when tenant configuration changes or manual cache clearing is needed. | |
| """ | |
| try: | |
| cache = get_plan_cache() | |
| if not cache: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Plan cache not available" | |
| ) | |
| invalidated_count = cache.invalidate_tenant_cache(tenant_id) | |
| logger.info(f"Invalidated {invalidated_count} cached plans for tenant {tenant_id}") | |
| return { | |
| "status": "success", | |
| "message": f"Invalidated {invalidated_count} cached plans for tenant {tenant_id}", | |
| "invalidated_count": invalidated_count | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to invalidate cache for tenant {tenant_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to invalidate cache: {str(e)}" | |
| ) | |
| async def get_schema_incremental( | |
| source_name: str, | |
| cached_hash: Optional[str] = Query(None, description="Hash of cached schema for incremental updates"), | |
| user: AuthUser = Depends(get_current_user), | |
| redis_client: redis.Redis = Depends(get_redis_client), | |
| minio_client: Minio = Depends(get_minio_client) | |
| ) -> Dict[str, Any]: | |
| """Get schema with incremental update support for optimization. | |
| This endpoint can return either a full schema or just the changes since | |
| the last cached version, significantly improving performance for large schemas. | |
| """ | |
| tenant_id = user.tenant_id | |
| try: | |
| # Get tenant configuration with proper redis_client parameter | |
| tenant_config = get_tenant_config(tenant_id, redis_client) | |
| # Return 404 if tenant has no sources configured | |
| if not tenant_config: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"No sources configured for tenant '{tenant_id}'" | |
| ) | |
| # Find the source configuration | |
| source_config = None | |
| for config in tenant_config: | |
| if config.get("source_name") == source_name: | |
| source_config = config | |
| break | |
| if not source_config: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Source '{source_name}' not found for tenant '{tenant_id}'" | |
| ) | |
| # Create agent and get connector | |
| agent = FederationAgent(tenant_config, redis_client, minio_client) | |
| source_type = source_config.get("type", "ibis") | |
| # Get the connector (package-relative import) | |
| from .connectors.ibis_connector import IbisConnector | |
| if source_type == "ibis": | |
| connector = IbisConnector(source_config, redis_client, minio_client) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Incremental schema updates not yet supported for source type: {source_type}" | |
| ) | |
| # Get incremental schema | |
| connector.connect() | |
| try: | |
| schema_result = connector.get_schema_incremental(cached_hash) | |
| # Add metadata | |
| schema_result.update({ | |
| "source_name": source_name, | |
| "tenant_id": tenant_id, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "incremental_supported": True | |
| }) | |
| return schema_result | |
| finally: | |
| connector.disconnect() | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Failed to get incremental schema for {source_name}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to get incremental schema: {str(e)}" | |
| ) | |