import type { StudioEventBus, StudioMessageStore, StudioPartStore, StudioSessionEventStore, StudioSessionStore, StudioTask, StudioTaskStore, StudioWorkResultStore, StudioWorkStore } from '../domain/types' import type { StudioBlobStore } from '../storage/studio-blob-store' import { publishRenderFailureFeedback } from '../works/render-failure-feedback' import { syncRenderWorkFromTask } from '../works/render-work-sync' import { getBullJobStatus, getJobResult, getJobStage } from '../../services/job-store' import { syncRenderTaskSessionEvents } from './session-event-inbox' interface SyncStudioRenderTaskInput { task: StudioTask taskStore: StudioTaskStore workStore: StudioWorkStore workResultStore: StudioWorkResultStore sessionStore: StudioSessionStore sessionEventStore: StudioSessionEventStore messageStore: StudioMessageStore partStore: StudioPartStore eventBus: StudioEventBus blobStore?: StudioBlobStore } export async function syncStudioRenderTask(input: SyncStudioRenderTaskInput): Promise { const jobId = typeof input.task.metadata?.jobId === 'string' ? input.task.metadata.jobId : undefined if (!jobId) { return } if (input.task.status === 'completed' || input.task.status === 'failed' || input.task.status === 'cancelled') { await publishRenderSync(input, input.task) return } const [bullStatus, result, stage] = await Promise.all([ getBullJobStatus(jobId), getJobResult(jobId), getJobStage(jobId) ]) if (bullStatus === 'active') { const updated = await input.taskStore.update(input.task.id, { status: 'running', metadata: { ...input.task.metadata, bullStatus, stage: stage || 'rendering' } }) await publishRenderSync(input, updated ?? input.task) return } if (bullStatus === 'waiting' || bullStatus === 'delayed') { const updated = await input.taskStore.update(input.task.id, { status: 'queued', metadata: { ...input.task.metadata, bullStatus, stage: stage || 'rendering' } }) await publishRenderSync(input, updated ?? input.task) return } if (!result) { return } if (result.status === 'completed') { const updated = await input.taskStore.update(input.task.id, { status: 'completed', metadata: { ...input.task.metadata, bullStatus: bullStatus ?? 'completed', result } }) await publishRenderSync(input, updated ?? input.task) return } const alreadyReported = input.task.metadata?.failureReported === true const failedTask = await input.taskStore.update(input.task.id, { status: 'failed', metadata: { ...input.task.metadata, bullStatus: bullStatus ?? 'failed', stage: stage || 'rendering', result, failureReported: true } }) const finalTask = failedTask ?? input.task await publishRenderSync(input, finalTask) if (alreadyReported) { return } await publishRenderFailureFeedback({ task: finalTask, sessionStore: input.sessionStore, messageStore: input.messageStore, partStore: input.partStore }) } async function publishRenderSync( input: SyncStudioRenderTaskInput, task: StudioTask ): Promise { const synced = await syncRenderWorkFromTask({ workStore: input.workStore, workResultStore: input.workResultStore, blobStore: input.blobStore }, task) await syncRenderTaskSessionEvents({ task, sessionEventStore: input.sessionEventStore, eventBus: input.eventBus }) if (!synced) { return } input.eventBus.publish({ type: 'task_updated', sessionId: task.sessionId, runId: task.runId, task }) input.eventBus.publish({ type: 'work_updated', sessionId: synced.work.sessionId, runId: synced.work.runId, work: synced.work }) if (synced.result) { input.eventBus.publish({ type: 'work_result_updated', sessionId: synced.work.sessionId, runId: synced.work.runId, result: synced.result }) } }