CognxSafeTrack
chore: execute Sprint 38 technical debt resolution (Type Safety, Zod validation, Vitest, Mock LLM extracted)
d9879cf | import { FastifyInstance } from 'fastify'; | |
| import { WhatsAppService } from '../services/whatsapp'; | |
| import { z } from 'zod'; | |
| // βββ Zod Schema for WhatsApp Webhook Payload βββββββββββββββββββββββββββββββββ | |
| const WhatsAppMessageSchema = z.object({ | |
| from: z.string(), | |
| id: z.string(), | |
| timestamp: z.string(), | |
| type: z.enum(['text', 'audio', 'image', 'video', 'document', 'sticker', 'reaction', 'interactive']), | |
| text: z.object({ body: z.string() }).optional(), | |
| audio: z.object({ id: z.string(), mime_type: z.string().optional() }).optional(), | |
| image: z.object({ id: z.string() }).optional(), | |
| interactive: z.object({ | |
| type: z.enum(['button_reply', 'list_reply']), | |
| button_reply: z.object({ | |
| id: z.string(), | |
| title: z.string(), | |
| }).optional(), | |
| list_reply: z.object({ | |
| id: z.string(), | |
| title: z.string(), | |
| description: z.string().optional() | |
| }).optional(), | |
| }).optional() | |
| }); | |
| const WebhookPayloadSchema = z.object({ | |
| object: z.literal('whatsapp_business_account'), | |
| entry: z.array(z.object({ | |
| id: z.string(), | |
| changes: z.array(z.object({ | |
| value: z.object({ | |
| messaging_product: z.string().optional(), | |
| metadata: z.object({ phone_number_id: z.string() }).optional(), | |
| contacts: z.array(z.any()).optional(), | |
| messages: z.array(WhatsAppMessageSchema).optional(), | |
| statuses: z.array(z.any()).optional(), | |
| }), | |
| field: z.string(), | |
| })), | |
| })), | |
| }); | |
| /** | |
| * Internal-only routes β protected by ADMIN_API_KEY, not exposed publicly. | |
| * Used by the Railway worker to call handleIncomingMessage after audio transcription. | |
| */ | |
| export async function internalRoutes(fastify: FastifyInstance) { | |
| // ββ Handle Webhook Forwarding from Gateway (HF -> Railway) βββββββββββββββ | |
| fastify.post('/v1/internal/whatsapp/inbound', { | |
| config: { requireAuth: true } as any | |
| }, async (request, reply) => { | |
| // We received the raw webhook payload that was forwarded. | |
| // Send a 200 immediately to release HF Gateway | |
| reply.code(200).send({ ok: true }); | |
| // Process message parsing outside the request loop | |
| setImmediate(async () => { | |
| try { | |
| const parsed = WebhookPayloadSchema.safeParse(request.body); | |
| if (!parsed.success) { | |
| fastify.log.warn(`[INTERNAL-WEBHOOK] Invalid payload schema: ${JSON.stringify(parsed.error.flatten())}`); | |
| return; | |
| } | |
| const payload = parsed.data; | |
| for (const entry of payload.entry) { | |
| for (const change of entry.changes) { | |
| const messages = change.value.messages ?? []; | |
| for (const message of messages) { | |
| const phone = message.from; | |
| let text = ''; | |
| if (message.type === 'text' && message.text) { | |
| text = message.text.body; | |
| } else if (message.type === 'interactive' && message.interactive) { | |
| if (message.interactive.type === 'button_reply' && message.interactive.button_reply) { | |
| text = message.interactive.button_reply.id; | |
| fastify.log.info(`[INTERNAL-WEBHOOK] Button reply: ${text}`); | |
| } else if (message.interactive.type === 'list_reply' && message.interactive.list_reply) { | |
| text = message.interactive.list_reply.id; | |
| fastify.log.info(`[INTERNAL-WEBHOOK] List reply: ${text}`); | |
| } | |
| } else if (message.type === 'audio' && message.audio) { | |
| // βββ Audio inbound: delegate download to Railway worker ββββββββββββ | |
| const accessToken = process.env.WHATSAPP_ACCESS_TOKEN || ''; | |
| const { Queue } = await import('bullmq'); | |
| const Redis = (await import('ioredis')).default; | |
| const conn = process.env.REDIS_URL | |
| ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null }) | |
| : new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null }); | |
| const q = new Queue('whatsapp-queue', { connection: conn as any }); | |
| await q.add('download-media', { | |
| mediaId: message.audio.id, | |
| mimeType: message.audio.mime_type || 'audio/ogg', | |
| phone, | |
| ...(accessToken ? { accessToken } : {}) | |
| }, { priority: 1, attempts: 3, backoff: { type: 'exponential', delay: 2000 } }); | |
| await q.add('send-message-direct', { | |
| phone, | |
| text: "β³ J'analyse ton audio..." | |
| }); | |
| fastify.log.info(`[INTERNAL-WEBHOOK] Audio ${message.audio.id} enqueued for Railway download`); | |
| continue; | |
| } | |
| if (phone && text) { | |
| await WhatsAppService.handleIncomingMessage(phone, text); | |
| } | |
| } | |
| } | |
| } | |
| } catch (error) { | |
| fastify.log.error(`[INTERNAL-WEBHOOK] Async processing error: ${String(error)}`); | |
| } | |
| }); | |
| }); | |
| // ββ Handle standard transcribed messages from worker (Railway) βββββββββββ | |
| fastify.post<{ | |
| Body: { phone: string; text: string; audioUrl?: string } | |
| }>('/v1/internal/handle-message', { | |
| config: { requireAuth: true } as any, | |
| schema: { | |
| body: { | |
| type: 'object', | |
| required: ['phone', 'text'], | |
| properties: { | |
| phone: { type: 'string' }, | |
| text: { type: 'string' }, | |
| audioUrl: { type: 'string' } | |
| } | |
| } | |
| } | |
| }, async (request, reply) => { | |
| const { phone, text, audioUrl } = request.body; | |
| const traceId = `[INTERNAL-TX-${phone.slice(-4)}]`; | |
| if (!phone || text === undefined) { | |
| request.log.warn(`${traceId} Missing phone or text in handle-message request`); | |
| return reply.code(400).send({ error: 'phone and text are required' }); | |
| } | |
| request.log.info(`${traceId} Received transcribed text: "${text.substring(0, 100)}..."`); | |
| // Fire and await - ensuring the worker knows if it failed | |
| try { | |
| await WhatsAppService.handleIncomingMessage(phone, text, audioUrl); | |
| request.log.info(`${traceId} Successfully processed message`); | |
| } catch (err: unknown) { | |
| request.log.error(`${traceId} handleIncomingMessage error: ${(err instanceof Error ? (err instanceof Error ? err.message : String(err)) : String(err))}`); | |
| return reply.code(500).send({ error: (err instanceof Error ? (err instanceof Error ? err.message : String(err)) : String(err)) }); | |
| } | |
| return reply.send({ ok: true }); | |
| }); | |
| // ββ Simple Ping for Token/Connectivity Verification ββββββββββββββββββββββ | |
| fastify.get('/v1/internal/ping', { | |
| config: { requireAuth: true } as any | |
| }, async () => { | |
| return { ok: true, message: 'Pong! Railway Internal API is reachable and authorized.', timestamp: new Date().toISOString() }; | |
| }); | |
| } | |