Spaces:
Running
Running
| """Celery worker for processing federated data query jobs. | |
| This module implements both a simple Worker class for testing and a full | |
| Celery-based distributed worker system for production use with proper | |
| error handling, network resilience, and observability. | |
| """ | |
| import orjson as json | |
| import time | |
| import logging | |
| import traceback | |
| import sys | |
| import os | |
| import io | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime, timezone | |
| import redis | |
| from minio import Minio | |
| from celery import Celery | |
| from celery.exceptions import Retry, WorkerLostError | |
| from celery.signals import worker_ready, worker_shutdown, worker_process_init | |
| from kombu.exceptions import OperationalError as KombuOperationalError | |
| from redis.exceptions import ConnectionError as RedisConnectionError, TimeoutError as RedisTimeoutError | |
| # Add backend directory to path for proper imports | |
| backend_path = os.path.join(os.path.dirname(__file__), "..") | |
| if backend_path not in sys.path: | |
| sys.path.insert(0, backend_path) | |
| # Import centralized storage configuration | |
| from backend.core.minio.config import get_redis_config, get_minio_config | |
| from backend.data_sources.federation_agent import FederationAgent | |
| from backend.data_sources.jobs import JobStatus, JobMetadata | |
| from backend.data_sources import metrics | |
| from backend.data_sources import tracing | |
| from backend.data_sources.tracing import traced_span, SpanType, add_trace_event, add_trace_metadata | |
| # Configure logging for workers | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - [Worker:%(process)d] - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Globals initialized once per worker process by Celery signal | |
| _redis_pool = None | |
| _minio_client: Optional[Minio] = None | |
| def init_worker_process(**kwargs): | |
| """Initialize shared Redis connection pool and MinIO client once per worker process.""" | |
| global _redis_pool, _minio_client | |
| try: | |
| logger.info("Initializing worker connections (Redis pool + MinIO client)") | |
| # Use the existing helper to build a redis URL | |
| _redis_pool = redis.ConnectionPool.from_url(get_redis_url(0)) | |
| # MinIO config returns dict with endpoint/access_key/secret_key/secure | |
| mconf = get_minio_config() | |
| # Minio constructor expects endpoint (host:port), access_key, secret_key, secure | |
| _minio_client = Minio( | |
| endpoint=mconf.get('endpoint'), | |
| access_key=mconf.get('access_key') or mconf.get('access_key'), | |
| secret_key=mconf.get('secret_key') or mconf.get('secret_key'), | |
| secure=mconf.get('secure', False) | |
| ) | |
| logger.info("Worker connections initialized.") | |
| except Exception as e: | |
| logger.exception(f"Failed initializing worker connections: {e}") | |
| # Build Redis URL from centralized configuration | |
| def get_redis_url(db: int = 0) -> str: | |
| """Build Redis URL from centralized configuration""" | |
| redis_config = get_redis_config() | |
| host = redis_config['host'] | |
| port = redis_config['port'] | |
| password = redis_config.get('password') | |
| if password: | |
| return f"redis://:{password}@{host}:{port}/{db}" | |
| else: | |
| return f"redis://{host}:{port}/{db}" | |
| # Create Celery app with Redis as broker and backend | |
| redis_url = get_redis_url(0) | |
| celery_app = Celery( | |
| 'data_sources_worker', | |
| broker=redis_url, | |
| backend=redis_url | |
| ) | |
| # Celery configuration for production resilience | |
| celery_app.conf.update( | |
| # Task routing and execution | |
| task_serializer='json', | |
| accept_content=['json'], | |
| result_serializer='json', | |
| timezone='UTC', | |
| enable_utc=True, | |
| # Worker configuration | |
| worker_prefetch_multiplier=1, # Process one job at a time for memory efficiency | |
| task_acks_late=True, # Acknowledge task only after completion | |
| worker_max_tasks_per_child=100, # Restart worker after 100 tasks to prevent memory leaks | |
| # Network resilience | |
| broker_connection_retry_on_startup=True, | |
| broker_connection_retry=True, | |
| broker_connection_max_retries=10, | |
| # Result backend configuration | |
| result_backend_transport_options={ | |
| 'retry_on_timeout': True, | |
| 'retry_on_error': [RedisConnectionError, RedisTimeoutError], | |
| 'max_retries': 3, | |
| }, | |
| # Task time limits | |
| task_soft_time_limit=300, # 5 minutes soft limit | |
| task_time_limit=600, # 10 minutes hard limit | |
| # Dead Letter Queue configuration | |
| task_reject_on_worker_lost=True, | |
| ) | |
| # Constants for job management | |
| MAX_RESULT_SIZE_BYTES = 10 * 1024 * 1024 # 10MB | |
| RESULT_TTL_SECONDS = 3600 # 1 hour | |
| class NetworkError(Exception): | |
| """Base class for network-related errors that should trigger retries.""" | |
| pass | |
| class TransientError(Exception): | |
| """Base class for transient errors that should be retried.""" | |
| pass | |
| class PermanentError(Exception): | |
| """Base class for permanent errors that should not be retried.""" | |
| pass | |
| # NOTE: network retry/backoff was intentionally removed in favor of task-level | |
| # retries using Celery's `self.retry(...)`. Helper functions below will raise | |
| # exceptions directly so the task can decide how to retry. | |
| # Simple Worker class for testing (existing functionality) | |
| class Worker: | |
| """Simple worker class for testing and basic job processing.""" | |
| def __init__(self, tenant_config, redis_client=None, minio_client=None): | |
| self.agent = FederationAgent(tenant_config, redis_client=redis_client, minio_client=minio_client) | |
| def process_job(self, job_payload: Dict[str, Any]) -> Dict[str, Any]: | |
| """Process a job payload which contains a 'plan' key (list of steps). | |
| Returns the step-wise results. | |
| """ | |
| plan = job_payload.get("plan") | |
| if plan is None: | |
| raise ValueError("job payload must contain 'plan'") | |
| logger.info("Processing job with %d steps", len(plan)) | |
| return self.agent.execute_federated_plan(plan) | |
| # Celery task for distributed processing | |
| def process_federated_job(self, job_id: str, job_payload: Dict[str, Any], tenant_id: str): | |
| """ | |
| Main Celery task for processing federated query jobs. | |
| This task handles: | |
| - Loading tenant configuration | |
| - Executing federated query plans | |
| - Storing results in Redis/MinIO | |
| - Network error handling with retries | |
| - Job cancellation checks | |
| Args: | |
| job_id: Unique identifier for the job | |
| job_payload: Validated job payload with query plan | |
| tenant_id: Tenant identifier for multi-tenancy | |
| """ | |
| # Start enhanced trace for the job | |
| tracer = tracing.get_tracer() | |
| if tracer: | |
| trace_context = tracer.start_trace( | |
| operation_name=f"process_federated_job", | |
| span_type=SpanType.BACKGROUND_JOB, | |
| tenant_id=tenant_id, | |
| job_id=job_id | |
| ) | |
| trace_id = trace_context.trace_id | |
| else: | |
| # Fallback to legacy trace_id | |
| trace_id = f"job-{job_id}-{int(time.time())}" | |
| logger.info(f"Starting job {job_id} for tenant {tenant_id}") | |
| redis_client = None | |
| minio_client = None | |
| try: | |
| add_trace_event("job_processing_started", level="INFO", job_id=job_id, tenant_id=tenant_id) | |
| # Initialize clients from process-global pool/client | |
| with traced_span("initialize_clients", SpanType.EXTERNAL_API): | |
| if _redis_pool is None: | |
| raise NetworkError("Redis pool not initialized in worker process") | |
| redis_client = redis.Redis(connection_pool=_redis_pool) | |
| minio_client = _minio_client | |
| # Check for cancellation flag | |
| with traced_span("check_cancellation", SpanType.CACHE_OPERATION): | |
| if check_job_cancelled(job_id, redis_client): | |
| logger.info(f"Job {job_id} was cancelled") | |
| update_job_status(job_id, JobStatus.CANCELLED, redis_client, trace_id) | |
| add_trace_event("job_cancelled", level="WARN") | |
| if tracer: | |
| tracer.finish_span("cancelled") | |
| return {"status": "cancelled", "message": "Job was cancelled"} | |
| # Update job status to running | |
| update_job_status(job_id, JobStatus.RUNNING, redis_client, trace_id) | |
| # Record job start for metrics | |
| metrics.record_job_start(job_id, tenant_id) | |
| # Load tenant configuration | |
| with traced_span("load_tenant_config", SpanType.CACHE_OPERATION): | |
| tenant_config = get_tenant_config_with_retry(tenant_id, redis_client) | |
| if not tenant_config: | |
| raise PermanentError(f"No configuration found for tenant {tenant_id}") | |
| add_trace_metadata(sources_count=len(tenant_config)) | |
| # Initialize federation agent | |
| with traced_span("initialize_federation_agent", SpanType.BACKGROUND_JOB): | |
| agent = FederationAgent(tenant_config, redis_client, minio_client) | |
| add_trace_metadata(agent_initialized=True) | |
| # Execute the federated plan with periodic cancellation checks | |
| with traced_span("execute_federated_plan", SpanType.DATABASE_QUERY): | |
| add_trace_metadata(plan_steps=len(job_payload["plan"])) | |
| results = execute_plan_with_cancellation_check( | |
| agent, job_payload["plan"], job_id, redis_client, trace_id | |
| ) | |
| add_trace_event("plan_execution_completed", level="INFO", | |
| result_count=len(results) if isinstance(results, list) else 1) | |
| # Store results with size-based routing | |
| with traced_span("store_results", SpanType.FILE_OPERATION): | |
| result_location = store_results_with_fallback( | |
| job_id, results, redis_client, minio_client, trace_id | |
| ) | |
| add_trace_metadata(result_location=result_location) | |
| # Update job metadata with completion | |
| completion_metadata = { | |
| "status": JobStatus.COMPLETED, | |
| "completed_at": datetime.now(timezone.utc).isoformat(), | |
| "result_location": result_location, | |
| "trace_id": trace_id | |
| } | |
| update_job_metadata(job_id, completion_metadata, redis_client) | |
| add_trace_event("job_completed_successfully", level="INFO", result_location=result_location) | |
| logger.info(f"Job {job_id} completed successfully") | |
| if tracer: | |
| tracer.finish_span("success", result_location=result_location) | |
| return {"status": "completed", "result_location": result_location} | |
| except PermanentError as e: | |
| # Permanent errors should not be retried | |
| logger.error(f"Permanent error in job {job_id}: {e}") | |
| add_trace_event("permanent_error_occurred", level="ERROR", error=str(e)) | |
| update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e)) | |
| send_to_dlq(job_id, job_payload, str(e), "permanent_error", redis_client) | |
| if tracer: | |
| tracer.finish_span("error", str(e)) | |
| return {"status": "failed", "error": str(e)} | |
| except (NetworkError, TransientError, RedisConnectionError, RedisTimeoutError) as e: | |
| # Transient errors should be retried | |
| retry_count = self.request.retries + 1 | |
| logger.warning(f"Transient error in job {job_id}, attempt {retry_count}: {e}") | |
| add_trace_event("transient_error_occurred", level="WARN", | |
| error=str(e), retry_count=retry_count, max_retries=self.max_retries) | |
| if self.request.retries >= self.max_retries: | |
| logger.error(f"Job {job_id} failed after {self.max_retries} retries") | |
| update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e)) | |
| send_to_dlq(job_id, job_payload, str(e), "max_retries_exceeded", redis_client) | |
| if tracer: | |
| tracer.finish_span("error", f"Max retries exceeded: {str(e)}") | |
| return {"status": "failed", "error": str(e)} | |
| # Exponential backoff: 60s, 120s, 240s | |
| retry_delay = 60 * (2 ** self.request.retries) | |
| logger.info(f"Retrying job {job_id} in {retry_delay} seconds") | |
| add_trace_event("job_retry_scheduled", level="INFO", | |
| retry_delay=retry_delay, retry_count=retry_count) | |
| if tracer: | |
| tracer.add_metadata(retry_delay=retry_delay, will_retry=True) | |
| tracer.finish_span("error", f"Transient error, will retry: {str(e)}") | |
| raise self.retry(countdown=retry_delay, exc=e) | |
| except Exception as e: | |
| # Unexpected errors | |
| logger.exception(f"Unexpected error in job {job_id}: {e}") | |
| add_trace_event("unexpected_error_occurred", level="ERROR", error=str(e)) | |
| update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e)) | |
| if tracer: | |
| tracer.finish_span("error", f"Unexpected error: {str(e)}") | |
| return {"status": "failed", "error": f"Unexpected error: {e}"} | |
| send_to_dlq(job_id, job_payload, str(e), "unexpected_error", redis_client) | |
| return {"status": "failed", "error": str(e)} | |
| # Helper functions with network error handling | |
| def get_redis_client_with_retry(): | |
| """Get Redis client with network error handling. | |
| Try multiple import paths to be resilient to different PYTHONPATH/package layouts | |
| (running as module, running from backend dir, or running from data_sources dir). | |
| """ | |
| import importlib | |
| candidates = [ | |
| 'backend.data_sources.api', | |
| 'data_sources.api', | |
| 'api', | |
| ] | |
| last_err = None | |
| for mod in candidates: | |
| try: | |
| m = importlib.import_module(mod) | |
| if hasattr(m, 'get_redis_client'): | |
| return m.get_redis_client() | |
| except Exception as e: | |
| last_err = e | |
| continue | |
| # If none succeeded, raise a clear error | |
| raise ImportError(f"Could not import get_redis_client from any candidate modules: {candidates}. Last error: {last_err}") | |
| def get_minio_client_with_retry(): | |
| """Get MinIO client with network error handling. | |
| Try multiple import paths to be resilient to different PYTHONPATH/package layouts. | |
| """ | |
| import importlib | |
| candidates = [ | |
| 'backend.data_sources.api', | |
| 'data_sources.api', | |
| 'api', | |
| ] | |
| last_err = None | |
| for mod in candidates: | |
| try: | |
| m = importlib.import_module(mod) | |
| if hasattr(m, 'get_minio_client'): | |
| return m.get_minio_client() | |
| except Exception as e: | |
| last_err = e | |
| continue | |
| raise ImportError(f"Could not import get_minio_client from any candidate modules: {candidates}. Last error: {last_err}") | |
| def get_tenant_config_with_retry(tenant_id: str, redis_client): | |
| """Get tenant configuration with network error handling. | |
| Try multiple import paths to be resilient to different PYTHONPATH/package layouts. | |
| """ | |
| import importlib | |
| candidates = [ | |
| 'backend.data_sources.api', | |
| 'data_sources.api', | |
| 'api', | |
| ] | |
| last_err = None | |
| for mod in candidates: | |
| try: | |
| m = importlib.import_module(mod) | |
| if hasattr(m, 'get_tenant_config'): | |
| return m.get_tenant_config(tenant_id, redis_client) | |
| except Exception as e: | |
| last_err = e | |
| continue | |
| raise ImportError(f"Could not import get_tenant_config from any candidate modules: {candidates}. Last error: {last_err}") | |
| def check_job_cancelled(job_id: str, redis_client) -> bool: | |
| """Check if job has been cancelled via Redis flag.""" | |
| try: | |
| cancel_flag = redis_client.get(f"job:{job_id}:cancel") | |
| # Redis may return bytes or str depending on client config | |
| if cancel_flag is None: | |
| return False | |
| if isinstance(cancel_flag, (bytes, bytearray)): | |
| return cancel_flag == b"true" | |
| return str(cancel_flag).lower() == "true" | |
| except (RedisConnectionError, RedisTimeoutError) as e: | |
| logger.warning(f"Could not check cancellation flag for job {job_id}: {e}") | |
| return False # Assume not cancelled if we can't check | |
| def execute_plan_with_cancellation_check(agent, plan: List[Dict], job_id: str, redis_client, trace_id: str): | |
| """Execute federated plan with periodic cancellation checks.""" | |
| logger.info(f"[{trace_id}] Executing plan with {len(plan)} steps") | |
| for i, step in enumerate(plan): | |
| # Check for cancellation before each step | |
| if check_job_cancelled(job_id, redis_client): | |
| logger.info(f"[{trace_id}] Job {job_id} cancelled during step {i + 1}") | |
| raise TransientError("Job was cancelled during execution") | |
| logger.debug(f"[{trace_id}] Executing step {i + 1}: {step.get('source', 'unknown')}") | |
| # Execute the full plan via federation agent | |
| try: | |
| return agent.execute_federated_plan(plan) | |
| except Exception as e: | |
| logger.error(f"[{trace_id}] Federation agent error: {e}") | |
| # Check if it's a network/connection issue | |
| if any(term in str(e).lower() for term in ['connection', 'network', 'timeout', 'unreachable']): | |
| raise NetworkError(f"Network error during plan execution: {e}") | |
| else: | |
| raise PermanentError(f"Plan execution failed: {e}") | |
| def store_results_with_fallback(job_id: str, results: Any, redis_client, minio_client, trace_id: str) -> str: | |
| """Store results in Redis or MinIO based on size with fallback handling.""" | |
| try: | |
| # Serialize results to estimate size. `orjson.dumps` returns bytes. | |
| results_bytes = json.dumps(results) | |
| if isinstance(results_bytes, str): | |
| results_bytes = results_bytes.encode('utf-8') | |
| size_bytes = len(results_bytes) | |
| # Use Redis for small results (< 1MB), MinIO for large results | |
| if size_bytes < 1024 * 1024: # 1MB threshold | |
| redis_key = f"job:{job_id}:results" | |
| # Redis client is configured with decode_responses; store a str | |
| redis_client.setex(redis_key, 86400, results_bytes.decode('utf-8')) # 24 hour TTL | |
| logger.info(f"[{trace_id}] Stored {size_bytes} bytes in Redis for job {job_id}") | |
| return f"redis://{redis_key}" | |
| else: | |
| # Store in MinIO for large results | |
| minio_path = f"job-results/{job_id}/results.json" | |
| minio_client.put_object( | |
| bucket_name="job-results", | |
| object_name=minio_path, | |
| data=io.BytesIO(results_bytes), | |
| length=size_bytes, | |
| content_type="application/json" | |
| ) | |
| logger.info(f"[{trace_id}] Stored {size_bytes} bytes in MinIO for job {job_id}") | |
| return f"minio://{minio_path}" | |
| except (RedisConnectionError, RedisTimeoutError) as e: | |
| logger.warning(f"[{trace_id}] Redis storage failed, falling back to MinIO: {e}") | |
| # Fallback to MinIO if Redis fails | |
| minio_path = f"job-results/{job_id}/results.json" | |
| fallback_bytes = json.dumps(results) | |
| if isinstance(fallback_bytes, str): | |
| fallback_bytes = fallback_bytes.encode('utf-8') | |
| minio_client.put_object( | |
| bucket_name="job-results", | |
| object_name=minio_path, | |
| data=io.BytesIO(fallback_bytes), | |
| length=len(fallback_bytes), | |
| content_type="application/json" | |
| ) | |
| return f"minio://{minio_path}" | |
| except Exception as e: | |
| logger.error(f"[{trace_id}] Failed to store results for job {job_id}: {e}") | |
| raise TransientError(f"Result storage failed: {e}") | |
| def update_job_status(job_id: str, status: JobStatus, redis_client, trace_id: str, error_msg: Optional[str] = None): | |
| """Update job status in Redis with error handling.""" | |
| try: | |
| update_data = { | |
| "status": status.value, | |
| "updated_at": datetime.now(timezone.utc).isoformat(), | |
| "trace_id": trace_id | |
| } | |
| if error_msg: | |
| update_data["error_message"] = error_msg | |
| update_job_metadata(job_id, update_data, redis_client) | |
| logger.info(f"[{trace_id}] Updated job {job_id} status to {status.value}") | |
| # Record job completion metrics for final statuses | |
| if status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: | |
| tenant_id = update_data.get("tenant_id", "unknown") # Try to get tenant_id from job metadata | |
| metrics.record_job_completion(job_id, tenant_id, status.value.lower(), error_msg) | |
| except Exception as e: | |
| logger.error(f"[{trace_id}] Failed to update job status for {job_id}: {e}") | |
| def update_job_metadata(job_id: str, updates: Dict[str, Any], redis_client): | |
| """Update job metadata in Redis.""" | |
| metadata_key = f"job:{job_id}:metadata" | |
| # Get existing metadata | |
| existing = redis_client.get(metadata_key) | |
| if existing: | |
| # orjson.loads expects bytes; accept str or bytes | |
| if isinstance(existing, (bytes, bytearray)): | |
| metadata = json.loads(existing) | |
| else: | |
| metadata = json.loads(existing.encode('utf-8')) | |
| else: | |
| metadata = {"job_id": job_id} | |
| # Apply updates | |
| metadata.update(updates) | |
| # Store back to Redis with TTL (store as str) | |
| dumped = json.dumps(metadata) | |
| if isinstance(dumped, (bytes, bytearray)): | |
| dumped = dumped.decode('utf-8') | |
| redis_client.setex(metadata_key, 86400, dumped) | |
| def send_to_dlq(job_id: str, job_payload: Dict[str, Any], error: str, failure_type: str, redis_client): | |
| """Send failed job to Dead Letter Queue.""" | |
| try: | |
| dlq_entry = { | |
| "job_id": job_id, | |
| "payload": job_payload, | |
| "error": error, | |
| "failure_type": failure_type, | |
| "failed_at": datetime.now(timezone.utc).isoformat(), | |
| "retry_count": 3 # Max retries exceeded | |
| } | |
| dlq_blob = json.dumps(dlq_entry) | |
| if isinstance(dlq_blob, (bytes, bytearray)): | |
| dlq_blob = dlq_blob.decode('utf-8') | |
| redis_client.lpush("dlq:failed_jobs", dlq_blob) | |
| redis_client.expire("dlq:failed_jobs", 86400 * 7) # Keep DLQ for 7 days | |
| logger.info(f"Sent job {job_id} to DLQ with failure type: {failure_type}") | |
| except Exception as e: | |
| logger.error(f"Failed to send job {job_id} to DLQ: {e}") | |
| def process_raw_sql_job(self, job_id: str, tenant_id: str, source_name: str, sql_query: str, max_rows: Optional[int] = None): | |
| """ | |
| Celery task for processing raw SQL queries asynchronously. | |
| This task handles: | |
| - Loading tenant configuration for specific source | |
| - Executing raw SQL with timeout | |
| - Applying max_rows limit | |
| - Storing results in Redis/MinIO | |
| - Network error handling with retries | |
| - Job cancellation checks | |
| Args: | |
| job_id: Unique identifier for the job | |
| tenant_id: Tenant identifier | |
| source_name: Target data source name | |
| sql_query: Raw SQL query to execute | |
| max_rows: Optional maximum rows to return | |
| """ | |
| # Start enhanced trace for the job | |
| tracer = tracing.get_tracer() | |
| if tracer: | |
| trace_context = tracer.start_trace( | |
| operation_name=f"process_raw_sql_job", | |
| span_type=SpanType.BACKGROUND_JOB, | |
| tenant_id=tenant_id, | |
| job_id=job_id, | |
| source_name=source_name | |
| ) | |
| trace_id = trace_context.trace_id | |
| else: | |
| trace_id = f"sql-job-{job_id}-{int(time.time())}" | |
| logger.info(f"Starting raw SQL job {job_id} for tenant {tenant_id} on source {source_name}") | |
| redis_client = None | |
| minio_client = None | |
| try: | |
| add_trace_event("sql_job_processing_started", level="INFO", job_id=job_id, tenant_id=tenant_id, source_name=source_name) | |
| # Initialize clients from process-global pool/client | |
| with traced_span("initialize_clients", SpanType.EXTERNAL_API): | |
| if _redis_pool is None: | |
| raise NetworkError("Redis pool not initialized in worker process") | |
| redis_client = redis.Redis(connection_pool=_redis_pool) | |
| minio_client = _minio_client | |
| # Check for cancellation flag | |
| with traced_span("check_cancellation", SpanType.CACHE_OPERATION): | |
| if check_job_cancelled(job_id, redis_client): | |
| logger.info(f"SQL job {job_id} was cancelled") | |
| update_job_status(job_id, JobStatus.CANCELLED, redis_client, trace_id) | |
| add_trace_event("sql_job_cancelled", level="WARN") | |
| if tracer: | |
| tracer.finish_span("cancelled") | |
| return {"status": "cancelled", "message": "Job was cancelled"} | |
| # Update job status to running | |
| update_job_status(job_id, JobStatus.RUNNING, redis_client, trace_id) | |
| # Record job start for metrics | |
| metrics.record_job_start(job_id, tenant_id) | |
| # Load tenant configuration | |
| with traced_span("load_tenant_config", SpanType.CACHE_OPERATION): | |
| tenant_config = get_tenant_config_with_retry(tenant_id, redis_client) | |
| if not tenant_config: | |
| raise PermanentError(f"No configuration found for tenant {tenant_id}") | |
| # Find specific source config | |
| source_config = next((conf for conf in tenant_config if conf.get("source_name") == source_name), None) | |
| if not source_config: | |
| raise PermanentError(f"Data source '{source_name}' not found for tenant '{tenant_id}'") | |
| add_trace_metadata(source_type=source_config.get("source_type", "unknown")) | |
| # Initialize federation agent with only target source | |
| with traced_span("initialize_federation_agent", SpanType.BACKGROUND_JOB): | |
| agent = FederationAgent([source_config], redis_client, minio_client) | |
| if source_name not in agent.connectors: | |
| raise PermanentError(f"Failed to initialize connector for '{source_name}'") | |
| add_trace_metadata(agent_initialized=True) | |
| # Execute the raw SQL query | |
| with traced_span("execute_raw_sql", SpanType.DATABASE_QUERY): | |
| start_time = time.time() | |
| results = agent.execute_raw_query(source_name, sql_query) | |
| duration = time.time() - start_time | |
| add_trace_metadata(query_duration_ms=duration * 1000, result_rows=len(results)) | |
| logger.info(f"Raw SQL executed successfully in {duration:.3f}s, returned {len(results)} rows") | |
| # Apply max_rows limit if specified | |
| rows_limited = False | |
| if max_rows and len(results) > max_rows: | |
| results = results[:max_rows] | |
| rows_limited = True | |
| logger.info(f"Results limited to {max_rows} rows (originally {len(results)} rows)") | |
| add_trace_metadata(rows_limited=True, original_row_count=len(results)) | |
| # Store results | |
| with traced_span("store_results", SpanType.CACHE_OPERATION): | |
| serialized = json.dumps({ | |
| "status": "success", | |
| "results": results, | |
| "rows_returned": len(results), | |
| "rows_limited": rows_limited, | |
| "execution_time_ms": duration * 1000 | |
| }) | |
| if isinstance(serialized, str): | |
| serialized_bytes = serialized.encode('utf-8') | |
| else: | |
| serialized_bytes = serialized | |
| results_size = len(serialized_bytes) | |
| if results_size > MAX_RESULT_SIZE_BYTES: | |
| # Store in MinIO for large results | |
| logger.info(f"Storing large result ({results_size} bytes) in MinIO") | |
| result_key = f"results/{tenant_id}/{job_id}.json" | |
| minio_client.put_object( | |
| "query-results", | |
| result_key, | |
| io.BytesIO(serialized_bytes), | |
| length=results_size, | |
| content_type="application/json" | |
| ) | |
| redis_client.setex( | |
| f"job:{job_id}:result_location", | |
| RESULT_TTL_SECONDS, | |
| f"minio:{result_key}" | |
| ) | |
| add_trace_metadata(storage="minio", result_size_bytes=results_size) | |
| else: | |
| # Store in Redis for smaller results (store a str) | |
| to_store = serialized_bytes.decode('utf-8') if isinstance(serialized_bytes, (bytes, bytearray)) else serialized | |
| redis_client.setex( | |
| f"job:{job_id}:result", | |
| RESULT_TTL_SECONDS, | |
| to_store | |
| ) | |
| add_trace_metadata(storage="redis", result_size_bytes=results_size) | |
| # Update job status to completed | |
| update_job_status(job_id, JobStatus.COMPLETED, redis_client, trace_id) | |
| metrics.record_job_completion(job_id, tenant_id, duration) | |
| add_trace_event("sql_job_completed", level="INFO", duration=f"{duration:.3f}s", rows=len(results)) | |
| # Record SQL execution metrics | |
| metrics.record_sql_execution( | |
| tenant_id=tenant_id, | |
| async_mode=True, | |
| duration=duration, | |
| rows_returned=len(results), | |
| rows_limited=rows_limited | |
| ) | |
| if tracer: | |
| tracer.finish_span("success", result_rows=len(results)) | |
| logger.info(f"Raw SQL job {job_id} completed successfully") | |
| return {"status": "completed", "job_id": job_id, "rows": len(results)} | |
| except PermanentError as e: | |
| logger.error(f"Permanent error in SQL job {job_id}: {e}") | |
| error_msg = str(e) | |
| if redis_client: | |
| update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, error_msg) | |
| send_to_dlq(job_id, tenant_id, {"sql_query": sql_query, "source_name": source_name}, error_msg, "permanent_error", redis_client) | |
| add_trace_event("sql_job_permanent_error", level="ERROR", error=error_msg) | |
| if tracer: | |
| tracer.finish_span("error", error_msg) | |
| metrics.record_job_failure(job_id, tenant_id, "permanent_error") | |
| raise # Don't retry permanent errors | |
| except (ConnectionError, TimeoutError) as e: | |
| logger.warning(f"Retryable error in SQL job {job_id}: {e}") | |
| if redis_client: | |
| update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e)) | |
| add_trace_event("sql_job_retryable_error", level="WARN", error=str(e), retry_count=self.request.retries) | |
| try: | |
| raise self.retry(exc=e, countdown=30 * (2 ** self.request.retries)) | |
| except self.MaxRetriesExceededError: | |
| logger.error(f"Max retries exceeded for SQL job {job_id}") | |
| if redis_client: | |
| send_to_dlq(job_id, tenant_id, {"sql_query": sql_query, "source_name": source_name}, str(e), "max_retries", redis_client) | |
| metrics.record_job_failure(job_id, tenant_id, "max_retries") | |
| if tracer: | |
| tracer.finish_span("error", "Max retries exceeded") | |
| raise | |
| except Exception as e: | |
| logger.exception(f"Unexpected error in SQL job {job_id}: {e}") | |
| error_msg = str(e) | |
| if redis_client: | |
| update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, error_msg) | |
| send_to_dlq(job_id, tenant_id, {"sql_query": sql_query, "source_name": source_name}, error_msg, "unexpected_error", redis_client) | |
| add_trace_event("sql_job_unexpected_error", level="CRITICAL", error=error_msg) | |
| if tracer: | |
| tracer.finish_span("error", error_msg) | |
| metrics.record_job_failure(job_id, tenant_id, "unexpected_error") | |
| raise | |
| # Celery signal handlers for worker lifecycle management | |
| def worker_ready_handler(sender=None, **kwargs): | |
| """Handler for when worker is ready to receive tasks.""" | |
| logger.info(f"Worker {sender} is ready and connected to broker") | |
| def worker_shutdown_handler(sender=None, **kwargs): | |
| """Handler for when worker is shutting down.""" | |
| logger.info(f"Worker {sender} is shutting down") | |
| if __name__ == "__main__": | |
| # Start the Celery worker | |
| celery_app.start() | |