CognxSafeTrack commited on
Commit
d983a7d
·
1 Parent(s): e00083b

refactor: resolve critical tech debt in CRM broadcasts and usage tracking

Browse files
apps/whatsapp-worker/src/handlers/BroadcastHandler.ts CHANGED
@@ -3,6 +3,7 @@ import Redis from 'ioredis';
3
  import { JobHandler, JobData } from './types';
4
  import { prisma } from '../services/prisma';
5
  import { logger } from '../logger';
 
6
 
7
  export class BroadcastHandler implements JobHandler {
8
  private async getTenantConfig(organizationId: string, connection: Redis) {
@@ -60,6 +61,13 @@ export class BroadcastHandler implements JobHandler {
60
 
61
  for (const contact of list.contacts) {
62
  try {
 
 
 
 
 
 
 
63
  const body = {
64
  messaging_product: 'whatsapp',
65
  recipient_type: 'individual',
@@ -76,6 +84,21 @@ export class BroadcastHandler implements JobHandler {
76
 
77
  if (response.ok) {
78
  successCount++;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  } else {
80
  const errorData = await response.json();
81
  logger.warn({ phoneNumber: contact.phoneNumber, errorData }, '[BroadcastHandler] Meta API returned error');
 
3
  import { JobHandler, JobData } from './types';
4
  import { prisma } from '../services/prisma';
5
  import { logger } from '../logger';
6
+ import { UsageService } from '../services/usage';
7
 
8
  export class BroadcastHandler implements JobHandler {
9
  private async getTenantConfig(organizationId: string, connection: Redis) {
 
61
 
62
  for (const contact of list.contacts) {
63
  try {
64
+ // 1.5 Check Usage Limit
65
+ const { allowed } = await UsageService.checkAndIncrement(organizationId, connection);
66
+ if (!allowed) {
67
+ logger.warn({ organizationId, listId }, '[BroadcastHandler] Daily limit reached during broadcast. Halting.');
68
+ break;
69
+ }
70
+
71
  const body = {
72
  messaging_product: 'whatsapp',
73
  recipient_type: 'individual',
 
84
 
85
  if (response.ok) {
86
  successCount++;
87
+ // P0 Fix: Persist broadcast message for history
88
+ try {
89
+ await (prisma as any).message.create({
90
+ data: {
91
+ organizationId,
92
+ contactId: contact.id,
93
+ content: message,
94
+ direction: 'OUTBOUND',
95
+ status: 'SENT',
96
+ channel: 'WHATSAPP'
97
+ }
98
+ });
99
+ } catch (dbErr) {
100
+ logger.warn({ dbErr, contactId: contact.id }, '[BroadcastHandler] Message persisted failed but Meta send succeeded');
101
+ }
102
  } else {
103
  const errorData = await response.json();
104
  logger.warn({ phoneNumber: contact.phoneNumber, errorData }, '[BroadcastHandler] Meta API returned error');
apps/whatsapp-worker/src/index.ts CHANGED
@@ -169,7 +169,7 @@ const worker = new Worker('whatsapp-queue', async (job: Job<JobData>) => {
169
  const outboundJobNames = [
170
  'send-message', 'send-message-direct', 'send-image',
171
  'send-interactive-buttons', 'send-interactive-list',
172
- 'send-content', 'send-nudge', 'send-broadcast', 'send-direct-message'
173
  ];
174
 
175
  if (outboundJobNames.includes(job.name)) {
 
169
  const outboundJobNames = [
170
  'send-message', 'send-message-direct', 'send-image',
171
  'send-interactive-buttons', 'send-interactive-list',
172
+ 'send-content', 'send-nudge', 'send-direct-message'
173
  ];
174
 
175
  if (outboundJobNames.includes(job.name)) {
apps/whatsapp-worker/src/services/usage.ts CHANGED
@@ -8,7 +8,7 @@ export class UsageService {
8
  * Checks if an organization has exceeded its daily message limit.
9
  * Increments the count if within limits.
10
  */
11
- static async checkAndIncrement(organizationId: string, redis: Redis): Promise<{ allowed: boolean; current: number; limit: number }> {
12
  const org = await getCachedOrganization(organizationId);
13
  if (!org) return { allowed: false, current: 0, limit: 0 };
14
 
@@ -16,10 +16,10 @@ export class UsageService {
16
  const today = new Date().toISOString().split('T')[0];
17
  const key = `usage:${organizationId}:${today}`;
18
 
19
- // Atomic increment
20
- const current = await redis.incr(key);
21
 
22
- if (current === 1) {
23
  // Set expiry for 24h on first use of the day
24
  await redis.expire(key, 86400);
25
  }
 
8
  * Checks if an organization has exceeded its daily message limit.
9
  * Increments the count if within limits.
10
  */
11
+ static async checkAndIncrement(organizationId: string, redis: Redis, amount: number = 1): Promise<{ allowed: boolean; current: number; limit: number }> {
12
  const org = await getCachedOrganization(organizationId);
13
  if (!org) return { allowed: false, current: 0, limit: 0 };
14
 
 
16
  const today = new Date().toISOString().split('T')[0];
17
  const key = `usage:${organizationId}:${today}`;
18
 
19
+ // Atomic increment by amount
20
+ const current = await redis.incrby(key, amount);
21
 
22
+ if (current <= amount) {
23
  // Set expiry for 24h on first use of the day
24
  await redis.expire(key, 86400);
25
  }