sirus / backend /data_sources /worker.py
ranilmukesh's picture
Deploy SiRUS SQL Agent backend
b8277c4
"""Celery worker for processing federated data query jobs.
This module implements both a simple Worker class for testing and a full
Celery-based distributed worker system for production use with proper
error handling, network resilience, and observability.
"""
import orjson as json
import time
import logging
import traceback
import sys
import os
import io
from typing import Dict, Any, List, Optional
from datetime import datetime, timezone
import redis
from minio import Minio
from celery import Celery
from celery.exceptions import Retry, WorkerLostError
from celery.signals import worker_ready, worker_shutdown, worker_process_init
from kombu.exceptions import OperationalError as KombuOperationalError
from redis.exceptions import ConnectionError as RedisConnectionError, TimeoutError as RedisTimeoutError
# Add backend directory to path for proper imports
backend_path = os.path.join(os.path.dirname(__file__), "..")
if backend_path not in sys.path:
sys.path.insert(0, backend_path)
# Import centralized storage configuration
from backend.core.minio.config import get_redis_config, get_minio_config
from backend.data_sources.federation_agent import FederationAgent
from backend.data_sources.jobs import JobStatus, JobMetadata
from backend.data_sources import metrics
from backend.data_sources import tracing
from backend.data_sources.tracing import traced_span, SpanType, add_trace_event, add_trace_metadata
# Configure logging for workers
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - [Worker:%(process)d] - %(message)s'
)
logger = logging.getLogger(__name__)
# Globals initialized once per worker process by Celery signal
_redis_pool = None
_minio_client: Optional[Minio] = None
@worker_process_init.connect
def init_worker_process(**kwargs):
"""Initialize shared Redis connection pool and MinIO client once per worker process."""
global _redis_pool, _minio_client
try:
logger.info("Initializing worker connections (Redis pool + MinIO client)")
# Use the existing helper to build a redis URL
_redis_pool = redis.ConnectionPool.from_url(get_redis_url(0))
# MinIO config returns dict with endpoint/access_key/secret_key/secure
mconf = get_minio_config()
# Minio constructor expects endpoint (host:port), access_key, secret_key, secure
_minio_client = Minio(
endpoint=mconf.get('endpoint'),
access_key=mconf.get('access_key') or mconf.get('access_key'),
secret_key=mconf.get('secret_key') or mconf.get('secret_key'),
secure=mconf.get('secure', False)
)
logger.info("Worker connections initialized.")
except Exception as e:
logger.exception(f"Failed initializing worker connections: {e}")
# Build Redis URL from centralized configuration
def get_redis_url(db: int = 0) -> str:
"""Build Redis URL from centralized configuration"""
redis_config = get_redis_config()
host = redis_config['host']
port = redis_config['port']
password = redis_config.get('password')
if password:
return f"redis://:{password}@{host}:{port}/{db}"
else:
return f"redis://{host}:{port}/{db}"
# Create Celery app with Redis as broker and backend
redis_url = get_redis_url(0)
celery_app = Celery(
'data_sources_worker',
broker=redis_url,
backend=redis_url
)
# Celery configuration for production resilience
celery_app.conf.update(
# Task routing and execution
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
# Worker configuration
worker_prefetch_multiplier=1, # Process one job at a time for memory efficiency
task_acks_late=True, # Acknowledge task only after completion
worker_max_tasks_per_child=100, # Restart worker after 100 tasks to prevent memory leaks
# Network resilience
broker_connection_retry_on_startup=True,
broker_connection_retry=True,
broker_connection_max_retries=10,
# Result backend configuration
result_backend_transport_options={
'retry_on_timeout': True,
'retry_on_error': [RedisConnectionError, RedisTimeoutError],
'max_retries': 3,
},
# Task time limits
task_soft_time_limit=300, # 5 minutes soft limit
task_time_limit=600, # 10 minutes hard limit
# Dead Letter Queue configuration
task_reject_on_worker_lost=True,
)
# Constants for job management
MAX_RESULT_SIZE_BYTES = 10 * 1024 * 1024 # 10MB
RESULT_TTL_SECONDS = 3600 # 1 hour
class NetworkError(Exception):
"""Base class for network-related errors that should trigger retries."""
pass
class TransientError(Exception):
"""Base class for transient errors that should be retried."""
pass
class PermanentError(Exception):
"""Base class for permanent errors that should not be retried."""
pass
# NOTE: network retry/backoff was intentionally removed in favor of task-level
# retries using Celery's `self.retry(...)`. Helper functions below will raise
# exceptions directly so the task can decide how to retry.
# Simple Worker class for testing (existing functionality)
class Worker:
"""Simple worker class for testing and basic job processing."""
def __init__(self, tenant_config, redis_client=None, minio_client=None):
self.agent = FederationAgent(tenant_config, redis_client=redis_client, minio_client=minio_client)
def process_job(self, job_payload: Dict[str, Any]) -> Dict[str, Any]:
"""Process a job payload which contains a 'plan' key (list of steps).
Returns the step-wise results.
"""
plan = job_payload.get("plan")
if plan is None:
raise ValueError("job payload must contain 'plan'")
logger.info("Processing job with %d steps", len(plan))
return self.agent.execute_federated_plan(plan)
# Celery task for distributed processing
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_federated_job(self, job_id: str, job_payload: Dict[str, Any], tenant_id: str):
"""
Main Celery task for processing federated query jobs.
This task handles:
- Loading tenant configuration
- Executing federated query plans
- Storing results in Redis/MinIO
- Network error handling with retries
- Job cancellation checks
Args:
job_id: Unique identifier for the job
job_payload: Validated job payload with query plan
tenant_id: Tenant identifier for multi-tenancy
"""
# Start enhanced trace for the job
tracer = tracing.get_tracer()
if tracer:
trace_context = tracer.start_trace(
operation_name=f"process_federated_job",
span_type=SpanType.BACKGROUND_JOB,
tenant_id=tenant_id,
job_id=job_id
)
trace_id = trace_context.trace_id
else:
# Fallback to legacy trace_id
trace_id = f"job-{job_id}-{int(time.time())}"
logger.info(f"Starting job {job_id} for tenant {tenant_id}")
redis_client = None
minio_client = None
try:
add_trace_event("job_processing_started", level="INFO", job_id=job_id, tenant_id=tenant_id)
# Initialize clients from process-global pool/client
with traced_span("initialize_clients", SpanType.EXTERNAL_API):
if _redis_pool is None:
raise NetworkError("Redis pool not initialized in worker process")
redis_client = redis.Redis(connection_pool=_redis_pool)
minio_client = _minio_client
# Check for cancellation flag
with traced_span("check_cancellation", SpanType.CACHE_OPERATION):
if check_job_cancelled(job_id, redis_client):
logger.info(f"Job {job_id} was cancelled")
update_job_status(job_id, JobStatus.CANCELLED, redis_client, trace_id)
add_trace_event("job_cancelled", level="WARN")
if tracer:
tracer.finish_span("cancelled")
return {"status": "cancelled", "message": "Job was cancelled"}
# Update job status to running
update_job_status(job_id, JobStatus.RUNNING, redis_client, trace_id)
# Record job start for metrics
metrics.record_job_start(job_id, tenant_id)
# Load tenant configuration
with traced_span("load_tenant_config", SpanType.CACHE_OPERATION):
tenant_config = get_tenant_config_with_retry(tenant_id, redis_client)
if not tenant_config:
raise PermanentError(f"No configuration found for tenant {tenant_id}")
add_trace_metadata(sources_count=len(tenant_config))
# Initialize federation agent
with traced_span("initialize_federation_agent", SpanType.BACKGROUND_JOB):
agent = FederationAgent(tenant_config, redis_client, minio_client)
add_trace_metadata(agent_initialized=True)
# Execute the federated plan with periodic cancellation checks
with traced_span("execute_federated_plan", SpanType.DATABASE_QUERY):
add_trace_metadata(plan_steps=len(job_payload["plan"]))
results = execute_plan_with_cancellation_check(
agent, job_payload["plan"], job_id, redis_client, trace_id
)
add_trace_event("plan_execution_completed", level="INFO",
result_count=len(results) if isinstance(results, list) else 1)
# Store results with size-based routing
with traced_span("store_results", SpanType.FILE_OPERATION):
result_location = store_results_with_fallback(
job_id, results, redis_client, minio_client, trace_id
)
add_trace_metadata(result_location=result_location)
# Update job metadata with completion
completion_metadata = {
"status": JobStatus.COMPLETED,
"completed_at": datetime.now(timezone.utc).isoformat(),
"result_location": result_location,
"trace_id": trace_id
}
update_job_metadata(job_id, completion_metadata, redis_client)
add_trace_event("job_completed_successfully", level="INFO", result_location=result_location)
logger.info(f"Job {job_id} completed successfully")
if tracer:
tracer.finish_span("success", result_location=result_location)
return {"status": "completed", "result_location": result_location}
except PermanentError as e:
# Permanent errors should not be retried
logger.error(f"Permanent error in job {job_id}: {e}")
add_trace_event("permanent_error_occurred", level="ERROR", error=str(e))
update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e))
send_to_dlq(job_id, job_payload, str(e), "permanent_error", redis_client)
if tracer:
tracer.finish_span("error", str(e))
return {"status": "failed", "error": str(e)}
except (NetworkError, TransientError, RedisConnectionError, RedisTimeoutError) as e:
# Transient errors should be retried
retry_count = self.request.retries + 1
logger.warning(f"Transient error in job {job_id}, attempt {retry_count}: {e}")
add_trace_event("transient_error_occurred", level="WARN",
error=str(e), retry_count=retry_count, max_retries=self.max_retries)
if self.request.retries >= self.max_retries:
logger.error(f"Job {job_id} failed after {self.max_retries} retries")
update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e))
send_to_dlq(job_id, job_payload, str(e), "max_retries_exceeded", redis_client)
if tracer:
tracer.finish_span("error", f"Max retries exceeded: {str(e)}")
return {"status": "failed", "error": str(e)}
# Exponential backoff: 60s, 120s, 240s
retry_delay = 60 * (2 ** self.request.retries)
logger.info(f"Retrying job {job_id} in {retry_delay} seconds")
add_trace_event("job_retry_scheduled", level="INFO",
retry_delay=retry_delay, retry_count=retry_count)
if tracer:
tracer.add_metadata(retry_delay=retry_delay, will_retry=True)
tracer.finish_span("error", f"Transient error, will retry: {str(e)}")
raise self.retry(countdown=retry_delay, exc=e)
except Exception as e:
# Unexpected errors
logger.exception(f"Unexpected error in job {job_id}: {e}")
add_trace_event("unexpected_error_occurred", level="ERROR", error=str(e))
update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e))
if tracer:
tracer.finish_span("error", f"Unexpected error: {str(e)}")
return {"status": "failed", "error": f"Unexpected error: {e}"}
send_to_dlq(job_id, job_payload, str(e), "unexpected_error", redis_client)
return {"status": "failed", "error": str(e)}
# Helper functions with network error handling
def get_redis_client_with_retry():
"""Get Redis client with network error handling.
Try multiple import paths to be resilient to different PYTHONPATH/package layouts
(running as module, running from backend dir, or running from data_sources dir).
"""
import importlib
candidates = [
'backend.data_sources.api',
'data_sources.api',
'api',
]
last_err = None
for mod in candidates:
try:
m = importlib.import_module(mod)
if hasattr(m, 'get_redis_client'):
return m.get_redis_client()
except Exception as e:
last_err = e
continue
# If none succeeded, raise a clear error
raise ImportError(f"Could not import get_redis_client from any candidate modules: {candidates}. Last error: {last_err}")
def get_minio_client_with_retry():
"""Get MinIO client with network error handling.
Try multiple import paths to be resilient to different PYTHONPATH/package layouts.
"""
import importlib
candidates = [
'backend.data_sources.api',
'data_sources.api',
'api',
]
last_err = None
for mod in candidates:
try:
m = importlib.import_module(mod)
if hasattr(m, 'get_minio_client'):
return m.get_minio_client()
except Exception as e:
last_err = e
continue
raise ImportError(f"Could not import get_minio_client from any candidate modules: {candidates}. Last error: {last_err}")
def get_tenant_config_with_retry(tenant_id: str, redis_client):
"""Get tenant configuration with network error handling.
Try multiple import paths to be resilient to different PYTHONPATH/package layouts.
"""
import importlib
candidates = [
'backend.data_sources.api',
'data_sources.api',
'api',
]
last_err = None
for mod in candidates:
try:
m = importlib.import_module(mod)
if hasattr(m, 'get_tenant_config'):
return m.get_tenant_config(tenant_id, redis_client)
except Exception as e:
last_err = e
continue
raise ImportError(f"Could not import get_tenant_config from any candidate modules: {candidates}. Last error: {last_err}")
def check_job_cancelled(job_id: str, redis_client) -> bool:
"""Check if job has been cancelled via Redis flag."""
try:
cancel_flag = redis_client.get(f"job:{job_id}:cancel")
# Redis may return bytes or str depending on client config
if cancel_flag is None:
return False
if isinstance(cancel_flag, (bytes, bytearray)):
return cancel_flag == b"true"
return str(cancel_flag).lower() == "true"
except (RedisConnectionError, RedisTimeoutError) as e:
logger.warning(f"Could not check cancellation flag for job {job_id}: {e}")
return False # Assume not cancelled if we can't check
def execute_plan_with_cancellation_check(agent, plan: List[Dict], job_id: str, redis_client, trace_id: str):
"""Execute federated plan with periodic cancellation checks."""
logger.info(f"[{trace_id}] Executing plan with {len(plan)} steps")
for i, step in enumerate(plan):
# Check for cancellation before each step
if check_job_cancelled(job_id, redis_client):
logger.info(f"[{trace_id}] Job {job_id} cancelled during step {i + 1}")
raise TransientError("Job was cancelled during execution")
logger.debug(f"[{trace_id}] Executing step {i + 1}: {step.get('source', 'unknown')}")
# Execute the full plan via federation agent
try:
return agent.execute_federated_plan(plan)
except Exception as e:
logger.error(f"[{trace_id}] Federation agent error: {e}")
# Check if it's a network/connection issue
if any(term in str(e).lower() for term in ['connection', 'network', 'timeout', 'unreachable']):
raise NetworkError(f"Network error during plan execution: {e}")
else:
raise PermanentError(f"Plan execution failed: {e}")
def store_results_with_fallback(job_id: str, results: Any, redis_client, minio_client, trace_id: str) -> str:
"""Store results in Redis or MinIO based on size with fallback handling."""
try:
# Serialize results to estimate size. `orjson.dumps` returns bytes.
results_bytes = json.dumps(results)
if isinstance(results_bytes, str):
results_bytes = results_bytes.encode('utf-8')
size_bytes = len(results_bytes)
# Use Redis for small results (< 1MB), MinIO for large results
if size_bytes < 1024 * 1024: # 1MB threshold
redis_key = f"job:{job_id}:results"
# Redis client is configured with decode_responses; store a str
redis_client.setex(redis_key, 86400, results_bytes.decode('utf-8')) # 24 hour TTL
logger.info(f"[{trace_id}] Stored {size_bytes} bytes in Redis for job {job_id}")
return f"redis://{redis_key}"
else:
# Store in MinIO for large results
minio_path = f"job-results/{job_id}/results.json"
minio_client.put_object(
bucket_name="job-results",
object_name=minio_path,
data=io.BytesIO(results_bytes),
length=size_bytes,
content_type="application/json"
)
logger.info(f"[{trace_id}] Stored {size_bytes} bytes in MinIO for job {job_id}")
return f"minio://{minio_path}"
except (RedisConnectionError, RedisTimeoutError) as e:
logger.warning(f"[{trace_id}] Redis storage failed, falling back to MinIO: {e}")
# Fallback to MinIO if Redis fails
minio_path = f"job-results/{job_id}/results.json"
fallback_bytes = json.dumps(results)
if isinstance(fallback_bytes, str):
fallback_bytes = fallback_bytes.encode('utf-8')
minio_client.put_object(
bucket_name="job-results",
object_name=minio_path,
data=io.BytesIO(fallback_bytes),
length=len(fallback_bytes),
content_type="application/json"
)
return f"minio://{minio_path}"
except Exception as e:
logger.error(f"[{trace_id}] Failed to store results for job {job_id}: {e}")
raise TransientError(f"Result storage failed: {e}")
def update_job_status(job_id: str, status: JobStatus, redis_client, trace_id: str, error_msg: Optional[str] = None):
"""Update job status in Redis with error handling."""
try:
update_data = {
"status": status.value,
"updated_at": datetime.now(timezone.utc).isoformat(),
"trace_id": trace_id
}
if error_msg:
update_data["error_message"] = error_msg
update_job_metadata(job_id, update_data, redis_client)
logger.info(f"[{trace_id}] Updated job {job_id} status to {status.value}")
# Record job completion metrics for final statuses
if status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
tenant_id = update_data.get("tenant_id", "unknown") # Try to get tenant_id from job metadata
metrics.record_job_completion(job_id, tenant_id, status.value.lower(), error_msg)
except Exception as e:
logger.error(f"[{trace_id}] Failed to update job status for {job_id}: {e}")
def update_job_metadata(job_id: str, updates: Dict[str, Any], redis_client):
"""Update job metadata in Redis."""
metadata_key = f"job:{job_id}:metadata"
# Get existing metadata
existing = redis_client.get(metadata_key)
if existing:
# orjson.loads expects bytes; accept str or bytes
if isinstance(existing, (bytes, bytearray)):
metadata = json.loads(existing)
else:
metadata = json.loads(existing.encode('utf-8'))
else:
metadata = {"job_id": job_id}
# Apply updates
metadata.update(updates)
# Store back to Redis with TTL (store as str)
dumped = json.dumps(metadata)
if isinstance(dumped, (bytes, bytearray)):
dumped = dumped.decode('utf-8')
redis_client.setex(metadata_key, 86400, dumped)
def send_to_dlq(job_id: str, job_payload: Dict[str, Any], error: str, failure_type: str, redis_client):
"""Send failed job to Dead Letter Queue."""
try:
dlq_entry = {
"job_id": job_id,
"payload": job_payload,
"error": error,
"failure_type": failure_type,
"failed_at": datetime.now(timezone.utc).isoformat(),
"retry_count": 3 # Max retries exceeded
}
dlq_blob = json.dumps(dlq_entry)
if isinstance(dlq_blob, (bytes, bytearray)):
dlq_blob = dlq_blob.decode('utf-8')
redis_client.lpush("dlq:failed_jobs", dlq_blob)
redis_client.expire("dlq:failed_jobs", 86400 * 7) # Keep DLQ for 7 days
logger.info(f"Sent job {job_id} to DLQ with failure type: {failure_type}")
except Exception as e:
logger.error(f"Failed to send job {job_id} to DLQ: {e}")
@celery_app.task(bind=True, max_retries=2, default_retry_delay=30)
def process_raw_sql_job(self, job_id: str, tenant_id: str, source_name: str, sql_query: str, max_rows: Optional[int] = None):
"""
Celery task for processing raw SQL queries asynchronously.
This task handles:
- Loading tenant configuration for specific source
- Executing raw SQL with timeout
- Applying max_rows limit
- Storing results in Redis/MinIO
- Network error handling with retries
- Job cancellation checks
Args:
job_id: Unique identifier for the job
tenant_id: Tenant identifier
source_name: Target data source name
sql_query: Raw SQL query to execute
max_rows: Optional maximum rows to return
"""
# Start enhanced trace for the job
tracer = tracing.get_tracer()
if tracer:
trace_context = tracer.start_trace(
operation_name=f"process_raw_sql_job",
span_type=SpanType.BACKGROUND_JOB,
tenant_id=tenant_id,
job_id=job_id,
source_name=source_name
)
trace_id = trace_context.trace_id
else:
trace_id = f"sql-job-{job_id}-{int(time.time())}"
logger.info(f"Starting raw SQL job {job_id} for tenant {tenant_id} on source {source_name}")
redis_client = None
minio_client = None
try:
add_trace_event("sql_job_processing_started", level="INFO", job_id=job_id, tenant_id=tenant_id, source_name=source_name)
# Initialize clients from process-global pool/client
with traced_span("initialize_clients", SpanType.EXTERNAL_API):
if _redis_pool is None:
raise NetworkError("Redis pool not initialized in worker process")
redis_client = redis.Redis(connection_pool=_redis_pool)
minio_client = _minio_client
# Check for cancellation flag
with traced_span("check_cancellation", SpanType.CACHE_OPERATION):
if check_job_cancelled(job_id, redis_client):
logger.info(f"SQL job {job_id} was cancelled")
update_job_status(job_id, JobStatus.CANCELLED, redis_client, trace_id)
add_trace_event("sql_job_cancelled", level="WARN")
if tracer:
tracer.finish_span("cancelled")
return {"status": "cancelled", "message": "Job was cancelled"}
# Update job status to running
update_job_status(job_id, JobStatus.RUNNING, redis_client, trace_id)
# Record job start for metrics
metrics.record_job_start(job_id, tenant_id)
# Load tenant configuration
with traced_span("load_tenant_config", SpanType.CACHE_OPERATION):
tenant_config = get_tenant_config_with_retry(tenant_id, redis_client)
if not tenant_config:
raise PermanentError(f"No configuration found for tenant {tenant_id}")
# Find specific source config
source_config = next((conf for conf in tenant_config if conf.get("source_name") == source_name), None)
if not source_config:
raise PermanentError(f"Data source '{source_name}' not found for tenant '{tenant_id}'")
add_trace_metadata(source_type=source_config.get("source_type", "unknown"))
# Initialize federation agent with only target source
with traced_span("initialize_federation_agent", SpanType.BACKGROUND_JOB):
agent = FederationAgent([source_config], redis_client, minio_client)
if source_name not in agent.connectors:
raise PermanentError(f"Failed to initialize connector for '{source_name}'")
add_trace_metadata(agent_initialized=True)
# Execute the raw SQL query
with traced_span("execute_raw_sql", SpanType.DATABASE_QUERY):
start_time = time.time()
results = agent.execute_raw_query(source_name, sql_query)
duration = time.time() - start_time
add_trace_metadata(query_duration_ms=duration * 1000, result_rows=len(results))
logger.info(f"Raw SQL executed successfully in {duration:.3f}s, returned {len(results)} rows")
# Apply max_rows limit if specified
rows_limited = False
if max_rows and len(results) > max_rows:
results = results[:max_rows]
rows_limited = True
logger.info(f"Results limited to {max_rows} rows (originally {len(results)} rows)")
add_trace_metadata(rows_limited=True, original_row_count=len(results))
# Store results
with traced_span("store_results", SpanType.CACHE_OPERATION):
serialized = json.dumps({
"status": "success",
"results": results,
"rows_returned": len(results),
"rows_limited": rows_limited,
"execution_time_ms": duration * 1000
})
if isinstance(serialized, str):
serialized_bytes = serialized.encode('utf-8')
else:
serialized_bytes = serialized
results_size = len(serialized_bytes)
if results_size > MAX_RESULT_SIZE_BYTES:
# Store in MinIO for large results
logger.info(f"Storing large result ({results_size} bytes) in MinIO")
result_key = f"results/{tenant_id}/{job_id}.json"
minio_client.put_object(
"query-results",
result_key,
io.BytesIO(serialized_bytes),
length=results_size,
content_type="application/json"
)
redis_client.setex(
f"job:{job_id}:result_location",
RESULT_TTL_SECONDS,
f"minio:{result_key}"
)
add_trace_metadata(storage="minio", result_size_bytes=results_size)
else:
# Store in Redis for smaller results (store a str)
to_store = serialized_bytes.decode('utf-8') if isinstance(serialized_bytes, (bytes, bytearray)) else serialized
redis_client.setex(
f"job:{job_id}:result",
RESULT_TTL_SECONDS,
to_store
)
add_trace_metadata(storage="redis", result_size_bytes=results_size)
# Update job status to completed
update_job_status(job_id, JobStatus.COMPLETED, redis_client, trace_id)
metrics.record_job_completion(job_id, tenant_id, duration)
add_trace_event("sql_job_completed", level="INFO", duration=f"{duration:.3f}s", rows=len(results))
# Record SQL execution metrics
metrics.record_sql_execution(
tenant_id=tenant_id,
async_mode=True,
duration=duration,
rows_returned=len(results),
rows_limited=rows_limited
)
if tracer:
tracer.finish_span("success", result_rows=len(results))
logger.info(f"Raw SQL job {job_id} completed successfully")
return {"status": "completed", "job_id": job_id, "rows": len(results)}
except PermanentError as e:
logger.error(f"Permanent error in SQL job {job_id}: {e}")
error_msg = str(e)
if redis_client:
update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, error_msg)
send_to_dlq(job_id, tenant_id, {"sql_query": sql_query, "source_name": source_name}, error_msg, "permanent_error", redis_client)
add_trace_event("sql_job_permanent_error", level="ERROR", error=error_msg)
if tracer:
tracer.finish_span("error", error_msg)
metrics.record_job_failure(job_id, tenant_id, "permanent_error")
raise # Don't retry permanent errors
except (ConnectionError, TimeoutError) as e:
logger.warning(f"Retryable error in SQL job {job_id}: {e}")
if redis_client:
update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, str(e))
add_trace_event("sql_job_retryable_error", level="WARN", error=str(e), retry_count=self.request.retries)
try:
raise self.retry(exc=e, countdown=30 * (2 ** self.request.retries))
except self.MaxRetriesExceededError:
logger.error(f"Max retries exceeded for SQL job {job_id}")
if redis_client:
send_to_dlq(job_id, tenant_id, {"sql_query": sql_query, "source_name": source_name}, str(e), "max_retries", redis_client)
metrics.record_job_failure(job_id, tenant_id, "max_retries")
if tracer:
tracer.finish_span("error", "Max retries exceeded")
raise
except Exception as e:
logger.exception(f"Unexpected error in SQL job {job_id}: {e}")
error_msg = str(e)
if redis_client:
update_job_status(job_id, JobStatus.FAILED, redis_client, trace_id, error_msg)
send_to_dlq(job_id, tenant_id, {"sql_query": sql_query, "source_name": source_name}, error_msg, "unexpected_error", redis_client)
add_trace_event("sql_job_unexpected_error", level="CRITICAL", error=error_msg)
if tracer:
tracer.finish_span("error", error_msg)
metrics.record_job_failure(job_id, tenant_id, "unexpected_error")
raise
# Celery signal handlers for worker lifecycle management
@worker_ready.connect
def worker_ready_handler(sender=None, **kwargs):
"""Handler for when worker is ready to receive tasks."""
logger.info(f"Worker {sender} is ready and connected to broker")
@worker_shutdown.connect
def worker_shutdown_handler(sender=None, **kwargs):
"""Handler for when worker is shutting down."""
logger.info(f"Worker {sender} is shutting down")
if __name__ == "__main__":
# Start the Celery worker
celery_app.start()