File size: 6,099 Bytes
de6a95b a59f28c 25374b3 a59f28c 25374b3 a59f28c cfbb685 7b0c22b 2ab1980 cfbb685 2ab1980 a59f28c d978795 de6a95b d978795 de6a95b d978795 3bf9adc df0edd7 de6a95b df0edd7 3bf9adc a59f28c 3bf9adc df0edd7 de6a95b df0edd7 3bf9adc a59f28c 3c6fc2a 3bf9adc df0edd7 de6a95b df0edd7 3bf9adc 1dec751 3bf9adc 1dec751 df0edd7 de6a95b df0edd7 3bf9adc 1dec751 3bf9adc 1dec751 df0edd7 de6a95b df0edd7 3bf9adc 3c6fc2a 1dec751 74e06ff 3bf9adc 74e06ff f000fc9 e286845 6dd9bad e286845 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | import { logger } from '../logger';
import { Queue } from 'bullmq';
import Redis from 'ioredis';
const connection = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
: new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || 'default',
password: process.env.REDIS_PASSWORD || undefined,
tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
maxRetriesPerRequest: null
});
connection.on('error', (err) => logger.error({ err }, '[REDIS] Queue connection error:'));
export const whatsappQueue = new Queue('whatsapp-queue', { connection });
export const notificationQueue = new Queue('notification-queue', { connection });
/** Gracefully close all queues and the underlying connection */
export async function closeQueues() {
logger.info('[QUEUE] Closing all queues...');
await whatsappQueue.close();
await notificationQueue.close();
await connection.quit();
}
/** Schedule an email notification */
export async function scheduleEmail(payload: {
to: string,
subject: string,
templateId?: number,
params?: Record<string, any>,
htmlContent?: string
}) {
await notificationQueue.add('send-email', payload, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 }
});
}
// βββ Time-Travel Context (Redis overlay for historical lesson replay) ββββββββ
export const redis = connection; // Shared connection for time-travel ops
const TT_TTL = 1800; // 30 minutes
const ttKey = (userId: string) => `time_travel:${userId}`;
export async function setTimeTravelContext(userId: string, replayDay: number): Promise<void> {
await connection.set(ttKey(userId), String(replayDay), 'EX', TT_TTL);
logger.info(`[TIME-TRAVEL] π°οΈ SET User ${userId} β Day ${replayDay} (TTL: ${TT_TTL}s)`);
}
export async function getTimeTravelContext(userId: string): Promise<number | null> {
const val = await connection.get(ttKey(userId));
if (!val) return null;
const day = parseFloat(val);
return isNaN(day) ? null : day;
}
export async function clearTimeTravelContext(userId: string): Promise<void> {
const n = await connection.del(ttKey(userId));
if (n > 0) logger.info(`[TIME-TRAVEL] ποΈ CLEARED User ${userId}`);
}
export async function scheduleMessage(userId: string, text: string, delayMs: number = 0, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-message' for user ${userId}`);
return;
}
await whatsappQueue.add('send-message', { userId, text, organizationId }, { delay: delayMs });
}
export async function scheduleTrackDay(userId: string, trackId: string, dayNumber: number, delayMs: number = 0, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-content' for user ${userId}`);
return;
}
await whatsappQueue.add('send-content', { userId, trackId, dayNumber, organizationId }, { delay: delayMs });
}
export async function enrollUser(userId: string, trackId: string, organizationId?: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'enroll-user' for user ${userId}`);
return;
}
await whatsappQueue.add('enroll-user', { userId, trackId, organizationId });
}
/** Send a WhatsApp interactive BUTTON message (max 3 buttons). */
export async function scheduleInteractiveButtons(
userId: string,
bodyText: string,
buttons: Array<{ id: string; title: string }>,
organizationId?: string
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-buttons' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-buttons', { userId, bodyText, buttons, organizationId });
}
/** Send a WhatsApp interactive LIST message (up to 10 rows, grouped in sections). */
export async function scheduleInteractiveList(
userId: string,
headerText: string,
bodyText: string,
buttonLabel: string,
sections: Array<{ title: string; rows: Array<{ id: string; title: string; description?: string }> }>,
organizationId?: string
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-list' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-list', { userId, headerText, bodyText, buttonLabel, sections, organizationId });
}
/** π¨ ASYNC HANDOVER: Send inbound message for background processing in the worker. */
export async function scheduleInboundMessage(payload: { phone: string, text: string, audioUrl?: string, imageUrl?: string, messageId?: string, organizationId?: string }) {
await whatsappQueue.add('handle-inbound', payload, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
removeOnFail: false
});
}
/** π’ BROADCAST: Enqueue a mass message task. */
export async function scheduleBroadcast(payload: { organizationId: string, listId: string, message: string }) {
await whatsappQueue.add('send-broadcast', payload, {
attempts: 1, // We handle retry logic within the loop if needed, but the whole job shouldn't necessarily retry
removeOnComplete: true
});
}
/** π CAMPAIGN: Enqueue a mass campaign task for all contacts or a specific list. */
export async function scheduleCampaign(payload: {
organizationId: string,
messageContent: string,
listId?: string,
templateName?: string,
templateLanguage?: string
}) {
await whatsappQueue.add('process-campaign', payload, {
attempts: 1,
removeOnComplete: true
});
}
|