File size: 5,947 Bytes
de6a95b
a59f28c
a966957
cfbb685
7b0c22b
 
2ab1980
cfbb685
 
 
 
 
 
 
 
2ab1980
 
 
 
 
 
 
 
 
 
 
 
 
a59f28c
d978795
 
 
 
 
 
 
de6a95b
d978795
 
 
 
 
 
 
 
 
 
 
de6a95b
d978795
 
3bf9adc
df0edd7
de6a95b
df0edd7
 
3bf9adc
a59f28c
 
3bf9adc
df0edd7
de6a95b
df0edd7
 
3bf9adc
a59f28c
3c6fc2a
3bf9adc
df0edd7
de6a95b
df0edd7
 
3bf9adc
1dec751
 
 
 
 
 
3bf9adc
 
1dec751
df0edd7
de6a95b
df0edd7
 
3bf9adc
1dec751
 
 
 
 
 
 
 
3bf9adc
 
1dec751
df0edd7
de6a95b
df0edd7
 
3bf9adc
3c6fc2a
1dec751
74e06ff
3bf9adc
74e06ff
 
 
 
 
 
 
 
98240fd
 
 
 
 
 
 
 
 
 
 
 
 
f000fc9
 
 
98240fd
 
 
 
 
 
 
 
6dd9bad
98240fd
 
 
e286845
98240fd
 
e286845
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import { logger } from '../logger';
import { Queue } from 'bullmq';
import { redisForBullMQ as connection } from '../lib/redis';

export const whatsappQueue = new Queue('whatsapp-queue', { connection });
export const notificationQueue = new Queue('notification-queue', { connection });

/** Gracefully close all queues and the underlying connection */
export async function closeQueues() {
    logger.info('[QUEUE] Closing all queues...');
    await whatsappQueue.close();
    await notificationQueue.close();
    await connection.quit();
}

/** Schedule an email notification */
export async function scheduleEmail(payload: { 
    to: string, 
    subject: string, 
    templateId?: number, 
    params?: Record<string, any>,
    htmlContent?: string 
}) {
    await notificationQueue.add('send-email', payload, {
        attempts: 3,
        backoff: { type: 'exponential', delay: 1000 }
    });
}

// ─── Time-Travel Context (Redis overlay for historical lesson replay) ────────

const TT_TTL = 1800; // 30 minutes
const ttKey = (userId: string) => `time_travel:${userId}`;

export async function setTimeTravelContext(userId: string, replayDay: number): Promise<void> {
    await connection.set(ttKey(userId), String(replayDay), 'EX', TT_TTL);
    logger.info(`[TIME-TRAVEL] πŸ•°οΈ SET User ${userId} β†’ Day ${replayDay} (TTL: ${TT_TTL}s)`);
}

export async function getTimeTravelContext(userId: string): Promise<number | null> {
    const val = await connection.get(ttKey(userId));
    if (!val) return null;
    const day = parseFloat(val);
    return isNaN(day) ? null : day;
}

export async function clearTimeTravelContext(userId: string): Promise<void> {
    const n = await connection.del(ttKey(userId));
    if (n > 0) logger.info(`[TIME-TRAVEL] πŸ—‘οΈ CLEARED User ${userId}`);
}

export async function scheduleMessage(userId: string, text: string, delayMs: number = 0, organizationId?: string) {
    if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
        logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-message' for user ${userId}`);
        return;
    }
    await whatsappQueue.add('send-message', { userId, text, organizationId }, { delay: delayMs });
}

export async function scheduleTrackDay(userId: string, trackId: string, dayNumber: number, delayMs: number = 0, organizationId?: string) {
    if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
        logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-content' for user ${userId}`);
        return;
    }
    await whatsappQueue.add('send-content', { userId, trackId, dayNumber, organizationId }, { delay: delayMs });
}

export async function enrollUser(userId: string, trackId: string, organizationId?: string) {
    if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
        logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'enroll-user' for user ${userId}`);
        return;
    }
    await whatsappQueue.add('enroll-user', { userId, trackId, organizationId });
}

/** Send a WhatsApp interactive BUTTON message (max 3 buttons). */
export async function scheduleInteractiveButtons(
    userId: string,
    bodyText: string,
    buttons: Array<{ id: string; title: string }>,
    organizationId?: string
) {
    if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
        logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-buttons' for user ${userId}`);
        return;
    }
    await whatsappQueue.add('send-interactive-buttons', { userId, bodyText, buttons, organizationId });
}

/** Send a WhatsApp interactive LIST message (up to 10 rows, grouped in sections). */
export async function scheduleInteractiveList(
    userId: string,
    headerText: string,
    bodyText: string,
    buttonLabel: string,
    sections: Array<{ title: string; rows: Array<{ id: string; title: string; description?: string }> }>,
    organizationId?: string
) {
    if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
        logger.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-list' for user ${userId}`);
        return;
    }
    await whatsappQueue.add('send-interactive-list', { userId, headerText, bodyText, buttonLabel, sections, organizationId });
}

/** 🚨 ASYNC HANDOVER: Send inbound message for background processing in the worker. */
export async function scheduleInboundMessage(payload: { phone: string, text: string, audioUrl?: string, imageUrl?: string, messageId?: string, organizationId?: string }) {
    await whatsappQueue.add('handle-inbound', payload, {
        attempts: 3,
        backoff: { type: 'exponential', delay: 1000 },
        removeOnComplete: true,
        removeOnFail: false
    });
}

/** πŸ“’ BROADCAST: Enqueue a mass message task with optional future scheduling. */
export async function scheduleBroadcast(payload: {
    organizationId: string;
    listId: string;
    message: string;
    sendAt?: string; // ISO 8601 β€” if set, job is delayed until this time
}) {
    const { sendAt, ...data } = payload;
    const delayMs = sendAt ? Math.max(0, new Date(sendAt).getTime() - Date.now()) : 0;
    await whatsappQueue.add('send-broadcast', data, {
        attempts: 1,
        removeOnComplete: true,
        ...(delayMs > 0 ? { delay: delayMs } : {}),
    });
}

/** πŸš€ CAMPAIGN: Enqueue a mass campaign task with optional future scheduling. */
export async function scheduleCampaign(payload: {
    organizationId: string;
    messageContent: string;
    listId?: string;
    templateName?: string;
    templateLanguage?: string;
    sendAt?: string; // ISO 8601 β€” if set, job is delayed until this time
}) {
    const { sendAt, ...data } = payload;
    const delayMs = sendAt ? Math.max(0, new Date(sendAt).getTime() - Date.now()) : 0;
    await whatsappQueue.add('process-campaign', data, {
        attempts: 1,
        removeOnComplete: true,
        ...(delayMs > 0 ? { delay: delayMs } : {}),
    });
}