CognxSafeTrack Claude Sonnet 4.6 commited on
Commit
30a407a
·
1 Parent(s): 37fb9ce

perf(worker): Sprint 3 — broadcast/campaign scalability fixes

Browse files

H4: Replace per-contact prisma.message.create() inside the loop with
a batch prisma.message.createMany() after each page — reduces N DB
round-trips to 1 per page of 200 contacts.

H5: Replace include:{contacts:true} (loads all contacts into memory) with
cursor-based pagination (PAGE_SIZE=200) using broadcastLists relation
filter. Large lists no longer OOM the worker.

H6 (usage check): Move UsageService.checkAndIncrement() from per-contact
to per-page with amount=page.length — reduces Redis INCRBY calls from
N to N/200.

H11: Add job.updateProgress() after each page in CampaignHandler so BullMQ's
stall detector sees the job as alive during long campaigns; also provides
live progress in BullBoard (0–99%).

H10: Confirmed resolved — wallet:ok:{orgId} cache key is consistent across
API middleware and worker since credit-guard.ts deletion in Sprint 2.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

apps/whatsapp-worker/src/handlers/BroadcastHandler.ts CHANGED
@@ -8,6 +8,8 @@ import { getCachedOrganization } from '../services/organization';
8
  import { debitWallet } from '../services/wallet';
9
  import { CREDIT_PRICE } from '@repo/database';
10
 
 
 
11
  export class BroadcastHandler implements JobHandler {
12
  private async getTenantConfig(organizationId: string) {
13
  const org = await getCachedOrganization(organizationId);
@@ -18,81 +20,103 @@ export class BroadcastHandler implements JobHandler {
18
  async handle(job: Job<JobData>, connection: Redis): Promise<void> {
19
  if (job.name !== 'send-broadcast') return;
20
 
21
- const { organizationId, listId, message, templateName, templateLanguage } = job.data as {
22
- organizationId: string,
23
- listId: string,
24
  message: string,
25
  templateName?: string,
26
  templateLanguage?: string
27
  };
28
-
29
  const tenantConfig = await this.getTenantConfig(organizationId);
30
  if (!tenantConfig) {
31
  logger.error({ organizationId }, '[BroadcastHandler] No WhatsApp config found for organization');
32
  return;
33
  }
34
 
35
- // 1. Fetch the list with its contacts
36
  const list = await prisma.broadcastList.findUnique({
37
  where: { id: listId },
38
- include: { contacts: true }
39
  });
40
-
41
  if (!list) {
42
  logger.error({ listId }, '[BroadcastHandler] Broadcast list not found');
43
  return;
44
  }
45
 
46
- logger.info(`[BroadcastHandler] Starting broadcast for list "${list.name}" to ${list.contacts.length} contacts (Template: ${templateName || 'None'})`);
47
 
48
  const { sendTextMessage, sendTemplateMessage } = await import('../whatsapp-cloud');
49
 
50
  let successCount = 0;
51
  let failCount = 0;
 
52
 
53
- for (const contact of list.contacts) {
54
- try {
55
- // 1.5 Check Usage Limit
56
- const { allowed } = await UsageService.checkAndIncrement(organizationId, connection);
57
- if (!allowed) {
58
- logger.warn({ organizationId, listId }, '[BroadcastHandler] Daily limit reached during broadcast. Halting.');
59
- break;
60
- }
61
 
62
- if (templateName) {
63
- await sendTemplateMessage(contact.phoneNumber, templateName, templateLanguage || 'fr', tenantConfig);
64
- } else {
65
- await sendTextMessage(contact.phoneNumber, message, tenantConfig);
66
- }
 
 
 
 
67
 
68
- successCount++;
69
- // P0 Fix: Persist broadcast message for history
 
 
 
 
 
 
 
 
70
  try {
71
- await prisma.message.create({
72
- data: {
73
- organizationId,
74
- contactId: contact.id,
75
- content: templateName ? `[TEMPLATE: ${templateName}]` : message,
76
- direction: 'OUTBOUND',
77
- status: 'SENT',
78
- channel: 'WHATSAPP'
79
- }
 
 
 
 
 
80
  });
81
- } catch (dbErr) {
82
- logger.warn({ dbErr, contactId: contact.id }, '[BroadcastHandler] Message persisted failed but Meta send succeeded');
83
- }
84
 
85
- // 2. Rate limiting: sleep between 50ms and 100ms
86
- const delay = Math.floor(Math.random() * (100 - 50 + 1) + 50);
87
- await new Promise(resolve => setTimeout(resolve, delay));
 
 
 
 
 
88
 
89
- } catch (err) {
90
- logger.error({ phoneNumber: contact.phoneNumber, err }, '[BroadcastHandler] Failed to send to contact');
91
- failCount++;
 
 
92
  }
 
 
93
  }
94
 
95
- logger.info(`[BroadcastHandler] Broadcast finished for "${list.name}". Success: ${successCount}, Failed: ${failCount}`);
96
 
97
  if (successCount > 0) {
98
  try {
 
8
  import { debitWallet } from '../services/wallet';
9
  import { CREDIT_PRICE } from '@repo/database';
10
 
11
+ const PAGE_SIZE = 200;
12
+
13
  export class BroadcastHandler implements JobHandler {
14
  private async getTenantConfig(organizationId: string) {
15
  const org = await getCachedOrganization(organizationId);
 
20
  async handle(job: Job<JobData>, connection: Redis): Promise<void> {
21
  if (job.name !== 'send-broadcast') return;
22
 
23
+ const { organizationId, listId, message, templateName, templateLanguage } = job.data as {
24
+ organizationId: string,
25
+ listId: string,
26
  message: string,
27
  templateName?: string,
28
  templateLanguage?: string
29
  };
30
+
31
  const tenantConfig = await this.getTenantConfig(organizationId);
32
  if (!tenantConfig) {
33
  logger.error({ organizationId }, '[BroadcastHandler] No WhatsApp config found for organization');
34
  return;
35
  }
36
 
 
37
  const list = await prisma.broadcastList.findUnique({
38
  where: { id: listId },
39
+ select: { name: true },
40
  });
 
41
  if (!list) {
42
  logger.error({ listId }, '[BroadcastHandler] Broadcast list not found');
43
  return;
44
  }
45
 
46
+ logger.info({ organizationId, listId }, `[BroadcastHandler] Starting broadcast "${list.name}" (Template: ${templateName || 'None'})`);
47
 
48
  const { sendTextMessage, sendTemplateMessage } = await import('../whatsapp-cloud');
49
 
50
  let successCount = 0;
51
  let failCount = 0;
52
+ let cursor: string | undefined;
53
 
54
+ while (true) {
55
+ const contacts = await prisma.contact.findMany({
56
+ where: { broadcastLists: { some: { id: listId } } },
57
+ select: { id: true, phoneNumber: true },
58
+ take: PAGE_SIZE,
59
+ orderBy: { id: 'asc' },
60
+ ...(cursor ? { cursor: { id: cursor }, skip: 1 } : {}),
61
+ });
62
 
63
+ if (contacts.length === 0) break;
64
+ cursor = contacts[contacts.length - 1].id;
65
+
66
+ // Reserve quota for the whole page in one Redis call
67
+ const { allowed } = await UsageService.checkAndIncrement(organizationId, connection, contacts.length);
68
+ if (!allowed) {
69
+ logger.warn({ organizationId, listId }, '[BroadcastHandler] Daily limit reached. Halting.');
70
+ break;
71
+ }
72
 
73
+ const messageRecords: Array<{
74
+ organizationId: string;
75
+ contactId: string;
76
+ content: string;
77
+ direction: 'OUTBOUND';
78
+ status: 'SENT';
79
+ channel: string;
80
+ }> = [];
81
+
82
+ for (const contact of contacts) {
83
  try {
84
+ if (templateName) {
85
+ await sendTemplateMessage(contact.phoneNumber, templateName, templateLanguage || 'fr', tenantConfig);
86
+ } else {
87
+ await sendTextMessage(contact.phoneNumber, message, tenantConfig);
88
+ }
89
+
90
+ successCount++;
91
+ messageRecords.push({
92
+ organizationId,
93
+ contactId: contact.id,
94
+ content: templateName ? `[TEMPLATE: ${templateName}]` : message,
95
+ direction: 'OUTBOUND',
96
+ status: 'SENT',
97
+ channel: 'WHATSAPP',
98
  });
 
 
 
99
 
100
+ // Rate limiting: 50–100ms between sends
101
+ const delay = Math.floor(Math.random() * 51) + 50;
102
+ await new Promise(resolve => setTimeout(resolve, delay));
103
+ } catch (err) {
104
+ logger.error({ phoneNumber: contact.phoneNumber, err }, '[BroadcastHandler] Failed to send to contact');
105
+ failCount++;
106
+ }
107
+ }
108
 
109
+ // Batch persist for the page — one DB round-trip instead of N
110
+ if (messageRecords.length > 0) {
111
+ await prisma.message.createMany({ data: messageRecords }).catch(err =>
112
+ logger.warn({ err, organizationId }, '[BroadcastHandler] createMany failed')
113
+ );
114
  }
115
+
116
+ if (contacts.length < PAGE_SIZE) break;
117
  }
118
 
119
+ logger.info(`[BroadcastHandler] Broadcast "${list.name}" finished. Success: ${successCount}, Failed: ${failCount}`);
120
 
121
  if (successCount > 0) {
122
  try {
apps/whatsapp-worker/src/handlers/CampaignHandler.ts CHANGED
@@ -8,6 +8,8 @@ import { getCachedOrganization } from '../services/organization';
8
  import { debitWallet } from '../services/wallet';
9
  import { CREDIT_PRICE } from '@repo/database';
10
 
 
 
11
  export class CampaignHandler implements JobHandler {
12
  private async getTenantConfig(organizationId: string) {
13
  const org = await getCachedOrganization(organizationId);
@@ -18,15 +20,15 @@ export class CampaignHandler implements JobHandler {
18
  async handle(job: Job<JobData>, connection: Redis): Promise<void> {
19
  if (job.name !== 'process-campaign') return;
20
 
21
- const { organizationId, messageContent, listId, templateName, templateLanguage } = job.data as {
22
- organizationId: string,
23
- messageContent: string,
24
  listId?: string,
25
  templateName?: string,
26
  templateLanguage?: string
27
  };
28
-
29
- logger.info({ organizationId, listId }, `[CampaignHandler] Starting campaign processing (Template: ${templateName || 'None'})...`);
30
 
31
  const tenantConfig = await this.getTenantConfig(organizationId);
32
  if (!tenantConfig) {
@@ -34,72 +36,95 @@ export class CampaignHandler implements JobHandler {
34
  return;
35
  }
36
 
37
- // 1. Resolve contacts
38
- let contacts: any[] = [];
39
- if (listId) {
40
- const list = await prisma.broadcastList.findUnique({
41
- where: { id: listId },
42
- include: { contacts: true }
43
- });
44
- contacts = list?.contacts || [];
45
- } else {
46
- // Fallback: send to all contacts of the organization
47
- contacts = await prisma.contact.findMany({
48
- where: { organizationId }
49
- });
50
- }
51
-
52
- if (contacts.length === 0) {
53
- logger.warn({ organizationId, listId }, '[CampaignHandler] No contacts found for campaign');
54
- return;
55
- }
56
-
57
- logger.info(`[CampaignHandler] Broadcasting to ${contacts.length} contacts`);
58
-
59
  const { sendTextMessage, sendTemplateMessage } = await import('../whatsapp-cloud');
60
 
61
  let successCount = 0;
62
  let failCount = 0;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
 
64
- for (const contact of contacts) {
65
- try {
66
- // Usage Limit Check
67
- const { allowed } = await UsageService.checkAndIncrement(organizationId, connection);
68
- if (!allowed) {
69
- logger.warn({ organizationId }, '[CampaignHandler] Daily limit reached. Halting campaign.');
70
- break;
71
- }
72
-
73
- if (templateName) {
74
- await sendTemplateMessage(contact.phoneNumber, templateName, templateLanguage || 'fr', tenantConfig);
75
- } else {
76
- await sendTextMessage(contact.phoneNumber, messageContent, tenantConfig);
77
- }
78
 
79
- successCount++;
80
- // Persist for history
81
  try {
82
- await prisma.message.create({
83
- data: {
84
- organizationId,
85
- contactId: contact.id,
86
- content: templateName ? `[TEMPLATE: ${templateName}]` : messageContent,
87
- direction: 'OUTBOUND',
88
- status: 'SENT',
89
- channel: 'WHATSAPP'
90
- }
 
 
 
 
 
91
  });
92
- } catch (dbErr) {
93
- logger.warn({ dbErr, contactId: contact.id }, '[CampaignHandler] Message persistence failed');
94
- }
95
 
96
- // 2. REQUIRED DELAY: 1000ms as per instructions
97
- await new Promise(resolve => setTimeout(resolve, 1000));
 
 
 
 
 
98
 
99
- } catch (err) {
100
- logger.error({ phoneNumber: contact.phoneNumber, err }, '[CampaignHandler] Failed to send');
101
- failCount++;
 
 
102
  }
 
 
 
 
 
 
 
 
103
  }
104
 
105
  logger.info(`[CampaignHandler] Finished. Success: ${successCount}, Failed: ${failCount}`);
 
8
  import { debitWallet } from '../services/wallet';
9
  import { CREDIT_PRICE } from '@repo/database';
10
 
11
+ const PAGE_SIZE = 200;
12
+
13
  export class CampaignHandler implements JobHandler {
14
  private async getTenantConfig(organizationId: string) {
15
  const org = await getCachedOrganization(organizationId);
 
20
  async handle(job: Job<JobData>, connection: Redis): Promise<void> {
21
  if (job.name !== 'process-campaign') return;
22
 
23
+ const { organizationId, messageContent, listId, templateName, templateLanguage } = job.data as {
24
+ organizationId: string,
25
+ messageContent: string,
26
  listId?: string,
27
  templateName?: string,
28
  templateLanguage?: string
29
  };
30
+
31
+ logger.info({ organizationId, listId }, `[CampaignHandler] Starting campaign (Template: ${templateName || 'None'})`);
32
 
33
  const tenantConfig = await this.getTenantConfig(organizationId);
34
  if (!tenantConfig) {
 
36
  return;
37
  }
38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  const { sendTextMessage, sendTemplateMessage } = await import('../whatsapp-cloud');
40
 
41
  let successCount = 0;
42
  let failCount = 0;
43
+ let cursor: string | undefined;
44
+ let pageIndex = 0;
45
+ const totalEstimate = await (listId
46
+ ? prisma.contact.count({ where: { broadcastLists: { some: { id: listId } } } })
47
+ : prisma.contact.count({ where: { organizationId } })
48
+ );
49
+
50
+ while (true) {
51
+ const contacts = await (listId
52
+ ? prisma.contact.findMany({
53
+ where: { broadcastLists: { some: { id: listId } } },
54
+ select: { id: true, phoneNumber: true },
55
+ take: PAGE_SIZE,
56
+ orderBy: { id: 'asc' },
57
+ ...(cursor ? { cursor: { id: cursor }, skip: 1 } : {}),
58
+ })
59
+ : prisma.contact.findMany({
60
+ where: { organizationId },
61
+ select: { id: true, phoneNumber: true },
62
+ take: PAGE_SIZE,
63
+ orderBy: { id: 'asc' },
64
+ ...(cursor ? { cursor: { id: cursor }, skip: 1 } : {}),
65
+ })
66
+ );
67
+
68
+ if (contacts.length === 0) break;
69
+ cursor = contacts[contacts.length - 1].id;
70
+ pageIndex++;
71
+
72
+ // Reserve quota for the whole page in one Redis call
73
+ const { allowed } = await UsageService.checkAndIncrement(organizationId, connection, contacts.length);
74
+ if (!allowed) {
75
+ logger.warn({ organizationId }, '[CampaignHandler] Daily limit reached. Halting campaign.');
76
+ break;
77
+ }
78
 
79
+ const messageRecords: Array<{
80
+ organizationId: string;
81
+ contactId: string;
82
+ content: string;
83
+ direction: 'OUTBOUND';
84
+ status: 'SENT';
85
+ channel: string;
86
+ }> = [];
 
 
 
 
 
 
87
 
88
+ for (const contact of contacts) {
 
89
  try {
90
+ if (templateName) {
91
+ await sendTemplateMessage(contact.phoneNumber, templateName, templateLanguage || 'fr', tenantConfig);
92
+ } else {
93
+ await sendTextMessage(contact.phoneNumber, messageContent, tenantConfig);
94
+ }
95
+
96
+ successCount++;
97
+ messageRecords.push({
98
+ organizationId,
99
+ contactId: contact.id,
100
+ content: templateName ? `[TEMPLATE: ${templateName}]` : messageContent,
101
+ direction: 'OUTBOUND',
102
+ status: 'SENT',
103
+ channel: 'WHATSAPP',
104
  });
 
 
 
105
 
106
+ // Required delay between sends to respect Meta rate limits
107
+ await new Promise(resolve => setTimeout(resolve, 1000));
108
+ } catch (err) {
109
+ logger.error({ phoneNumber: contact.phoneNumber, err }, '[CampaignHandler] Failed to send');
110
+ failCount++;
111
+ }
112
+ }
113
 
114
+ // Batch persist for the page — one DB round-trip instead of N
115
+ if (messageRecords.length > 0) {
116
+ await prisma.message.createMany({ data: messageRecords }).catch(err =>
117
+ logger.warn({ err, organizationId }, '[CampaignHandler] createMany failed')
118
+ );
119
  }
120
+
121
+ // Signal liveness to BullMQ stall detector and update progress in BullBoard
122
+ const progress = totalEstimate > 0
123
+ ? Math.min(Math.floor(((pageIndex * PAGE_SIZE) / totalEstimate) * 100), 99)
124
+ : 0;
125
+ await job.updateProgress(progress);
126
+
127
+ if (contacts.length < PAGE_SIZE) break;
128
  }
129
 
130
  logger.info(`[CampaignHandler] Finished. Success: ${successCount}, Failed: ${failCount}`);