ManimCat / src /studio-agent /runtime /render-task-sync.ts
Bin29's picture
Sync from main: c1ef036 chore: document docker persistence volumes
94e1b2f
import type {
StudioEventBus,
StudioMessageStore,
StudioPartStore,
StudioSessionEventStore,
StudioSessionStore,
StudioTask,
StudioTaskStore,
StudioWorkResultStore,
StudioWorkStore
} from '../domain/types'
import type { StudioBlobStore } from '../storage/studio-blob-store'
import { publishRenderFailureFeedback } from '../works/render-failure-feedback'
import { syncRenderWorkFromTask } from '../works/render-work-sync'
import { getBullJobStatus, getJobResult, getJobStage } from '../../services/job-store'
import { syncRenderTaskSessionEvents } from './session-event-inbox'
interface SyncStudioRenderTaskInput {
task: StudioTask
taskStore: StudioTaskStore
workStore: StudioWorkStore
workResultStore: StudioWorkResultStore
sessionStore: StudioSessionStore
sessionEventStore: StudioSessionEventStore
messageStore: StudioMessageStore
partStore: StudioPartStore
eventBus: StudioEventBus
blobStore?: StudioBlobStore
}
export async function syncStudioRenderTask(input: SyncStudioRenderTaskInput): Promise<void> {
const jobId = typeof input.task.metadata?.jobId === 'string' ? input.task.metadata.jobId : undefined
if (!jobId) {
return
}
if (input.task.status === 'completed' || input.task.status === 'failed' || input.task.status === 'cancelled') {
await publishRenderSync(input, input.task)
return
}
const [bullStatus, result, stage] = await Promise.all([
getBullJobStatus(jobId),
getJobResult(jobId),
getJobStage(jobId)
])
if (bullStatus === 'active') {
const updated = await input.taskStore.update(input.task.id, {
status: 'running',
metadata: {
...input.task.metadata,
bullStatus,
stage: stage || 'rendering'
}
})
await publishRenderSync(input, updated ?? input.task)
return
}
if (bullStatus === 'waiting' || bullStatus === 'delayed') {
const updated = await input.taskStore.update(input.task.id, {
status: 'queued',
metadata: {
...input.task.metadata,
bullStatus,
stage: stage || 'rendering'
}
})
await publishRenderSync(input, updated ?? input.task)
return
}
if (!result) {
return
}
if (result.status === 'completed') {
const updated = await input.taskStore.update(input.task.id, {
status: 'completed',
metadata: {
...input.task.metadata,
bullStatus: bullStatus ?? 'completed',
result
}
})
await publishRenderSync(input, updated ?? input.task)
return
}
const alreadyReported = input.task.metadata?.failureReported === true
const failedTask = await input.taskStore.update(input.task.id, {
status: 'failed',
metadata: {
...input.task.metadata,
bullStatus: bullStatus ?? 'failed',
stage: stage || 'rendering',
result,
failureReported: true
}
})
const finalTask = failedTask ?? input.task
await publishRenderSync(input, finalTask)
if (alreadyReported) {
return
}
await publishRenderFailureFeedback({
task: finalTask,
sessionStore: input.sessionStore,
messageStore: input.messageStore,
partStore: input.partStore
})
}
async function publishRenderSync(
input: SyncStudioRenderTaskInput,
task: StudioTask
): Promise<void> {
const synced = await syncRenderWorkFromTask({
workStore: input.workStore,
workResultStore: input.workResultStore,
blobStore: input.blobStore
}, task)
await syncRenderTaskSessionEvents({
task,
sessionEventStore: input.sessionEventStore,
eventBus: input.eventBus
})
if (!synced) {
return
}
input.eventBus.publish({
type: 'task_updated',
sessionId: task.sessionId,
runId: task.runId,
task
})
input.eventBus.publish({
type: 'work_updated',
sessionId: synced.work.sessionId,
runId: synced.work.runId,
work: synced.work
})
if (synced.result) {
input.eventBus.publish({
type: 'work_result_updated',
sessionId: synced.work.sessionId,
runId: synced.work.runId,
result: synced.result
})
}
}