CognxSafeTrack commited on
Commit
0ee3f5e
Β·
1 Parent(s): 184eeee

fix: restore image forwarding and result propagation in internal routes

Browse files
Files changed (1) hide show
  1. apps/api/src/routes/internal.ts +34 -6
apps/api/src/routes/internal.ts CHANGED
@@ -9,7 +9,7 @@ const WhatsAppMessageSchema = z.object({
9
  type: z.enum(['text', 'audio', 'image', 'video', 'document', 'sticker', 'reaction', 'interactive']),
10
  text: z.object({ body: z.string() }).optional(),
11
  audio: z.object({ id: z.string(), mime_type: z.string().optional() }).optional(),
12
- image: z.object({ id: z.string() }).optional(),
13
  interactive: z.object({
14
  type: z.enum(['button_reply', 'list_reply']),
15
  button_reply: z.object({
@@ -110,6 +110,33 @@ export async function internalRoutes(fastify: FastifyInstance) {
110
 
111
  fastify.log.info(`[INTERNAL-WEBHOOK] Audio ${message.audio.id} enqueued for Railway download`);
112
  continue;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
  }
114
 
115
  if (phone && text) {
@@ -126,7 +153,7 @@ export async function internalRoutes(fastify: FastifyInstance) {
126
 
127
  // ── Handle standard transcribed messages from worker (Railway) ───────────
128
  fastify.post<{
129
- Body: { phone: string; text: string; audioUrl?: string }
130
  }>('/v1/internal/handle-message', {
131
  config: { requireAuth: true } as any,
132
  schema: {
@@ -136,12 +163,13 @@ export async function internalRoutes(fastify: FastifyInstance) {
136
  properties: {
137
  phone: { type: 'string' },
138
  text: { type: 'string' },
139
- audioUrl: { type: 'string' }
 
140
  }
141
  }
142
  }
143
  }, async (request, reply) => {
144
- const { phone, text, audioUrl } = request.body;
145
  const traceId = `[INTERNAL-TX-${phone.slice(-4)}]`;
146
 
147
  if (!phone || text === undefined) {
@@ -149,11 +177,11 @@ export async function internalRoutes(fastify: FastifyInstance) {
149
  return reply.code(400).send({ error: 'phone and text are required' });
150
  }
151
 
152
- request.log.info(`${traceId} Received transcribed text: "${text.substring(0, 100)}..."`);
153
 
154
  // Fire and await - ensuring the worker knows if it failed
155
  try {
156
- await WhatsAppService.handleIncomingMessage(phone, text, audioUrl);
157
  request.log.info(`${traceId} Successfully processed message`);
158
  } catch (err: unknown) {
159
  request.log.error(`${traceId} handleIncomingMessage error: ${(err instanceof Error ? (err instanceof Error ? err.message : String(err)) : String(err))}`);
 
9
  type: z.enum(['text', 'audio', 'image', 'video', 'document', 'sticker', 'reaction', 'interactive']),
10
  text: z.object({ body: z.string() }).optional(),
11
  audio: z.object({ id: z.string(), mime_type: z.string().optional() }).optional(),
12
+ image: z.object({ id: z.string(), caption: z.string().optional() }).optional(),
13
  interactive: z.object({
14
  type: z.enum(['button_reply', 'list_reply']),
15
  button_reply: z.object({
 
110
 
111
  fastify.log.info(`[INTERNAL-WEBHOOK] Audio ${message.audio.id} enqueued for Railway download`);
112
  continue;
113
+ } else if (message.type === 'image' && message.image) {
114
+ // ─── Image inbound: delegate download to Railway worker ────────────
115
+ const accessToken = process.env.WHATSAPP_ACCESS_TOKEN || '';
116
+ const { Queue } = await import('bullmq');
117
+ const Redis = (await import('ioredis')).default;
118
+ const conn = process.env.REDIS_URL
119
+ ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
120
+ : new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null });
121
+ const q = new Queue('whatsapp-queue', { connection: conn as any });
122
+
123
+ fastify.log.info(`[INTERNAL-WEBHOOK] Image ${message.image.id} detected. Enqueuing download.`);
124
+
125
+ await q.add('download-media', {
126
+ mediaId: message.image.id,
127
+ mimeType: 'image/jpeg',
128
+ phone,
129
+ caption: message.image.caption || undefined,
130
+ ...(accessToken ? { accessToken } : {})
131
+ }, { priority: 1, attempts: 3, backoff: { type: 'exponential', delay: 2000 } });
132
+
133
+ await q.add('send-message-direct', {
134
+ phone,
135
+ text: "⏳ J'analyse ton image..."
136
+ });
137
+
138
+ fastify.log.info(`[INTERNAL-WEBHOOK] Image ${message.image.id} enqueued for Railway download`);
139
+ continue;
140
  }
141
 
142
  if (phone && text) {
 
153
 
154
  // ── Handle standard transcribed messages from worker (Railway) ───────────
155
  fastify.post<{
156
+ Body: { phone: string; text: string; audioUrl?: string; imageUrl?: string }
157
  }>('/v1/internal/handle-message', {
158
  config: { requireAuth: true } as any,
159
  schema: {
 
163
  properties: {
164
  phone: { type: 'string' },
165
  text: { type: 'string' },
166
+ audioUrl: { type: 'string' },
167
+ imageUrl: { type: 'string' }
168
  }
169
  }
170
  }
171
  }, async (request, reply) => {
172
+ const { phone, text, audioUrl, imageUrl } = request.body;
173
  const traceId = `[INTERNAL-TX-${phone.slice(-4)}]`;
174
 
175
  if (!phone || text === undefined) {
 
177
  return reply.code(400).send({ error: 'phone and text are required' });
178
  }
179
 
180
+ request.log.info(`${traceId} Received message text: "${text.substring(0, 100)}..." (Image: ${!!imageUrl})`);
181
 
182
  // Fire and await - ensuring the worker knows if it failed
183
  try {
184
+ await WhatsAppService.handleIncomingMessage(phone, text, audioUrl, imageUrl);
185
  request.log.info(`${traceId} Successfully processed message`);
186
  } catch (err: unknown) {
187
  request.log.error(`${traceId} handleIncomingMessage error: ${(err instanceof Error ? (err instanceof Error ? err.message : String(err)) : String(err))}`);