ManimCat-show / src /queues /processors /video.processor.ts
Bin29's picture
docs+logging: add sample video prompt and restore prod summary mode with memory controls
b18cbb9
/**
* Video Processor
* 任务处理器 - 主编排器
*/
import { videoQueue } from '../../config/bull'
import { storeJobResult } from '../../services/job-store'
import { clearJobCancelled } from '../../services/job-cancel-store'
import { JobCancelledError } from '../../utils/errors'
import { createLogger } from '../../utils/logger'
import type { VideoJobData } from '../../types'
import { runEditFlow, runGenerationFlow, runPreGeneratedFlow } from './video-processor-flows'
import { getRetryMeta, shouldDisableQueueRetry } from './video-processor-utils'
const logger = createLogger('VideoProcessor')
function emitJobSummary(args: {
jobId: string
taskType: 'pre-generated' | 'ai-edit' | 'generation'
result: 'completed' | 'failed'
outputMode: string
timings?: Record<string, number>
renderPeakMemoryMB?: number
error?: string
attempt?: number
maxAttempts?: number
}): void {
logger.info('job_summary', {
_logType: 'job_summary',
jobId: args.jobId,
taskType: args.taskType,
result: args.result,
outputMode: args.outputMode,
attempt: args.attempt,
maxAttempts: args.maxAttempts,
timings: args.timings,
renderPeakMemoryMB: args.renderPeakMemoryMB,
error: args.error
})
}
videoQueue.process(async (job) => {
const data = job.data as VideoJobData
const retryMeta = getRetryMeta(job)
const {
jobId,
concept,
quality,
outputMode = 'video',
preGeneratedCode,
editCode,
editInstructions,
promptOverrides,
referenceImages
} = data
logger.info('Processing video job', {
jobId,
concept,
outputMode,
quality,
hasPreGeneratedCode: !!preGeneratedCode,
hasEditRequest: !!editInstructions,
referenceImageCount: referenceImages?.length || 0
})
const timings: Record<string, number> = {}
try {
if (preGeneratedCode) {
const result = await runPreGeneratedFlow({ job, data, promptOverrides, timings })
logger.info('Job completed (pre-generated code)', { jobId, timings })
emitJobSummary({
jobId,
taskType: 'pre-generated',
result: 'completed',
outputMode,
timings,
renderPeakMemoryMB: result.renderPeakMemoryMB,
attempt: retryMeta.currentAttempt,
maxAttempts: retryMeta.maxAttempts
})
return result
}
if (editCode && editInstructions) {
const result = await runEditFlow({ job, data, promptOverrides, timings })
logger.info('Job completed', { jobId, source: 'ai-edit', timings })
emitJobSummary({
jobId,
taskType: 'ai-edit',
result: 'completed',
outputMode,
timings,
renderPeakMemoryMB: result.renderPeakMemoryMB,
attempt: retryMeta.currentAttempt,
maxAttempts: retryMeta.maxAttempts
})
return result
}
const result = await runGenerationFlow({ job, data, promptOverrides, timings })
logger.info('Job completed', { jobId, source: 'generation', timings })
emitJobSummary({
jobId,
taskType: 'generation',
result: 'completed',
outputMode,
timings,
renderPeakMemoryMB: result.renderPeakMemoryMB,
attempt: retryMeta.currentAttempt,
maxAttempts: retryMeta.maxAttempts
})
return result
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
const cancelReason = error instanceof JobCancelledError ? error.details : undefined
const currentRetryMeta = getRetryMeta(job)
const disableQueueRetry = shouldDisableQueueRetry(errorMessage)
const willQueueRetry = !disableQueueRetry && currentRetryMeta.hasRemainingAttempts
if (disableQueueRetry) {
try {
job.discard()
logger.warn('Queue retry disabled for exhausted code retry', {
jobId,
error: errorMessage,
currentAttempt: currentRetryMeta.currentAttempt,
maxAttempts: currentRetryMeta.maxAttempts
})
} catch (discardError) {
logger.warn('Failed to discard job retry', { jobId, error: discardError })
}
}
if (willQueueRetry) {
logger.warn('Job attempt failed, Bull will retry', {
jobId,
error: errorMessage,
currentAttempt: currentRetryMeta.currentAttempt,
maxAttempts: currentRetryMeta.maxAttempts
})
throw error
}
logger.error('Job failed', {
jobId,
error: errorMessage,
timings,
currentAttempt: currentRetryMeta.currentAttempt,
maxAttempts: currentRetryMeta.maxAttempts
})
await storeJobResult(jobId, {
status: 'failed',
data: { error: errorMessage, cancelReason, outputMode }
})
await clearJobCancelled(jobId)
emitJobSummary({
jobId,
taskType: editCode && editInstructions ? 'ai-edit' : preGeneratedCode ? 'pre-generated' : 'generation',
result: 'failed',
outputMode,
timings,
error: errorMessage,
attempt: currentRetryMeta.currentAttempt,
maxAttempts: currentRetryMeta.maxAttempts
})
throw error
}
})