CognxSafeTrack
fix: dry-run patches — guard enrollment.updateMany on replay + contextual TT CTA after validated replay
0c6a86e | import dns from 'node:dns'; | |
| dns.setDefaultResultOrder('ipv4first'); | |
| import { Worker, Job, Queue } from 'bullmq'; | |
| import dotenv from 'dotenv'; | |
| import { PrismaClient } from '@repo/database'; | |
| import { sendTextMessage, sendDocumentMessage, downloadMedia, sendInteractiveButtonMessage, sendInteractiveListMessage, sendImageMessage } from './whatsapp-cloud'; | |
| import { sendLessonDay } from './pedagogy'; | |
| import { updateBehavioralScore } from './scoring'; | |
| import { normalizeWolof } from './normalizeWolof'; | |
| import { getApiUrl, getAdminApiKey, validateEnvironment, isFeatureEnabled } from './config'; | |
| import { WhatsAppLogic } from './services/whatsapp-logic'; | |
| dotenv.config(); | |
| // 🚀 CRITICAL: Validate environment variables at boot | |
| validateEnvironment(); | |
| const prisma = new PrismaClient(); | |
| import Redis from 'ioredis'; | |
| const connection = process.env.REDIS_URL | |
| ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null }) | |
| : new Redis({ | |
| host: process.env.REDIS_HOST || 'localhost', | |
| port: parseInt(process.env.REDIS_PORT || '6379'), | |
| username: process.env.REDIS_USERNAME || 'default', | |
| password: process.env.REDIS_PASSWORD || undefined, | |
| tls: process.env.REDIS_TLS === 'true' ? {} : undefined, | |
| maxRetriesPerRequest: null | |
| }); | |
| const worker = new Worker('whatsapp-queue', async (job: Job) => { | |
| console.log('Processing job:', job.name, job.id); | |
| try { | |
| if (job.name === 'send-message') { | |
| const { userId, text } = job.data; | |
| const user = await prisma.user.findUnique({ where: { id: userId } }); | |
| if (user?.phone) { | |
| await sendTextMessage(user.phone, text); | |
| } else { | |
| console.warn(`[WORKER] User ${userId} not found or missing phone — skipping send.`); | |
| } | |
| } | |
| else if (job.name === 'send-message-direct') { | |
| const { phone, text } = job.data; | |
| await sendTextMessage(phone, text); | |
| } | |
| else if (job.name === 'handle-inbound') { | |
| const { phone, text, audioUrl, imageUrl, messageId } = job.data; | |
| // 🚨 Idempotence Lock for Inbound Messages | |
| if (messageId) { | |
| const lockKey = `lock:inbound:${messageId}`; | |
| const isLocked = await connection.set(lockKey, "1", "EX", 300, "NX"); | |
| if (!isLocked) { | |
| console.log(`[WORKER] 🔒 Lock inbound activé : message ${messageId} déjà traité.`); | |
| return; | |
| } | |
| } | |
| await WhatsAppLogic.handleIncomingMessage(phone, text, audioUrl, imageUrl); | |
| } | |
| else if (job.name === 'generate-feedback') { | |
| const { userId, text, trackId, exercisePrompt, lessonText, exerciseCriteria, totalDays, language, userActivity, userRegion, previousResponses, isDeepDive, iterationCount, imageUrl, isButtonChoice, isTimeTravelMode, realCurrentDay } = job.data; | |
| const currentDay = Number(job.data.currentDay || job.data.dayNumber || 0); | |
| const enrollmentId = job.data.enrollmentId; | |
| const user = await prisma.user.findUnique({ | |
| where: { id: userId }, | |
| include: { businessProfile: true } as any | |
| }) as any; | |
| if (!user?.phone) return; | |
| // ─── 🚀 Idempotence Lock (Progression Bug Fix) ───────── | |
| const Redis = (await import('ioredis')).default; | |
| const redis = process.env.REDIS_URL | |
| ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null }) | |
| : new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null }); | |
| const textHash = text ? text.substring(0, 10).replace(/[^a-z0-9]/gi, '') : ''; | |
| const lockKey = `lock:feedback:${userId}:${currentDay}:${textHash}`; | |
| const isLocked = await redis.set(lockKey, "1", "EX", 300, "NX"); | |
| if (!isLocked) { | |
| console.log(`[WORKER] 🔒 Lock activé : ignorer ce job de feedback en double (User ${userId}, Day ${currentDay})`); | |
| return; | |
| } | |
| let feedbackMsg = ''; | |
| let feedbackData: any = null; | |
| let trackDay: any = null; | |
| let AI_API_BASE_URL = ''; | |
| let apiKey = ''; | |
| try { | |
| trackDay = await prisma.trackDay.findFirst({ | |
| where: { trackId, dayNumber: currentDay } | |
| }); | |
| console.log(`[WORKER] Generating expert feedback for User ${userId}`); | |
| AI_API_BASE_URL = getApiUrl(); | |
| apiKey = getAdminApiKey(); | |
| console.log(`[PIPELINE] Handing over text to Coach Engine... (User: ${userId}, Day: ${currentDay})`); | |
| const feedbackRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/generate-feedback`, { | |
| method: 'POST', | |
| headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` }, | |
| body: JSON.stringify({ | |
| answers: text, | |
| lessonText, | |
| exercisePrompt, | |
| userLanguage: language, | |
| businessProfile: user.businessProfile, | |
| exerciseCriteria, | |
| userActivity, | |
| userRegion, | |
| dayNumber: currentDay, | |
| previousResponses, | |
| isDeepDive: isDeepDive || false, | |
| iterationCount: iterationCount || 0, | |
| imageUrl: imageUrl, | |
| isButtonChoice: isButtonChoice || false | |
| }) | |
| }); | |
| if (feedbackRes.ok) { | |
| feedbackData = await feedbackRes.json(); | |
| const callToAction = language === 'WOLOF' | |
| ? "\n\nSu nga bëggee yokk leneen ci li nga xam, bindal 1️⃣ APPROFONDIR, wala nga bind 2️⃣ SUITE." | |
| : "\n\nSi tu veux affiner ce point avec une donnée de ton propre terrain, tape 1️⃣ APPROFONDIR, sinon tape 2️⃣ SUITE."; | |
| if (feedbackData.validation && feedbackData.enrichedVersion && feedbackData.actionableAdvice) { | |
| feedbackMsg = `🌟 ${feedbackData.validation}\n\n🚀 ${feedbackData.enrichedVersion}\n\n💡 Conseil de Terrain :\n${feedbackData.actionableAdvice}`; | |
| if (!isDeepDive || (isDeepDive && iterationCount < 3 && !feedbackData.isForcedClosure)) { | |
| feedbackMsg += callToAction; | |
| } else if (feedbackData.isForcedClosure) { | |
| feedbackMsg += (language === 'WOLOF' ? "\n\nBindal 2️⃣ SUITE ngir wéy." : "\n\nTape 2️⃣ SUITE pour continuer."); | |
| } | |
| } else if (feedbackData.text) { | |
| feedbackMsg = feedbackData.text; | |
| if (!isDeepDive || (isDeepDive && iterationCount < 3 && !feedbackData.isForcedClosure)) { | |
| feedbackMsg += callToAction; | |
| } else if (feedbackData.isForcedClosure) { | |
| feedbackMsg += (language === 'WOLOF' ? "\n\nBindal 2️⃣ SUITE ngir wéy." : "\n\nTape 2️⃣ SUITE pour continuer."); | |
| } | |
| } else { | |
| feedbackMsg = '✅ Analyse terminée.'; | |
| } | |
| } else if (feedbackRes.status === 429) { | |
| console.warn(`[WORKER] 429 Error during generate-feedback`); | |
| const fallbackMsg = language === 'WOLOF' | |
| ? "Jërëjëf ci sa tontu ! (Analyse IA temporairement indisponible)" | |
| : "Merci pour ta réponse ! (Analyse IA de la réponse temporairement indisponible suite à une surcharge, mais ta progression est sauvegardée)."; | |
| await sendTextMessage(user.phone, fallbackMsg); | |
| return; | |
| } else { | |
| const errText = await feedbackRes.text(); | |
| throw new Error(`generate-feedback failed HTTP ${feedbackRes.status}: ${errText}`); | |
| } | |
| } catch (err: unknown) { | |
| console.error(`[WORKER] generate-feedback failed:`, (err instanceof Error ? err.message : String(err))); | |
| // 🚨 RACE CONDITION: Delete lock on error to allow immediate retry by BullMQ | |
| await redis.del(lockKey); | |
| throw err; | |
| } | |
| if (feedbackMsg) { | |
| // 🌟 Adaptive Pedagogy: Dynamic Remediation & Diagnostic Logic v1.1 🌟 | |
| // 🚨 RACE CONDITION FIX: Update UserProgress strictly BEFORE sending the message over WhatsApp. | |
| let nextDay = currentDay + 1; | |
| const currentProgress = await prisma.userProgress.findUnique({ | |
| where: { userId_trackId: { userId, trackId } } | |
| }); | |
| const currentBadges = ((currentProgress as any)?.badges as string[]) || []; | |
| let updatedBadges = [...currentBadges]; | |
| if (feedbackData?.isQualified === false) { | |
| // Check for Adaptive Diagnostic Branching | |
| const diagnosticTrigger = (exerciseCriteria as any)?.diagnostic?.trigger; | |
| const adaptiveModuleId = (exerciseCriteria as any)?.diagnostic?.moduleId; | |
| if (diagnosticTrigger && feedbackData?.missingElements?.includes(diagnosticTrigger) && adaptiveModuleId) { | |
| console.log(`[WORKER] Adaptive Diagnostic triggered for User ${userId}: Re-routing to module ${adaptiveModuleId}`); | |
| // 🚀 Redirect to specific module | |
| nextDay = 1; // Modules start at day 1 | |
| await prisma.enrollment.updateMany({ | |
| where: { userId, status: 'ACTIVE' }, | |
| data: { trackId: adaptiveModuleId, currentDay: 1 } | |
| }); | |
| } | |
| const remediationDay = (exerciseCriteria as any)?.remediation?.dayNumber; | |
| if (remediationDay && remediationDay !== currentDay) { | |
| console.log(`[WORKER] Dynamic remediation triggered for User ${userId}: Day ${currentDay} -> ${remediationDay}`); | |
| nextDay = remediationDay; | |
| } else { | |
| console.log(`[WORKER] Exercise not qualified but no remediation day defined. Staying on Day ${currentDay}.`); | |
| nextDay = currentDay; | |
| } | |
| // 🚨 Hardening: If explicitly not qualified, we put the user in PENDING_REMEDIATION | |
| await prisma.userProgress.update({ | |
| where: { userId_trackId: { userId, trackId } }, | |
| data: { | |
| exerciseStatus: 'PENDING_REMEDIATION', // Stay in remediation until final success | |
| score: { increment: 0 } | |
| } as any | |
| }); | |
| // 🚨 Store Strategy Data in BusinessProfile (Guarded by Day 10+) | |
| const hasStrategyData = feedbackData?.searchResults || (feedbackData as any)?.competitorList || (feedbackData as any)?.financialProjections || (feedbackData as any)?.fundingAsk || (feedbackData as any)?.teamMembers; | |
| if (hasStrategyData && currentDay >= 10) { | |
| const updatePayload: any = { lastUpdatedFromDay: currentDay }; | |
| if (feedbackData?.searchResults) updatePayload.marketData = feedbackData.searchResults; | |
| if ((feedbackData as any)?.competitorList) updatePayload.competitorList = (feedbackData as any).competitorList; | |
| if ((feedbackData as any)?.financialProjections) updatePayload.financialProjections = (feedbackData as any).financialProjections; | |
| if ((feedbackData as any)?.fundingAsk) updatePayload.fundingAsk = (feedbackData as any).fundingAsk; | |
| if ((feedbackData as any)?.teamMembers && Array.isArray((feedbackData as any).teamMembers)) { | |
| const existingProfile = await (prisma as any).businessProfile.findUnique({ where: { userId } }); | |
| const existingTeam = Array.isArray(existingProfile?.teamMembers) ? existingProfile.teamMembers : []; | |
| updatePayload.teamMembers = [...existingTeam, ...(feedbackData as any).teamMembers]; | |
| } | |
| try { | |
| await (prisma as any).businessProfile.upsert({ | |
| where: { userId }, | |
| update: updatePayload, | |
| create: { userId, ...updatePayload } | |
| }); | |
| } catch (bpErr: unknown) { | |
| console.error('[WORKER] BusinessProfile upsert failed (non-fatal, REMEDIATION path):', (bpErr as Error).message); | |
| } | |
| } | |
| } else { | |
| // Success! Award Badges & Mark Completed (or keep Pending for Deep Dive) | |
| const trackDayBadges = (trackDay as any)?.badges as string[] || []; | |
| for (const b of trackDayBadges) { | |
| if (!updatedBadges.includes(b)) updatedBadges.push(b); | |
| } | |
| let newStatus = (isDeepDive && feedbackData?.isForcedClosure !== true) ? 'PENDING_DEEPDIVE' : 'COMPLETED'; | |
| // 🚨 Card/Button Bypass Logic (Lead Fullstack Developer Requirement) | |
| // Ensure that a button click never completes the lesson, even if the AI validates it. | |
| if (isButtonChoice) { | |
| console.log(`[WORKER] 🛡️ Button choice detected for User ${userId}. Overriding COMPLETED with PENDING.`); | |
| newStatus = 'PENDING'; | |
| } | |
| // 🕰️ TIME-TRAVEL GUARD: Skip COMPLETED update when replaying a historical lesson. | |
| // BusinessProfile (One-Pager) IS updated below — only the global exerciseStatus is preserved. | |
| if (isTimeTravelMode) { | |
| console.log(`[TIME-TRAVEL] 🛡️ User ${userId} — Skipping userProgress.update(COMPLETED) for replay Day ${currentDay} (real: ${realCurrentDay}).`); | |
| } else { | |
| // @ts-ignore - Prisma types may be out of sync after schema update | |
| await prisma.userProgress.update({ | |
| where: { userId_trackId: { userId, trackId } }, | |
| data: { | |
| exerciseStatus: newStatus, | |
| score: { increment: newStatus === 'COMPLETED' ? 1 : 0 }, | |
| badges: updatedBadges, | |
| behavioralScoring: updateBehavioralScore(currentProgress ? (currentProgress as any).behavioralScoring : null, (exerciseCriteria as any)?.scoring?.impact_success), | |
| aiSource: feedbackData?.aiSource || 'OPENAI' | |
| } as any | |
| }); | |
| } | |
| // 🚨 Store Strategy Data in BusinessProfile (Guarded by Day 10+) | |
| const hasStrategyData = feedbackData?.searchResults || (feedbackData as any)?.competitorList || (feedbackData as any)?.financialProjections || (feedbackData as any)?.fundingAsk || (feedbackData as any)?.teamMembers; | |
| if (hasStrategyData && currentDay >= 10) { | |
| const updatePayload: any = { lastUpdatedFromDay: currentDay }; | |
| if (feedbackData?.searchResults) updatePayload.marketData = feedbackData.searchResults; | |
| if ((feedbackData as any)?.competitorList) updatePayload.competitorList = (feedbackData as any).competitorList; | |
| if ((feedbackData as any)?.financialProjections) updatePayload.financialProjections = (feedbackData as any).financialProjections; | |
| if ((feedbackData as any)?.fundingAsk) updatePayload.fundingAsk = (feedbackData as any).fundingAsk; | |
| if ((feedbackData as any)?.teamMembers && Array.isArray((feedbackData as any).teamMembers)) { | |
| const existingProfile = await (prisma as any).businessProfile.findUnique({ where: { userId } }); | |
| const existingTeam = Array.isArray(existingProfile?.teamMembers) ? existingProfile.teamMembers : []; | |
| updatePayload.teamMembers = [...existingTeam, ...(feedbackData as any).teamMembers]; | |
| } | |
| try { | |
| await (prisma as any).businessProfile.upsert({ | |
| where: { userId }, | |
| update: updatePayload, | |
| create: { userId, ...updatePayload } | |
| }); | |
| } catch (bpErr: unknown) { | |
| console.error('[WORKER] BusinessProfile upsert failed (non-fatal, SUCCESS path):', (bpErr as Error).message); | |
| } | |
| } | |
| // If we were in a remediation day (fractional) -> move to next integer day | |
| if (!isTimeTravelMode && currentDay % 1 !== 0) { | |
| nextDay = Math.floor(currentDay) + 1; | |
| console.log(`[WORKER] Remediation successful for User ${userId}. Moving to Day ${nextDay}.`); | |
| } | |
| // 🕰️ TIME-TRAVEL: Clear the Redis context after a successful replay response | |
| if (isTimeTravelMode) { | |
| try { | |
| const Redis = (await import('ioredis')).default; | |
| const ttRedis = process.env.REDIS_URL | |
| ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null }) | |
| : new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null }); | |
| const { clearTimeTravelContext } = await import('./timeTravelContext'); | |
| await clearTimeTravelContext(userId, ttRedis); | |
| await ttRedis.quit(); | |
| } catch (ttErr) { | |
| console.error('[TIME-TRAVEL] Failed to clear context after successful replay:', ttErr); | |
| } | |
| } | |
| } // end success else block | |
| // THEN store the response | |
| // @ts-ignore | |
| await prisma.response.create({ | |
| data: { | |
| enrollmentId: enrollmentId, | |
| userId: user.id, | |
| dayNumber: currentDay, | |
| content: feedbackMsg, | |
| aiSource: feedbackData?.aiSource || 'OPENAI' | |
| } | |
| }); | |
| // THEN send the WhatsApp message | |
| await sendTextMessage(user.phone, feedbackMsg); | |
| if (isFeatureEnabled('FEATURE_BUSINESS_PROFILE') && currentDay) { | |
| try { | |
| const extractRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/extract-profile`, { | |
| method: 'POST', | |
| headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` }, | |
| body: JSON.stringify({ | |
| userInput: text, | |
| dayNumber: currentDay, | |
| userLanguage: language | |
| }) | |
| }); | |
| if (extractRes.ok) { | |
| const { data } = await extractRes.json() as any; | |
| // Clean up undefined/null values | |
| const profileData = Object.fromEntries(Object.entries(data).filter(([_, v]) => v != null && v !== '')); | |
| // 🚨 Sector Locking (Lead AI Engineer Requirement) | |
| const lockedSectors = ["Organisation d'événements / PWA"]; | |
| if (lockedSectors.includes(user.activity || "")) { | |
| if (profileData.activityLabel || profileData.activityType) { | |
| console.log(`[WORKER] Sector Locked for User ${userId}. Blocking activity update.`); | |
| delete profileData.activityLabel; | |
| delete profileData.activityType; | |
| } | |
| } | |
| if (Object.keys(profileData).length > 0 || feedbackData?.searchResults) { | |
| console.log(`[WORKER] Updating BusinessProfile for User ${userId}:`, profileData); | |
| const updatePayload: any = { | |
| ...profileData, | |
| lastUpdatedFromDay: currentDay | |
| }; | |
| if (feedbackData?.searchResults) { | |
| updatePayload.marketData = feedbackData.searchResults; | |
| console.log(`[WORKER] Market Data (Enrichment) added to profile.`); | |
| } | |
| await (prisma as any).businessProfile.upsert({ | |
| where: { userId }, | |
| update: updatePayload, | |
| create: { userId, ...updatePayload } | |
| }); | |
| // 🌟 Visuals WOW: Generate Pitch Card for Day 1 🌟 | |
| const finalLabel = (profileData as any).activityLabel || user.activity || "Projet Entrepreneurial"; | |
| if (isFeatureEnabled('FEATURE_SHARE_CARD') && currentDay === 1 && finalLabel) { | |
| try { | |
| const { generatePitchCard } = await import('./visuals'); | |
| const { uploadFile } = await import('./storage'); | |
| const cardBuffer = await generatePitchCard(finalLabel); | |
| const cardUrl = await uploadFile(cardBuffer, 'pitch-card.png', 'image/png'); | |
| const caption = language === 'WOLOF' | |
| ? "Sa kàrdu business mu neex ! ✨" | |
| : "Ta carte business personnalisée ! ✨"; | |
| await sendImageMessage(user.phone, cardUrl, caption); | |
| } catch (vErr: unknown) { | |
| console.error('[WORKER] Pitch Card generation failed:', (vErr as any)?.message); | |
| } | |
| } | |
| } | |
| } | |
| } catch (err: unknown) { | |
| console.error('[WORKER] BusinessProfile extraction failed:', (err instanceof Error ? err.message : String(err))); | |
| } | |
| } | |
| // 🚨 Hardening: We DO NOT update `currentDay` here for normal or remediation flows. | |
| // The `send-content` job will update `enrollment.currentDay` right before sending the lesson! | |
| // This prevents the 'double incrementation' bug (+2 days) when the user types SUITE. | |
| // 🌟 Adaptive Pedagogy: Streak Management 🌟 | |
| const lastActivity = user.lastActivityAt ? new Date(user.lastActivityAt) : null; | |
| const today = new Date(); | |
| const diffTime = lastActivity ? Math.abs(today.getTime() - lastActivity.getTime()) : Infinity; | |
| const diffDays = Math.ceil(diffTime / (1000 * 60 * 60 * 24)); | |
| let newStreak = (user as any).currentStreak || 0; | |
| if (diffDays <= 1) { | |
| newStreak += 1; // Continuous streak | |
| } else if (diffDays > 1) { | |
| newStreak = 1; // Streak broken | |
| } | |
| await prisma.user.update({ | |
| where: { id: userId }, | |
| data: { | |
| lastActivityAt: new Date(), | |
| currentStreak: newStreak, | |
| longestStreak: Math.max((user as any).longestStreak || 0, newStreak) | |
| } as any | |
| }); | |
| if (currentDay >= totalDays && feedbackData?.isQualified !== false) { | |
| await sendTextMessage(user.phone, language === 'WOLOF' | |
| ? "🎉 Baraka Allahu fik ! Jeex nga module bi. Dokumaan yi dinañu leen yónnee ci kanam !" | |
| : "🎉 Félicitations ! Vous avez terminé ce module. Vos documents intelligents arrivent bientôt !" | |
| ); | |
| const q = new Queue('whatsapp-queue', { connection: connection as any }); | |
| await q.add('send-content', { userId, trackId, dayNumber: currentDay + 1 }); | |
| } else if (feedbackData?.isQualified === false) { | |
| await sendTextMessage(user.phone, language === 'WOLOF' | |
| ? "🚨 Am na lo xamni leerul bu baax. Xoolal missal yi ma la yónnee te tàmbaleet ko ndànk." | |
| : "🚨 Certains points sont encore à renforcer pour valider. Regarde mes conseils et réessaie." | |
| ); | |
| // Si on a un jour de remédiation (ex 1.5), on l'envoie automatiquement | |
| if (nextDay !== currentDay && nextDay % 1 !== 0) { | |
| const q = new Queue('whatsapp-queue', { connection: connection as any }); | |
| await q.add('send-content', { userId, trackId, dayNumber: nextDay }, { delay: 2000 }); | |
| } | |
| } else { | |
| // 🕰️ TIME-TRAVEL MODE: Custom CTA — SUITE would be blocked, so give clear guidance | |
| if (isTimeTravelMode) { | |
| await sendTextMessage(user.phone, language === 'WOLOF' | |
| ? `✅ Tànkâr ! Sa tôntu Jour ${currentDay} def na nu ci kos. Dànga dem ci Jour ${realCurrentDay || currentDay} — tôntu ci sén exercice ngir dem ci kanam.` | |
| : `✅ Enregistré ! Ta réponse du Jour ${currentDay} a été sauvegardée. Tu es toujours au Jour ${realCurrentDay || currentDay} — réponds à son exercice pour continuer 📅` | |
| ); | |
| } else { | |
| await sendTextMessage(user.phone, language === 'WOLOF' | |
| ? "Baax na ! Yónnee *SUITE* ngir dem ci kanam." | |
| : "Bravo ! Envoyez *SUITE* pour passer à la leçon suivante." | |
| ); | |
| } | |
| } | |
| } | |
| } | |
| else if (job.name === 'send-nudge') { | |
| const { userId, type } = job.data; | |
| const user = await prisma.user.findUnique({ where: { id: userId } }); | |
| if (!user?.phone) return; | |
| const isWolof = user.language === 'WOLOF'; | |
| const messages = { | |
| ENCOURAGEMENT: isWolof | |
| ? "Assalamuyalaykum ! Fatte wuñu sa mbir. Tontu bu gatt ngir wéy ? 💪" | |
| : "Coucou ! On n'a pas oublié ton projet. Une petite réponse pour continuer ? 💪", | |
| RESURRECTION: isWolof | |
| ? "Sa liggeey mu ngi lay xaar ! Am succès dafa laaj lëkkalë. Ñu tàmbaleeti ? 🚀" | |
| : "Ton business t'attend ! Le succès vient de la régularité. On s'y remet ? 🚀" | |
| }; | |
| const text = (messages as any)[type] || messages.ENCOURAGEMENT; | |
| await sendTextMessage(user.phone, text); | |
| console.log(`[WORKER] Nudge ${type} sent to ${user.phone}`); | |
| } | |
| else if (job.name === 'send-interactive-buttons') { | |
| const { userId, bodyText, buttons } = job.data; | |
| const user = await prisma.user.findUnique({ where: { id: userId } }); | |
| if (user?.phone) { | |
| await sendInteractiveButtonMessage(user.phone, bodyText, buttons); | |
| } | |
| } | |
| else if (job.name === 'send-interactive-list') { | |
| const { userId, headerText, bodyText, buttonLabel, sections } = job.data; | |
| const user = await prisma.user.findUnique({ where: { id: userId } }); | |
| if (user?.phone) { | |
| await sendInteractiveListMessage(user.phone, headerText, bodyText, buttonLabel, sections); | |
| } | |
| } | |
| else if (job.name === 'enroll-user') { | |
| const { userId, trackId } = job.data; | |
| const track = await prisma.track.findUnique({ where: { id: trackId } }); | |
| if (!track) { | |
| console.error(`[WORKER] Enrollment failed: Track ${trackId} not found.`); | |
| return; | |
| } | |
| if (track.isPremium) { | |
| console.log(`[WORKER] User ${userId} requested Premium Track ${trackId}. Generating Payment Link...`); | |
| try { | |
| const AI_API_BASE_URL = getApiUrl(); | |
| const apiKey = getAdminApiKey(); | |
| const checkoutRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/payments/checkout`, { | |
| method: 'POST', | |
| headers: { | |
| 'Content-Type': 'application/json', | |
| 'Authorization': `Bearer ${apiKey}` | |
| }, | |
| body: JSON.stringify({ userId, trackId }) | |
| }); | |
| const checkoutData = await checkoutRes.json() as any; | |
| if (checkoutRes.ok && checkoutData.url) { | |
| const user = await prisma.user.findUnique({ where: { id: userId } }); | |
| if (user?.phone) { | |
| await sendTextMessage( | |
| user.phone, | |
| `💳 Cette formation est Premium. Complétez votre paiement ici :\n${checkoutData.url}` | |
| ); | |
| } | |
| } else { | |
| console.error('[WORKER] Failed to get checkout URL', checkoutData); | |
| } | |
| } catch (err) { | |
| console.error('[WORKER] Error calling checkout endpoint', err); | |
| } | |
| } else { | |
| console.log(`[WORKER] Enrolling User ${userId} in Free Track ${trackId}...`); | |
| const existing = await prisma.enrollment.findFirst({ where: { userId, trackId } }); | |
| if (!existing) { | |
| await prisma.enrollment.create({ | |
| data: { userId, trackId, status: 'ACTIVE', currentDay: 1 } | |
| }); | |
| const user = await prisma.user.findUnique({ where: { id: userId } }); | |
| if (user?.phone) { | |
| await sendTextMessage( | |
| user.phone, | |
| `🎉 Bienvenue dans *${track.title}* ! La génération de votre cours personnalisé (Jour 1) a commencé. Cela prendra environ 30 secondes...` | |
| ); | |
| // Immediately trigger Day 1 content generation | |
| const whatsappQueue = new Queue('whatsapp-queue', { connection: connection as any }); | |
| await whatsappQueue.add('send-content', { | |
| userId, | |
| trackId, | |
| dayNumber: 1 | |
| }); | |
| } | |
| } | |
| } | |
| } | |
| else if (job.name === 'download-media') { | |
| // ─── Audio download from Meta Graph API — Railway only ───────────── | |
| const { mediaId, mimeType, phone } = job.data; | |
| const traceId = `[STT-FLOW-${phone.slice(-4)}]`; | |
| // Always prioritize the live environment variable over stale job data from Redis | |
| const accessToken = process.env.WHATSAPP_ACCESS_TOKEN || job.data.accessToken; | |
| console.log(`${traceId} Downloading media ${mediaId} for ${phone}...`); | |
| if (!accessToken) { | |
| console.error(`[WORKER] Missing WHATSAPP_ACCESS_TOKEN for media ${mediaId}.`); | |
| return; | |
| } | |
| let transcribedText = ''; | |
| let audioUrl = ''; | |
| try { | |
| const { buffer } = await downloadMedia(mediaId, accessToken); | |
| console.log(`${traceId} Downloaded file size=${buffer.length} contentType=${mimeType}`); | |
| const AI_API_BASE_URL = getApiUrl(); | |
| const apiKey = getAdminApiKey(); | |
| // ─── Hardening: Store audio on R2 via the API ───────── | |
| try { | |
| const storeRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/store-audio`, { | |
| method: 'POST', | |
| headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` }, | |
| body: JSON.stringify({ audioBase64: buffer.toString('base64'), mimeType, phone }) | |
| }); | |
| if (storeRes.ok) { | |
| const storeData = await storeRes.json() as any; | |
| if (storeData.url) { | |
| audioUrl = storeData.url; | |
| console.log(`[R2] Inbound audio uploaded: ${audioUrl}`); | |
| } | |
| } | |
| } catch (err: unknown) { | |
| console.error('[WORKER] store-audio failed (inbound audio will not have a permanent link):', (err instanceof Error ? err.message : String(err))); | |
| } | |
| // ─── Hardening: Record Inbound Message in DB ────────── | |
| const user = await prisma.user.findFirst({ where: { phone } }); | |
| if (user) { | |
| try { | |
| await prisma.message.create({ | |
| data: { | |
| userId: user.id, | |
| direction: 'INBOUND', | |
| channel: 'WHATSAPP', | |
| mediaUrl: audioUrl || null, | |
| payload: job.data // Raw Meta payload from job | |
| } | |
| }); | |
| console.log(`[DB] Recorded inbound audio message for ${phone}`); | |
| } catch (dbErr: unknown) { | |
| console.error('[DB] Failed to record inbound message:', (dbErr as any)?.message); | |
| } | |
| } | |
| // ─── Routing: Transcribe if Audio, Forward if Image ───────── | |
| if (mimeType.startsWith('audio/')) { | |
| console.log(`${traceId} Transcribe start calling ${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/transcribe`); | |
| const transcribeRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/transcribe`, { | |
| method: 'POST', | |
| headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` }, | |
| body: JSON.stringify({ | |
| audioBase64: buffer.toString('base64'), | |
| filename: `msg.${mimeType.includes('mp4') ? 'mp4' : 'ogg'}`, | |
| language: user?.language // Pass language hint to Whisper | |
| }) | |
| }); | |
| if (transcribeRes.ok) { | |
| const data = await transcribeRes.json() as any; | |
| const isSuspect = data.isSuspect || false; | |
| const confidence = data.confidence || 0; | |
| transcribedText = data.text || ''; | |
| const user = await prisma.user.findFirst({ where: { phone } }); | |
| // 🌍 Normalisation Wolof STT v2.0 🌍 | |
| if (user?.language === 'WOLOF') { | |
| const originalText = transcribedText; | |
| const normResult = normalizeWolof(transcribedText); | |
| transcribedText = normResult.normalizedText; | |
| // Output correction feedback | |
| console.log(`[STT] Normalized: "${originalText}" -> "${transcribedText}"`); | |
| // Soft Feedback UI | |
| await sendTextMessage(phone, `Ma dégg na: "${transcribedText}" ✅\n(Confiance STT: ${confidence}%)`); | |
| if (normResult.changes.length > 0) { | |
| const limitedChanges = normResult.changes.slice(0, 2).join(", "); | |
| await sendTextMessage(phone, `Nataal bu gën: ${limitedChanges}`); | |
| } | |
| // 🚨 Wolof Auto-Validation Logic (Target: > 80%) 🚨 | |
| if (confidence <= 80) { | |
| console.log(`[STT] Whisper Confidence (${confidence}%) <= 80. Intercepting WOLOF audio for User ${user.id}. Shifting to PENDING_REVIEW.`); | |
| // First, make sure there is an active enrollment to find the trackId | |
| const activeEnrollment = await prisma.enrollment.findFirst({ | |
| where: { userId: user.id, status: 'ACTIVE' }, | |
| include: { track: true } | |
| }); | |
| if (activeEnrollment) { | |
| // 🚨 Brique 4 (Routage Deep Dive) : On ne bloque pas les audios itératifs | |
| const currentProgress = await prisma.userProgress.findUnique({ | |
| where: { userId_trackId: { userId: user.id, trackId: activeEnrollment.trackId } } | |
| }); | |
| if (currentProgress?.exerciseStatus !== 'PENDING_DEEPDIVE') { | |
| await prisma.userProgress.upsert({ | |
| where: { userId_trackId: { userId: user.id, trackId: activeEnrollment.trackId } }, | |
| update: { exerciseStatus: 'PENDING_REVIEW' as any, adminTranscription: transcribedText, confidenceScore: confidence }, | |
| create: { userId: user.id, trackId: activeEnrollment.trackId, exerciseStatus: 'PENDING_REVIEW' as any, adminTranscription: transcribedText, confidenceScore: confidence } | |
| }); | |
| await sendTextMessage(phone, "🎙️ Nyangi jaxas sa kàddu. Xamle dina la tontu ci kanam ! (En cours d'analyse par l'équipe)"); | |
| // Still save the audio URL to the message for the admin to read! | |
| await prisma.message.updateMany({ | |
| where: { userId: user.id, direction: 'INBOUND', mediaUrl: audioUrl }, | |
| data: { content: transcribedText } | |
| }).catch(() => { }); | |
| return; // Stop here, WAIT FOR ADMIN OVERRIDE | |
| } | |
| } else { | |
| // Edge case: not enrolled but sent audio... | |
| await sendTextMessage(phone, "Dama jaxaso ci li nga wax... Mën nga ko waxaat ndànk ?"); | |
| return; | |
| } | |
| } | |
| console.log(`[STT] transcribe result="${transcribedText.substring(0, 80)}" (suspect=${isSuspect}, confidence=${confidence}%)`); | |
| // 🌟 STT Hardening: Handle suspect transcription 🌟 | |
| if (isSuspect && user) { | |
| const fallbackMsg = user.language === 'WOLOF' | |
| ? "Dama jaxaso ci li nga wax... Mën nga ko waxaat ndànk ? (10-15s)" | |
| : "Je n'ai pas bien compris ton vocal. Pourrais-tu le renvoyer en parlant bien distinctement ? (10-15s)"; | |
| await sendTextMessage(phone, fallbackMsg); | |
| return; // Stop here, don't trigger AI generator | |
| } | |
| // Send an immediate confirmation to the user that the audio was understood | |
| if (user && transcribedText) { | |
| const confirmationText = `J'ai compris :\n"${transcribedText}"`; | |
| await sendTextMessage(phone, confirmationText); | |
| // Update the message record with transcribed text if found | |
| await prisma.message.updateMany({ | |
| where: { userId: user.id, direction: 'INBOUND', mediaUrl: audioUrl }, | |
| data: { content: transcribedText } | |
| }).catch(() => { }); | |
| } | |
| } // end WOLOF block | |
| // 🇫🇷 FR users: send confirmation (WOLOF users already got theirs above) | |
| if (user?.language !== 'WOLOF' && user && transcribedText) { | |
| console.log(`[STT] transcribe result="${transcribedText.substring(0, 80)}" (suspect=${isSuspect}, confidence=${confidence}%)`); | |
| if (isSuspect) { | |
| await sendTextMessage(phone, "Je n'ai pas bien compris ton vocal. Pourrais-tu le renvoyer en parlant bien distinctement ? (10-15s)"); | |
| return; | |
| } | |
| await sendTextMessage(phone, `🎙️ J'ai compris : "${transcribedText}"`); | |
| await prisma.message.updateMany({ | |
| where: { userId: user!.id, direction: 'INBOUND', mediaUrl: audioUrl }, | |
| data: { content: transcribedText } | |
| }).catch(() => { }); | |
| } | |
| // Process the transcribed text as a normal incoming message via API | |
| // ─── Routing: Process transcribed text ───────── | |
| if (transcribedText) { | |
| const traceId = `[STT-FLOW-${phone.slice(-4)}]`; | |
| console.log(`${traceId} Processing transcribed text via WhatsAppLogic...`); | |
| await WhatsAppLogic.handleIncomingMessage(phone, transcribedText, audioUrl); | |
| console.log(`${traceId} Inbound audio processing complete.`); | |
| } | |
| } else if (transcribeRes.status === 429) { | |
| // OpenAI quota exceeded — send fallback and do NOT requeue | |
| console.warn(`[WORKER] 429 Error during transcription`); | |
| const user = await prisma.user.findFirst({ where: { phone } }); | |
| if (user) { | |
| await sendTextMessage(phone, user.language === 'WOLOF' | |
| ? "⚠️ Mënuma dégg sa kàddu léegi (réseau bi dafa fees). Yónnee sa tontu ci bind (texte)." | |
| : "⚠️ Le service audio est temporairement saturé. Envoie ta réponse en texte." | |
| ); | |
| } | |
| return; // Stop processing | |
| } else { | |
| const errText = await transcribeRes.text().catch(() => `HTTP ${transcribeRes.status}`); | |
| console.error(`[WORKER] /v1/ai/transcribe failed with HTTP ${transcribeRes.status}: ${errText}`); | |
| throw new Error(`Transcription failed HTTP ${transcribeRes.status}`); // throw so BullMQ retries | |
| } | |
| } else if (mimeType.startsWith('image/')) { | |
| // 📸 IMAGE-FLOW: Build imageUrl from the R2 store result (same pattern as audioUrl for audio) | |
| let imageUrl = ''; | |
| try { | |
| const storeImgRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, '')}/v1/ai/store-audio`, { | |
| method: 'POST', | |
| headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` }, | |
| body: JSON.stringify({ audioBase64: buffer.toString('base64'), mimeType, phone }) | |
| }); | |
| if (storeImgRes.ok) { | |
| const storeImgData = await storeImgRes.json() as any; | |
| if (storeImgData.url) { | |
| imageUrl = storeImgData.url; | |
| console.log(`[IMAGE-FLOW] ✅ Image uploaded to R2: ${imageUrl}`); | |
| } | |
| } | |
| } catch (imgStoreErr: unknown) { | |
| console.error('[IMAGE-FLOW] R2 store failed (image will be analyzed without permanent URL):', (imgStoreErr as Error).message); | |
| } | |
| console.log(`[IMAGE-FLOW] 📸 Image detected for ${phone}. Routing to WhatsAppLogic (imageUrl: ${imageUrl || audioUrl || 'none'})...`); | |
| // Fallback: si imageUrl (upload #2) échoue, utiliser audioUrl (upload #1, même buffer, déjà réussi) | |
| const finalImageUrl = imageUrl || audioUrl || undefined; | |
| await WhatsAppLogic.handleIncomingMessage(phone, job.data.caption || 'Image reçue', undefined, finalImageUrl); | |
| console.log(`[IMAGE-FLOW] ✅ Inbound image processing complete.`); | |
| } | |
| } catch (err: unknown) { | |
| console.error(`[WORKER] download-media failed:`, err); | |
| } | |
| } | |
| else if (job.name === 'send-image') { | |
| const { to, imageUrl, caption } = job.data; | |
| try { | |
| await sendImageMessage(to, imageUrl, caption || ''); | |
| console.log(`[WhatsApp] ✅ Image message sent to ${to}`); | |
| } catch (err: unknown) { | |
| console.error(`[WORKER] send-image failed:`, (err instanceof Error ? err.message : String(err))); | |
| } | |
| } | |
| else if (job.name === 'send-content') { | |
| const { userId, trackId, dayNumber, testImageUrl } = job.data; | |
| if (testImageUrl && userId) { | |
| const u = await prisma.user.findUnique({ where: { id: userId } }); | |
| if (u?.phone) { | |
| await sendImageMessage(u.phone, testImageUrl, "Branding XAMLÉ 🇸🇳"); | |
| console.log(`[WhatsApp] ✅ Image message sent to ${u.phone}`); | |
| } | |
| return; | |
| } | |
| const user = await prisma.user.findUnique({ where: { id: userId } }); | |
| const trackDay = await prisma.trackDay.findFirst({ | |
| where: { trackId, dayNumber } | |
| }); | |
| if (trackDay) { | |
| await sendLessonDay(userId, trackId, dayNumber, { | |
| skipProgressUpdate: job.data.skipProgressUpdate === true | |
| }); | |
| // 🕰️ TIME-TRAVEL GUARD: Only update currentDay if this is NOT a historical replay | |
| // Without this guard, a Replay Day 11 would overwrite currentDay 12 → 11 | |
| if (!job.data.skipProgressUpdate) { | |
| await prisma.enrollment.updateMany({ | |
| where: { userId, trackId }, | |
| data: { | |
| currentDay: dayNumber, | |
| lastActivityAt: new Date() | |
| } | |
| }); | |
| } else { | |
| // Read-only replay: only touch lastActivityAt to keep heartbeat alive | |
| await prisma.enrollment.updateMany({ | |
| where: { userId, trackId }, | |
| data: { lastActivityAt: new Date() } | |
| }); | |
| console.log(`[SEND-CONTENT] 🕰️ Replay Day ${dayNumber} sent read-only. currentDay unchanged.`); | |
| } | |
| } else { | |
| console.log(`[WORKER] No more content for Track ${trackId} Day ${dayNumber}. Marking enrollment as completed.`); | |
| await prisma.enrollment.updateMany({ | |
| where: { userId, trackId }, | |
| data: { | |
| status: 'COMPLETED', | |
| lastActivityAt: new Date() | |
| } | |
| }); | |
| // 🎓 Graduation Flow: Successive Track Unlocking (v1.0) 🎓 | |
| const trackMatch = trackId.match(/^T(\d)-(FR|WO)$/); | |
| if (trackMatch) { | |
| const currentLevel = parseInt(trackMatch[1]); | |
| const lang = trackMatch[2]; | |
| const nextLevel = currentLevel + 1; | |
| const nextTrackId = `T${nextLevel}-${lang}`; | |
| const nextTrack = await prisma.track.findUnique({ where: { id: nextTrackId } }); | |
| if (nextTrack) { | |
| const existingNextEnrollment = await prisma.enrollment.findFirst({ | |
| where: { userId, trackId: nextTrackId } | |
| }); | |
| if (!existingNextEnrollment) { | |
| console.log(`[WORKER] Auto-graduating User ${userId}: ${trackId} -> ${nextTrackId}`); | |
| const isWolof = lang === 'WO'; | |
| const congratsMsg = isWolof | |
| ? `🎉 Baraka Allahu fik ! Mat nga Niveau ${currentLevel}. Maangi lay tàmbaleel Niveau ${nextLevel} : *${nextTrack.title}*...` | |
| : `🎉 Félicitations ! Vous avez validé le Niveau ${currentLevel}. Je vous inscris immédiatement au Niveau ${nextLevel} : *${nextTrack.title}*...`; | |
| if (user?.phone) { | |
| await sendTextMessage(user.phone, congratsMsg); | |
| if (!nextTrack.isPremium) { | |
| await prisma.enrollment.create({ | |
| data: { userId, trackId: nextTrackId, status: 'ACTIVE', currentDay: 1 } | |
| }); | |
| // Trigger Day 1 for next track with 10s delay to let graduation docs/badges arrive | |
| const q = new Queue('whatsapp-queue', { connection: connection as any }); | |
| await q.add('send-content', { userId, trackId: nextTrackId, dayNumber: 1 }, { delay: 10000 }); | |
| } else { | |
| const payMsg = isWolof | |
| ? `💳 Niveau ${nextLevel} bi dafa laaj pass. Yónnee ma "PAYER" ngir tàmbaleeti.` | |
| : `💳 Le Niveau ${nextLevel} est un module Premium. Envoyez "PAYER" pour le débloquer et continuer votre ascension !`; | |
| await sendTextMessage(user.phone, payMsg); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| // 🌟 Trigger AI Document Generation 🌟 | |
| console.log(`[WORKER] Triggering AI Document Generation for User ${userId}...`); | |
| try { | |
| const userWithProfile = await prisma.user.findUnique({ | |
| where: { id: userId }, | |
| include: { businessProfile: true } as any | |
| }) as any; | |
| const isWolof = userWithProfile?.language === 'WOLOF'; | |
| const userLangPrefix = isWolof ? "MBIR : " : "ACTIVITÉ : "; | |
| // Localize context to avoid English bias in LLM | |
| const userContext = `${userLangPrefix} ${userWithProfile?.businessProfile?.activityLabel || userWithProfile?.activity || 'Inconnue'}. Cet entrepreneur a terminé son parcours de formation XAMLÉ. Génère les documents basés sur son activité et les concepts appris.`; | |
| const AI_API_BASE_URL = getApiUrl(); | |
| const apiKey = getAdminApiKey(); | |
| const authHeaders = { | |
| 'Content-Type': 'application/json', | |
| 'Authorization': `Bearer ${apiKey}` | |
| }; | |
| console.log(`[WORKER] Calling ${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/onepager (${userWithProfile?.language || 'FR'})...`); | |
| const opRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/onepager`, { | |
| method: 'POST', | |
| headers: authHeaders, | |
| body: JSON.stringify({ | |
| userContext, | |
| language: userWithProfile?.language || 'FR', | |
| businessProfile: userWithProfile?.businessProfile | |
| }) | |
| }); | |
| const pdfData = await opRes.json() as any; | |
| console.log(`[WORKER] Calling ${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/deck (${userWithProfile?.language || 'FR'})...`); | |
| const deckRes = await fetch(`${AI_API_BASE_URL.replace(/\/$/, "")}/v1/ai/deck`, { | |
| method: 'POST', | |
| headers: authHeaders, | |
| body: JSON.stringify({ | |
| userContext, | |
| language: userWithProfile?.language || 'FR', | |
| businessProfile: userWithProfile?.businessProfile | |
| }) | |
| }); | |
| const pptxData = await deckRes.json() as any; | |
| console.log(`[AI DOCS READY] 📄 PDF: ${pdfData.url}`); | |
| console.log(`[AI DOCS READY] 📊 PPTX: ${pptxData.url}`); | |
| // Send documents to user via WhatsApp | |
| if (user?.phone) { | |
| if (pdfData.url) { | |
| await sendDocumentMessage( | |
| user.phone, | |
| pdfData.url, | |
| isWolof ? 'one-pager-xamle.pdf' : 'mon-business-plan.pdf', | |
| isWolof ? '📄 Sa One-Pager mu ngi nii !' : '📄 Votre One-Pager est prêt !' | |
| ); | |
| } | |
| if (pptxData.url) { | |
| await sendDocumentMessage( | |
| user.phone, | |
| pptxData.url, | |
| isWolof ? 'pitch-deck-xamle.pptx' : 'mon-pitch-deck.pptx', | |
| isWolof ? '📊 Sa Pitch Deck mu ngi nii !' : '📊 Votre Pitch Deck est prêt !' | |
| ); | |
| } | |
| // 🚨 Track AI source for documents via Response table | |
| // @ts-ignore - Prisma types may be out of sync after schema update | |
| await prisma.response.create({ | |
| data: { | |
| userId, | |
| enrollmentId: userWithProfile?.enrollments?.[0]?.id || 0, | |
| dayNumber: (dayNumber || 1) + 1, // Indicate graduation/end of track | |
| content: `AI Documents Generated. PDF: ${pdfData.aiSource || '?'}, PPTX: ${pptxData.aiSource || '?'}`, | |
| aiSource: 'SYSTEM' | |
| } | |
| }).catch(() => { }); | |
| } | |
| } catch (aiError) { | |
| console.error('[WORKER] Failed to generate AI documents:', aiError); | |
| } | |
| } | |
| } | |
| else if (job.name === 'send-admin-audio-override') { | |
| const { userId, trackId, overrideAudioUrl, adminId } = job.data; | |
| const user = await prisma.user.findUnique({ where: { id: userId } }); | |
| if (user?.phone) { | |
| // 1. Send the Admin's Voice Message | |
| const { sendAudioMessage } = await import('./whatsapp-cloud'); | |
| await sendAudioMessage(user.phone, overrideAudioUrl); | |
| // 2. Send transition prompt | |
| await sendTextMessage(user.phone, | |
| user.language === 'WOLOF' | |
| ? "Baax na ! Yónnee *SUITE* ngir dem ci kanam." | |
| : "Bravo ! Envoyez *SUITE* pour passer à la leçon suivante." | |
| ); | |
| console.log(`[WORKER] Admin ${adminId} Audio Overdrive sent to User ${userId}.`); | |
| // 3. Increment the logic via Queue so that user doesn't fall behind. | |
| const enrollment = await prisma.enrollment.findFirst({ | |
| where: { userId, trackId, status: 'ACTIVE' } | |
| }); | |
| if (enrollment) { | |
| const nextDay = Math.floor(enrollment.currentDay) + 1; | |
| const q = new Queue('whatsapp-queue', { connection: connection as any }); | |
| await q.add('send-content', { userId, trackId, dayNumber: nextDay }, { delay: 2000 }); | |
| } | |
| } | |
| } | |
| } catch (error) { | |
| console.error(`Job ${job.id} failed:`, error); | |
| throw error; | |
| } | |
| }, { connection: connection as any }); | |
| console.log('WhatsApp Worker started...'); | |
| // 🚀 Start the daily cron scheduler | |
| import { startDailyScheduler } from './scheduler'; | |
| startDailyScheduler(); | |
| worker.on('completed', job => { | |
| console.log(`[WORKER] Job ${job.id} has completed!`); | |
| }); | |
| worker.on('failed', (job, err) => { | |
| console.error(`[WORKER] Job ${job?.id} has failed with ${(err instanceof Error ? err.message : String(err))}`); | |
| }); | |