File size: 5,094 Bytes
32aacff
 
 
 
 
 
0b3a3cc
 
 
 
 
32aacff
 
0b3a3cc
e8394ba
 
b18cbb9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0b3a3cc
 
b18cbb9
0b3a3cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32aacff
 
 
0b3a3cc
32aacff
0b3a3cc
32aacff
 
 
 
b18cbb9
 
 
 
 
 
 
 
 
 
32aacff
 
0b3a3cc
32aacff
 
 
b18cbb9
 
 
 
 
 
 
 
 
 
32aacff
 
0b3a3cc
32aacff
 
b18cbb9
 
 
 
 
 
 
 
 
 
32aacff
e8394ba
 
 
b18cbb9
1ee8b88
b18cbb9
e8394ba
1ee8b88
e8394ba
 
1ee8b88
 
 
b18cbb9
 
1ee8b88
e8394ba
 
 
 
 
1ee8b88
 
 
 
b18cbb9
 
1ee8b88
 
 
 
 
 
 
 
b18cbb9
 
1ee8b88
32aacff
 
 
f6cb378
32aacff
 
b18cbb9
 
 
 
 
 
 
 
 
 
32aacff
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/**
 * Video Processor
 * 任务处理器 - 主编排器
 */

import { videoQueue } from '../../config/bull'
import { storeJobResult } from '../../services/job-store'
import { clearJobCancelled } from '../../services/job-cancel-store'
import { JobCancelledError } from '../../utils/errors'
import { createLogger } from '../../utils/logger'
import type { VideoJobData } from '../../types'
import { runEditFlow, runGenerationFlow, runPreGeneratedFlow } from './video-processor-flows'
import { getRetryMeta, shouldDisableQueueRetry } from './video-processor-utils'

const logger = createLogger('VideoProcessor')

function emitJobSummary(args: {
  jobId: string
  taskType: 'pre-generated' | 'ai-edit' | 'generation'
  result: 'completed' | 'failed'
  outputMode: string
  timings?: Record<string, number>
  renderPeakMemoryMB?: number
  error?: string
  attempt?: number
  maxAttempts?: number
}): void {
  logger.info('job_summary', {
    _logType: 'job_summary',
    jobId: args.jobId,
    taskType: args.taskType,
    result: args.result,
    outputMode: args.outputMode,
    attempt: args.attempt,
    maxAttempts: args.maxAttempts,
    timings: args.timings,
    renderPeakMemoryMB: args.renderPeakMemoryMB,
    error: args.error
  })
}
videoQueue.process(async (job) => {
  const data = job.data as VideoJobData
  const retryMeta = getRetryMeta(job)
  const {
    jobId,
    concept,
    quality,
    outputMode = 'video',
    preGeneratedCode,
    editCode,
    editInstructions,
    promptOverrides,
    referenceImages
  } = data

  logger.info('Processing video job', {
    jobId,
    concept,
    outputMode,
    quality,
    hasPreGeneratedCode: !!preGeneratedCode,
    hasEditRequest: !!editInstructions,
    referenceImageCount: referenceImages?.length || 0
  })

  const timings: Record<string, number> = {}

  try {
    if (preGeneratedCode) {
      const result = await runPreGeneratedFlow({ job, data, promptOverrides, timings })
      logger.info('Job completed (pre-generated code)', { jobId, timings })
      emitJobSummary({
        jobId,
        taskType: 'pre-generated',
        result: 'completed',
        outputMode,
        timings,
        renderPeakMemoryMB: result.renderPeakMemoryMB,
        attempt: retryMeta.currentAttempt,
        maxAttempts: retryMeta.maxAttempts
      })
      return result
    }

    if (editCode && editInstructions) {
      const result = await runEditFlow({ job, data, promptOverrides, timings })
      logger.info('Job completed', { jobId, source: 'ai-edit', timings })
      emitJobSummary({
        jobId,
        taskType: 'ai-edit',
        result: 'completed',
        outputMode,
        timings,
        renderPeakMemoryMB: result.renderPeakMemoryMB,
        attempt: retryMeta.currentAttempt,
        maxAttempts: retryMeta.maxAttempts
      })
      return result
    }

    const result = await runGenerationFlow({ job, data, promptOverrides, timings })
    logger.info('Job completed', { jobId, source: 'generation', timings })
    emitJobSummary({
      jobId,
      taskType: 'generation',
      result: 'completed',
      outputMode,
      timings,
      renderPeakMemoryMB: result.renderPeakMemoryMB,
      attempt: retryMeta.currentAttempt,
      maxAttempts: retryMeta.maxAttempts
    })
    return result
  } catch (error) {
    const errorMessage = error instanceof Error ? error.message : String(error)
    const cancelReason = error instanceof JobCancelledError ? error.details : undefined
    const currentRetryMeta = getRetryMeta(job)
    const disableQueueRetry = shouldDisableQueueRetry(errorMessage)
    const willQueueRetry = !disableQueueRetry && currentRetryMeta.hasRemainingAttempts

    if (disableQueueRetry) {
      try {
        job.discard()
        logger.warn('Queue retry disabled for exhausted code retry', {
          jobId,
          error: errorMessage,
          currentAttempt: currentRetryMeta.currentAttempt,
          maxAttempts: currentRetryMeta.maxAttempts
        })
      } catch (discardError) {
        logger.warn('Failed to discard job retry', { jobId, error: discardError })
      }
    }

    if (willQueueRetry) {
      logger.warn('Job attempt failed, Bull will retry', {
        jobId,
        error: errorMessage,
        currentAttempt: currentRetryMeta.currentAttempt,
        maxAttempts: currentRetryMeta.maxAttempts
      })
      throw error
    }

    logger.error('Job failed', {
      jobId,
      error: errorMessage,
      timings,
      currentAttempt: currentRetryMeta.currentAttempt,
      maxAttempts: currentRetryMeta.maxAttempts
    })

    await storeJobResult(jobId, {
      status: 'failed',
      data: { error: errorMessage, cancelReason, outputMode }
    })
    await clearJobCancelled(jobId)
    emitJobSummary({
      jobId,
      taskType: editCode && editInstructions ? 'ai-edit' : preGeneratedCode ? 'pre-generated' : 'generation',
      result: 'failed',
      outputMode,
      timings,
      error: errorMessage,
      attempt: currentRetryMeta.currentAttempt,
      maxAttempts: currentRetryMeta.maxAttempts
    })

    throw error
  }
})