| |
| |
| |
| |
|
|
| import { videoQueue } from '../../config/bull' |
| import { storeJobResult } from '../../services/job-store' |
| import { clearJobCancelled } from '../../services/job-cancel-store' |
| import { createHistory } from '../../database' |
| import { JobCancelledError } from '../../utils/errors' |
| import { createLogger } from '../../utils/logger' |
| import type { VideoJobData } from '../../types' |
| import { runEditFlow, runGenerationFlow, runPreGeneratedFlow } from './video-processor-flows-static' |
| import { getRetryMeta, shouldDisableQueueRetry, storeProcessingStage } from './video-processor-utils' |
| import { getCurrentJobLogSummary, runWithJobLogContext } from '../../services/job-log-context' |
|
|
| const logger = createLogger('VideoProcessor') |
|
|
| function emitJobSummary(args: { |
| jobId: string |
| taskType: 'pre-generated' | 'ai-edit' | 'generation' |
| result: 'completed' | 'failed' |
| outputMode: string |
| timings?: Record<string, number> |
| renderPeakMemoryMB?: number |
| error?: string |
| attempt?: number |
| maxAttempts?: number |
| }): void { |
| const tokenSummary = getCurrentJobLogSummary() |
| logger.info('job_summary', { |
| _logType: 'job_summary', |
| jobId: args.jobId, |
| taskType: args.taskType, |
| result: args.result, |
| outputMode: args.outputMode, |
| attempt: args.attempt, |
| maxAttempts: args.maxAttempts, |
| timings: args.timings, |
| renderPeakMemoryMB: args.renderPeakMemoryMB, |
| error: args.error, |
| tokens: tokenSummary |
| ? { |
| totals: tokenSummary.totals, |
| calls: tokenSummary.calls |
| } |
| : { |
| totals: { |
| promptTokens: 0, |
| completionTokens: 0, |
| totalTokens: 0, |
| measuredCalls: 0, |
| unmeasuredCalls: 0 |
| }, |
| calls: [] |
| } |
| }) |
| } |
|
|
| videoQueue.process(async (job) => { |
| const data = job.data as VideoJobData |
| const contextAttempt = typeof job.attemptsMade === 'number' ? job.attemptsMade + 1 : 1 |
|
|
| return runWithJobLogContext( |
| { |
| jobId: data.jobId, |
| outputMode: data.outputMode || 'video', |
| attempts: contextAttempt |
| }, |
| async () => { |
| const { |
| jobId, |
| concept, |
| quality, |
| outputMode = 'video', |
| preGeneratedCode, |
| editCode, |
| editInstructions, |
| promptOverrides, |
| referenceImages |
| } = data |
|
|
| logger.info('Processing video job', { |
| jobId, |
| concept, |
| outputMode, |
| quality, |
| hasPreGeneratedCode: !!preGeneratedCode, |
| hasEditRequest: !!editInstructions, |
| referenceImageCount: referenceImages?.length || 0 |
| }) |
|
|
| const timings: Record<string, number> = {} |
| const retryMeta = getRetryMeta(job) |
| const initialStage = preGeneratedCode ? 'rendering' : 'generating' |
|
|
| try { |
| await storeProcessingStage(jobId, initialStage, { attempt: retryMeta.currentAttempt }) |
|
|
| if (preGeneratedCode) { |
| const result = await runPreGeneratedFlow({ job, data, promptOverrides, timings }) |
| logger.info('Job completed (pre-generated code)', { jobId, timings }) |
| emitJobSummary({ |
| jobId, |
| taskType: 'pre-generated', |
| result: 'completed', |
| outputMode, |
| timings, |
| renderPeakMemoryMB: result.renderPeakMemoryMB, |
| attempt: retryMeta.currentAttempt, |
| maxAttempts: retryMeta.maxAttempts |
| }) |
| return result |
| } |
|
|
| if (editCode && editInstructions) { |
| const result = await runEditFlow({ job, data, promptOverrides, timings }) |
| logger.info('Job completed', { jobId, source: 'ai-edit', timings }) |
| emitJobSummary({ |
| jobId, |
| taskType: 'ai-edit', |
| result: 'completed', |
| outputMode, |
| timings, |
| renderPeakMemoryMB: result.renderPeakMemoryMB, |
| attempt: retryMeta.currentAttempt, |
| maxAttempts: retryMeta.maxAttempts |
| }) |
| return result |
| } |
|
|
| const result = await runGenerationFlow({ job, data, promptOverrides, timings }) |
| logger.info('Job completed', { jobId, source: 'generation', timings }) |
| emitJobSummary({ |
| jobId, |
| taskType: 'generation', |
| result: 'completed', |
| outputMode, |
| timings, |
| renderPeakMemoryMB: result.renderPeakMemoryMB, |
| attempt: retryMeta.currentAttempt, |
| maxAttempts: retryMeta.maxAttempts |
| }) |
| return result |
| } catch (error) { |
| const errorMessage = error instanceof Error ? error.message : String(error) |
| const cancelReason = error instanceof JobCancelledError ? error.details : undefined |
| const currentRetryMeta = getRetryMeta(job) |
| const disableQueueRetry = shouldDisableQueueRetry(errorMessage) |
| const willQueueRetry = !disableQueueRetry && currentRetryMeta.hasRemainingAttempts |
|
|
| if (disableQueueRetry) { |
| try { |
| job.discard() |
| logger.warn('Queue retry disabled for exhausted code retry', { |
| jobId, |
| error: errorMessage, |
| currentAttempt: currentRetryMeta.currentAttempt, |
| maxAttempts: currentRetryMeta.maxAttempts |
| }) |
| } catch (discardError) { |
| logger.warn('Failed to discard job retry', { jobId, error: discardError }) |
| } |
| } |
|
|
| if (willQueueRetry) { |
| logger.warn('Job attempt failed, Bull will retry', { |
| jobId, |
| error: errorMessage, |
| currentAttempt: currentRetryMeta.currentAttempt, |
| maxAttempts: currentRetryMeta.maxAttempts |
| }) |
| throw error |
| } |
|
|
| logger.error('Job failed', { |
| jobId, |
| error: errorMessage, |
| timings, |
| currentAttempt: currentRetryMeta.currentAttempt, |
| maxAttempts: currentRetryMeta.maxAttempts |
| }) |
|
|
| await storeJobResult(jobId, { |
| status: 'failed', |
| data: { error: errorMessage, cancelReason, outputMode } |
| }) |
| await clearJobCancelled(jobId) |
|
|
| |
| if (data.clientId) { |
| try { |
| await createHistory({ |
| client_id: data.clientId, |
| prompt: concept, |
| code: null, |
| output_mode: outputMode as 'video' | 'image', |
| quality: quality as 'low' | 'medium' | 'high', |
| status: 'failed', |
| error: errorMessage |
| }) |
| } catch (histErr) { |
| logger.warn('Failed to write history record', { jobId, error: histErr }) |
| } |
| } |
|
|
| emitJobSummary({ |
| jobId, |
| taskType: editCode && editInstructions ? 'ai-edit' : preGeneratedCode ? 'pre-generated' : 'generation', |
| result: 'failed', |
| outputMode, |
| timings, |
| error: errorMessage, |
| attempt: currentRetryMeta.currentAttempt, |
| maxAttempts: currentRetryMeta.maxAttempts |
| }) |
|
|
| throw error |
| } |
| } |
| ) |
| }) |
|
|