CognxSafeTrack commited on
Commit
f000fc9
Β·
1 Parent(s): 9f0410f

feat: implement WhatsApp broadcast system with BullMQ worker and rate limiting

Browse files
apps/admin/src/pages/CrmConversationalDashboard.tsx CHANGED
@@ -148,12 +148,44 @@ export default function CrmConversationalDashboard() {
148
  }
149
  };
150
 
151
- const handleValidateAndSend = (_message: string) => {
152
- if (!uploadedFile) {
153
  alert("Veuillez d'abord importer une liste de contacts.");
154
  return;
155
  }
156
- alert(`Campagne validΓ©e ! Envoi du message vers la liste "${uploadedFile.listName}"... (Action Γ  implΓ©menter)`);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
157
  };
158
 
159
  return (
 
148
  }
149
  };
150
 
151
+ const handleValidateAndSend = async (message: string) => {
152
+ if (!uploadedFile || !token || !selectedOrgId) {
153
  alert("Veuillez d'abord importer une liste de contacts.");
154
  return;
155
  }
156
+
157
+ try {
158
+ const res = await fetch(`${import.meta.env.VITE_API_URL}/v1/organizations/${selectedOrgId}/campaigns/send`, {
159
+ method: 'POST',
160
+ headers: {
161
+ 'Content-Type': 'application/json',
162
+ 'Authorization': `Bearer ${token}`
163
+ },
164
+ body: JSON.stringify({
165
+ listId: uploadedFile.listId,
166
+ message: message
167
+ })
168
+ });
169
+
170
+ if (res.ok) {
171
+ alert("πŸš€ Votre campagne est en cours de distribution !");
172
+ // Reset state for a new campaign
173
+ setUploadedFile(null);
174
+ setMessages([
175
+ {
176
+ id: 'initial-' + Date.now(),
177
+ role: 'assistant',
178
+ content: "Campagne lancée avec succès ! Souhaitez-vous préparer une autre liste ou une nouvelle campagne ?",
179
+ timestamp: new Date()
180
+ }
181
+ ]);
182
+ } else {
183
+ alert("Erreur lors de l'envoi de la campagne.");
184
+ }
185
+ } catch (err) {
186
+ console.error("Send failed:", err);
187
+ alert("Une erreur technique est survenue.");
188
+ }
189
  };
190
 
191
  return (
apps/api/src/routes/campaigns.ts CHANGED
@@ -24,4 +24,28 @@ export default async function campaignRoutes(fastify: FastifyInstance) {
24
  return reply.code(500).send({ error: 'AI Generation failed' });
25
  }
26
  });
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
  }
 
24
  return reply.code(500).send({ error: 'AI Generation failed' });
25
  }
26
  });
27
+
28
+ // Send Campaign to Broadcast List
29
+ fastify.post('/:id/campaigns/send', async (req, reply) => {
30
+ const { id: organizationId } = req.params as { id: string };
31
+ const { listId, message } = req.body as { listId: string, message: string };
32
+
33
+ if (!listId || !message) {
34
+ return reply.code(400).send({ error: 'listId and message are required' });
35
+ }
36
+
37
+ try {
38
+ const { scheduleBroadcast } = await import('../services/queue');
39
+ await scheduleBroadcast({ organizationId, listId, message });
40
+
41
+ return reply.code(202).send({
42
+ ok: true,
43
+ status: 'queued',
44
+ message: 'Campagne en cours d\'envoi en arrière-plan'
45
+ });
46
+ } catch (err) {
47
+ fastify.log.error(err);
48
+ return reply.code(500).send({ error: 'Failed to enqueue campaign' });
49
+ }
50
+ });
51
  }
apps/api/src/services/queue.ts CHANGED
@@ -127,3 +127,11 @@ export async function scheduleInboundMessage(payload: { phone: string, text: str
127
  });
128
  }
129
 
 
 
 
 
 
 
 
 
 
127
  });
128
  }
129
 
130
+ /** πŸ“’ BROADCAST: Enqueue a mass message task. */
131
+ export async function scheduleBroadcast(payload: { organizationId: string, listId: string, message: string }) {
132
+ await whatsappQueue.add('send-broadcast', payload, {
133
+ attempts: 1, // We handle retry logic within the loop if needed, but the whole job shouldn't necessarily retry
134
+ removeOnComplete: true
135
+ });
136
+ }
137
+
apps/whatsapp-worker/src/handlers/BroadcastHandler.ts ADDED
@@ -0,0 +1,97 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { Job } from 'bullmq';
2
+ import Redis from 'ioredis';
3
+ import { JobHandler, JobData } from './types';
4
+ import { prisma } from '../services/prisma';
5
+ import { logger } from '../logger';
6
+
7
+ export class BroadcastHandler implements JobHandler {
8
+ private async getTenantConfig(organizationId: string, connection: Redis) {
9
+ const cacheKey = `org:config:${organizationId}`;
10
+ const cached = await connection.get(cacheKey);
11
+ if (cached) return JSON.parse(cached);
12
+
13
+ const org = await prisma.organization.findUnique({
14
+ where: { id: organizationId },
15
+ include: { phoneNumbers: true }
16
+ });
17
+
18
+ if (!org || !org.systemUserToken || !org.phoneNumbers?.[0]?.id) return undefined;
19
+
20
+ const config = {
21
+ accessToken: org.systemUserToken,
22
+ phoneNumberId: org.phoneNumbers[0].id
23
+ };
24
+ await connection.set(cacheKey, JSON.stringify(config), 'EX', 3600);
25
+ return config;
26
+ }
27
+
28
+ async handle(job: Job<JobData>, connection: Redis): Promise<void> {
29
+ if (job.name !== 'send-broadcast') return;
30
+
31
+ const { organizationId, listId, message } = job.data as { organizationId: string, listId: string, message: string };
32
+
33
+ const tenantConfig = await this.getTenantConfig(organizationId, connection);
34
+ if (!tenantConfig) {
35
+ logger.error({ organizationId }, '[BroadcastHandler] No WhatsApp config found for organization');
36
+ return;
37
+ }
38
+
39
+ // 1. Fetch the list with its contacts
40
+ const list = await prisma.broadcastList.findUnique({
41
+ where: { id: listId },
42
+ include: { contacts: true }
43
+ });
44
+
45
+ if (!list) {
46
+ logger.error({ listId }, '[BroadcastHandler] Broadcast list not found');
47
+ return;
48
+ }
49
+
50
+ logger.info(`[BroadcastHandler] Starting broadcast for list "${list.name}" to ${list.contacts.length} contacts`);
51
+
52
+ const baseUrl = `https://graph.facebook.com/v19.0/${tenantConfig.phoneNumberId}/messages`;
53
+ const headers = {
54
+ 'Content-Type': 'application/json',
55
+ 'Authorization': `Bearer ${tenantConfig.accessToken}`
56
+ };
57
+
58
+ let successCount = 0;
59
+ let failCount = 0;
60
+
61
+ for (const contact of list.contacts) {
62
+ try {
63
+ const body = {
64
+ messaging_product: 'whatsapp',
65
+ recipient_type: 'individual',
66
+ to: contact.phoneNumber,
67
+ type: 'text',
68
+ text: { preview_url: false, body: message }
69
+ };
70
+
71
+ const response = await fetch(baseUrl, {
72
+ method: 'POST',
73
+ headers,
74
+ body: JSON.stringify(body)
75
+ });
76
+
77
+ if (response.ok) {
78
+ successCount++;
79
+ } else {
80
+ const errorData = await response.json();
81
+ logger.warn({ phoneNumber: contact.phoneNumber, errorData }, '[BroadcastHandler] Meta API returned error');
82
+ failCount++;
83
+ }
84
+
85
+ // 2. Rate limiting: sleep between 50ms and 100ms
86
+ const delay = Math.floor(Math.random() * (100 - 50 + 1) + 50);
87
+ await new Promise(resolve => setTimeout(resolve, delay));
88
+
89
+ } catch (err) {
90
+ logger.error({ phoneNumber: contact.phoneNumber, err }, '[BroadcastHandler] Failed to send to contact');
91
+ failCount++;
92
+ }
93
+ }
94
+
95
+ logger.info(`[BroadcastHandler] Broadcast finished for "${list.name}". Success: ${successCount}, Failed: ${failCount}`);
96
+ }
97
+ }
apps/whatsapp-worker/src/index.ts CHANGED
@@ -26,6 +26,7 @@ import { InboundHandler } from './handlers/InboundHandler';
26
  import { WebhookHandler } from './handlers/WebhookHandler';
27
  import { KBProcessor } from './handlers/KBProcessor';
28
  import { EmailHandler } from './handlers/EmailHandler';
 
29
  import { UsageService } from './services/usage';
30
 
31
  dotenv.config();
@@ -65,7 +66,8 @@ const handlers: Record<string, JobHandler> = {
65
  'handle-inbound': new InboundHandler(),
66
  'send-webhook': new WebhookHandler(),
67
  'process-kb': new KBProcessor(),
68
- 'send-email': new EmailHandler()
 
69
  };
70
 
71
  // ─── HTTP SERVER (Inbound Bridge) ─────────────────────────────────────────────
@@ -165,7 +167,7 @@ const worker = new Worker('whatsapp-queue', async (job: Job<JobData>) => {
165
  const outboundJobNames = [
166
  'send-message', 'send-message-direct', 'send-image',
167
  'send-interactive-buttons', 'send-interactive-list',
168
- 'send-content', 'send-nudge'
169
  ];
170
 
171
  if (outboundJobNames.includes(job.name)) {
 
26
  import { WebhookHandler } from './handlers/WebhookHandler';
27
  import { KBProcessor } from './handlers/KBProcessor';
28
  import { EmailHandler } from './handlers/EmailHandler';
29
+ import { BroadcastHandler } from './handlers/BroadcastHandler';
30
  import { UsageService } from './services/usage';
31
 
32
  dotenv.config();
 
66
  'handle-inbound': new InboundHandler(),
67
  'send-webhook': new WebhookHandler(),
68
  'process-kb': new KBProcessor(),
69
+ 'send-email': new EmailHandler(),
70
+ 'send-broadcast': new BroadcastHandler()
71
  };
72
 
73
  // ─── HTTP SERVER (Inbound Bridge) ─────────────────────────────────────────────
 
167
  const outboundJobNames = [
168
  'send-message', 'send-message-direct', 'send-image',
169
  'send-interactive-buttons', 'send-interactive-list',
170
+ 'send-content', 'send-nudge', 'send-broadcast'
171
  ];
172
 
173
  if (outboundJobNames.includes(job.name)) {