/** * 基于 BullMQ 的分布式任务队列实现 (持久化与可靠处理) * 支持:自动重试、延迟执行、并发控制、任务持久化 */ import { Queue, Worker, Job } from 'bullmq'; import { redis, isRedisAvailable } from './redis.js'; const QUEUE_NAME = 'codex_tasks'; // 内存队列模拟 (当 Redis 不可用时) const memoryQueue: any[] = []; // 1. 定义任务队列 export const taskQueue = isRedisAvailable ? new Queue(QUEUE_NAME, { connection: redis as any, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, removeOnComplete: true, removeOnFail: false, }, }) : null; let worker: Worker | null = null; // 2. 设置处理器与并发控制 export const setupWorkers = ( aiHandler: (data: any) => Promise, docHandler: (data: any) => Promise ) => { if (worker || !isRedisAvailable) { if (!isRedisAvailable) { console.warn('[Queue] Redis 不可用,启用内存模拟模式 (任务不持久化)'); // 启动一个简单的定时器处理内存任务 setInterval(async () => { if (memoryQueue.length > 0) { const task = memoryQueue.shift(); console.log(`[Queue:Memory] 正在处理任务: ${task.type}`); try { if (task.type === 'ai_workflow') await aiHandler(task.data); else if (task.type === 'document_process') await docHandler(task.data); } catch (e) {} } }, 3000); } return; } worker = new Worker( QUEUE_NAME, async (job: Job) => { // ... 逻辑保持不变 console.log(`[Queue] 正在执行任务: ${job.name} (ID: ${job.id})`); const { type, data } = job.data; try { if (type === 'ai_workflow') { return await aiHandler(data); } else if (type === 'document_process') { return await docHandler(data); } else { throw new Error(`未知任务类型: ${type}`); } } catch (err) { console.error(`[Queue] 任务失败: ${job.id}`, err); throw err; } }, { connection: redis as any, concurrency: 5, // 最大并发处理数 } ); worker.on('completed', (job) => { console.log(`[Queue] 任务已完成: ${job.id}`); }); worker.on('failed', (job, err) => { console.error(`[Queue] 任务彻底失败: ${job?.id}`, err.message); }); console.log('[Queue] BullMQ 任务队列 Workers 已就绪,并发数: 5'); }; // 3. 添加任务接口 export const addJob = async (type: string, data: any) => { const jobName = `${type}_${Date.now()}`; if (taskQueue) { const job = await taskQueue.add(jobName, { type, data }); return { id: job.id, name: jobName }; } else { // 内存降级逻辑 memoryQueue.push({ type, data }); return { id: `mem_${Date.now()}`, name: jobName }; } }; // 获取队列简要状态 (用于监控) export const getQueueStatus = async () => { if (taskQueue) { const [active, waiting, completed, failed] = await Promise.all([ taskQueue.getActiveCount(), taskQueue.getWaitingCount(), taskQueue.getCompletedCount(), taskQueue.getFailedCount(), ]); return { active, waiting, completed, failed }; } else { return { active: 0, waiting: memoryQueue.length, completed: 0, failed: 0 }; } };