File size: 1,222 Bytes
a6b6c66 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | import { Queue, Worker } from 'bullmq';
import { redisConnection } from '../config/redis';
import { automationEngine } from '../automation/engine';
import { db, and, eq, lte, scheduledMessages } from '@autoloop/db';
export const cronQueue = new Queue('cron-tasks', {
connection: redisConnection,
});
export const initScheduler = async () => {
// Add a repeatable job to poll for scheduled messages every minute
await cronQueue.add('poll-scheduled-messages', {}, {
repeat: { pattern: '* * * * *' },
removeOnComplete: true,
});
const worker = new Worker(
'cron-tasks',
async (job) => {
if (job.name === 'poll-scheduled-messages') {
console.log('[Scheduler] Polling for scheduled messages...');
const now = new Date();
const due = await db.query.scheduledMessages.findMany({
where: and(
eq(scheduledMessages.status, 'pending'),
lte(scheduledMessages.dueAt, now)
),
limit: 50,
});
for (const msg of due) {
// Add to processing queue
await automationEngine.processScheduledMessage(msg.id);
}
}
},
{ connection: redisConnection }
);
return worker;
};
|