edtech / apps /api /src /services /queue.ts
CognxSafeTrack
feat(agentic-week1): conversational memory, RAG threshold, wallet alerts, weekly reports, campaign scheduling
98240fd
raw
history blame
5.95 kB
import { logger } from '../logger';
import { Queue } from 'bullmq';
import { redisForBullMQ as connection } from '../lib/redis';
export const whatsappQueue = new Queue('whatsapp-queue', { connection });
export const notificationQueue = new Queue('notification-queue', { connection });
/** Gracefully close all queues and the underlying connection */
export async function closeQueues() {
logger.info('[QUEUE] Closing all queues...');
await whatsappQueue.close();
await notificationQueue.close();
await connection.quit();
}
/** Schedule an email notification */
export async function scheduleEmail(payload: {
to: string,
subject: string,
templateId?: number,
params?: Record<string, any>,
htmlContent?: string
}) {
await notificationQueue.add('send-email', payload, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 }
});
}
// ─── Time-Travel Context (Redis overlay for historical lesson replay) ────────
const TT_TTL = 1800; // 30 minutes
const ttKey = (userId: string) => `time_travel:${userId}`;
export async function setTimeTravelContext(userId: string, replayDay: number): Promise<void> {
await connection.set(ttKey(userId), String(replayDay), 'EX', TT_TTL);
logger.info(`[TIME-TRAVEL] πŸ•°οΈ SET User ${userId} β†’ Day ${replayDay} (TTL: ${TT_TTL}s)`);
}
export async function getTimeTravelContext(userId: string): Promise<number | null> {
const val = await connection.get(ttKey(userId));
if (!val) return null;
const day = parseFloat(val);
return isNaN(day) ? null : day;
}
export async function clearTimeTravelContext(userId: string): Promise<void> {
const n = await connection.del(ttKey(userId));
if (n > 0) logger.info(`[TIME-TRAVEL] πŸ—‘οΈ CLEARED User ${userId}`);
}
export async function scheduleMessage(userId: string, text: string, delayMs: number = 0, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-message' for user ${userId}`);
return;
}
await whatsappQueue.add('send-message', { userId, text, organizationId }, { delay: delayMs });
}
export async function scheduleTrackDay(userId: string, trackId: string, dayNumber: number, delayMs: number = 0, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-content' for user ${userId}`);
return;
}
await whatsappQueue.add('send-content', { userId, trackId, dayNumber, organizationId }, { delay: delayMs });
}
export async function enrollUser(userId: string, trackId: string, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'enroll-user' for user ${userId}`);
return;
}
await whatsappQueue.add('enroll-user', { userId, trackId, organizationId });
}
/** Send a WhatsApp interactive BUTTON message (max 3 buttons). */
export async function scheduleInteractiveButtons(
userId: string,
bodyText: string,
buttons: Array<{ id: string; title: string }>,
organizationId?: string
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-buttons' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-buttons', { userId, bodyText, buttons, organizationId });
}
/** Send a WhatsApp interactive LIST message (up to 10 rows, grouped in sections). */
export async function scheduleInteractiveList(
userId: string,
headerText: string,
bodyText: string,
buttonLabel: string,
sections: Array<{ title: string; rows: Array<{ id: string; title: string; description?: string }> }>,
organizationId?: string
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-list' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-list', { userId, headerText, bodyText, buttonLabel, sections, organizationId });
}
/** 🚨 ASYNC HANDOVER: Send inbound message for background processing in the worker. */
export async function scheduleInboundMessage(payload: { phone: string, text: string, audioUrl?: string, imageUrl?: string, messageId?: string, organizationId?: string }) {
await whatsappQueue.add('handle-inbound', payload, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
removeOnFail: false
});
}
/** πŸ“’ BROADCAST: Enqueue a mass message task with optional future scheduling. */
export async function scheduleBroadcast(payload: {
organizationId: string;
listId: string;
message: string;
sendAt?: string; // ISO 8601 β€” if set, job is delayed until this time
}) {
const { sendAt, ...data } = payload;
const delayMs = sendAt ? Math.max(0, new Date(sendAt).getTime() - Date.now()) : 0;
await whatsappQueue.add('send-broadcast', data, {
attempts: 1,
removeOnComplete: true,
...(delayMs > 0 ? { delay: delayMs } : {}),
});
}
/** πŸš€ CAMPAIGN: Enqueue a mass campaign task with optional future scheduling. */
export async function scheduleCampaign(payload: {
organizationId: string;
messageContent: string;
listId?: string;
templateName?: string;
templateLanguage?: string;
sendAt?: string; // ISO 8601 β€” if set, job is delayed until this time
}) {
const { sendAt, ...data } = payload;
const delayMs = sendAt ? Math.max(0, new Date(sendAt).getTime() - Date.now()) : 0;
await whatsappQueue.add('process-campaign', data, {
attempts: 1,
removeOnComplete: true,
...(delayMs > 0 ? { delay: delayMs } : {}),
});
}