CognxSafeTrack commited on
Commit
df0edd7
·
1 Parent(s): 0e83f45

fix: Split Webhook into Gateway (HuggingFace) and Processor (Railway), fix missing https:// in API_URL

Browse files
apps/api/src/routes/internal.ts CHANGED
@@ -1,11 +1,125 @@
1
  import { FastifyInstance } from 'fastify';
2
  import { WhatsAppService } from '../services/whatsapp';
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
 
4
  /**
5
  * Internal-only routes — protected by ADMIN_API_KEY, not exposed publicly.
6
  * Used by the Railway worker to call handleIncomingMessage after audio transcription.
7
  */
8
  export async function internalRoutes(fastify: FastifyInstance) {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  fastify.post<{
10
  Body: { phone: string; text: string; audioUrl?: string }
11
  }>('/v1/internal/handle-message', {
@@ -35,3 +149,4 @@ export async function internalRoutes(fastify: FastifyInstance) {
35
  return reply.send({ ok: true });
36
  });
37
  }
 
 
1
  import { FastifyInstance } from 'fastify';
2
  import { WhatsAppService } from '../services/whatsapp';
3
+ import { z } from 'zod';
4
+ // ─── Zod Schema for WhatsApp Webhook Payload ─────────────────────────────────
5
+ const WhatsAppMessageSchema = z.object({
6
+ from: z.string(),
7
+ id: z.string(),
8
+ timestamp: z.string(),
9
+ type: z.enum(['text', 'audio', 'image', 'video', 'document', 'sticker', 'reaction', 'interactive']),
10
+ text: z.object({ body: z.string() }).optional(),
11
+ audio: z.object({ id: z.string(), mime_type: z.string().optional() }).optional(),
12
+ image: z.object({ id: z.string() }).optional(),
13
+ interactive: z.object({
14
+ type: z.enum(['button_reply', 'list_reply']),
15
+ button_reply: z.object({
16
+ id: z.string(),
17
+ title: z.string(),
18
+ }).optional(),
19
+ list_reply: z.object({
20
+ id: z.string(),
21
+ title: z.string(),
22
+ description: z.string().optional()
23
+ }).optional(),
24
+ }).optional()
25
+ });
26
+
27
+ const WebhookPayloadSchema = z.object({
28
+ object: z.literal('whatsapp_business_account'),
29
+ entry: z.array(z.object({
30
+ id: z.string(),
31
+ changes: z.array(z.object({
32
+ value: z.object({
33
+ messaging_product: z.string().optional(),
34
+ metadata: z.object({ phone_number_id: z.string() }).optional(),
35
+ contacts: z.array(z.any()).optional(),
36
+ messages: z.array(WhatsAppMessageSchema).optional(),
37
+ statuses: z.array(z.any()).optional(),
38
+ }),
39
+ field: z.string(),
40
+ })),
41
+ })),
42
+ });
43
 
44
  /**
45
  * Internal-only routes — protected by ADMIN_API_KEY, not exposed publicly.
46
  * Used by the Railway worker to call handleIncomingMessage after audio transcription.
47
  */
48
  export async function internalRoutes(fastify: FastifyInstance) {
49
+ // ── Handle Webhook Forwarding from Gateway (HF -> Railway) ───────────────
50
+ fastify.post('/v1/internal/whatsapp/inbound', {
51
+ config: { requireAuth: true } as any
52
+ }, async (request, reply) => {
53
+ // We received the raw webhook payload that was forwarded.
54
+ // Send a 200 immediately to release HF Gateway
55
+ reply.code(200).send({ ok: true });
56
+
57
+ // Process message parsing outside the request loop
58
+ setImmediate(async () => {
59
+ try {
60
+ const parsed = WebhookPayloadSchema.safeParse(request.body);
61
+
62
+ if (!parsed.success) {
63
+ fastify.log.warn(`[INTERNAL-WEBHOOK] Invalid payload schema: ${JSON.stringify(parsed.error.flatten())}`);
64
+ return;
65
+ }
66
+
67
+ const payload = parsed.data;
68
+
69
+ for (const entry of payload.entry) {
70
+ for (const change of entry.changes) {
71
+ const messages = change.value.messages ?? [];
72
+
73
+ for (const message of messages) {
74
+ const phone = message.from;
75
+ let text = '';
76
+
77
+ if (message.type === 'text' && message.text) {
78
+ text = message.text.body;
79
+
80
+ } else if (message.type === 'interactive' && message.interactive) {
81
+ if (message.interactive.type === 'button_reply' && message.interactive.button_reply) {
82
+ text = message.interactive.button_reply.id;
83
+ fastify.log.info(`[INTERNAL-WEBHOOK] Button reply: ${text}`);
84
+ } else if (message.interactive.type === 'list_reply' && message.interactive.list_reply) {
85
+ text = message.interactive.list_reply.id;
86
+ fastify.log.info(`[INTERNAL-WEBHOOK] List reply: ${text}`);
87
+ }
88
+
89
+ } else if (message.type === 'audio' && message.audio) {
90
+ // ─── Audio inbound: delegate download to Railway worker ────────────
91
+ const accessToken = process.env.WHATSAPP_ACCESS_TOKEN || '';
92
+ const { Queue } = await import('bullmq');
93
+ const Redis = (await import('ioredis')).default;
94
+ const conn = process.env.REDIS_URL
95
+ ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
96
+ : new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null });
97
+ const q = new Queue('whatsapp-queue', { connection: conn as any });
98
+
99
+ await q.add('download-media', {
100
+ mediaId: message.audio.id,
101
+ mimeType: message.audio.mime_type || 'audio/ogg',
102
+ phone,
103
+ accessToken
104
+ }, { priority: 1 });
105
+
106
+ fastify.log.info(`[INTERNAL-WEBHOOK] Audio ${message.audio.id} enqueued for Railway download`);
107
+ continue;
108
+ }
109
+
110
+ if (phone && text) {
111
+ await WhatsAppService.handleIncomingMessage(phone, text);
112
+ }
113
+ }
114
+ }
115
+ }
116
+ } catch (error) {
117
+ fastify.log.error(`[INTERNAL-WEBHOOK] Async processing error: ${String(error)}`);
118
+ }
119
+ });
120
+ });
121
+
122
+ // ── Handle standard transcribed messages from worker (Railway) ───────────
123
  fastify.post<{
124
  Body: { phone: string; text: string; audioUrl?: string }
125
  }>('/v1/internal/handle-message', {
 
149
  return reply.send({ ok: true });
150
  });
151
  }
152
+
apps/api/src/routes/whatsapp.ts CHANGED
@@ -100,8 +100,10 @@ export async function whatsappRoutes(fastify: FastifyInstance) {
100
  const signature = request.headers['x-hub-signature-256'] as string;
101
  const rawBody = (request as any).rawBody as Buffer;
102
 
 
103
  if (!verifyWebhookSignature(rawBody, signature, appSecret)) {
104
  request.log.warn('[WEBHOOK] Invalid HMAC signature — request rejected');
 
105
  return reply.code(403).send({ error: 'Invalid signature' });
106
  }
107
  } else {
@@ -109,7 +111,32 @@ export async function whatsappRoutes(fastify: FastifyInstance) {
109
  request.log.warn('[WEBHOOK] WHATSAPP_APP_SECRET not set — skipping signature verification');
110
  }
111
 
112
- // ── 2. Return 200 IMMEDIATELY to satisfy Meta's < 20s timeout ───────
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
  reply.code(200).send('EVENT_RECEIVED');
114
 
115
  // ── 3. Validate payload with Zod (async, after reply sent) ──────────
 
100
  const signature = request.headers['x-hub-signature-256'] as string;
101
  const rawBody = (request as any).rawBody as Buffer;
102
 
103
+ // Optional: verify webhook signature. Important for production on both HF and Railway.
104
  if (!verifyWebhookSignature(rawBody, signature, appSecret)) {
105
  request.log.warn('[WEBHOOK] Invalid HMAC signature — request rejected');
106
+ // Meta won't retry if we return 401/403 directly for invalid signatures.
107
  return reply.code(403).send({ error: 'Invalid signature' });
108
  }
109
  } else {
 
111
  request.log.warn('[WEBHOOK] WHATSAPP_APP_SECRET not set — skipping signature verification');
112
  }
113
 
114
+ // ── 2. Forward to Railway Internal Worker (if configured as Gateway) ──
115
+ const railwayInternalUrl = process.env.RAILWAY_INTERNAL_URL;
116
+
117
+ if (railwayInternalUrl) {
118
+ request.log.info(`[WEBHOOK] Gateway Mode: Forwarding payload to Railway API (${railwayInternalUrl}).`);
119
+ try {
120
+ // Fire and forget (don't await) to ensure fast 200 response to Meta
121
+ fetch(`${railwayInternalUrl}/v1/internal/whatsapp/inbound`, {
122
+ method: 'POST',
123
+ headers: {
124
+ 'Content-Type': 'application/json',
125
+ 'x-admin-key': process.env.ADMIN_API_KEY || ''
126
+ },
127
+ body: request.body ? JSON.stringify(request.body) : ''
128
+ }).catch(err => {
129
+ request.log.error(`[WEBHOOK] Forward to Railway failed: ${err.message}`);
130
+ });
131
+
132
+ return reply.code(200).send('EVENT_RECEIVED');
133
+ } catch (error: any) {
134
+ request.log.error(`[WEBHOOK] Forward throwing error: ${error?.message}`);
135
+ return reply.code(500).send({ error: 'Gateway forwarding failed' });
136
+ }
137
+ }
138
+
139
+ // ── 3. Return 200 IMMEDIATELY to satisfy Meta's < 20s timeout ───────
140
  reply.code(200).send('EVENT_RECEIVED');
141
 
142
  // ── 3. Validate payload with Zod (async, after reply sent) ──────────
apps/api/src/services/queue.ts CHANGED
@@ -15,14 +15,26 @@ const connection = process.env.REDIS_URL
15
  export const whatsappQueue = new Queue('whatsapp-queue', { connection: connection as any });
16
 
17
  export async function scheduleMessage(userId: string, text: string, delayMs: number = 0) {
 
 
 
 
18
  await whatsappQueue.add('send-message', { userId, text }, { delay: delayMs });
19
  }
20
 
21
  export async function scheduleTrackDay(userId: string, trackId: string, dayNumber: number, delayMs: number = 0) {
 
 
 
 
22
  await whatsappQueue.add('send-content', { userId, trackId, dayNumber }, { delay: delayMs });
23
  }
24
 
25
  export async function enrollUser(userId: string, trackId: string) {
 
 
 
 
26
  await whatsappQueue.add('enroll-user', { userId, trackId });
27
  }
28
 
@@ -32,6 +44,10 @@ export async function scheduleInteractiveButtons(
32
  bodyText: string,
33
  buttons: Array<{ id: string; title: string }>
34
  ) {
 
 
 
 
35
  await whatsappQueue.add('send-interactive-buttons', { userId, bodyText, buttons });
36
  }
37
 
@@ -43,6 +59,10 @@ export async function scheduleInteractiveList(
43
  buttonLabel: string,
44
  sections: Array<{ title: string; rows: Array<{ id: string; title: string; description?: string }> }>
45
  ) {
 
 
 
 
46
  await whatsappQueue.add('send-interactive-list', { userId, headerText, bodyText, buttonLabel, sections });
47
  }
48
 
 
15
  export const whatsappQueue = new Queue('whatsapp-queue', { connection: connection as any });
16
 
17
  export async function scheduleMessage(userId: string, text: string, delayMs: number = 0) {
18
+ if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
19
+ console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-message' for user ${userId}`);
20
+ return;
21
+ }
22
  await whatsappQueue.add('send-message', { userId, text }, { delay: delayMs });
23
  }
24
 
25
  export async function scheduleTrackDay(userId: string, trackId: string, dayNumber: number, delayMs: number = 0) {
26
+ if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
27
+ console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-content' for user ${userId}`);
28
+ return;
29
+ }
30
  await whatsappQueue.add('send-content', { userId, trackId, dayNumber }, { delay: delayMs });
31
  }
32
 
33
  export async function enrollUser(userId: string, trackId: string) {
34
+ if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
35
+ console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'enroll-user' for user ${userId}`);
36
+ return;
37
+ }
38
  await whatsappQueue.add('enroll-user', { userId, trackId });
39
  }
40
 
 
44
  bodyText: string,
45
  buttons: Array<{ id: string; title: string }>
46
  ) {
47
+ if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
48
+ console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-buttons' for user ${userId}`);
49
+ return;
50
+ }
51
  await whatsappQueue.add('send-interactive-buttons', { userId, bodyText, buttons });
52
  }
53
 
 
59
  buttonLabel: string,
60
  sections: Array<{ title: string; rows: Array<{ id: string; title: string; description?: string }> }>
61
  ) {
62
+ if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
63
+ console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-list' for user ${userId}`);
64
+ return;
65
+ }
66
  await whatsappQueue.add('send-interactive-list', { userId, headerText, bodyText, buttonLabel, sections });
67
  }
68
 
apps/whatsapp-worker/src/index.ts CHANGED
@@ -63,7 +63,8 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
63
  if (track.isPremium) {
64
  console.log(`[WORKER] User ${userId} requested Premium Track ${trackId}. Generating Payment Link...`);
65
  try {
66
- const API_URL = process.env.API_URL || 'http://localhost:3001';
 
67
  const apiKey = process.env.ADMIN_API_KEY || '';
68
  const checkoutRes = await fetch(`${API_URL}/v1/payments/checkout`, {
69
  method: 'POST',
@@ -125,7 +126,8 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
125
  const { buffer } = await downloadMedia(mediaId, accessToken);
126
 
127
  // Store audio on R2 via the API
128
- const API_URL_STORE = process.env.API_URL || 'http://localhost:3001';
 
129
  const apiKeyStore = process.env.ADMIN_API_KEY || '';
130
  try {
131
  const storeRes = await fetch(`${API_URL_STORE}/v1/ai/store-audio`, {
@@ -142,7 +144,8 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
142
  }
143
 
144
  // Transcribe with Whisper via API
145
- const API_URL = process.env.API_URL || 'http://localhost:3001';
 
146
  const apiKey = process.env.ADMIN_API_KEY || '';
147
  const transcribeRes = await fetch(`${API_URL}/v1/ai/transcribe`, {
148
  method: 'POST',
@@ -174,7 +177,8 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
174
 
175
  // Process the transcribed text as a normal incoming message via API
176
  if (transcribedText) {
177
- const API_URL = process.env.API_URL || 'http://localhost:3001';
 
178
  const apiKey = process.env.ADMIN_API_KEY || '';
179
  await fetch(`${API_URL}/v1/internal/handle-message`, {
180
  method: 'POST',
@@ -218,7 +222,8 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
218
  // Update userContext to explicitly reference the user's sector/activity
219
  const userContext = `User ${userId} completed the Business Pitch track. Their business activity/sector is: ${user?.activity || 'Unknown'}. They want to build a business in this sector using the concepts learned in the track.`;
220
 
221
- const API_URL = process.env.API_URL || 'http://localhost:3001';
 
222
  const apiKey = process.env.ADMIN_API_KEY || '';
223
  const authHeaders = {
224
  'Content-Type': 'application/json',
 
63
  if (track.isPremium) {
64
  console.log(`[WORKER] User ${userId} requested Premium Track ${trackId}. Generating Payment Link...`);
65
  try {
66
+ let API_URL = process.env.API_URL || 'http://localhost:3001';
67
+ if (!API_URL.startsWith('http')) API_URL = `https://${API_URL}`;
68
  const apiKey = process.env.ADMIN_API_KEY || '';
69
  const checkoutRes = await fetch(`${API_URL}/v1/payments/checkout`, {
70
  method: 'POST',
 
126
  const { buffer } = await downloadMedia(mediaId, accessToken);
127
 
128
  // Store audio on R2 via the API
129
+ let API_URL_STORE = process.env.API_URL || 'http://localhost:3001';
130
+ if (!API_URL_STORE.startsWith('http')) API_URL_STORE = `https://${API_URL_STORE}`;
131
  const apiKeyStore = process.env.ADMIN_API_KEY || '';
132
  try {
133
  const storeRes = await fetch(`${API_URL_STORE}/v1/ai/store-audio`, {
 
144
  }
145
 
146
  // Transcribe with Whisper via API
147
+ let API_URL = process.env.API_URL || 'http://localhost:3001';
148
+ if (!API_URL.startsWith('http')) API_URL = `https://${API_URL}`;
149
  const apiKey = process.env.ADMIN_API_KEY || '';
150
  const transcribeRes = await fetch(`${API_URL}/v1/ai/transcribe`, {
151
  method: 'POST',
 
177
 
178
  // Process the transcribed text as a normal incoming message via API
179
  if (transcribedText) {
180
+ let API_URL = process.env.API_URL || 'http://localhost:3001';
181
+ if (!API_URL.startsWith('http')) API_URL = `https://${API_URL}`;
182
  const apiKey = process.env.ADMIN_API_KEY || '';
183
  await fetch(`${API_URL}/v1/internal/handle-message`, {
184
  method: 'POST',
 
222
  // Update userContext to explicitly reference the user's sector/activity
223
  const userContext = `User ${userId} completed the Business Pitch track. Their business activity/sector is: ${user?.activity || 'Unknown'}. They want to build a business in this sector using the concepts learned in the track.`;
224
 
225
+ let API_URL = process.env.API_URL || 'http://localhost:3001';
226
+ if (!API_URL.startsWith('http')) API_URL = `https://${API_URL}`;
227
  const apiKey = process.env.ADMIN_API_KEY || '';
228
  const authHeaders = {
229
  'Content-Type': 'application/json',
apps/whatsapp-worker/src/pedagogy.ts CHANGED
@@ -27,7 +27,8 @@ export async function sendLessonDay(userId: string, trackId: string, dayNumber:
27
  if (user.activity && lessonText) {
28
  try {
29
  console.log(`[PEDAGOGY] Personalizing lesson for User ${userId}'s activity: ${user.activity}`);
30
- const API_URL = process.env.API_URL || 'http://localhost:3001';
 
31
  const apiKey = process.env.ADMIN_API_KEY || '';
32
  const personalizeRes = await fetch(`${API_URL}/v1/ai/personalize-lesson`, {
33
  method: 'POST',
@@ -55,7 +56,8 @@ export async function sendLessonDay(userId: string, trackId: string, dayNumber:
55
  if (!finalAudioUrl && lessonText) {
56
  try {
57
  console.log(`[PEDAGOGY] Generating TTS Audio for User ${userId}...`);
58
- const API_URL = process.env.API_URL || 'http://localhost:3001';
 
59
  const apiKey = process.env.ADMIN_API_KEY || '';
60
  const ttsRes = await fetch(`${API_URL}/v1/ai/tts`, {
61
  method: 'POST',
 
27
  if (user.activity && lessonText) {
28
  try {
29
  console.log(`[PEDAGOGY] Personalizing lesson for User ${userId}'s activity: ${user.activity}`);
30
+ let API_URL = process.env.API_URL || 'http://localhost:3001';
31
+ if (!API_URL.startsWith('http')) API_URL = `https://${API_URL}`;
32
  const apiKey = process.env.ADMIN_API_KEY || '';
33
  const personalizeRes = await fetch(`${API_URL}/v1/ai/personalize-lesson`, {
34
  method: 'POST',
 
56
  if (!finalAudioUrl && lessonText) {
57
  try {
58
  console.log(`[PEDAGOGY] Generating TTS Audio for User ${userId}...`);
59
+ let API_URL = process.env.API_URL || 'http://localhost:3001';
60
+ if (!API_URL.startsWith('http')) API_URL = `https://${API_URL}`;
61
  const apiKey = process.env.ADMIN_API_KEY || '';
62
  const ttsRes = await fetch(`${API_URL}/v1/ai/tts`, {
63
  method: 'POST',