CognxSafeTrack
fix: dry-run patches — guard enrollment.updateMany on replay + contextual TT CTA after validated replay
0c6a86e
import dns from 'node:dns';
dns.setDefaultResultOrder('ipv4first');
import { Worker, Job, Queue } from 'bullmq';
import dotenv from 'dotenv';
import { PrismaClient } from '@repo/database';
import { sendTextMessage, sendDocumentMessage, downloadMedia, sendInteractiveButtonMessage, sendInteractiveListMessage, sendImageMessage } from './whatsapp-cloud';
import { sendLessonDay } from './pedagogy';
import { updateBehavioralScore } from './scoring';
import { normalizeWolof } from './normalizeWolof';
import { getApiUrl, getAdminApiKey, validateEnvironment, isFeatureEnabled } from './config';
import { WhatsAppLogic } from './services/whatsapp-logic';
dotenv.config();
// 🚀 CRITICAL: Validate environment variables at boot
validateEnvironment();
const prisma = new PrismaClient();
import Redis from 'ioredis';
const connection = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
: new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || 'default',
password: process.env.REDIS_PASSWORD || undefined,
tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
maxRetriesPerRequest: null
});
const worker = new Worker('whatsapp-queue', async (job: Job) => {
console.log('Processing job:', job.name, job.id);
try {
if (job.name === 'send-message') {
const { userId, text } = job.data;
const user = await prisma.user.findUnique({ where: { id: userId } });
if (user?.phone) {
await sendTextMessage(user.phone, text);
} else {
console.warn(`[WORKER] User ${userId} not found or missing phone — skipping send.`);
}
}
else if (job.name === 'send-message-direct') {
const { phone, text } = job.data;
await sendTextMessage(phone, text);
}
else if (job.name === 'handle-inbound') {
const { phone, text, audioUrl, imageUrl, messageId } = job.data;
// 🚨 Idempotence Lock for Inbound Messages
if (messageId) {
const lockKey = `lock:inbound:${messageId}`;
const isLocked = await connection.set(lockKey, "1", "EX", 300, "NX");
if (!isLocked) {
console.log(`[WORKER] 🔒 Lock inbound activé : message ${messageId} déjà traité.`);
return;
}
}
await WhatsAppLogic.handleIncomingMessage(phone, text, audioUrl, imageUrl);
}
else if (job.name === 'generate-feedback') {
const { userId, text, trackId, exercisePrompt, lessonText, exerciseCriteria, totalDays, language, userActivity, userRegion, previousResponses, isDeepDive, iterationCount, imageUrl, isButtonChoice, isTimeTravelMode, realCurrentDay } = job.data;
const currentDay = Number(job.data.currentDay || job.data.dayNumber || 0);
const enrollmentId = job.data.enrollmentId;
const user = await prisma.user.findUnique({
where: { id: userId },
include: { businessProfile: true } as any
}) as any;
if (!user?.phone) return;
// ─── 🚀 Idempotence Lock (Progression Bug Fix) ─────────
const Redis = (await import('ioredis')).default;
const redis = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
: new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null });
const textHash = text ? text.substring(0, 10).replace(/[^a-z0-9]/gi, '') : '';
const lockKey = `lock:feedback:${userId}:${currentDay}:${textHash}`;
const isLocked = await redis.set(lockKey, "1", "EX", 300, "NX");
if (!isLocked) {
console.log(`[WORKER] 🔒 Lock activé : ignorer ce job de feedback en double (User ${userId}, Day ${currentDay})`);
return;
}
let feedbackMsg = '';
let feedbackData: any = null;
let trackDay: any = null;
let AI_API_BASE_URL = '';
let apiKey = '';
try {
trackDay = await prisma.trackDay.findFirst({
where: { trackId, dayNumber: currentDay }
});
console.log(`[WORKER] Generating expert feedback for User ${userId}`);
AI_API_BASE_URL = getApiUrl();
apiKey = getAdminApiKey();
console.log(`[PIPELINE] Handing over text to Coach Engine... (User: ${userId}, Day: ${currentDay})`);
const feedbackRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/generate-feedback`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` },
body: JSON.stringify({
answers: text,
lessonText,
exercisePrompt,
userLanguage: language,
businessProfile: user.businessProfile,
exerciseCriteria,
userActivity,
userRegion,
dayNumber: currentDay,
previousResponses,
isDeepDive: isDeepDive || false,
iterationCount: iterationCount || 0,
imageUrl: imageUrl,
isButtonChoice: isButtonChoice || false
})
});
if (feedbackRes.ok) {
feedbackData = await feedbackRes.json();
const callToAction = language === 'WOLOF'
? "\n\nSu nga bëggee yokk leneen ci li nga xam, bindal 1️⃣ APPROFONDIR, wala nga bind 2️⃣ SUITE."
: "\n\nSi tu veux affiner ce point avec une donnée de ton propre terrain, tape 1️⃣ APPROFONDIR, sinon tape 2️⃣ SUITE.";
if (feedbackData.validation && feedbackData.enrichedVersion && feedbackData.actionableAdvice) {
feedbackMsg = `🌟 ${feedbackData.validation}\n\n🚀 ${feedbackData.enrichedVersion}\n\n💡 Conseil de Terrain :\n${feedbackData.actionableAdvice}`;
if (!isDeepDive || (isDeepDive && iterationCount < 3 && !feedbackData.isForcedClosure)) {
feedbackMsg += callToAction;
} else if (feedbackData.isForcedClosure) {
feedbackMsg += (language === 'WOLOF' ? "\n\nBindal 2️⃣ SUITE ngir wéy." : "\n\nTape 2️⃣ SUITE pour continuer.");
}
} else if (feedbackData.text) {
feedbackMsg = feedbackData.text;
if (!isDeepDive || (isDeepDive && iterationCount < 3 && !feedbackData.isForcedClosure)) {
feedbackMsg += callToAction;
} else if (feedbackData.isForcedClosure) {
feedbackMsg += (language === 'WOLOF' ? "\n\nBindal 2️⃣ SUITE ngir wéy." : "\n\nTape 2️⃣ SUITE pour continuer.");
}
} else {
feedbackMsg = '✅ Analyse terminée.';
}
} else if (feedbackRes.status === 429) {
console.warn(`[WORKER] 429 Error during generate-feedback`);
const fallbackMsg = language === 'WOLOF'
? "Jërëjëf ci sa tontu ! (Analyse IA temporairement indisponible)"
: "Merci pour ta réponse ! (Analyse IA de la réponse temporairement indisponible suite à une surcharge, mais ta progression est sauvegardée).";
await sendTextMessage(user.phone, fallbackMsg);
return;
} else {
const errText = await feedbackRes.text();
throw new Error(`generate-feedback failed HTTP ${feedbackRes.status}: ${errText}`);
}
} catch (err: unknown) {
console.error(`[WORKER] generate-feedback failed:`, (err instanceof Error ? err.message : String(err)));
// 🚨 RACE CONDITION: Delete lock on error to allow immediate retry by BullMQ
await redis.del(lockKey);
throw err;
}
if (feedbackMsg) {
// 🌟 Adaptive Pedagogy: Dynamic Remediation & Diagnostic Logic v1.1 🌟
// 🚨 RACE CONDITION FIX: Update UserProgress strictly BEFORE sending the message over WhatsApp.
let nextDay = currentDay + 1;
const currentProgress = await prisma.userProgress.findUnique({
where: { userId_trackId: { userId, trackId } }
});
const currentBadges = ((currentProgress as any)?.badges as string[]) || [];
let updatedBadges = [...currentBadges];
if (feedbackData?.isQualified === false) {
// Check for Adaptive Diagnostic Branching
const diagnosticTrigger = (exerciseCriteria as any)?.diagnostic?.trigger;
const adaptiveModuleId = (exerciseCriteria as any)?.diagnostic?.moduleId;
if (diagnosticTrigger && feedbackData?.missingElements?.includes(diagnosticTrigger) && adaptiveModuleId) {
console.log(`[WORKER] Adaptive Diagnostic triggered for User ${userId}: Re-routing to module ${adaptiveModuleId}`);
// 🚀 Redirect to specific module
nextDay = 1; // Modules start at day 1
await prisma.enrollment.updateMany({
where: { userId, status: 'ACTIVE' },
data: { trackId: adaptiveModuleId, currentDay: 1 }
});
}
const remediationDay = (exerciseCriteria as any)?.remediation?.dayNumber;
if (remediationDay && remediationDay !== currentDay) {
console.log(`[WORKER] Dynamic remediation triggered for User ${userId}: Day ${currentDay} -> ${remediationDay}`);
nextDay = remediationDay;
} else {
console.log(`[WORKER] Exercise not qualified but no remediation day defined. Staying on Day ${currentDay}.`);
nextDay = currentDay;
}
// 🚨 Hardening: If explicitly not qualified, we put the user in PENDING_REMEDIATION
await prisma.userProgress.update({
where: { userId_trackId: { userId, trackId } },
data: {
exerciseStatus: 'PENDING_REMEDIATION', // Stay in remediation until final success
score: { increment: 0 }
} as any
});
// 🚨 Store Strategy Data in BusinessProfile (Guarded by Day 10+)
const hasStrategyData = feedbackData?.searchResults || (feedbackData as any)?.competitorList || (feedbackData as any)?.financialProjections || (feedbackData as any)?.fundingAsk || (feedbackData as any)?.teamMembers;
if (hasStrategyData && currentDay >= 10) {
const updatePayload: any = { lastUpdatedFromDay: currentDay };
if (feedbackData?.searchResults) updatePayload.marketData = feedbackData.searchResults;
if ((feedbackData as any)?.competitorList) updatePayload.competitorList = (feedbackData as any).competitorList;
if ((feedbackData as any)?.financialProjections) updatePayload.financialProjections = (feedbackData as any).financialProjections;
if ((feedbackData as any)?.fundingAsk) updatePayload.fundingAsk = (feedbackData as any).fundingAsk;
if ((feedbackData as any)?.teamMembers && Array.isArray((feedbackData as any).teamMembers)) {
const existingProfile = await (prisma as any).businessProfile.findUnique({ where: { userId } });
const existingTeam = Array.isArray(existingProfile?.teamMembers) ? existingProfile.teamMembers : [];
updatePayload.teamMembers = [...existingTeam, ...(feedbackData as any).teamMembers];
}
try {
await (prisma as any).businessProfile.upsert({
where: { userId },
update: updatePayload,
create: { userId, ...updatePayload }
});
} catch (bpErr: unknown) {
console.error('[WORKER] BusinessProfile upsert failed (non-fatal, REMEDIATION path):', (bpErr as Error).message);
}
}
} else {
// Success! Award Badges & Mark Completed (or keep Pending for Deep Dive)
const trackDayBadges = (trackDay as any)?.badges as string[] || [];
for (const b of trackDayBadges) {
if (!updatedBadges.includes(b)) updatedBadges.push(b);
}
let newStatus = (isDeepDive && feedbackData?.isForcedClosure !== true) ? 'PENDING_DEEPDIVE' : 'COMPLETED';
// 🚨 Card/Button Bypass Logic (Lead Fullstack Developer Requirement)
// Ensure that a button click never completes the lesson, even if the AI validates it.
if (isButtonChoice) {
console.log(`[WORKER] 🛡️ Button choice detected for User ${userId}. Overriding COMPLETED with PENDING.`);
newStatus = 'PENDING';
}
// 🕰️ TIME-TRAVEL GUARD: Skip COMPLETED update when replaying a historical lesson.
// BusinessProfile (One-Pager) IS updated below — only the global exerciseStatus is preserved.
if (isTimeTravelMode) {
console.log(`[TIME-TRAVEL] 🛡️ User ${userId} — Skipping userProgress.update(COMPLETED) for replay Day ${currentDay} (real: ${realCurrentDay}).`);
} else {
// @ts-ignore - Prisma types may be out of sync after schema update
await prisma.userProgress.update({
where: { userId_trackId: { userId, trackId } },
data: {
exerciseStatus: newStatus,
score: { increment: newStatus === 'COMPLETED' ? 1 : 0 },
badges: updatedBadges,
behavioralScoring: updateBehavioralScore(currentProgress ? (currentProgress as any).behavioralScoring : null, (exerciseCriteria as any)?.scoring?.impact_success),
aiSource: feedbackData?.aiSource || 'OPENAI'
} as any
});
}
// 🚨 Store Strategy Data in BusinessProfile (Guarded by Day 10+)
const hasStrategyData = feedbackData?.searchResults || (feedbackData as any)?.competitorList || (feedbackData as any)?.financialProjections || (feedbackData as any)?.fundingAsk || (feedbackData as any)?.teamMembers;
if (hasStrategyData && currentDay >= 10) {
const updatePayload: any = { lastUpdatedFromDay: currentDay };
if (feedbackData?.searchResults) updatePayload.marketData = feedbackData.searchResults;
if ((feedbackData as any)?.competitorList) updatePayload.competitorList = (feedbackData as any).competitorList;
if ((feedbackData as any)?.financialProjections) updatePayload.financialProjections = (feedbackData as any).financialProjections;
if ((feedbackData as any)?.fundingAsk) updatePayload.fundingAsk = (feedbackData as any).fundingAsk;
if ((feedbackData as any)?.teamMembers && Array.isArray((feedbackData as any).teamMembers)) {
const existingProfile = await (prisma as any).businessProfile.findUnique({ where: { userId } });
const existingTeam = Array.isArray(existingProfile?.teamMembers) ? existingProfile.teamMembers : [];
updatePayload.teamMembers = [...existingTeam, ...(feedbackData as any).teamMembers];
}
try {
await (prisma as any).businessProfile.upsert({
where: { userId },
update: updatePayload,
create: { userId, ...updatePayload }
});
} catch (bpErr: unknown) {
console.error('[WORKER] BusinessProfile upsert failed (non-fatal, SUCCESS path):', (bpErr as Error).message);
}
}
// If we were in a remediation day (fractional) -> move to next integer day
if (!isTimeTravelMode && currentDay % 1 !== 0) {
nextDay = Math.floor(currentDay) + 1;
console.log(`[WORKER] Remediation successful for User ${userId}. Moving to Day ${nextDay}.`);
}
// 🕰️ TIME-TRAVEL: Clear the Redis context after a successful replay response
if (isTimeTravelMode) {
try {
const Redis = (await import('ioredis')).default;
const ttRedis = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
: new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null });
const { clearTimeTravelContext } = await import('./timeTravelContext');
await clearTimeTravelContext(userId, ttRedis);
await ttRedis.quit();
} catch (ttErr) {
console.error('[TIME-TRAVEL] Failed to clear context after successful replay:', ttErr);
}
}
} // end success else block
// THEN store the response
// @ts-ignore
await prisma.response.create({
data: {
enrollmentId: enrollmentId,
userId: user.id,
dayNumber: currentDay,
content: feedbackMsg,
aiSource: feedbackData?.aiSource || 'OPENAI'
}
});
// THEN send the WhatsApp message
await sendTextMessage(user.phone, feedbackMsg);
if (isFeatureEnabled('FEATURE_BUSINESS_PROFILE') && currentDay) {
try {
const extractRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/extract-profile`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` },
body: JSON.stringify({
userInput: text,
dayNumber: currentDay,
userLanguage: language
})
});
if (extractRes.ok) {
const { data } = await extractRes.json() as any;
// Clean up undefined/null values
const profileData = Object.fromEntries(Object.entries(data).filter(([_, v]) => v != null && v !== ''));
// 🚨 Sector Locking (Lead AI Engineer Requirement)
const lockedSectors = ["Organisation d'événements / PWA"];
if (lockedSectors.includes(user.activity || "")) {
if (profileData.activityLabel || profileData.activityType) {
console.log(`[WORKER] Sector Locked for User ${userId}. Blocking activity update.`);
delete profileData.activityLabel;
delete profileData.activityType;
}
}
if (Object.keys(profileData).length > 0 || feedbackData?.searchResults) {
console.log(`[WORKER] Updating BusinessProfile for User ${userId}:`, profileData);
const updatePayload: any = {
...profileData,
lastUpdatedFromDay: currentDay
};
if (feedbackData?.searchResults) {
updatePayload.marketData = feedbackData.searchResults;
console.log(`[WORKER] Market Data (Enrichment) added to profile.`);
}
await (prisma as any).businessProfile.upsert({
where: { userId },
update: updatePayload,
create: { userId, ...updatePayload }
});
// 🌟 Visuals WOW: Generate Pitch Card for Day 1 🌟
const finalLabel = (profileData as any).activityLabel || user.activity || "Projet Entrepreneurial";
if (isFeatureEnabled('FEATURE_SHARE_CARD') && currentDay === 1 && finalLabel) {
try {
const { generatePitchCard } = await import('./visuals');
const { uploadFile } = await import('./storage');
const cardBuffer = await generatePitchCard(finalLabel);
const cardUrl = await uploadFile(cardBuffer, 'pitch-card.png', 'image/png');
const caption = language === 'WOLOF'
? "Sa kàrdu business mu neex ! ✨"
: "Ta carte business personnalisée ! ✨";
await sendImageMessage(user.phone, cardUrl, caption);
} catch (vErr: unknown) {
console.error('[WORKER] Pitch Card generation failed:', (vErr as any)?.message);
}
}
}
}
} catch (err: unknown) {
console.error('[WORKER] BusinessProfile extraction failed:', (err instanceof Error ? err.message : String(err)));
}
}
// 🚨 Hardening: We DO NOT update `currentDay` here for normal or remediation flows.
// The `send-content` job will update `enrollment.currentDay` right before sending the lesson!
// This prevents the 'double incrementation' bug (+2 days) when the user types SUITE.
// 🌟 Adaptive Pedagogy: Streak Management 🌟
const lastActivity = user.lastActivityAt ? new Date(user.lastActivityAt) : null;
const today = new Date();
const diffTime = lastActivity ? Math.abs(today.getTime() - lastActivity.getTime()) : Infinity;
const diffDays = Math.ceil(diffTime / (1000 * 60 * 60 * 24));
let newStreak = (user as any).currentStreak || 0;
if (diffDays <= 1) {
newStreak += 1; // Continuous streak
} else if (diffDays > 1) {
newStreak = 1; // Streak broken
}
await prisma.user.update({
where: { id: userId },
data: {
lastActivityAt: new Date(),
currentStreak: newStreak,
longestStreak: Math.max((user as any).longestStreak || 0, newStreak)
} as any
});
if (currentDay >= totalDays && feedbackData?.isQualified !== false) {
await sendTextMessage(user.phone, language === 'WOLOF'
? "🎉 Baraka Allahu fik ! Jeex nga module bi. Dokumaan yi dinañu leen yónnee ci kanam !"
: "🎉 Félicitations ! Vous avez terminé ce module. Vos documents intelligents arrivent bientôt !"
);
const q = new Queue('whatsapp-queue', { connection: connection as any });
await q.add('send-content', { userId, trackId, dayNumber: currentDay + 1 });
} else if (feedbackData?.isQualified === false) {
await sendTextMessage(user.phone, language === 'WOLOF'
? "🚨 Am na lo xamni leerul bu baax. Xoolal missal yi ma la yónnee te tàmbaleet ko ndànk."
: "🚨 Certains points sont encore à renforcer pour valider. Regarde mes conseils et réessaie."
);
// Si on a un jour de remédiation (ex 1.5), on l'envoie automatiquement
if (nextDay !== currentDay && nextDay % 1 !== 0) {
const q = new Queue('whatsapp-queue', { connection: connection as any });
await q.add('send-content', { userId, trackId, dayNumber: nextDay }, { delay: 2000 });
}
} else {
// 🕰️ TIME-TRAVEL MODE: Custom CTA — SUITE would be blocked, so give clear guidance
if (isTimeTravelMode) {
await sendTextMessage(user.phone, language === 'WOLOF'
? `✅ Tànkâr ! Sa tôntu Jour ${currentDay} def na nu ci kos. Dànga dem ci Jour ${realCurrentDay || currentDay} — tôntu ci sén exercice ngir dem ci kanam.`
: `✅ Enregistré ! Ta réponse du Jour ${currentDay} a été sauvegardée. Tu es toujours au Jour ${realCurrentDay || currentDay} — réponds à son exercice pour continuer 📅`
);
} else {
await sendTextMessage(user.phone, language === 'WOLOF'
? "Baax na ! Yónnee *SUITE* ngir dem ci kanam."
: "Bravo ! Envoyez *SUITE* pour passer à la leçon suivante."
);
}
}
}
}
else if (job.name === 'send-nudge') {
const { userId, type } = job.data;
const user = await prisma.user.findUnique({ where: { id: userId } });
if (!user?.phone) return;
const isWolof = user.language === 'WOLOF';
const messages = {
ENCOURAGEMENT: isWolof
? "Assalamuyalaykum ! Fatte wuñu sa mbir. Tontu bu gatt ngir wéy ? 💪"
: "Coucou ! On n'a pas oublié ton projet. Une petite réponse pour continuer ? 💪",
RESURRECTION: isWolof
? "Sa liggeey mu ngi lay xaar ! Am succès dafa laaj lëkkalë. Ñu tàmbaleeti ? 🚀"
: "Ton business t'attend ! Le succès vient de la régularité. On s'y remet ? 🚀"
};
const text = (messages as any)[type] || messages.ENCOURAGEMENT;
await sendTextMessage(user.phone, text);
console.log(`[WORKER] Nudge ${type} sent to ${user.phone}`);
}
else if (job.name === 'send-interactive-buttons') {
const { userId, bodyText, buttons } = job.data;
const user = await prisma.user.findUnique({ where: { id: userId } });
if (user?.phone) {
await sendInteractiveButtonMessage(user.phone, bodyText, buttons);
}
}
else if (job.name === 'send-interactive-list') {
const { userId, headerText, bodyText, buttonLabel, sections } = job.data;
const user = await prisma.user.findUnique({ where: { id: userId } });
if (user?.phone) {
await sendInteractiveListMessage(user.phone, headerText, bodyText, buttonLabel, sections);
}
}
else if (job.name === 'enroll-user') {
const { userId, trackId } = job.data;
const track = await prisma.track.findUnique({ where: { id: trackId } });
if (!track) {
console.error(`[WORKER] Enrollment failed: Track ${trackId} not found.`);
return;
}
if (track.isPremium) {
console.log(`[WORKER] User ${userId} requested Premium Track ${trackId}. Generating Payment Link...`);
try {
const AI_API_BASE_URL = getApiUrl();
const apiKey = getAdminApiKey();
const checkoutRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/payments/checkout`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`
},
body: JSON.stringify({ userId, trackId })
});
const checkoutData = await checkoutRes.json() as any;
if (checkoutRes.ok && checkoutData.url) {
const user = await prisma.user.findUnique({ where: { id: userId } });
if (user?.phone) {
await sendTextMessage(
user.phone,
`💳 Cette formation est Premium. Complétez votre paiement ici :\n${checkoutData.url}`
);
}
} else {
console.error('[WORKER] Failed to get checkout URL', checkoutData);
}
} catch (err) {
console.error('[WORKER] Error calling checkout endpoint', err);
}
} else {
console.log(`[WORKER] Enrolling User ${userId} in Free Track ${trackId}...`);
const existing = await prisma.enrollment.findFirst({ where: { userId, trackId } });
if (!existing) {
await prisma.enrollment.create({
data: { userId, trackId, status: 'ACTIVE', currentDay: 1 }
});
const user = await prisma.user.findUnique({ where: { id: userId } });
if (user?.phone) {
await sendTextMessage(
user.phone,
`🎉 Bienvenue dans *${track.title}* ! La génération de votre cours personnalisé (Jour 1) a commencé. Cela prendra environ 30 secondes...`
);
// Immediately trigger Day 1 content generation
const whatsappQueue = new Queue('whatsapp-queue', { connection: connection as any });
await whatsappQueue.add('send-content', {
userId,
trackId,
dayNumber: 1
});
}
}
}
}
else if (job.name === 'download-media') {
// ─── Audio download from Meta Graph API — Railway only ─────────────
const { mediaId, mimeType, phone } = job.data;
const traceId = `[STT-FLOW-${phone.slice(-4)}]`;
// Always prioritize the live environment variable over stale job data from Redis
const accessToken = process.env.WHATSAPP_ACCESS_TOKEN || job.data.accessToken;
console.log(`${traceId} Downloading media ${mediaId} for ${phone}...`);
if (!accessToken) {
console.error(`[WORKER] Missing WHATSAPP_ACCESS_TOKEN for media ${mediaId}.`);
return;
}
let transcribedText = '';
let audioUrl = '';
try {
const { buffer } = await downloadMedia(mediaId, accessToken);
console.log(`${traceId} Downloaded file size=${buffer.length} contentType=${mimeType}`);
const AI_API_BASE_URL = getApiUrl();
const apiKey = getAdminApiKey();
// ─── Hardening: Store audio on R2 via the API ─────────
try {
const storeRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/store-audio`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` },
body: JSON.stringify({ audioBase64: buffer.toString('base64'), mimeType, phone })
});
if (storeRes.ok) {
const storeData = await storeRes.json() as any;
if (storeData.url) {
audioUrl = storeData.url;
console.log(`[R2] Inbound audio uploaded: ${audioUrl}`);
}
}
} catch (err: unknown) {
console.error('[WORKER] store-audio failed (inbound audio will not have a permanent link):', (err instanceof Error ? err.message : String(err)));
}
// ─── Hardening: Record Inbound Message in DB ──────────
const user = await prisma.user.findFirst({ where: { phone } });
if (user) {
try {
await prisma.message.create({
data: {
userId: user.id,
direction: 'INBOUND',
channel: 'WHATSAPP',
mediaUrl: audioUrl || null,
payload: job.data // Raw Meta payload from job
}
});
console.log(`[DB] Recorded inbound audio message for ${phone}`);
} catch (dbErr: unknown) {
console.error('[DB] Failed to record inbound message:', (dbErr as any)?.message);
}
}
// ─── Routing: Transcribe if Audio, Forward if Image ─────────
if (mimeType.startsWith('audio/')) {
console.log(`${traceId} Transcribe start calling ${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/transcribe`);
const transcribeRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/transcribe`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` },
body: JSON.stringify({
audioBase64: buffer.toString('base64'),
filename: `msg.${mimeType.includes('mp4') ? 'mp4' : 'ogg'}`,
language: user?.language // Pass language hint to Whisper
})
});
if (transcribeRes.ok) {
const data = await transcribeRes.json() as any;
const isSuspect = data.isSuspect || false;
const confidence = data.confidence || 0;
transcribedText = data.text || '';
const user = await prisma.user.findFirst({ where: { phone } });
// 🌍 Normalisation Wolof STT v2.0 🌍
if (user?.language === 'WOLOF') {
const originalText = transcribedText;
const normResult = normalizeWolof(transcribedText);
transcribedText = normResult.normalizedText;
// Output correction feedback
console.log(`[STT] Normalized: "${originalText}" -> "${transcribedText}"`);
// Soft Feedback UI
await sendTextMessage(phone, `Ma dégg na: "${transcribedText}" ✅\n(Confiance STT: ${confidence}%)`);
if (normResult.changes.length > 0) {
const limitedChanges = normResult.changes.slice(0, 2).join(", ");
await sendTextMessage(phone, `Nataal bu gën: ${limitedChanges}`);
}
// 🚨 Wolof Auto-Validation Logic (Target: > 80%) 🚨
if (confidence <= 80) {
console.log(`[STT] Whisper Confidence (${confidence}%) <= 80. Intercepting WOLOF audio for User ${user.id}. Shifting to PENDING_REVIEW.`);
// First, make sure there is an active enrollment to find the trackId
const activeEnrollment = await prisma.enrollment.findFirst({
where: { userId: user.id, status: 'ACTIVE' },
include: { track: true }
});
if (activeEnrollment) {
// 🚨 Brique 4 (Routage Deep Dive) : On ne bloque pas les audios itératifs
const currentProgress = await prisma.userProgress.findUnique({
where: { userId_trackId: { userId: user.id, trackId: activeEnrollment.trackId } }
});
if (currentProgress?.exerciseStatus !== 'PENDING_DEEPDIVE') {
await prisma.userProgress.upsert({
where: { userId_trackId: { userId: user.id, trackId: activeEnrollment.trackId } },
update: { exerciseStatus: 'PENDING_REVIEW' as any, adminTranscription: transcribedText, confidenceScore: confidence },
create: { userId: user.id, trackId: activeEnrollment.trackId, exerciseStatus: 'PENDING_REVIEW' as any, adminTranscription: transcribedText, confidenceScore: confidence }
});
await sendTextMessage(phone, "🎙️ Nyangi jaxas sa kàddu. Xamle dina la tontu ci kanam ! (En cours d'analyse par l'équipe)");
// Still save the audio URL to the message for the admin to read!
await prisma.message.updateMany({
where: { userId: user.id, direction: 'INBOUND', mediaUrl: audioUrl },
data: { content: transcribedText }
}).catch(() => { });
return; // Stop here, WAIT FOR ADMIN OVERRIDE
}
} else {
// Edge case: not enrolled but sent audio...
await sendTextMessage(phone, "Dama jaxaso ci li nga wax... Mën nga ko waxaat ndànk ?");
return;
}
}
console.log(`[STT] transcribe result="${transcribedText.substring(0, 80)}" (suspect=${isSuspect}, confidence=${confidence}%)`);
// 🌟 STT Hardening: Handle suspect transcription 🌟
if (isSuspect && user) {
const fallbackMsg = user.language === 'WOLOF'
? "Dama jaxaso ci li nga wax... Mën nga ko waxaat ndànk ? (10-15s)"
: "Je n'ai pas bien compris ton vocal. Pourrais-tu le renvoyer en parlant bien distinctement ? (10-15s)";
await sendTextMessage(phone, fallbackMsg);
return; // Stop here, don't trigger AI generator
}
// Send an immediate confirmation to the user that the audio was understood
if (user && transcribedText) {
const confirmationText = `J'ai compris :\n"${transcribedText}"`;
await sendTextMessage(phone, confirmationText);
// Update the message record with transcribed text if found
await prisma.message.updateMany({
where: { userId: user.id, direction: 'INBOUND', mediaUrl: audioUrl },
data: { content: transcribedText }
}).catch(() => { });
}
} // end WOLOF block
// 🇫🇷 FR users: send confirmation (WOLOF users already got theirs above)
if (user?.language !== 'WOLOF' && user && transcribedText) {
console.log(`[STT] transcribe result="${transcribedText.substring(0, 80)}" (suspect=${isSuspect}, confidence=${confidence}%)`);
if (isSuspect) {
await sendTextMessage(phone, "Je n'ai pas bien compris ton vocal. Pourrais-tu le renvoyer en parlant bien distinctement ? (10-15s)");
return;
}
await sendTextMessage(phone, `🎙️ J'ai compris : "${transcribedText}"`);
await prisma.message.updateMany({
where: { userId: user!.id, direction: 'INBOUND', mediaUrl: audioUrl },
data: { content: transcribedText }
}).catch(() => { });
}
// Process the transcribed text as a normal incoming message via API
// ─── Routing: Process transcribed text ─────────
if (transcribedText) {
const traceId = `[STT-FLOW-${phone.slice(-4)}]`;
console.log(`${traceId} Processing transcribed text via WhatsAppLogic...`);
await WhatsAppLogic.handleIncomingMessage(phone, transcribedText, audioUrl);
console.log(`${traceId} Inbound audio processing complete.`);
}
} else if (transcribeRes.status === 429) {
// OpenAI quota exceeded — send fallback and do NOT requeue
console.warn(`[WORKER] 429 Error during transcription`);
const user = await prisma.user.findFirst({ where: { phone } });
if (user) {
await sendTextMessage(phone, user.language === 'WOLOF'
? "⚠️ Mënuma dégg sa kàddu léegi (réseau bi dafa fees). Yónnee sa tontu ci bind (texte)."
: "⚠️ Le service audio est temporairement saturé. Envoie ta réponse en texte."
);
}
return; // Stop processing
} else {
const errText = await transcribeRes.text().catch(() => `HTTP ${transcribeRes.status}`);
console.error(`[WORKER] /v1/ai/transcribe failed with HTTP ${transcribeRes.status}: ${errText}`);
throw new Error(`Transcription failed HTTP ${transcribeRes.status}`); // throw so BullMQ retries
}
} else if (mimeType.startsWith('image/')) {
// 📸 IMAGE-FLOW: Build imageUrl from the R2 store result (same pattern as audioUrl for audio)
let imageUrl = '';
try {
const storeImgRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, '')}/v1/ai/store-audio`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` },
body: JSON.stringify({ audioBase64: buffer.toString('base64'), mimeType, phone })
});
if (storeImgRes.ok) {
const storeImgData = await storeImgRes.json() as any;
if (storeImgData.url) {
imageUrl = storeImgData.url;
console.log(`[IMAGE-FLOW] ✅ Image uploaded to R2: ${imageUrl}`);
}
}
} catch (imgStoreErr: unknown) {
console.error('[IMAGE-FLOW] R2 store failed (image will be analyzed without permanent URL):', (imgStoreErr as Error).message);
}
console.log(`[IMAGE-FLOW] 📸 Image detected for ${phone}. Routing to WhatsAppLogic (imageUrl: ${imageUrl || audioUrl || 'none'})...`);
// Fallback: si imageUrl (upload #2) échoue, utiliser audioUrl (upload #1, même buffer, déjà réussi)
const finalImageUrl = imageUrl || audioUrl || undefined;
await WhatsAppLogic.handleIncomingMessage(phone, job.data.caption || 'Image reçue', undefined, finalImageUrl);
console.log(`[IMAGE-FLOW] ✅ Inbound image processing complete.`);
}
} catch (err: unknown) {
console.error(`[WORKER] download-media failed:`, err);
}
}
else if (job.name === 'send-image') {
const { to, imageUrl, caption } = job.data;
try {
await sendImageMessage(to, imageUrl, caption || '');
console.log(`[WhatsApp] ✅ Image message sent to ${to}`);
} catch (err: unknown) {
console.error(`[WORKER] send-image failed:`, (err instanceof Error ? err.message : String(err)));
}
}
else if (job.name === 'send-content') {
const { userId, trackId, dayNumber, testImageUrl } = job.data;
if (testImageUrl && userId) {
const u = await prisma.user.findUnique({ where: { id: userId } });
if (u?.phone) {
await sendImageMessage(u.phone, testImageUrl, "Branding XAMLÉ 🇸🇳");
console.log(`[WhatsApp] ✅ Image message sent to ${u.phone}`);
}
return;
}
const user = await prisma.user.findUnique({ where: { id: userId } });
const trackDay = await prisma.trackDay.findFirst({
where: { trackId, dayNumber }
});
if (trackDay) {
await sendLessonDay(userId, trackId, dayNumber, {
skipProgressUpdate: job.data.skipProgressUpdate === true
});
// 🕰️ TIME-TRAVEL GUARD: Only update currentDay if this is NOT a historical replay
// Without this guard, a Replay Day 11 would overwrite currentDay 12 → 11
if (!job.data.skipProgressUpdate) {
await prisma.enrollment.updateMany({
where: { userId, trackId },
data: {
currentDay: dayNumber,
lastActivityAt: new Date()
}
});
} else {
// Read-only replay: only touch lastActivityAt to keep heartbeat alive
await prisma.enrollment.updateMany({
where: { userId, trackId },
data: { lastActivityAt: new Date() }
});
console.log(`[SEND-CONTENT] 🕰️ Replay Day ${dayNumber} sent read-only. currentDay unchanged.`);
}
} else {
console.log(`[WORKER] No more content for Track ${trackId} Day ${dayNumber}. Marking enrollment as completed.`);
await prisma.enrollment.updateMany({
where: { userId, trackId },
data: {
status: 'COMPLETED',
lastActivityAt: new Date()
}
});
// 🎓 Graduation Flow: Successive Track Unlocking (v1.0) 🎓
const trackMatch = trackId.match(/^T(\d)-(FR|WO)$/);
if (trackMatch) {
const currentLevel = parseInt(trackMatch[1]);
const lang = trackMatch[2];
const nextLevel = currentLevel + 1;
const nextTrackId = `T${nextLevel}-${lang}`;
const nextTrack = await prisma.track.findUnique({ where: { id: nextTrackId } });
if (nextTrack) {
const existingNextEnrollment = await prisma.enrollment.findFirst({
where: { userId, trackId: nextTrackId }
});
if (!existingNextEnrollment) {
console.log(`[WORKER] Auto-graduating User ${userId}: ${trackId} -> ${nextTrackId}`);
const isWolof = lang === 'WO';
const congratsMsg = isWolof
? `🎉 Baraka Allahu fik ! Mat nga Niveau ${currentLevel}. Maangi lay tàmbaleel Niveau ${nextLevel} : *${nextTrack.title}*...`
: `🎉 Félicitations ! Vous avez validé le Niveau ${currentLevel}. Je vous inscris immédiatement au Niveau ${nextLevel} : *${nextTrack.title}*...`;
if (user?.phone) {
await sendTextMessage(user.phone, congratsMsg);
if (!nextTrack.isPremium) {
await prisma.enrollment.create({
data: { userId, trackId: nextTrackId, status: 'ACTIVE', currentDay: 1 }
});
// Trigger Day 1 for next track with 10s delay to let graduation docs/badges arrive
const q = new Queue('whatsapp-queue', { connection: connection as any });
await q.add('send-content', { userId, trackId: nextTrackId, dayNumber: 1 }, { delay: 10000 });
} else {
const payMsg = isWolof
? `💳 Niveau ${nextLevel} bi dafa laaj pass. Yónnee ma "PAYER" ngir tàmbaleeti.`
: `💳 Le Niveau ${nextLevel} est un module Premium. Envoyez "PAYER" pour le débloquer et continuer votre ascension !`;
await sendTextMessage(user.phone, payMsg);
}
}
}
}
}
// 🌟 Trigger AI Document Generation 🌟
console.log(`[WORKER] Triggering AI Document Generation for User ${userId}...`);
try {
const userWithProfile = await prisma.user.findUnique({
where: { id: userId },
include: { businessProfile: true } as any
}) as any;
const isWolof = userWithProfile?.language === 'WOLOF';
const userLangPrefix = isWolof ? "MBIR : " : "ACTIVITÉ : ";
// Localize context to avoid English bias in LLM
const userContext = `${userLangPrefix} ${userWithProfile?.businessProfile?.activityLabel || userWithProfile?.activity || 'Inconnue'}. Cet entrepreneur a terminé son parcours de formation XAMLÉ. Génère les documents basés sur son activité et les concepts appris.`;
const AI_API_BASE_URL = getApiUrl();
const apiKey = getAdminApiKey();
const authHeaders = {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`
};
console.log(`[WORKER] Calling ${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/onepager (${userWithProfile?.language || 'FR'})...`);
const opRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/onepager`, {
method: 'POST',
headers: authHeaders,
body: JSON.stringify({
userContext,
language: userWithProfile?.language || 'FR',
businessProfile: userWithProfile?.businessProfile
})
});
const pdfData = await opRes.json() as any;
console.log(`[WORKER] Calling ${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/deck (${userWithProfile?.language || 'FR'})...`);
const deckRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/deck`, {
method: 'POST',
headers: authHeaders,
body: JSON.stringify({
userContext,
language: userWithProfile?.language || 'FR',
businessProfile: userWithProfile?.businessProfile
})
});
const pptxData = await deckRes.json() as any;
console.log(`[AI DOCS READY] 📄 PDF: ${pdfData.url}`);
console.log(`[AI DOCS READY] 📊 PPTX: ${pptxData.url}`);
// Send documents to user via WhatsApp
if (user?.phone) {
if (pdfData.url) {
await sendDocumentMessage(
user.phone,
pdfData.url,
isWolof ? 'one-pager-xamle.pdf' : 'mon-business-plan.pdf',
isWolof ? '📄 Sa One-Pager mu ngi nii !' : '📄 Votre One-Pager est prêt !'
);
}
if (pptxData.url) {
await sendDocumentMessage(
user.phone,
pptxData.url,
isWolof ? 'pitch-deck-xamle.pptx' : 'mon-pitch-deck.pptx',
isWolof ? '📊 Sa Pitch Deck mu ngi nii !' : '📊 Votre Pitch Deck est prêt !'
);
}
// 🚨 Track AI source for documents via Response table
// @ts-ignore - Prisma types may be out of sync after schema update
await prisma.response.create({
data: {
userId,
enrollmentId: userWithProfile?.enrollments?.[0]?.id || 0,
dayNumber: (dayNumber || 1) + 1, // Indicate graduation/end of track
content: `AI Documents Generated. PDF: ${pdfData.aiSource || '?'}, PPTX: ${pptxData.aiSource || '?'}`,
aiSource: 'SYSTEM'
}
}).catch(() => { });
}
} catch (aiError) {
console.error('[WORKER] Failed to generate AI documents:', aiError);
}
}
}
else if (job.name === 'send-admin-audio-override') {
const { userId, trackId, overrideAudioUrl, adminId } = job.data;
const user = await prisma.user.findUnique({ where: { id: userId } });
if (user?.phone) {
// 1. Send the Admin's Voice Message
const { sendAudioMessage } = await import('./whatsapp-cloud');
await sendAudioMessage(user.phone, overrideAudioUrl);
// 2. Send transition prompt
await sendTextMessage(user.phone,
user.language === 'WOLOF'
? "Baax na ! Yónnee *SUITE* ngir dem ci kanam."
: "Bravo ! Envoyez *SUITE* pour passer à la leçon suivante."
);
console.log(`[WORKER] Admin ${adminId} Audio Overdrive sent to User ${userId}.`);
// 3. Increment the logic via Queue so that user doesn't fall behind.
const enrollment = await prisma.enrollment.findFirst({
where: { userId, trackId, status: 'ACTIVE' }
});
if (enrollment) {
const nextDay = Math.floor(enrollment.currentDay) + 1;
const q = new Queue('whatsapp-queue', { connection: connection as any });
await q.add('send-content', { userId, trackId, dayNumber: nextDay }, { delay: 2000 });
}
}
}
} catch (error) {
console.error(`Job ${job.id} failed:`, error);
throw error;
}
}, { connection: connection as any });
console.log('WhatsApp Worker started...');
// 🚀 Start the daily cron scheduler
import { startDailyScheduler } from './scheduler';
startDailyScheduler();
worker.on('completed', job => {
console.log(`[WORKER] Job ${job.id} has completed!`);
});
worker.on('failed', (job, err) => {
console.error(`[WORKER] Job ${job?.id} has failed with ${(err instanceof Error ? err.message : String(err))}`);
});