"""Celery worker for processing federated data query jobs. This module implements both a simple Worker class for testing and a full Celery-based distributed worker system for production use with proper error handling, network resilience, and observability. """ import orjson as json import time import logging import traceback import sys import os import io from typing import Dict, Any, List, Optional from datetime import datetime, timezone import redis from minio import Minio from celery import Celery from celery.exceptions import Retry, WorkerLostError from celery.signals import worker_ready, worker_shutdown, worker_process_init from kombu.exceptions import OperationalError as KombuOperationalError from redis.exceptions import ConnectionError as RedisConnectionError, TimeoutError as RedisTimeoutError # Add backend directory to path for proper imports backend_path = os.path.join(os.path.dirname(__file__), "..") if backend_path not in sys.path: sys.path.insert(0, backend_path) # Import centralized storage configuration from backend.core.minio.config import get_redis_config, get_minio_config from backend.data_sources.federation_agent import FederationAgent from backend.data_sources.jobs import JobStatus, JobMetadata from backend.data_sources import metrics from backend.data_sources import tracing from backend.data_sources.tracing import traced_span, SpanType, add_trace_event, add_trace_metadata # Configure logging for workers logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - [Worker:%(process)d] - %(message)s' ) logger = logging.getLogger(__name__) # Globals initialized once per worker process by Celery signal _redis_pool = None _minio_client: Optional[Minio] = None @worker_process_init.connect def init_worker_process(**kwargs): """Initialize shared Redis connection pool and MinIO client once per worker process.""" global _redis_pool, _minio_client try: logger.info("Initializing worker connections (Redis pool + MinIO client)") # Use the existing helper to build a redis URL _redis_pool = redis.ConnectionPool.from_url(get_redis_url(0)) # MinIO config returns dict with endpoint/access_key/secret_key/secure mconf = get_minio_config() # Minio constructor expects endpoint (host:port), access_key, secret_key, secure _minio_client = Minio( endpoint=mconf.get('endpoint'), access_key=mconf.get('access_key') or mconf.get('access_key'), secret_key=mconf.get('secret_key') or mconf.get('secret_key'), secure=mconf.get('secure', False) ) logger.info("Worker connections initialized.") except Exception as e: logger.exception(f"Failed initializing worker connections: {e}") # Build Redis URL from centralized configuration def get_redis_url(db: int = 0) -> str: """Build Redis URL from centralized configuration""" redis_config = get_redis_config() host = redis_config['host'] port = redis_config['port'] password = redis_config.get('password') if password: return f"redis://:{password}@{host}:{port}/{db}" else: return f"redis://{host}:{port}/{db}" # Create Celery app with Redis as broker and backend redis_url = get_redis_url(0) celery_app = Celery( 'data_sources_worker', broker=redis_url, backend=redis_url ) # Celery configuration for production resilience celery_app.conf.update( # Task routing and execution task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, # Worker configuration worker_prefetch_multiplier=1, # Process one job at a time for memory efficiency task_acks_late=True, # Acknowledge task only after completion worker_max_tasks_per_child=100, # Restart worker after 100 tasks to prevent memory leaks # Network resilience broker_connection_retry_on_startup=True, broker_connection_retry=True, broker_connection_max_retries=10, # Result backend configuration result_backend_transport_options={ 'retry_on_timeout': True, 'retry_on_error': [RedisConnectionError, RedisTimeoutError], 'max_retries': 3, }, # Task time limits task_soft_time_limit=300, # 5 minutes soft limit task_time_limit=600, # 10 minutes hard limit # Dead Letter Queue configuration task_reject_on_worker_lost=True, ) # Constants for job management MAX_RESULT_SIZE_BYTES = 10 * 1024 * 1024 # 10MB RESULT_TTL_SECONDS = 3600 # 1 hour class NetworkError(Exception): """Base class for network-related errors that should trigger retries.""" pass class TransientError(Exception): """Base class for transient errors that should be retried.""" pass class PermanentError(Exception): """Base class for permanent errors that should not be retried.""" pass # NOTE: network retry/backoff was intentionally removed in favor of task-level # retries using Celery's `self.retry(...)`. Helper functions below will raise # exceptions directly so the task can decide how to retry. # Simple Worker class for testing (existing functionality) class Worker: """Simple worker class for testing and basic job processing.""" def __init__(self, tenant_config, redis_client=None, minio_client=None): self.agent = FederationAgent(tenant_config, redis_client=redis_client, minio_client=minio_client) def process_job(self, job_payload: Dict[str, Any]) -> Dict[str, Any]: """Process a job payload which contains a 'plan' key (list of steps). Returns the step-wise results. """ plan = job_payload.get("plan") if plan is None: raise ValueError("job payload must contain 'plan'") logger.info("Processing job with %d steps", len(plan)) return self.agent.execute_federated_plan(plan) # Celery task for distributed processing @celery_app.task(bind=True, max_retries=3, default_retry_delay=60) def process_federated_job(self, job_id: str, job_payload: Dict[str, Any], tenant_id: str): """ Main Celery task for processing federated query jobs. This task handles: - Loading tenant configuration - Executing federated query plans - Storing results in Redis/MinIO - Network error handling with retries - Job cancellation checks Args: job_id: Unique identifier for the job job_payload: Validated job payload with query plan tenant_id: Tenant identifier for multi-tenancy """ # Start enhanced trace for the job tracer = tracing.get_tracer() if tracer: trace_context = tracer.start_trace( operation_name=f"process_federated_job", span_type=SpanType.BACKGROUND_JOB, tenant_id=tenant_id, job_id=job_id ) trace_id = trace_context.trace_id else: # Fallback to legacy trace_id trace_id = f"job-{job_id}-{int(time.time())}" logger.info(f"Starting job {job_id} for tenant {tenant_id}") redis_client = None minio_client = None try: add_trace_event("job_processing_started", level="INFO", job_id=job_id, tenant_id=tenant_id) # Initialize clients from process-global pool/client with traced_span("initialize_clients", SpanType.EXTERNAL_API): if _redis_pool is None: raise NetworkError("Redis pool not initialized in worker process") redis_client = redis.Redis(connection_pool=_redis_pool) minio_client = _minio_client # Check for cancellation flag with traced_span("check_cancellation", SpanType.CACHE_OPERATION): if check_job_cancelled(job_id, redis_client): logger.info(f"Job {job_id} was cancelled") update_job_status(job_id, JobStatus.CANCELLED, redis_client, trace_id) add_trace_event("job_cancelled", level="WARN") if tracer: tracer.finish_span("cancelled") return {"status": "cancelled", "message": "Job was cancelled"} # Update job status to running update_job_status(job_id, JobStatus.RUNNING, redis_client, trace_id) # Record job start for metrics metrics.record_job_start(job_id, tenant_id) # Load tenant configuration with traced_span("load_tenant_config", SpanType.CACHE_OPERATION): tenant_config = get_tenant_config_with_retry(tenant_id, redis_client) if not tenant_config: raise PermanentError(f"No configuration found for tenant {tenant_id}") add_trace_metadata(sources_count=len(tenant_config)) # Initialize federation agent with traced_span("initialize_federation_agent", SpanType.BACKGROUND_JOB): agent = FederationAgent(tenant_config, redis_client, minio_client) add_trace_metadata(agent_initialized=True) # Execute the federated plan with periodic cancellation checks with traced_span("execute_federated_plan", SpanType.DATABASE_QUERY): add_trace_metadata(plan_steps=len(job_payload["plan"])) results = execute_plan_with_cancellation_check( agent, job_payload["plan"], job_id, redis_client, trace_id ) add_trace_event("plan_execution_completed", level="INFO", result_count=len(results) if isinstance(results, list) else 1) # Store results with size-based routing with traced_span("store_results", SpanType.FILE_OPERATION): result_location = store_results_with_fallback( job_id, results, redis_client, minio_client, trace_id ) add_trace_metadata(result_location=result_location) # Update job metadata with completion completion_metadata = { "status": JobStatus.COMPLETED, "completed_at": datetime.now(timezone.utc).isoformat(), "result_location": result_location, "trace_id": trace_id } update_job_metadata(job_id, completion_metadata, redis_client) add_trace_event("job_completed_successfully", level="INFO", result_location=result_location) logger.info(f"Job {job_id} completed successfully") if tracer: tracer.finish_span("success", result_location=result_location) return {"status": "completed", "result_location": result_location} except PermanentError as e: # Permanent errors should not be retried logger.error(f"Permanent error in job {job_id}: {e}") add_trace_event("permanent_error_occurred", level="ERROR", error=str(e)) update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e)) send_to_dlq(job_id, job_payload, str(e), "permanent_error", redis_client) if tracer: tracer.finish_span("error", str(e)) return {"status": "failed", "error": str(e)} except (NetworkError, TransientError, RedisConnectionError, RedisTimeoutError) as e: # Transient errors should be retried retry_count = self.request.retries + 1 logger.warning(f"Transient error in job {job_id}, attempt {retry_count}: {e}") add_trace_event("transient_error_occurred", level="WARN", error=str(e), retry_count=retry_count, max_retries=self.max_retries) if self.request.retries >= self.max_retries: logger.error(f"Job {job_id} failed after {self.max_retries} retries") update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e)) send_to_dlq(job_id, job_payload, str(e), "max_retries_exceeded", redis_client) if tracer: tracer.finish_span("error", f"Max retries exceeded: {str(e)}") return {"status": "failed", "error": str(e)} # Exponential backoff: 60s, 120s, 240s retry_delay = 60 * (2 ** self.request.retries) logger.info(f"Retrying job {job_id} in {retry_delay} seconds") add_trace_event("job_retry_scheduled", level="INFO", retry_delay=retry_delay, retry_count=retry_count) if tracer: tracer.add_metadata(retry_delay=retry_delay, will_retry=True) tracer.finish_span("error", f"Transient error, will retry: {str(e)}") raise self.retry(countdown=retry_delay, exc=e) except Exception as e: # Unexpected errors logger.exception(f"Unexpected error in job {job_id}: {e}") add_trace_event("unexpected_error_occurred", level="ERROR", error=str(e)) update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e)) if tracer: tracer.finish_span("error", f"Unexpected error: {str(e)}") return {"status": "failed", "error": f"Unexpected error: {e}"} send_to_dlq(job_id, job_payload, str(e), "unexpected_error", redis_client) return {"status": "failed", "error": str(e)} # Helper functions with network error handling def get_redis_client_with_retry(): """Get Redis client with network error handling. Try multiple import paths to be resilient to different PYTHONPATH/package layouts (running as module, running from backend dir, or running from data_sources dir). """ import importlib candidates = [ 'backend.data_sources.api', 'data_sources.api', 'api', ] last_err = None for mod in candidates: try: m = importlib.import_module(mod) if hasattr(m, 'get_redis_client'): return m.get_redis_client() except Exception as e: last_err = e continue # If none succeeded, raise a clear error raise ImportError(f"Could not import get_redis_client from any candidate modules: {candidates}. Last error: {last_err}") def get_minio_client_with_retry(): """Get MinIO client with network error handling. Try multiple import paths to be resilient to different PYTHONPATH/package layouts. """ import importlib candidates = [ 'backend.data_sources.api', 'data_sources.api', 'api', ] last_err = None for mod in candidates: try: m = importlib.import_module(mod) if hasattr(m, 'get_minio_client'): return m.get_minio_client() except Exception as e: last_err = e continue raise ImportError(f"Could not import get_minio_client from any candidate modules: {candidates}. Last error: {last_err}") def get_tenant_config_with_retry(tenant_id: str, redis_client): """Get tenant configuration with network error handling. Try multiple import paths to be resilient to different PYTHONPATH/package layouts. """ import importlib candidates = [ 'backend.data_sources.api', 'data_sources.api', 'api', ] last_err = None for mod in candidates: try: m = importlib.import_module(mod) if hasattr(m, 'get_tenant_config'): return m.get_tenant_config(tenant_id, redis_client) except Exception as e: last_err = e continue raise ImportError(f"Could not import get_tenant_config from any candidate modules: {candidates}. Last error: {last_err}") def check_job_cancelled(job_id: str, redis_client) -> bool: """Check if job has been cancelled via Redis flag.""" try: cancel_flag = redis_client.get(f"job:{job_id}:cancel") # Redis may return bytes or str depending on client config if cancel_flag is None: return False if isinstance(cancel_flag, (bytes, bytearray)): return cancel_flag == b"true" return str(cancel_flag).lower() == "true" except (RedisConnectionError, RedisTimeoutError) as e: logger.warning(f"Could not check cancellation flag for job {job_id}: {e}") return False # Assume not cancelled if we can't check def execute_plan_with_cancellation_check(agent, plan: List[Dict], job_id: str, redis_client, trace_id: str): """Execute federated plan with periodic cancellation checks.""" logger.info(f"[{trace_id}] Executing plan with {len(plan)} steps") for i, step in enumerate(plan): # Check for cancellation before each step if check_job_cancelled(job_id, redis_client): logger.info(f"[{trace_id}] Job {job_id} cancelled during step {i + 1}") raise TransientError("Job was cancelled during execution") logger.debug(f"[{trace_id}] Executing step {i + 1}: {step.get('source', 'unknown')}") # Execute the full plan via federation agent try: return agent.execute_federated_plan(plan) except Exception as e: logger.error(f"[{trace_id}] Federation agent error: {e}") # Check if it's a network/connection issue if any(term in str(e).lower() for term in ['connection', 'network', 'timeout', 'unreachable']): raise NetworkError(f"Network error during plan execution: {e}") else: raise PermanentError(f"Plan execution failed: {e}") def store_results_with_fallback(job_id: str, results: Any, redis_client, minio_client, trace_id: str) -> str: """Store results in Redis or MinIO based on size with fallback handling.""" try: # Serialize results to estimate size. `orjson.dumps` returns bytes. results_bytes = json.dumps(results) if isinstance(results_bytes, str): results_bytes = results_bytes.encode('utf-8') size_bytes = len(results_bytes) # Use Redis for small results (< 1MB), MinIO for large results if size_bytes < 1024 * 1024: # 1MB threshold redis_key = f"job:{job_id}:results" # Redis client is configured with decode_responses; store a str redis_client.setex(redis_key, 86400, results_bytes.decode('utf-8')) # 24 hour TTL logger.info(f"[{trace_id}] Stored {size_bytes} bytes in Redis for job {job_id}") return f"redis://{redis_key}" else: # Store in MinIO for large results minio_path = f"job-results/{job_id}/results.json" minio_client.put_object( bucket_name="job-results", object_name=minio_path, data=io.BytesIO(results_bytes), length=size_bytes, content_type="application/json" ) logger.info(f"[{trace_id}] Stored {size_bytes} bytes in MinIO for job {job_id}") return f"minio://{minio_path}" except (RedisConnectionError, RedisTimeoutError) as e: logger.warning(f"[{trace_id}] Redis storage failed, falling back to MinIO: {e}") # Fallback to MinIO if Redis fails minio_path = f"job-results/{job_id}/results.json" fallback_bytes = json.dumps(results) if isinstance(fallback_bytes, str): fallback_bytes = fallback_bytes.encode('utf-8') minio_client.put_object( bucket_name="job-results", object_name=minio_path, data=io.BytesIO(fallback_bytes), length=len(fallback_bytes), content_type="application/json" ) return f"minio://{minio_path}" except Exception as e: logger.error(f"[{trace_id}] Failed to store results for job {job_id}: {e}") raise TransientError(f"Result storage failed: {e}") def update_job_status(job_id: str, status: JobStatus, redis_client, trace_id: str, error_msg: Optional[str] = None): """Update job status in Redis with error handling.""" try: update_data = { "status": status.value, "updated_at": datetime.now(timezone.utc).isoformat(), "trace_id": trace_id } if error_msg: update_data["error_message"] = error_msg update_job_metadata(job_id, update_data, redis_client) logger.info(f"[{trace_id}] Updated job {job_id} status to {status.value}") # Record job completion metrics for final statuses if status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: tenant_id = update_data.get("tenant_id", "unknown") # Try to get tenant_id from job metadata metrics.record_job_completion(job_id, tenant_id, status.value.lower(), error_msg) except Exception as e: logger.error(f"[{trace_id}] Failed to update job status for {job_id}: {e}") def update_job_metadata(job_id: str, updates: Dict[str, Any], redis_client): """Update job metadata in Redis.""" metadata_key = f"job:{job_id}:metadata" # Get existing metadata existing = redis_client.get(metadata_key) if existing: # orjson.loads expects bytes; accept str or bytes if isinstance(existing, (bytes, bytearray)): metadata = json.loads(existing) else: metadata = json.loads(existing.encode('utf-8')) else: metadata = {"job_id": job_id} # Apply updates metadata.update(updates) # Store back to Redis with TTL (store as str) dumped = json.dumps(metadata) if isinstance(dumped, (bytes, bytearray)): dumped = dumped.decode('utf-8') redis_client.setex(metadata_key, 86400, dumped) def send_to_dlq(job_id: str, job_payload: Dict[str, Any], error: str, failure_type: str, redis_client): """Send failed job to Dead Letter Queue.""" try: dlq_entry = { "job_id": job_id, "payload": job_payload, "error": error, "failure_type": failure_type, "failed_at": datetime.now(timezone.utc).isoformat(), "retry_count": 3 # Max retries exceeded } dlq_blob = json.dumps(dlq_entry) if isinstance(dlq_blob, (bytes, bytearray)): dlq_blob = dlq_blob.decode('utf-8') redis_client.lpush("dlq:failed_jobs", dlq_blob) redis_client.expire("dlq:failed_jobs", 86400 * 7) # Keep DLQ for 7 days logger.info(f"Sent job {job_id} to DLQ with failure type: {failure_type}") except Exception as e: logger.error(f"Failed to send job {job_id} to DLQ: {e}") @celery_app.task(bind=True, max_retries=2, default_retry_delay=30) def process_raw_sql_job(self, job_id: str, tenant_id: str, source_name: str, sql_query: str, max_rows: Optional[int] = None): """ Celery task for processing raw SQL queries asynchronously. This task handles: - Loading tenant configuration for specific source - Executing raw SQL with timeout - Applying max_rows limit - Storing results in Redis/MinIO - Network error handling with retries - Job cancellation checks Args: job_id: Unique identifier for the job tenant_id: Tenant identifier source_name: Target data source name sql_query: Raw SQL query to execute max_rows: Optional maximum rows to return """ # Start enhanced trace for the job tracer = tracing.get_tracer() if tracer: trace_context = tracer.start_trace( operation_name=f"process_raw_sql_job", span_type=SpanType.BACKGROUND_JOB, tenant_id=tenant_id, job_id=job_id, source_name=source_name ) trace_id = trace_context.trace_id else: trace_id = f"sql-job-{job_id}-{int(time.time())}" logger.info(f"Starting raw SQL job {job_id} for tenant {tenant_id} on source {source_name}") redis_client = None minio_client = None try: add_trace_event("sql_job_processing_started", level="INFO", job_id=job_id, tenant_id=tenant_id, source_name=source_name) # Initialize clients from process-global pool/client with traced_span("initialize_clients", SpanType.EXTERNAL_API): if _redis_pool is None: raise NetworkError("Redis pool not initialized in worker process") redis_client = redis.Redis(connection_pool=_redis_pool) minio_client = _minio_client # Check for cancellation flag with traced_span("check_cancellation", SpanType.CACHE_OPERATION): if check_job_cancelled(job_id, redis_client): logger.info(f"SQL job {job_id} was cancelled") update_job_status(job_id, JobStatus.CANCELLED, redis_client, trace_id) add_trace_event("sql_job_cancelled", level="WARN") if tracer: tracer.finish_span("cancelled") return {"status": "cancelled", "message": "Job was cancelled"} # Update job status to running update_job_status(job_id, JobStatus.RUNNING, redis_client, trace_id) # Record job start for metrics metrics.record_job_start(job_id, tenant_id) # Load tenant configuration with traced_span("load_tenant_config", SpanType.CACHE_OPERATION): tenant_config = get_tenant_config_with_retry(tenant_id, redis_client) if not tenant_config: raise PermanentError(f"No configuration found for tenant {tenant_id}") # Find specific source config source_config = next((conf for conf in tenant_config if conf.get("source_name") == source_name), None) if not source_config: raise PermanentError(f"Data source '{source_name}' not found for tenant '{tenant_id}'") add_trace_metadata(source_type=source_config.get("source_type", "unknown")) # Initialize federation agent with only target source with traced_span("initialize_federation_agent", SpanType.BACKGROUND_JOB): agent = FederationAgent([source_config], redis_client, minio_client) if source_name not in agent.connectors: raise PermanentError(f"Failed to initialize connector for '{source_name}'") add_trace_metadata(agent_initialized=True) # Execute the raw SQL query with traced_span("execute_raw_sql", SpanType.DATABASE_QUERY): start_time = time.time() results = agent.execute_raw_query(source_name, sql_query) duration = time.time() - start_time add_trace_metadata(query_duration_ms=duration * 1000, result_rows=len(results)) logger.info(f"Raw SQL executed successfully in {duration:.3f}s, returned {len(results)} rows") # Apply max_rows limit if specified rows_limited = False if max_rows and len(results) > max_rows: results = results[:max_rows] rows_limited = True logger.info(f"Results limited to {max_rows} rows (originally {len(results)} rows)") add_trace_metadata(rows_limited=True, original_row_count=len(results)) # Store results with traced_span("store_results", SpanType.CACHE_OPERATION): serialized = json.dumps({ "status": "success", "results": results, "rows_returned": len(results), "rows_limited": rows_limited, "execution_time_ms": duration * 1000 }) if isinstance(serialized, str): serialized_bytes = serialized.encode('utf-8') else: serialized_bytes = serialized results_size = len(serialized_bytes) if results_size > MAX_RESULT_SIZE_BYTES: # Store in MinIO for large results logger.info(f"Storing large result ({results_size} bytes) in MinIO") result_key = f"results/{tenant_id}/{job_id}.json" minio_client.put_object( "query-results", result_key, io.BytesIO(serialized_bytes), length=results_size, content_type="application/json" ) redis_client.setex( f"job:{job_id}:result_location", RESULT_TTL_SECONDS, f"minio:{result_key}" ) add_trace_metadata(storage="minio", result_size_bytes=results_size) else: # Store in Redis for smaller results (store a str) to_store = serialized_bytes.decode('utf-8') if isinstance(serialized_bytes, (bytes, bytearray)) else serialized redis_client.setex( f"job:{job_id}:result", RESULT_TTL_SECONDS, to_store ) add_trace_metadata(storage="redis", result_size_bytes=results_size) # Update job status to completed update_job_status(job_id, JobStatus.COMPLETED, redis_client, trace_id) metrics.record_job_completion(job_id, tenant_id, duration) add_trace_event("sql_job_completed", level="INFO", duration=f"{duration:.3f}s", rows=len(results)) # Record SQL execution metrics metrics.record_sql_execution( tenant_id=tenant_id, async_mode=True, duration=duration, rows_returned=len(results), rows_limited=rows_limited ) if tracer: tracer.finish_span("success", result_rows=len(results)) logger.info(f"Raw SQL job {job_id} completed successfully") return {"status": "completed", "job_id": job_id, "rows": len(results)} except PermanentError as e: logger.error(f"Permanent error in SQL job {job_id}: {e}") error_msg = str(e) if redis_client: update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, error_msg) send_to_dlq(job_id, tenant_id, {"sql_query": sql_query, "source_name": source_name}, error_msg, "permanent_error", redis_client) add_trace_event("sql_job_permanent_error", level="ERROR", error=error_msg) if tracer: tracer.finish_span("error", error_msg) metrics.record_job_failure(job_id, tenant_id, "permanent_error") raise # Don't retry permanent errors except (ConnectionError, TimeoutError) as e: logger.warning(f"Retryable error in SQL job {job_id}: {e}") if redis_client: update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e)) add_trace_event("sql_job_retryable_error", level="WARN", error=str(e), retry_count=self.request.retries) try: raise self.retry(exc=e, countdown=30 * (2 ** self.request.retries)) except self.MaxRetriesExceededError: logger.error(f"Max retries exceeded for SQL job {job_id}") if redis_client: send_to_dlq(job_id, tenant_id, {"sql_query": sql_query, "source_name": source_name}, str(e), "max_retries", redis_client) metrics.record_job_failure(job_id, tenant_id, "max_retries") if tracer: tracer.finish_span("error", "Max retries exceeded") raise except Exception as e: logger.exception(f"Unexpected error in SQL job {job_id}: {e}") error_msg = str(e) if redis_client: update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, error_msg) send_to_dlq(job_id, tenant_id, {"sql_query": sql_query, "source_name": source_name}, error_msg, "unexpected_error", redis_client) add_trace_event("sql_job_unexpected_error", level="CRITICAL", error=error_msg) if tracer: tracer.finish_span("error", error_msg) metrics.record_job_failure(job_id, tenant_id, "unexpected_error") raise # Celery signal handlers for worker lifecycle management @worker_ready.connect def worker_ready_handler(sender=None, **kwargs): """Handler for when worker is ready to receive tasks.""" logger.info(f"Worker {sender} is ready and connected to broker") @worker_shutdown.connect def worker_shutdown_handler(sender=None, **kwargs): """Handler for when worker is shutting down.""" logger.info(f"Worker {sender} is shutting down") if __name__ == "__main__": # Start the Celery worker celery_app.start()