File size: 10,353 Bytes
9cc0a90 df0edd7 0ee3f5e df0edd7 9cc0a90 df0edd7 818c5f2 df0edd7 0ee3f5e df0edd7 9cc0a90 0ee3f5e 9cc0a90 abc4e24 0ee3f5e 9cc0a90 0ee3f5e 70901bb abc4e24 70901bb 9cc0a90 0ee3f5e 70901bb 31e6d9a 0ee3f5e 70901bb d9879cf 31e6d9a 9cc0a90 5443165 9cc0a90 df0edd7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 | import { FastifyInstance } from 'fastify';
import { WhatsAppService } from '../services/whatsapp';
import { z } from 'zod';
// βββ Zod Schema for WhatsApp Webhook Payload βββββββββββββββββββββββββββββββββ
const WhatsAppMessageSchema = z.object({
from: z.string(),
id: z.string(),
timestamp: z.string(),
type: z.enum(['text', 'audio', 'image', 'video', 'document', 'sticker', 'reaction', 'interactive']),
text: z.object({ body: z.string() }).optional(),
audio: z.object({ id: z.string(), mime_type: z.string().optional() }).optional(),
image: z.object({ id: z.string(), caption: z.string().optional() }).optional(),
interactive: z.object({
type: z.enum(['button_reply', 'list_reply']),
button_reply: z.object({
id: z.string(),
title: z.string(),
}).optional(),
list_reply: z.object({
id: z.string(),
title: z.string(),
description: z.string().optional()
}).optional(),
}).optional()
});
const WebhookPayloadSchema = z.object({
object: z.literal('whatsapp_business_account'),
entry: z.array(z.object({
id: z.string(),
changes: z.array(z.object({
value: z.object({
messaging_product: z.string().optional(),
metadata: z.object({ phone_number_id: z.string() }).optional(),
contacts: z.array(z.any()).optional(),
messages: z.array(WhatsAppMessageSchema).optional(),
statuses: z.array(z.any()).optional(),
}),
field: z.string(),
})),
})),
});
/**
* Internal-only routes β protected by ADMIN_API_KEY, not exposed publicly.
* Used by the Railway worker to call handleIncomingMessage after audio transcription.
*/
export async function internalRoutes(fastify: FastifyInstance) {
// ββ Handle Webhook Forwarding from Gateway (HF -> Railway) βββββββββββββββ
fastify.post('/v1/internal/whatsapp/inbound', {
config: { requireAuth: true } as any
}, async (request, reply) => {
// We received the raw webhook payload that was forwarded.
// Send a 200 immediately to release HF Gateway
reply.code(200).send({ ok: true });
// Process message parsing outside the request loop
setImmediate(async () => {
try {
const parsed = WebhookPayloadSchema.safeParse(request.body);
if (!parsed.success) {
fastify.log.warn(`[INTERNAL-WEBHOOK] Invalid payload schema: ${JSON.stringify(parsed.error.flatten())}`);
return;
}
const payload = parsed.data;
for (const entry of payload.entry) {
for (const change of entry.changes) {
const messages = change.value.messages ?? [];
for (const message of messages) {
const phone = message.from;
let text = '';
if (message.type === 'text' && message.text) {
text = message.text.body;
} else if (message.type === 'interactive' && message.interactive) {
if (message.interactive.type === 'button_reply' && message.interactive.button_reply) {
text = message.interactive.button_reply.id;
fastify.log.info(`[INTERNAL-WEBHOOK] Button reply: ${text}`);
} else if (message.interactive.type === 'list_reply' && message.interactive.list_reply) {
text = message.interactive.list_reply.id;
fastify.log.info(`[INTERNAL-WEBHOOK] List reply: ${text}`);
}
} else if (message.type === 'audio' && message.audio) {
// βββ Audio inbound: delegate download to Railway worker ββββββββββββ
const accessToken = process.env.WHATSAPP_ACCESS_TOKEN || '';
const { Queue } = await import('bullmq');
const Redis = (await import('ioredis')).default;
const conn = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
: new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null });
const q = new Queue('whatsapp-queue', { connection: conn as any });
await q.add('download-media', {
mediaId: message.audio.id,
mimeType: message.audio.mime_type || 'audio/ogg',
phone,
...(accessToken ? { accessToken } : {})
}, { priority: 1, attempts: 3, backoff: { type: 'exponential', delay: 2000 } });
await q.add('send-message-direct', {
phone,
text: "β³ J'analyse ton audio..."
});
fastify.log.info(`[INTERNAL-WEBHOOK] Audio ${message.audio.id} enqueued for Railway download`);
continue;
} else if (message.type === 'image' && message.image) {
// βββ Image inbound: delegate download to Railway worker ββββββββββββ
const accessToken = process.env.WHATSAPP_ACCESS_TOKEN || '';
const { Queue } = await import('bullmq');
const Redis = (await import('ioredis')).default;
const conn = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
: new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null });
const q = new Queue('whatsapp-queue', { connection: conn as any });
fastify.log.info(`[INTERNAL-WEBHOOK] Image ${message.image.id} detected. Enqueuing download.`);
await q.add('download-media', {
mediaId: message.image.id,
mimeType: 'image/jpeg',
phone,
caption: message.image.caption || undefined,
...(accessToken ? { accessToken } : {})
}, { priority: 1, attempts: 3, backoff: { type: 'exponential', delay: 2000 } });
await q.add('send-message-direct', {
phone,
text: "β³ J'analyse ton image..."
});
fastify.log.info(`[INTERNAL-WEBHOOK] Image ${message.image.id} enqueued for Railway download`);
continue;
}
if (phone && text) {
await WhatsAppService.handleIncomingMessage(phone, text);
}
}
}
}
} catch (error) {
fastify.log.error(`[INTERNAL-WEBHOOK] Async processing error: ${String(error)}`);
}
});
});
// ββ Handle standard transcribed messages from worker (Railway) βββββββββββ
fastify.post<{
Body: { phone: string; text: string; audioUrl?: string; imageUrl?: string }
}>('/v1/internal/handle-message', {
config: { requireAuth: true } as any,
schema: {
body: {
type: 'object',
required: ['phone', 'text'],
properties: {
phone: { type: 'string' },
text: { type: 'string' },
audioUrl: { type: 'string' },
imageUrl: { type: 'string' }
}
}
}
}, async (request, reply) => {
const { phone, text, audioUrl, imageUrl } = request.body;
const traceId = `[INTERNAL-TX-${phone.slice(-4)}]`;
if (!phone || text === undefined) {
request.log.warn(`${traceId} Missing phone or text in handle-message request`);
return reply.code(400).send({ error: 'phone and text are required' });
}
request.log.info(`${traceId} Received message text: "${text.substring(0, 100)}..." (Image: ${!!imageUrl})`);
// Fire and await - ensuring the worker knows if it failed
try {
await WhatsAppService.handleIncomingMessage(phone, text, audioUrl, imageUrl);
request.log.info(`${traceId} Successfully processed message`);
} catch (err: unknown) {
request.log.error(`${traceId} handleIncomingMessage error: ${(err instanceof Error ? (err instanceof Error ? err.message : String(err)) : String(err))}`);
return reply.code(500).send({ error: (err instanceof Error ? (err instanceof Error ? err.message : String(err)) : String(err)) });
}
return reply.send({ ok: true });
});
// ββ Simple Ping for Token/Connectivity Verification ββββββββββββββββββββββ
fastify.get('/v1/internal/ping', {
config: { requireAuth: true } as any
}, async () => {
return { ok: true, message: 'Pong! Railway Internal API is reachable and authorized.', timestamp: new Date().toISOString() };
});
}
|