| import { Queue, Worker } from 'bullmq'; |
| import { redisConnection } from '../config/redis'; |
| import { automationEngine } from '../automation/engine'; |
| import { db, and, eq, lte, scheduledMessages } from '@autoloop/db'; |
|
|
| export const cronQueue = new Queue('cron-tasks', { |
| connection: redisConnection, |
| }); |
|
|
| export const initScheduler = async () => { |
| |
| await cronQueue.add('poll-scheduled-messages', {}, { |
| repeat: { pattern: '* * * * *' }, |
| removeOnComplete: true, |
| }); |
|
|
| const worker = new Worker( |
| 'cron-tasks', |
| async (job) => { |
| if (job.name === 'poll-scheduled-messages') { |
| console.log('[Scheduler] Polling for scheduled messages...'); |
| const now = new Date(); |
| const due = await db.query.scheduledMessages.findMany({ |
| where: and( |
| eq(scheduledMessages.status, 'pending'), |
| lte(scheduledMessages.dueAt, now) |
| ), |
| limit: 50, |
| }); |
|
|
| for (const msg of due) { |
| |
| await automationEngine.processScheduledMessage(msg.id); |
| } |
| } |
| }, |
| { connection: redisConnection } |
| ); |
|
|
| return worker; |
| }; |
|
|