CognxSafeTrack Claude Sonnet 4.6 commited on
Commit
2b96936
·
1 Parent(s): 44f1818

fix(webhook+worker): restore gateway forwarding and fix token decryption

Browse files

1. Gateway architecture restored (whatsapp.ts)
- When RAILWAY_INTERNAL_URL is set (HF Space), forward raw payload to
Railway worker bridge instead of adding directly to Redis.
- Original architecture: HF → Railway bridge → Redis → Worker.
- Root cause of INSCRIPTION silence: direct Redis queue from HF Space
was failing because Redis connection doesn't work from HF containers.

2. MessageHandler: decrypt systemUserToken before use
- Was reading org.systemUserToken raw (AES-encrypted) from prisma,
sending encrypted string to Meta → auth failure.
- Now uses getCachedOrganization() which calls decryptOrgSecrets().

3. AdminHandler: eliminate double DB query + empty enrollmentId
- Was calling prisma.enrollment.findFirst() twice for same record.
- Validates enrollment exists before proceeding; adds mediaUrl to Response.

4. CommandHandler: route MENU_HISTORIQUE through queue
- Direct sendInteractiveListMessage() call bypassed tenant config.
- Now uses whatsappQueue.add('send-interactive-list', ...) with organizationId.

5. ExerciseHandler: add missing organizationId + queue retention config
- Added organizationId to send-message queue calls.
- Added removeOnComplete/removeOnFail to generate-feedback job.

6. organization.ts (worker): include phoneNumbers in getCachedOrganization
- MessageHandler needs phoneNumbers[0].id for phoneNumberId lookup.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

apps/api/src/routes/whatsapp.ts CHANGED
@@ -118,7 +118,32 @@ export async function whatsappRoutes(fastify: FastifyInstance) {
118
 
119
  const org = await prisma.organization.findUnique({ where: { id: organizationId } });
120
 
121
- // 🏢 WEBHOOK MODE — forward raw payload
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
  if (org?.mode === 'WEBHOOK' && org.webhookUrl) {
123
  logger.info({ orgId: organizationId }, '[WHATSAPP-WEBHOOK] WEBHOOK mode — forwarding');
124
  await whatsappQueue.add('send-webhook', {
@@ -129,12 +154,11 @@ export async function whatsappRoutes(fastify: FastifyInstance) {
129
  return reply.code(200).send('EVENT_RECEIVED');
130
  }
131
 
132
- // 🤖 ASYNC PROCESSING — respond 200 first, enqueue after (Async-First rule)
133
  reply.code(200).send('EVENT_RECEIVED');
134
 
135
  setImmediate(async () => {
136
  try {
137
- // extractWhatsAppPayload handles text, interactive (button_reply, list_reply), audio, image
138
  const extracted = extractWhatsAppPayload(request.body);
139
  for (const msg of extracted) {
140
  if (msg.text !== undefined) {
 
118
 
119
  const org = await prisma.organization.findUnique({ where: { id: organizationId } });
120
 
121
+ // 🚀 GATEWAY MODE — forward raw payload to Railway worker bridge
122
+ // When RAILWAY_INTERNAL_URL is set (HF Space deployment), we never touch Redis
123
+ // directly — the worker bridge on Railway owns all queue operations.
124
+ const railwayUrl = process.env.RAILWAY_INTERNAL_URL;
125
+ if (railwayUrl) {
126
+ reply.code(200).send('EVENT_RECEIVED');
127
+ setImmediate(async () => {
128
+ try {
129
+ const res = await fetch(`${railwayUrl}/v1/internal/whatsapp/inbound`, {
130
+ method: 'POST',
131
+ headers: {
132
+ 'Content-Type': 'application/json',
133
+ 'Authorization': `Bearer ${process.env.ADMIN_API_KEY}`,
134
+ 'x-organization-id': organizationId
135
+ },
136
+ body: JSON.stringify(request.body)
137
+ });
138
+ logger.info({ status: res.status, organizationId, phoneNumberId }, '[WHATSAPP-GATEWAY] Forwarded to Railway worker');
139
+ } catch (err) {
140
+ logger.error({ err, organizationId }, '[WHATSAPP-GATEWAY] Forward to Railway failed');
141
+ }
142
+ });
143
+ return;
144
+ }
145
+
146
+ // 🏢 WEBHOOK MODE — forward raw payload (standalone/Railway mode)
147
  if (org?.mode === 'WEBHOOK' && org.webhookUrl) {
148
  logger.info({ orgId: organizationId }, '[WHATSAPP-WEBHOOK] WEBHOOK mode — forwarding');
149
  await whatsappQueue.add('send-webhook', {
 
154
  return reply.code(200).send('EVENT_RECEIVED');
155
  }
156
 
157
+ // 🤖 STANDALONE MODE — respond 200 first, enqueue after (Railway-only, no gateway)
158
  reply.code(200).send('EVENT_RECEIVED');
159
 
160
  setImmediate(async () => {
161
  try {
 
162
  const extracted = extractWhatsAppPayload(request.body);
163
  for (const msg of extracted) {
164
  if (msg.text !== undefined) {
apps/whatsapp-worker/src/handlers/AdminHandler.ts CHANGED
@@ -3,76 +3,57 @@ import Redis from 'ioredis';
3
  import { JobHandler, JobData } from './types';
4
  import { prisma } from '../services/prisma';
5
  import { logger } from '../logger';
 
6
  import { sendTextMessage } from '../whatsapp-cloud';
7
 
8
- interface TenantConfig {
9
- accessToken: string;
10
- phoneNumberId: string;
11
- }
12
-
13
  export class AdminHandler implements JobHandler {
14
- private async getTenantConfig(organizationId: string, connection: Redis): Promise<TenantConfig | undefined> {
15
- const cacheKey = `org:config:${organizationId}`;
16
- try {
17
- const cached = await connection.get(cacheKey);
18
- if (cached) return JSON.parse(cached);
19
- } catch (err) {
20
- logger.warn({ err, organizationId }, '[AdminHandler] Redis cache lookup failed');
21
- }
22
-
23
- const org = await prisma.organization.findUnique({
24
- where: { id: organizationId },
25
- include: { phoneNumbers: true }
26
- });
27
-
28
- if (!org || !org.systemUserToken || !org.phoneNumbers?.[0]?.id) return undefined;
29
-
30
- const config = {
31
- accessToken: org.systemUserToken,
32
- phoneNumberId: org.phoneNumbers[0].id
33
- };
34
- await connection.set(cacheKey, JSON.stringify(config), 'EX', 3600);
35
- return config;
36
- }
37
-
38
  async handle(job: Job<JobData>, connection: Redis): Promise<void> {
39
  const { userId, trackId, overrideAudioUrl, adminId, organizationId } = job.data;
40
- if (!userId || !overrideAudioUrl) return;
41
-
42
- const user = await prisma.user.findUnique({ where: { id: userId } });
43
- const tenantConfig = await this.getTenantConfig(organizationId || '', connection);
44
 
45
- if (user?.phone) {
46
- const { sendAudioMessage } = await import('../whatsapp-cloud');
47
- await sendAudioMessage(user.phone, overrideAudioUrl, tenantConfig);
 
48
 
49
- await sendTextMessage(user.phone,
50
- user.language === 'WOLOF'
51
- ? "Baax na ! Yónnee *SUITE* ngir dem ci kanam."
52
- : "Bravo ! Envoyez *SUITE* pour passer à la leçon suivante.",
53
- tenantConfig
54
- );
55
 
56
- await prisma.response.create({
57
- data: {
58
- userId: userId,
59
- enrollmentId: (await prisma.enrollment.findFirst({ where: { userId, trackId, status: 'ACTIVE' } }))?.id || '',
60
- dayNumber: (await prisma.enrollment.findFirst({ where: { userId, trackId, status: 'ACTIVE' } }))?.currentDay || 0,
61
- content: `[AUDIO_OVERRIDE] ${overrideAudioUrl}`,
62
- aiSource: `ADMIN_OVERRIDE:${adminId || 'unknown'}`,
63
- organizationId: organizationId || user.organizationId
64
- }
65
- });
66
 
67
- const enrollment = await prisma.enrollment.findFirst({
68
- where: { userId, trackId, status: 'ACTIVE' }
69
- });
 
70
 
71
- if (enrollment) {
72
- const nextDay = Math.floor(enrollment.currentDay) + 1;
73
- const q = new Queue('whatsapp-queue', { connection });
74
- await q.add('send-content', { userId, trackId, dayNumber: nextDay, organizationId }, { delay: 2000 });
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  }
76
- }
 
 
 
 
77
  }
78
  }
 
3
  import { JobHandler, JobData } from './types';
4
  import { prisma } from '../services/prisma';
5
  import { logger } from '../logger';
6
+ import { getCachedOrganization } from '../services/organization';
7
  import { sendTextMessage } from '../whatsapp-cloud';
8
 
 
 
 
 
 
9
  export class AdminHandler implements JobHandler {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
  async handle(job: Job<JobData>, connection: Redis): Promise<void> {
11
  const { userId, trackId, overrideAudioUrl, adminId, organizationId } = job.data;
12
+ if (!userId || !overrideAudioUrl || !organizationId) return;
 
 
 
13
 
14
+ const [user, enrollment] = await Promise.all([
15
+ prisma.user.findUnique({ where: { id: userId } }),
16
+ prisma.enrollment.findFirst({ where: { userId, trackId, status: 'ACTIVE' } })
17
+ ]);
18
 
19
+ const org = await getCachedOrganization(organizationId);
20
+ const tenantConfig = (org?.systemUserToken && org.phoneNumbers?.[0]?.id)
21
+ ? { accessToken: org.systemUserToken, phoneNumberId: org.phoneNumbers[0].id }
22
+ : undefined;
 
 
23
 
24
+ if (!user?.phone) {
25
+ logger.warn({ userId }, '[ADMIN] User has no phone — skipping');
26
+ return;
27
+ }
 
 
 
 
 
 
28
 
29
+ if (!enrollment) {
30
+ logger.warn({ userId, trackId }, '[ADMIN] No active enrollment — skipping');
31
+ return;
32
+ }
33
 
34
+ const { sendAudioMessage } = await import('../whatsapp-cloud');
35
+ await sendAudioMessage(user.phone, overrideAudioUrl, tenantConfig);
36
+ await sendTextMessage(user.phone,
37
+ user.language === 'WOLOF'
38
+ ? "Baax na ! Yónnee *SUITE* ngir dem ci kanam."
39
+ : "Bravo ! Envoyez *SUITE* pour passer à la leçon suivante.",
40
+ tenantConfig
41
+ );
42
+
43
+ await prisma.response.create({
44
+ data: {
45
+ userId,
46
+ enrollmentId: enrollment.id,
47
+ dayNumber: Math.floor(enrollment.currentDay),
48
+ content: `[AUDIO_OVERRIDE] ${overrideAudioUrl}`,
49
+ mediaUrl: overrideAudioUrl,
50
+ aiSource: `ADMIN_OVERRIDE:${adminId || 'unknown'}`,
51
+ organizationId
52
  }
53
+ });
54
+
55
+ const nextDay = Math.floor(enrollment.currentDay) + 1;
56
+ const q = new Queue('whatsapp-queue', { connection });
57
+ await q.add('send-content', { userId, trackId, dayNumber: nextDay, organizationId }, { delay: 2000 });
58
  }
59
  }
apps/whatsapp-worker/src/handlers/CommandHandler.ts CHANGED
@@ -70,17 +70,14 @@ export class CommandHandler implements MessageHandler {
70
 
71
  const limitedRows = rows.slice(-10);
72
 
73
- const { sendInteractiveListMessage } = await import('../whatsapp-cloud');
74
- await sendInteractiveListMessage(
75
- user.phone || '',
76
- t('history_menu_title'),
77
- t('history_menu_body'),
78
- t('history_menu_button'),
79
- [{
80
- title: t('history_section_title'),
81
- rows: limitedRows
82
- }]
83
- );
84
  return true;
85
  }
86
 
 
70
 
71
  const limitedRows = rows.slice(-10);
72
 
73
+ await whatsappQueue.add('send-interactive-list', {
74
+ userId: user.id,
75
+ headerText: t('history_menu_title'),
76
+ bodyText: t('history_menu_body'),
77
+ buttonLabel: t('history_menu_button'),
78
+ sections: [{ title: t('history_section_title'), rows: limitedRows }],
79
+ organizationId: ctx.organizationId
80
+ });
 
 
 
81
  return true;
82
  }
83
 
apps/whatsapp-worker/src/handlers/ExerciseHandler.ts CHANGED
@@ -67,14 +67,14 @@ export class ExerciseHandler implements MessageHandler {
67
  const reminder = user.language === 'WOLOF'
68
  ? "Mat nga bés bi ba pare ! ✨\nBindal *2* wala *SUITE* ngir dem ci bés bi ci kanam."
69
  : "Tu as déjà validé cette étape ! ✨\nEnvoie *2* ou *SUITE* pour passer à la suite.";
70
- await whatsappQueue.add('send-message', { userId: user.id, text: reminder });
71
  return true;
72
  }
73
  return false;
74
  }
75
 
76
- const trackDay = await prisma.trackDay.findFirst({
77
- where: { trackId: activeEnrollment.trackId, dayNumber: effectiveDay }
78
  });
79
 
80
  if (!trackDay) {
@@ -84,7 +84,7 @@ export class ExerciseHandler implements MessageHandler {
84
 
85
  const isDeepDiveAction = pendingProgress.exerciseStatus === 'PENDING_DEEPDIVE';
86
  const wordCount = (text || '').trim().split(/\s+/).length;
87
-
88
  // Bypasses (Button, Special, Vision)
89
  let isButtonChoice = false;
90
  const buttons = trackDay.buttonsJson as { id?: string; title?: string }[] | null;
@@ -102,11 +102,11 @@ export class ExerciseHandler implements MessageHandler {
102
 
103
  if (wordCount < minWordCount) {
104
  const msg = user.language === 'WOLOF' ? "Tontu bi gatt na..." : "Ta réponse est un peu courte.";
105
- await whatsappQueue.add('send-message', { userId: user.id, text: msg });
106
  return true;
107
  }
108
 
109
- await whatsappQueue.add('send-message', { userId: user.id, text: user.language === 'WOLOF' ? "⏳ Defar ak sa tontu..." : "⏳ Analyse de votre réponse..." });
110
 
111
  let currentIterationCount = pendingProgress.iterationCount || 0;
112
  if (isDeepDiveAction) {
@@ -134,7 +134,7 @@ export class ExerciseHandler implements MessageHandler {
134
  isTimeTravelMode,
135
  realCurrentDay: activeEnrollment.currentDay,
136
  organizationId: ctx.organizationId
137
- }, { attempts: 3, backoff: { type: 'exponential', delay: 2000 } });
138
 
139
  return true;
140
  }
 
67
  const reminder = user.language === 'WOLOF'
68
  ? "Mat nga bés bi ba pare ! ✨\nBindal *2* wala *SUITE* ngir dem ci bés bi ci kanam."
69
  : "Tu as déjà validé cette étape ! ✨\nEnvoie *2* ou *SUITE* pour passer à la suite.";
70
+ await whatsappQueue.add('send-message', { userId: user.id, text: reminder, organizationId: ctx.organizationId });
71
  return true;
72
  }
73
  return false;
74
  }
75
 
76
+ const trackDay = await prisma.trackDay.findFirst({
77
+ where: { trackId: activeEnrollment.trackId, dayNumber: effectiveDay }
78
  });
79
 
80
  if (!trackDay) {
 
84
 
85
  const isDeepDiveAction = pendingProgress.exerciseStatus === 'PENDING_DEEPDIVE';
86
  const wordCount = (text || '').trim().split(/\s+/).length;
87
+
88
  // Bypasses (Button, Special, Vision)
89
  let isButtonChoice = false;
90
  const buttons = trackDay.buttonsJson as { id?: string; title?: string }[] | null;
 
102
 
103
  if (wordCount < minWordCount) {
104
  const msg = user.language === 'WOLOF' ? "Tontu bi gatt na..." : "Ta réponse est un peu courte.";
105
+ await whatsappQueue.add('send-message', { userId: user.id, text: msg, organizationId: ctx.organizationId });
106
  return true;
107
  }
108
 
109
+ await whatsappQueue.add('send-message', { userId: user.id, text: user.language === 'WOLOF' ? "⏳ Defar ak sa tontu..." : "⏳ Analyse de votre réponse...", organizationId: ctx.organizationId });
110
 
111
  let currentIterationCount = pendingProgress.iterationCount || 0;
112
  if (isDeepDiveAction) {
 
134
  isTimeTravelMode,
135
  realCurrentDay: activeEnrollment.currentDay,
136
  organizationId: ctx.organizationId
137
+ }, { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, removeOnComplete: true, removeOnFail: false });
138
 
139
  return true;
140
  }
apps/whatsapp-worker/src/handlers/MessageHandler.ts CHANGED
@@ -3,6 +3,7 @@ import Redis from 'ioredis';
3
  import { JobHandler, JobData } from './types';
4
  import { prisma } from '../services/prisma';
5
  import { logger } from '../logger';
 
6
  import { sendTextMessage, sendImageMessage, sendInteractiveButtonMessage, sendInteractiveListMessage } from '../whatsapp-cloud';
7
 
8
  interface TenantConfig {
@@ -11,28 +12,15 @@ interface TenantConfig {
11
  }
12
 
13
  export class MessageHandler implements JobHandler {
14
- private async getTenantConfig(organizationId: string, connection: Redis): Promise<TenantConfig | undefined> {
15
- const cacheKey = `org:config:${organizationId}`;
16
- try {
17
- const cached = await connection.get(cacheKey);
18
- if (cached) return JSON.parse(cached);
19
- } catch (err) {
20
- logger.warn({ err, organizationId }, '[MessageHandler] Redis cache lookup failed');
21
- }
22
-
23
- const org = await prisma.organization.findUnique({
24
- where: { id: organizationId },
25
- include: { phoneNumbers: true }
26
- });
27
-
28
  if (!org || !org.systemUserToken || !org.phoneNumbers?.[0]?.id) return undefined;
29
 
30
- const config = {
31
  accessToken: org.systemUserToken,
32
  phoneNumberId: org.phoneNumbers[0].id
33
  };
34
- await connection.set(cacheKey, JSON.stringify(config), 'EX', 3600);
35
- return config;
36
  }
37
 
38
  async handle(job: Job<JobData>, connection: Redis): Promise<void> {
 
3
  import { JobHandler, JobData } from './types';
4
  import { prisma } from '../services/prisma';
5
  import { logger } from '../logger';
6
+ import { getCachedOrganization } from '../services/organization';
7
  import { sendTextMessage, sendImageMessage, sendInteractiveButtonMessage, sendInteractiveListMessage } from '../whatsapp-cloud';
8
 
9
  interface TenantConfig {
 
12
  }
13
 
14
  export class MessageHandler implements JobHandler {
15
+ private async getTenantConfig(organizationId: string, _connection: Redis): Promise<TenantConfig | undefined> {
16
+ // getCachedOrganization decrypts systemUserToken before returning
17
+ const org = await getCachedOrganization(organizationId);
 
 
 
 
 
 
 
 
 
 
 
18
  if (!org || !org.systemUserToken || !org.phoneNumbers?.[0]?.id) return undefined;
19
 
20
+ return {
21
  accessToken: org.systemUserToken,
22
  phoneNumberId: org.phoneNumbers[0].id
23
  };
 
 
24
  }
25
 
26
  async handle(job: Job<JobData>, connection: Redis): Promise<void> {
apps/whatsapp-worker/src/services/organization.ts CHANGED
@@ -62,7 +62,8 @@ export async function getCachedOrganization(id: string) {
62
 
63
  // 3. DB Lookup
64
  const org = await prisma.organization.findUnique({
65
- where: { id }
 
66
  });
67
 
68
  if (org) {
 
62
 
63
  // 3. DB Lookup
64
  const org = await prisma.organization.findUnique({
65
+ where: { id },
66
+ include: { phoneNumbers: true }
67
  });
68
 
69
  if (org) {