File size: 5,947 Bytes
de6a95b a59f28c a966957 cfbb685 7b0c22b 2ab1980 cfbb685 2ab1980 a59f28c d978795 de6a95b d978795 de6a95b d978795 3bf9adc df0edd7 de6a95b df0edd7 3bf9adc a59f28c 3bf9adc df0edd7 de6a95b df0edd7 3bf9adc a59f28c 3c6fc2a 3bf9adc df0edd7 de6a95b df0edd7 3bf9adc 1dec751 3bf9adc 1dec751 df0edd7 de6a95b df0edd7 3bf9adc 1dec751 3bf9adc 1dec751 df0edd7 de6a95b df0edd7 3bf9adc 3c6fc2a 1dec751 74e06ff 3bf9adc 74e06ff 98240fd f000fc9 98240fd 6dd9bad 98240fd e286845 98240fd e286845 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | import { logger } from '../logger';
import { Queue } from 'bullmq';
import { redisForBullMQ as connection } from '../lib/redis';
export const whatsappQueue = new Queue('whatsapp-queue', { connection });
export const notificationQueue = new Queue('notification-queue', { connection });
/** Gracefully close all queues and the underlying connection */
export async function closeQueues() {
logger.info('[QUEUE] Closing all queues...');
await whatsappQueue.close();
await notificationQueue.close();
await connection.quit();
}
/** Schedule an email notification */
export async function scheduleEmail(payload: {
to: string,
subject: string,
templateId?: number,
params?: Record<string, any>,
htmlContent?: string
}) {
await notificationQueue.add('send-email', payload, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 }
});
}
// βββ Time-Travel Context (Redis overlay for historical lesson replay) ββββββββ
const TT_TTL = 1800; // 30 minutes
const ttKey = (userId: string) => `time_travel:${userId}`;
export async function setTimeTravelContext(userId: string, replayDay: number): Promise<void> {
await connection.set(ttKey(userId), String(replayDay), 'EX', TT_TTL);
logger.info(`[TIME-TRAVEL] π°οΈ SET User ${userId} β Day ${replayDay} (TTL: ${TT_TTL}s)`);
}
export async function getTimeTravelContext(userId: string): Promise<number | null> {
const val = await connection.get(ttKey(userId));
if (!val) return null;
const day = parseFloat(val);
return isNaN(day) ? null : day;
}
export async function clearTimeTravelContext(userId: string): Promise<void> {
const n = await connection.del(ttKey(userId));
if (n > 0) logger.info(`[TIME-TRAVEL] ποΈ CLEARED User ${userId}`);
}
export async function scheduleMessage(userId: string, text: string, delayMs: number = 0, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-message' for user ${userId}`);
return;
}
await whatsappQueue.add('send-message', { userId, text, organizationId }, { delay: delayMs });
}
export async function scheduleTrackDay(userId: string, trackId: string, dayNumber: number, delayMs: number = 0, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-content' for user ${userId}`);
return;
}
await whatsappQueue.add('send-content', { userId, trackId, dayNumber, organizationId }, { delay: delayMs });
}
export async function enrollUser(userId: string, trackId: string, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'enroll-user' for user ${userId}`);
return;
}
await whatsappQueue.add('enroll-user', { userId, trackId, organizationId });
}
/** Send a WhatsApp interactive BUTTON message (max 3 buttons). */
export async function scheduleInteractiveButtons(
userId: string,
bodyText: string,
buttons: Array<{ id: string; title: string }>,
organizationId?: string
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-buttons' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-buttons', { userId, bodyText, buttons, organizationId });
}
/** Send a WhatsApp interactive LIST message (up to 10 rows, grouped in sections). */
export async function scheduleInteractiveList(
userId: string,
headerText: string,
bodyText: string,
buttonLabel: string,
sections: Array<{ title: string; rows: Array<{ id: string; title: string; description?: string }> }>,
organizationId?: string
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-list' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-list', { userId, headerText, bodyText, buttonLabel, sections, organizationId });
}
/** π¨ ASYNC HANDOVER: Send inbound message for background processing in the worker. */
export async function scheduleInboundMessage(payload: { phone: string, text: string, audioUrl?: string, imageUrl?: string, messageId?: string, organizationId?: string }) {
await whatsappQueue.add('handle-inbound', payload, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
removeOnFail: false
});
}
/** π’ BROADCAST: Enqueue a mass message task with optional future scheduling. */
export async function scheduleBroadcast(payload: {
organizationId: string;
listId: string;
message: string;
sendAt?: string; // ISO 8601 β if set, job is delayed until this time
}) {
const { sendAt, ...data } = payload;
const delayMs = sendAt ? Math.max(0, new Date(sendAt).getTime() - Date.now()) : 0;
await whatsappQueue.add('send-broadcast', data, {
attempts: 1,
removeOnComplete: true,
...(delayMs > 0 ? { delay: delayMs } : {}),
});
}
/** π CAMPAIGN: Enqueue a mass campaign task with optional future scheduling. */
export async function scheduleCampaign(payload: {
organizationId: string;
messageContent: string;
listId?: string;
templateName?: string;
templateLanguage?: string;
sendAt?: string; // ISO 8601 β if set, job is delayed until this time
}) {
const { sendAt, ...data } = payload;
const delayMs = sendAt ? Math.max(0, new Date(sendAt).getTime() - Date.now()) : 0;
await whatsappQueue.add('process-campaign', data, {
attempts: 1,
removeOnComplete: true,
...(delayMs > 0 ? { delay: delayMs } : {}),
});
}
|