edtech / apps /api /src /routes /internal.ts
CognxSafeTrack
chore: execute Sprint 38 technical debt resolution (Type Safety, Zod validation, Vitest, Mock LLM extracted)
d9879cf
import { FastifyInstance } from 'fastify';
import { WhatsAppService } from '../services/whatsapp';
import { z } from 'zod';
// ─── Zod Schema for WhatsApp Webhook Payload ─────────────────────────────────
const WhatsAppMessageSchema = z.object({
from: z.string(),
id: z.string(),
timestamp: z.string(),
type: z.enum(['text', 'audio', 'image', 'video', 'document', 'sticker', 'reaction', 'interactive']),
text: z.object({ body: z.string() }).optional(),
audio: z.object({ id: z.string(), mime_type: z.string().optional() }).optional(),
image: z.object({ id: z.string() }).optional(),
interactive: z.object({
type: z.enum(['button_reply', 'list_reply']),
button_reply: z.object({
id: z.string(),
title: z.string(),
}).optional(),
list_reply: z.object({
id: z.string(),
title: z.string(),
description: z.string().optional()
}).optional(),
}).optional()
});
const WebhookPayloadSchema = z.object({
object: z.literal('whatsapp_business_account'),
entry: z.array(z.object({
id: z.string(),
changes: z.array(z.object({
value: z.object({
messaging_product: z.string().optional(),
metadata: z.object({ phone_number_id: z.string() }).optional(),
contacts: z.array(z.any()).optional(),
messages: z.array(WhatsAppMessageSchema).optional(),
statuses: z.array(z.any()).optional(),
}),
field: z.string(),
})),
})),
});
/**
* Internal-only routes β€” protected by ADMIN_API_KEY, not exposed publicly.
* Used by the Railway worker to call handleIncomingMessage after audio transcription.
*/
export async function internalRoutes(fastify: FastifyInstance) {
// ── Handle Webhook Forwarding from Gateway (HF -> Railway) ───────────────
fastify.post('/v1/internal/whatsapp/inbound', {
config: { requireAuth: true } as any
}, async (request, reply) => {
// We received the raw webhook payload that was forwarded.
// Send a 200 immediately to release HF Gateway
reply.code(200).send({ ok: true });
// Process message parsing outside the request loop
setImmediate(async () => {
try {
const parsed = WebhookPayloadSchema.safeParse(request.body);
if (!parsed.success) {
fastify.log.warn(`[INTERNAL-WEBHOOK] Invalid payload schema: ${JSON.stringify(parsed.error.flatten())}`);
return;
}
const payload = parsed.data;
for (const entry of payload.entry) {
for (const change of entry.changes) {
const messages = change.value.messages ?? [];
for (const message of messages) {
const phone = message.from;
let text = '';
if (message.type === 'text' && message.text) {
text = message.text.body;
} else if (message.type === 'interactive' && message.interactive) {
if (message.interactive.type === 'button_reply' && message.interactive.button_reply) {
text = message.interactive.button_reply.id;
fastify.log.info(`[INTERNAL-WEBHOOK] Button reply: ${text}`);
} else if (message.interactive.type === 'list_reply' && message.interactive.list_reply) {
text = message.interactive.list_reply.id;
fastify.log.info(`[INTERNAL-WEBHOOK] List reply: ${text}`);
}
} else if (message.type === 'audio' && message.audio) {
// ─── Audio inbound: delegate download to Railway worker ────────────
const accessToken = process.env.WHATSAPP_ACCESS_TOKEN || '';
const { Queue } = await import('bullmq');
const Redis = (await import('ioredis')).default;
const conn = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
: new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null });
const q = new Queue('whatsapp-queue', { connection: conn as any });
await q.add('download-media', {
mediaId: message.audio.id,
mimeType: message.audio.mime_type || 'audio/ogg',
phone,
...(accessToken ? { accessToken } : {})
}, { priority: 1, attempts: 3, backoff: { type: 'exponential', delay: 2000 } });
await q.add('send-message-direct', {
phone,
text: "⏳ J'analyse ton audio..."
});
fastify.log.info(`[INTERNAL-WEBHOOK] Audio ${message.audio.id} enqueued for Railway download`);
continue;
}
if (phone && text) {
await WhatsAppService.handleIncomingMessage(phone, text);
}
}
}
}
} catch (error) {
fastify.log.error(`[INTERNAL-WEBHOOK] Async processing error: ${String(error)}`);
}
});
});
// ── Handle standard transcribed messages from worker (Railway) ───────────
fastify.post<{
Body: { phone: string; text: string; audioUrl?: string }
}>('/v1/internal/handle-message', {
config: { requireAuth: true } as any,
schema: {
body: {
type: 'object',
required: ['phone', 'text'],
properties: {
phone: { type: 'string' },
text: { type: 'string' },
audioUrl: { type: 'string' }
}
}
}
}, async (request, reply) => {
const { phone, text, audioUrl } = request.body;
const traceId = `[INTERNAL-TX-${phone.slice(-4)}]`;
if (!phone || text === undefined) {
request.log.warn(`${traceId} Missing phone or text in handle-message request`);
return reply.code(400).send({ error: 'phone and text are required' });
}
request.log.info(`${traceId} Received transcribed text: "${text.substring(0, 100)}..."`);
// Fire and await - ensuring the worker knows if it failed
try {
await WhatsAppService.handleIncomingMessage(phone, text, audioUrl);
request.log.info(`${traceId} Successfully processed message`);
} catch (err: unknown) {
request.log.error(`${traceId} handleIncomingMessage error: ${(err instanceof Error ? (err instanceof Error ? err.message : String(err)) : String(err))}`);
return reply.code(500).send({ error: (err instanceof Error ? (err instanceof Error ? err.message : String(err)) : String(err)) });
}
return reply.send({ ok: true });
});
// ── Simple Ping for Token/Connectivity Verification ──────────────────────
fastify.get('/v1/internal/ping', {
config: { requireAuth: true } as any
}, async () => {
return { ok: true, message: 'Pong! Railway Internal API is reachable and authorized.', timestamp: new Date().toISOString() };
});
}