Spaces:
Sleeping
Sleeping
File size: 3,372 Bytes
e3725d2 ae4ceef e3725d2 ae4ceef c66336a ae4ceef c66336a ae4ceef c66336a ae4ceef c66336a ae4ceef c66336a ae4ceef c66336a ae4ceef c66336a ae4ceef c66336a ae4ceef c66336a ae4ceef c66336a ae4ceef c66336a ae4ceef c66336a e3725d2 ae4ceef e3725d2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | /**
* 基于 BullMQ 的分布式任务队列实现 (持久化与可靠处理)
* 支持:自动重试、延迟执行、并发控制、任务持久化
*/
import { Queue, Worker, Job } from 'bullmq';
import { redis, isRedisAvailable } from './redis.js';
const QUEUE_NAME = 'codex_tasks';
// 内存队列模拟 (当 Redis 不可用时)
const memoryQueue: any[] = [];
// 1. 定义任务队列
export const taskQueue = isRedisAvailable ? new Queue(QUEUE_NAME, {
connection: redis as any,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
removeOnFail: false,
},
}) : null;
let worker: Worker | null = null;
// 2. 设置处理器与并发控制
export const setupWorkers = (
aiHandler: (data: any) => Promise<any>,
docHandler: (data: any) => Promise<any>
) => {
if (worker || !isRedisAvailable) {
if (!isRedisAvailable) {
console.warn('[Queue] Redis 不可用,启用内存模拟模式 (任务不持久化)');
// 启动一个简单的定时器处理内存任务
setInterval(async () => {
if (memoryQueue.length > 0) {
const task = memoryQueue.shift();
console.log(`[Queue:Memory] 正在处理任务: ${task.type}`);
try {
if (task.type === 'ai_workflow') await aiHandler(task.data);
else if (task.type === 'document_process') await docHandler(task.data);
} catch (e) {}
}
}, 3000);
}
return;
}
worker = new Worker(
QUEUE_NAME,
async (job: Job) => {
// ... 逻辑保持不变
console.log(`[Queue] 正在执行任务: ${job.name} (ID: ${job.id})`);
const { type, data } = job.data;
try {
if (type === 'ai_workflow') {
return await aiHandler(data);
} else if (type === 'document_process') {
return await docHandler(data);
} else {
throw new Error(`未知任务类型: ${type}`);
}
} catch (err) {
console.error(`[Queue] 任务失败: ${job.id}`, err);
throw err;
}
},
{
connection: redis as any,
concurrency: 5, // 最大并发处理数
}
);
worker.on('completed', (job) => {
console.log(`[Queue] 任务已完成: ${job.id}`);
});
worker.on('failed', (job, err) => {
console.error(`[Queue] 任务彻底失败: ${job?.id}`, err.message);
});
console.log('[Queue] BullMQ 任务队列 Workers 已就绪,并发数: 5');
};
// 3. 添加任务接口
export const addJob = async (type: string, data: any) => {
const jobName = `${type}_${Date.now()}`;
if (taskQueue) {
const job = await taskQueue.add(jobName, { type, data });
return { id: job.id, name: jobName };
} else {
// 内存降级逻辑
memoryQueue.push({ type, data });
return { id: `mem_${Date.now()}`, name: jobName };
}
};
// 获取队列简要状态 (用于监控)
export const getQueueStatus = async () => {
if (taskQueue) {
const [active, waiting, completed, failed] = await Promise.all([
taskQueue.getActiveCount(),
taskQueue.getWaitingCount(),
taskQueue.getCompletedCount(),
taskQueue.getFailedCount(),
]);
return { active, waiting, completed, failed };
} else {
return { active: 0, waiting: memoryQueue.length, completed: 0, failed: 0 };
}
};
|