edtech / apps /api /src /services /queue.ts
CognxSafeTrack
feat: backlog P0β†’P3 β€” toast system, payments, tenant isolation, feedback handler, i18n parity
6dd9bad
raw
history blame
6.1 kB
import { logger } from '../logger';
import { Queue } from 'bullmq';
import Redis from 'ioredis';
const connection = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
: new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || 'default',
password: process.env.REDIS_PASSWORD || undefined,
tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
maxRetriesPerRequest: null
});
connection.on('error', (err) => logger.error({ err }, '[REDIS] Queue connection error:'));
export const whatsappQueue = new Queue('whatsapp-queue', { connection });
export const notificationQueue = new Queue('notification-queue', { connection });
/** Gracefully close all queues and the underlying connection */
export async function closeQueues() {
logger.info('[QUEUE] Closing all queues...');
await whatsappQueue.close();
await notificationQueue.close();
await connection.quit();
}
/** Schedule an email notification */
export async function scheduleEmail(payload: {
to: string,
subject: string,
templateId?: number,
params?: Record<string, any>,
htmlContent?: string
}) {
await notificationQueue.add('send-email', payload, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 }
});
}
// ─── Time-Travel Context (Redis overlay for historical lesson replay) ────────
export const redis = connection; // Shared connection for time-travel ops
const TT_TTL = 1800; // 30 minutes
const ttKey = (userId: string) => `time_travel:${userId}`;
export async function setTimeTravelContext(userId: string, replayDay: number): Promise<void> {
await connection.set(ttKey(userId), String(replayDay), 'EX', TT_TTL);
logger.info(`[TIME-TRAVEL] πŸ•°οΈ SET User ${userId} β†’ Day ${replayDay} (TTL: ${TT_TTL}s)`);
}
export async function getTimeTravelContext(userId: string): Promise<number | null> {
const val = await connection.get(ttKey(userId));
if (!val) return null;
const day = parseFloat(val);
return isNaN(day) ? null : day;
}
export async function clearTimeTravelContext(userId: string): Promise<void> {
const n = await connection.del(ttKey(userId));
if (n > 0) logger.info(`[TIME-TRAVEL] πŸ—‘οΈ CLEARED User ${userId}`);
}
export async function scheduleMessage(userId: string, text: string, delayMs: number = 0, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-message' for user ${userId}`);
return;
}
await whatsappQueue.add('send-message', { userId, text, organizationId }, { delay: delayMs });
}
export async function scheduleTrackDay(userId: string, trackId: string, dayNumber: number, delayMs: number = 0, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-content' for user ${userId}`);
return;
}
await whatsappQueue.add('send-content', { userId, trackId, dayNumber, organizationId }, { delay: delayMs });
}
export async function enrollUser(userId: string, trackId: string, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'enroll-user' for user ${userId}`);
return;
}
await whatsappQueue.add('enroll-user', { userId, trackId, organizationId });
}
/** Send a WhatsApp interactive BUTTON message (max 3 buttons). */
export async function scheduleInteractiveButtons(
userId: string,
bodyText: string,
buttons: Array<{ id: string; title: string }>,
organizationId?: string
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-buttons' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-buttons', { userId, bodyText, buttons, organizationId });
}
/** Send a WhatsApp interactive LIST message (up to 10 rows, grouped in sections). */
export async function scheduleInteractiveList(
userId: string,
headerText: string,
bodyText: string,
buttonLabel: string,
sections: Array<{ title: string; rows: Array<{ id: string; title: string; description?: string }> }>,
organizationId?: string
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-list' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-list', { userId, headerText, bodyText, buttonLabel, sections, organizationId });
}
/** 🚨 ASYNC HANDOVER: Send inbound message for background processing in the worker. */
export async function scheduleInboundMessage(payload: { phone: string, text: string, audioUrl?: string, imageUrl?: string, messageId?: string, organizationId?: string }) {
await whatsappQueue.add('handle-inbound', payload, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
removeOnFail: false
});
}
/** πŸ“’ BROADCAST: Enqueue a mass message task. */
export async function scheduleBroadcast(payload: { organizationId: string, listId: string, message: string }) {
await whatsappQueue.add('send-broadcast', payload, {
attempts: 1, // We handle retry logic within the loop if needed, but the whole job shouldn't necessarily retry
removeOnComplete: true
});
}
/** πŸš€ CAMPAIGN: Enqueue a mass campaign task for all contacts or a specific list. */
export async function scheduleCampaign(payload: {
organizationId: string,
messageContent: string,
listId?: string,
templateName?: string,
templateLanguage?: string
}) {
await whatsappQueue.add('process-campaign', payload, {
attempts: 1,
removeOnComplete: true
});
}