File size: 3,372 Bytes
e3725d2
ae4ceef
 
e3725d2
ae4ceef
 
c66336a
ae4ceef
c66336a
ae4ceef
 
c66336a
ae4ceef
 
 
 
 
 
 
 
 
 
c66336a
ae4ceef
c66336a
ae4ceef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c66336a
 
ae4ceef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c66336a
ae4ceef
c66336a
ae4ceef
 
 
c66336a
ae4ceef
 
 
c66336a
ae4ceef
 
c66336a
ae4ceef
 
 
 
 
 
 
 
 
 
 
c66336a
e3725d2
 
ae4ceef
 
 
 
 
 
 
 
 
 
 
 
 
e3725d2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/**
 * 基于 BullMQ 的分布式任务队列实现 (持久化与可靠处理)
 * 支持:自动重试、延迟执行、并发控制、任务持久化
 */
import { Queue, Worker, Job } from 'bullmq';
import { redis, isRedisAvailable } from './redis.js';

const QUEUE_NAME = 'codex_tasks';

// 内存队列模拟 (当 Redis 不可用时)
const memoryQueue: any[] = [];

// 1. 定义任务队列
export const taskQueue = isRedisAvailable ? new Queue(QUEUE_NAME, {
  connection: redis as any,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 },
    removeOnComplete: true,
    removeOnFail: false,
  },
}) : null;

let worker: Worker | null = null;

// 2. 设置处理器与并发控制
export const setupWorkers = (
  aiHandler: (data: any) => Promise<any>,
  docHandler: (data: any) => Promise<any>
) => {
  if (worker || !isRedisAvailable) {
    if (!isRedisAvailable) {
      console.warn('[Queue] Redis 不可用,启用内存模拟模式 (任务不持久化)');
      // 启动一个简单的定时器处理内存任务
      setInterval(async () => {
        if (memoryQueue.length > 0) {
          const task = memoryQueue.shift();
          console.log(`[Queue:Memory] 正在处理任务: ${task.type}`);
          try {
            if (task.type === 'ai_workflow') await aiHandler(task.data);
            else if (task.type === 'document_process') await docHandler(task.data);
          } catch (e) {}
        }
      }, 3000);
    }
    return;
  }

  worker = new Worker(
    QUEUE_NAME,
    async (job: Job) => {
      // ... 逻辑保持不变
      console.log(`[Queue] 正在执行任务: ${job.name} (ID: ${job.id})`);
      
      const { type, data } = job.data;
      
      try {
        if (type === 'ai_workflow') {
          return await aiHandler(data);
        } else if (type === 'document_process') {
          return await docHandler(data);
        } else {
          throw new Error(`未知任务类型: ${type}`);
        }
      } catch (err) {
        console.error(`[Queue] 任务失败: ${job.id}`, err);
        throw err;
      }
    },
    {
      connection: redis as any,
      concurrency: 5, // 最大并发处理数
    }
  );

  worker.on('completed', (job) => {
    console.log(`[Queue] 任务已完成: ${job.id}`);
  });

  worker.on('failed', (job, err) => {
    console.error(`[Queue] 任务彻底失败: ${job?.id}`, err.message);
  });

  console.log('[Queue] BullMQ 任务队列 Workers 已就绪,并发数: 5');
};

// 3. 添加任务接口
export const addJob = async (type: string, data: any) => {
  const jobName = `${type}_${Date.now()}`;
  
  if (taskQueue) {
    const job = await taskQueue.add(jobName, { type, data });
    return { id: job.id, name: jobName };
  } else {
    // 内存降级逻辑
    memoryQueue.push({ type, data });
    return { id: `mem_${Date.now()}`, name: jobName };
  }
};

// 获取队列简要状态 (用于监控)
export const getQueueStatus = async () => {
  if (taskQueue) {
    const [active, waiting, completed, failed] = await Promise.all([
      taskQueue.getActiveCount(),
      taskQueue.getWaitingCount(),
      taskQueue.getCompletedCount(),
      taskQueue.getFailedCount(),
    ]);
    return { active, waiting, completed, failed };
  } else {
    return { active: 0, waiting: memoryQueue.length, completed: 0, failed: 0 };
  }
};