ManimCat / src /queues /processors /video.processor.ts
Bin29's picture
Sync from main: b9996d0 feat: refine studio workspace and plot canvas ui
14ea677
/**
* Video Processor
* 任务处理器 - 主编排器
*/
import { videoQueue } from '../../config/bull'
import { storeJobResult } from '../../services/job-store'
import { clearJobCancelled } from '../../services/job-cancel-store'
import { createHistory } from '../../database'
import { JobCancelledError } from '../../utils/errors'
import { createLogger } from '../../utils/logger'
import type { VideoJobData } from '../../types'
import { runEditFlow, runGenerationFlow, runPreGeneratedFlow } from './video-processor-flows-static'
import { getRetryMeta, shouldDisableQueueRetry, storeProcessingStage } from './video-processor-utils'
import { getCurrentJobLogSummary, runWithJobLogContext } from '../../services/job-log-context'
const logger = createLogger('VideoProcessor')
function emitJobSummary(args: {
jobId: string
taskType: 'pre-generated' | 'ai-edit' | 'generation'
result: 'completed' | 'failed'
outputMode: string
timings?: Record<string, number>
renderPeakMemoryMB?: number
error?: string
attempt?: number
maxAttempts?: number
}): void {
const tokenSummary = getCurrentJobLogSummary()
logger.info('job_summary', {
_logType: 'job_summary',
jobId: args.jobId,
taskType: args.taskType,
result: args.result,
outputMode: args.outputMode,
attempt: args.attempt,
maxAttempts: args.maxAttempts,
timings: args.timings,
renderPeakMemoryMB: args.renderPeakMemoryMB,
error: args.error,
tokens: tokenSummary
? {
totals: tokenSummary.totals,
calls: tokenSummary.calls
}
: {
totals: {
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
measuredCalls: 0,
unmeasuredCalls: 0
},
calls: []
}
})
}
videoQueue.process(async (job) => {
const data = job.data as VideoJobData
const contextAttempt = typeof job.attemptsMade === 'number' ? job.attemptsMade + 1 : 1
return runWithJobLogContext(
{
jobId: data.jobId,
outputMode: data.outputMode || 'video',
attempts: contextAttempt
},
async () => {
const {
jobId,
concept,
quality,
outputMode = 'video',
preGeneratedCode,
editCode,
editInstructions,
promptOverrides,
referenceImages
} = data
logger.info('Processing video job', {
jobId,
concept,
outputMode,
quality,
hasPreGeneratedCode: !!preGeneratedCode,
hasEditRequest: !!editInstructions,
referenceImageCount: referenceImages?.length || 0
})
const timings: Record<string, number> = {}
const retryMeta = getRetryMeta(job)
const initialStage = preGeneratedCode ? 'rendering' : 'generating'
try {
await storeProcessingStage(jobId, initialStage, { attempt: retryMeta.currentAttempt })
if (preGeneratedCode) {
const result = await runPreGeneratedFlow({ job, data, promptOverrides, timings })
logger.info('Job completed (pre-generated code)', { jobId, timings })
emitJobSummary({
jobId,
taskType: 'pre-generated',
result: 'completed',
outputMode,
timings,
renderPeakMemoryMB: result.renderPeakMemoryMB,
attempt: retryMeta.currentAttempt,
maxAttempts: retryMeta.maxAttempts
})
return result
}
if (editCode && editInstructions) {
const result = await runEditFlow({ job, data, promptOverrides, timings })
logger.info('Job completed', { jobId, source: 'ai-edit', timings })
emitJobSummary({
jobId,
taskType: 'ai-edit',
result: 'completed',
outputMode,
timings,
renderPeakMemoryMB: result.renderPeakMemoryMB,
attempt: retryMeta.currentAttempt,
maxAttempts: retryMeta.maxAttempts
})
return result
}
const result = await runGenerationFlow({ job, data, promptOverrides, timings })
logger.info('Job completed', { jobId, source: 'generation', timings })
emitJobSummary({
jobId,
taskType: 'generation',
result: 'completed',
outputMode,
timings,
renderPeakMemoryMB: result.renderPeakMemoryMB,
attempt: retryMeta.currentAttempt,
maxAttempts: retryMeta.maxAttempts
})
return result
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
const cancelReason = error instanceof JobCancelledError ? error.details : undefined
const currentRetryMeta = getRetryMeta(job)
const disableQueueRetry = shouldDisableQueueRetry(errorMessage)
const willQueueRetry = !disableQueueRetry && currentRetryMeta.hasRemainingAttempts
if (disableQueueRetry) {
try {
job.discard()
logger.warn('Queue retry disabled for exhausted code retry', {
jobId,
error: errorMessage,
currentAttempt: currentRetryMeta.currentAttempt,
maxAttempts: currentRetryMeta.maxAttempts
})
} catch (discardError) {
logger.warn('Failed to discard job retry', { jobId, error: discardError })
}
}
if (willQueueRetry) {
logger.warn('Job attempt failed, Bull will retry', {
jobId,
error: errorMessage,
currentAttempt: currentRetryMeta.currentAttempt,
maxAttempts: currentRetryMeta.maxAttempts
})
throw error
}
logger.error('Job failed', {
jobId,
error: errorMessage,
timings,
currentAttempt: currentRetryMeta.currentAttempt,
maxAttempts: currentRetryMeta.maxAttempts
})
await storeJobResult(jobId, {
status: 'failed',
data: { error: errorMessage, cancelReason, outputMode }
})
await clearJobCancelled(jobId)
// 写入持久化历史记录(保存错误原因和提示词)
if (data.clientId) {
try {
await createHistory({
client_id: data.clientId,
prompt: concept,
code: null, // 失败时没有代码
output_mode: outputMode as 'video' | 'image',
quality: quality as 'low' | 'medium' | 'high',
status: 'failed',
error: errorMessage
})
} catch (histErr) {
logger.warn('Failed to write history record', { jobId, error: histErr })
}
}
emitJobSummary({
jobId,
taskType: editCode && editInstructions ? 'ai-edit' : preGeneratedCode ? 'pre-generated' : 'generation',
result: 'failed',
outputMode,
timings,
error: errorMessage,
attempt: currentRetryMeta.currentAttempt,
maxAttempts: currentRetryMeta.maxAttempts
})
throw error
}
}
)
})