| import type { |
| StudioEventBus, |
| StudioMessageStore, |
| StudioPartStore, |
| StudioSessionEventStore, |
| StudioSessionStore, |
| StudioTask, |
| StudioTaskStore, |
| StudioWorkResultStore, |
| StudioWorkStore |
| } from '../domain/types' |
| import type { StudioBlobStore } from '../storage/studio-blob-store' |
| import { publishRenderFailureFeedback } from '../works/render-failure-feedback' |
| import { syncRenderWorkFromTask } from '../works/render-work-sync' |
| import { getBullJobStatus, getJobResult, getJobStage } from '../../services/job-store' |
| import { syncRenderTaskSessionEvents } from './session-event-inbox' |
|
|
| interface SyncStudioRenderTaskInput { |
| task: StudioTask |
| taskStore: StudioTaskStore |
| workStore: StudioWorkStore |
| workResultStore: StudioWorkResultStore |
| sessionStore: StudioSessionStore |
| sessionEventStore: StudioSessionEventStore |
| messageStore: StudioMessageStore |
| partStore: StudioPartStore |
| eventBus: StudioEventBus |
| blobStore?: StudioBlobStore |
| } |
|
|
| export async function syncStudioRenderTask(input: SyncStudioRenderTaskInput): Promise<void> { |
| const jobId = typeof input.task.metadata?.jobId === 'string' ? input.task.metadata.jobId : undefined |
| if (!jobId) { |
| return |
| } |
|
|
| if (input.task.status === 'completed' || input.task.status === 'failed' || input.task.status === 'cancelled') { |
| await publishRenderSync(input, input.task) |
| return |
| } |
|
|
| const [bullStatus, result, stage] = await Promise.all([ |
| getBullJobStatus(jobId), |
| getJobResult(jobId), |
| getJobStage(jobId) |
| ]) |
|
|
| if (bullStatus === 'active') { |
| const updated = await input.taskStore.update(input.task.id, { |
| status: 'running', |
| metadata: { |
| ...input.task.metadata, |
| bullStatus, |
| stage: stage || 'rendering' |
| } |
| }) |
| await publishRenderSync(input, updated ?? input.task) |
| return |
| } |
|
|
| if (bullStatus === 'waiting' || bullStatus === 'delayed') { |
| const updated = await input.taskStore.update(input.task.id, { |
| status: 'queued', |
| metadata: { |
| ...input.task.metadata, |
| bullStatus, |
| stage: stage || 'rendering' |
| } |
| }) |
| await publishRenderSync(input, updated ?? input.task) |
| return |
| } |
|
|
| if (!result) { |
| return |
| } |
|
|
| if (result.status === 'completed') { |
| const updated = await input.taskStore.update(input.task.id, { |
| status: 'completed', |
| metadata: { |
| ...input.task.metadata, |
| bullStatus: bullStatus ?? 'completed', |
| result |
| } |
| }) |
| await publishRenderSync(input, updated ?? input.task) |
| return |
| } |
|
|
| const alreadyReported = input.task.metadata?.failureReported === true |
| const failedTask = await input.taskStore.update(input.task.id, { |
| status: 'failed', |
| metadata: { |
| ...input.task.metadata, |
| bullStatus: bullStatus ?? 'failed', |
| stage: stage || 'rendering', |
| result, |
| failureReported: true |
| } |
| }) |
|
|
| const finalTask = failedTask ?? input.task |
| await publishRenderSync(input, finalTask) |
|
|
| if (alreadyReported) { |
| return |
| } |
|
|
| await publishRenderFailureFeedback({ |
| task: finalTask, |
| sessionStore: input.sessionStore, |
| messageStore: input.messageStore, |
| partStore: input.partStore |
| }) |
| } |
|
|
| async function publishRenderSync( |
| input: SyncStudioRenderTaskInput, |
| task: StudioTask |
| ): Promise<void> { |
| const synced = await syncRenderWorkFromTask({ |
| workStore: input.workStore, |
| workResultStore: input.workResultStore, |
| blobStore: input.blobStore |
| }, task) |
|
|
| await syncRenderTaskSessionEvents({ |
| task, |
| sessionEventStore: input.sessionEventStore, |
| eventBus: input.eventBus |
| }) |
|
|
| if (!synced) { |
| return |
| } |
|
|
| input.eventBus.publish({ |
| type: 'task_updated', |
| sessionId: task.sessionId, |
| runId: task.runId, |
| task |
| }) |
|
|
| input.eventBus.publish({ |
| type: 'work_updated', |
| sessionId: synced.work.sessionId, |
| runId: synced.work.runId, |
| work: synced.work |
| }) |
|
|
| if (synced.result) { |
| input.eventBus.publish({ |
| type: 'work_result_updated', |
| sessionId: synced.work.sessionId, |
| runId: synced.work.runId, |
| result: synced.result |
| }) |
| } |
| }
|
|
|