| import { createLoggerWithContext } from "@midday/logger"; |
| import * as Sentry from "@sentry/bun"; |
| import type { Job } from "bullmq"; |
| import type { ZodSchema } from "zod"; |
| import { |
| classifyError, |
| getMaxRetries, |
| getRetryDelay, |
| isNonRetryableError, |
| NonRetryableError, |
| } from "../utils/error-classification"; |
|
|
| |
| |
| |
| export abstract class BaseProcessor<TData = unknown> { |
| protected logger: ReturnType<typeof createLoggerWithContext>; |
|
|
| constructor() { |
| this.logger = createLoggerWithContext(`worker:${this.constructor.name}`); |
| } |
|
|
| |
| |
| |
| |
| protected getPayloadSchema(): ZodSchema<TData> | null { |
| return null; |
| } |
|
|
| |
| |
| |
| |
| abstract process(job: Job<TData>): Promise<unknown>; |
|
|
| |
| |
| |
| protected validatePayload(job: Job<TData>): TData { |
| const schema = this.getPayloadSchema(); |
| if (!schema) { |
| return job.data; |
| } |
|
|
| try { |
| return schema.parse(job.data) as TData; |
| } catch (error) { |
| this.logger.error("Payload validation failed", { |
| jobId: job.id, |
| jobName: job.name, |
| error: error instanceof Error ? error.message : "Unknown error", |
| payload: JSON.stringify(job.data), |
| }); |
| throw new NonRetryableError( |
| `Invalid job payload: ${error instanceof Error ? error.message : "Unknown error"}`, |
| error, |
| "validation", |
| ); |
| } |
| } |
|
|
| |
| |
| |
| async handle(job: Job<TData>): Promise<unknown> { |
| const startTime = Date.now(); |
|
|
| this.logger.info("Processing job", { |
| jobId: job.id, |
| jobName: job.name, |
| attempt: job.attemptsMade + 1, |
| maxAttempts: job.opts.attempts, |
| }); |
|
|
| try { |
| |
| |
| const validatedData = this.validatePayload(job); |
| job.data = validatedData; |
|
|
| |
| const shouldProcess = await this.shouldProcess(job); |
| if (!shouldProcess) { |
| this.logger.info("Skipping job due to idempotency check", { |
| jobId: job.id, |
| jobName: job.name, |
| idempotencyKey: this.getIdempotencyKey(job), |
| }); |
| return { skipped: true, reason: "idempotency" }; |
| } |
|
|
| |
| if (job.opts.removeOnComplete !== false) { |
| await this.updateProgress( |
| job, |
| this.ProgressMilestones.STARTED, |
| "Job started", |
| ); |
| } |
|
|
| |
| const result = await this.process(job); |
|
|
| const duration = Date.now() - startTime; |
|
|
| |
| |
| try { |
| this.logger.info("Job completed", { |
| jobId: job.id, |
| jobName: job.name, |
| duration: `${duration}ms`, |
| hasResult: result !== undefined, |
| resultType: typeof result, |
| }); |
| } catch (_logError) { |
| |
| |
| } |
|
|
| |
| |
| if (result !== undefined && result !== null) { |
| try { |
| |
| const serialized = JSON.stringify(result); |
| |
| const sizeInMB = new Blob([serialized]).size / (1024 * 1024); |
| if (sizeInMB > 100) { |
| this.logger.warn("Large job result detected", { |
| jobId: job.id, |
| jobName: job.name, |
| sizeMB: sizeInMB.toFixed(2), |
| }); |
| } |
| } catch (error) { |
| this.logger.error("Result is not JSON-serializable", { |
| jobId: job.id, |
| jobName: job.name, |
| error: error instanceof Error ? error.message : "Unknown error", |
| resultType: typeof result, |
| resultKeys: |
| result && typeof result === "object" |
| ? Object.keys(result) |
| : undefined, |
| }); |
| throw new Error( |
| `Job result is not JSON-serializable: ${error instanceof Error ? error.message : "Unknown error"}`, |
| ); |
| } |
| } |
|
|
| return result; |
| } catch (error) { |
| const duration = Date.now() - startTime; |
| const errorMessage = |
| error instanceof Error ? error.message : "Unknown error"; |
| const errorStack = error instanceof Error ? error.stack : undefined; |
| const classified = classifyError(error); |
|
|
| |
| const isNonRetryable = isNonRetryableError(error); |
| const shouldRetry = classified.retryable && !isNonRetryable; |
| const remainingAttempts = |
| (job.opts.attempts ?? 3) - (job.attemptsMade + 1); |
|
|
| this.logger.error("Job failed", { |
| jobId: job.id, |
| jobName: job.name, |
| attempt: job.attemptsMade + 1, |
| maxAttempts: job.opts.attempts, |
| remainingAttempts, |
| duration: `${duration}ms`, |
| error: errorMessage, |
| errorCategory: classified.category, |
| retryable: classified.retryable, |
| isNonRetryable, |
| shouldRetry, |
| suggestedRetryDelay: getRetryDelay(error), |
| suggestedMaxRetries: getMaxRetries(error), |
| stack: errorStack, |
| }); |
|
|
| |
| |
| if (!shouldRetry || remainingAttempts <= 0) { |
| Sentry.captureException(error, { |
| tags: { |
| jobName: job.name, |
| errorCategory: classified.category, |
| retryable: String(shouldRetry), |
| finalAttempt: String(remainingAttempts <= 0), |
| }, |
| extra: { |
| jobId: job.id, |
| attempt: job.attemptsMade + 1, |
| maxAttempts: job.opts.attempts, |
| remainingAttempts, |
| duration: `${duration}ms`, |
| payload: JSON.stringify(job.data), |
| }, |
| }); |
| } |
|
|
| |
| |
| if (isNonRetryable && remainingAttempts > 0) { |
| try { |
| |
| |
| this.logger.info( |
| "Marking job as non-retryable, skipping remaining attempts", |
| { |
| jobId: job.id, |
| jobName: job.name, |
| category: classified.category, |
| }, |
| ); |
| } catch (removeError) { |
| this.logger.warn("Failed to remove non-retryable job from queue", { |
| jobId: job.id, |
| error: |
| removeError instanceof Error |
| ? removeError.message |
| : "Unknown error", |
| }); |
| } |
| } |
|
|
| |
| |
| if (!shouldRetry && !isNonRetryable) { |
| const wrappedError = new NonRetryableError( |
| errorMessage, |
| error, |
| classified.category, |
| ); |
| throw wrappedError; |
| } |
|
|
| |
| throw error; |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| protected async updateProgress( |
| job: Job<TData>, |
| progress: number, |
| message?: string, |
| ): Promise<void> { |
| |
| const clampedProgress = Math.max(0, Math.min(100, progress)); |
|
|
| try { |
| |
| |
| if (job.updateProgress && typeof job.updateProgress === "function") { |
| await job.updateProgress(clampedProgress); |
| this.logger.debug("Progress updated", { |
| jobId: job.id, |
| progress: `${clampedProgress}%`, |
| message, |
| }); |
| } else { |
| |
| this.logger.debug("Progress update skipped (method not available)", { |
| jobId: job.id, |
| progress: `${clampedProgress}%`, |
| message, |
| }); |
| } |
| } catch (error) { |
| |
| this.logger.warn("Failed to update job progress", { |
| jobId: job.id, |
| progress: clampedProgress, |
| error: error instanceof Error ? error.message : "Unknown error", |
| }); |
| } |
| } |
|
|
| |
| |
| |
| protected readonly ProgressMilestones = { |
| STARTED: 0, |
| VALIDATED: 5, |
| FETCHED: 10, |
| PROCESSING: 25, |
| HALFWAY: 50, |
| NEARLY_DONE: 75, |
| FINALIZING: 90, |
| COMPLETED: 100, |
| } as const; |
|
|
| |
| |
| |
| |
| |
| |
| protected async shouldProcess(_job: Job<TData>): Promise<boolean> { |
| |
| |
| return true; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| protected getIdempotencyKey(job: Job<TData>): string | null { |
| |
| |
| return job.id ?? null; |
| } |
| } |
|
|