import { FastifyInstance } from 'fastify'; import { WhatsAppService } from '../services/whatsapp'; import { z } from 'zod'; // ─── Zod Schema for WhatsApp Webhook Payload ───────────────────────────────── const WhatsAppMessageSchema = z.object({ from: z.string(), id: z.string(), timestamp: z.string(), type: z.enum(['text', 'audio', 'image', 'video', 'document', 'sticker', 'reaction', 'interactive']), text: z.object({ body: z.string() }).optional(), audio: z.object({ id: z.string(), mime_type: z.string().optional() }).optional(), image: z.object({ id: z.string() }).optional(), interactive: z.object({ type: z.enum(['button_reply', 'list_reply']), button_reply: z.object({ id: z.string(), title: z.string(), }).optional(), list_reply: z.object({ id: z.string(), title: z.string(), description: z.string().optional() }).optional(), }).optional() }); const WebhookPayloadSchema = z.object({ object: z.literal('whatsapp_business_account'), entry: z.array(z.object({ id: z.string(), changes: z.array(z.object({ value: z.object({ messaging_product: z.string().optional(), metadata: z.object({ phone_number_id: z.string() }).optional(), contacts: z.array(z.any()).optional(), messages: z.array(WhatsAppMessageSchema).optional(), statuses: z.array(z.any()).optional(), }), field: z.string(), })), })), }); /** * Internal-only routes — protected by ADMIN_API_KEY, not exposed publicly. * Used by the Railway worker to call handleIncomingMessage after audio transcription. */ export async function internalRoutes(fastify: FastifyInstance) { // ── Handle Webhook Forwarding from Gateway (HF -> Railway) ─────────────── fastify.post('/v1/internal/whatsapp/inbound', { config: { requireAuth: true } as any }, async (request, reply) => { // We received the raw webhook payload that was forwarded. // Send a 200 immediately to release HF Gateway reply.code(200).send({ ok: true }); // Process message parsing outside the request loop setImmediate(async () => { try { const parsed = WebhookPayloadSchema.safeParse(request.body); if (!parsed.success) { fastify.log.warn(`[INTERNAL-WEBHOOK] Invalid payload schema: ${JSON.stringify(parsed.error.flatten())}`); return; } const payload = parsed.data; for (const entry of payload.entry) { for (const change of entry.changes) { const messages = change.value.messages ?? []; for (const message of messages) { const phone = message.from; let text = ''; if (message.type === 'text' && message.text) { text = message.text.body; } else if (message.type === 'interactive' && message.interactive) { if (message.interactive.type === 'button_reply' && message.interactive.button_reply) { text = message.interactive.button_reply.id; fastify.log.info(`[INTERNAL-WEBHOOK] Button reply: ${text}`); } else if (message.interactive.type === 'list_reply' && message.interactive.list_reply) { text = message.interactive.list_reply.id; fastify.log.info(`[INTERNAL-WEBHOOK] List reply: ${text}`); } } else if (message.type === 'audio' && message.audio) { // ─── Audio inbound: delegate download to Railway worker ──────────── const accessToken = process.env.WHATSAPP_ACCESS_TOKEN || ''; const { Queue } = await import('bullmq'); const Redis = (await import('ioredis')).default; const conn = process.env.REDIS_URL ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null }) : new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null }); const q = new Queue('whatsapp-queue', { connection: conn as any }); await q.add('download-media', { mediaId: message.audio.id, mimeType: message.audio.mime_type || 'audio/ogg', phone, ...(accessToken ? { accessToken } : {}) }, { priority: 1, attempts: 3, backoff: { type: 'exponential', delay: 2000 } }); await q.add('send-message-direct', { phone, text: "⏳ J'analyse ton audio..." }); fastify.log.info(`[INTERNAL-WEBHOOK] Audio ${message.audio.id} enqueued for Railway download`); continue; } if (phone && text) { await WhatsAppService.handleIncomingMessage(phone, text); } } } } } catch (error) { fastify.log.error(`[INTERNAL-WEBHOOK] Async processing error: ${String(error)}`); } }); }); // ── Handle standard transcribed messages from worker (Railway) ─────────── fastify.post<{ Body: { phone: string; text: string; audioUrl?: string } }>('/v1/internal/handle-message', { config: { requireAuth: true } as any, schema: { body: { type: 'object', required: ['phone', 'text'], properties: { phone: { type: 'string' }, text: { type: 'string' }, audioUrl: { type: 'string' } } } } }, async (request, reply) => { const { phone, text, audioUrl } = request.body; const traceId = `[INTERNAL-TX-${phone.slice(-4)}]`; if (!phone || text === undefined) { request.log.warn(`${traceId} Missing phone or text in handle-message request`); return reply.code(400).send({ error: 'phone and text are required' }); } request.log.info(`${traceId} Received transcribed text: "${text.substring(0, 100)}..."`); // Fire and await - ensuring the worker knows if it failed try { await WhatsAppService.handleIncomingMessage(phone, text, audioUrl); request.log.info(`${traceId} Successfully processed message`); } catch (err: unknown) { request.log.error(`${traceId} handleIncomingMessage error: ${(err instanceof Error ? (err instanceof Error ? err.message : String(err)) : String(err))}`); return reply.code(500).send({ error: (err instanceof Error ? (err instanceof Error ? err.message : String(err)) : String(err)) }); } return reply.send({ ok: true }); }); // ── Simple Ping for Token/Connectivity Verification ────────────────────── fastify.get('/v1/internal/ping', { config: { requireAuth: true } as any }, async () => { return { ok: true, message: 'Pong! Railway Internal API is reachable and authorized.', timestamp: new Date().toISOString() }; }); }