CognxSafeTrack commited on
Commit
41fe9a9
Β·
1 Parent(s): 9ca5873

feat(admin): human-in-the-loop dashboard base, add PENDING_REVIEW logic and audio overdrive worker

Browse files
apps/api/src/routes/admin.ts CHANGED
@@ -1,5 +1,6 @@
1
  import { FastifyInstance } from 'fastify';
2
  import { prisma } from '../services/prisma';
 
3
  import { z } from 'zod';
4
 
5
  // ─── Zod Schemas ───────────────────────────────────────────────────────────────
@@ -25,6 +26,14 @@ const TrackDaySchema = z.object({
25
  unlockCondition: z.string().optional(),
26
  });
27
 
 
 
 
 
 
 
 
 
28
  export async function adminRoutes(fastify: FastifyInstance) {
29
 
30
  // ── Dashboard Stats ────────────────────────────────────────────────────────
@@ -70,6 +79,90 @@ export async function adminRoutes(fastify: FastifyInstance) {
70
  return enrollments;
71
  });
72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  // ══════════════════════════════════════════════════════════════════════════
74
  // TRACKS CRUD
75
  // ══════════════════════════════════════════════════════════════════════════
 
1
  import { FastifyInstance } from 'fastify';
2
  import { prisma } from '../services/prisma';
3
+ import { whatsappQueue } from '../services/queue';
4
  import { z } from 'zod';
5
 
6
  // ─── Zod Schemas ───────────────────────────────────────────────────────────────
 
26
  unlockCondition: z.string().optional(),
27
  });
28
 
29
+ const OverrideFeedbackSchema = z.object({
30
+ userId: z.string(),
31
+ trackId: z.string(),
32
+ transcription: z.string().min(1),
33
+ overrideAudioUrl: z.string().url(),
34
+ adminId: z.string()
35
+ });
36
+
37
  export async function adminRoutes(fastify: FastifyInstance) {
38
 
39
  // ── Dashboard Stats ────────────────────────────────────────────────────────
 
79
  return enrollments;
80
  });
81
 
82
+ // ── Human-in-the-Loop / Audio Overdrive ────────────────────────────────────
83
+
84
+ // LIVE FEED : Students blocked waiting for manual review
85
+ fastify.get('/admin/live-feed', async () => {
86
+ const pendingReviews = await prisma.userProgress.findMany({
87
+ where: {
88
+ exerciseStatus: 'PENDING_REVIEW',
89
+ user: { language: 'WOLOF' } // Currently only focusing on Wolof interceptions
90
+ },
91
+ include: {
92
+ user: { select: { id: true, name: true, phone: true, activity: true, language: true } },
93
+ track: { select: { id: true, title: true } }
94
+ },
95
+ orderBy: { updatedAt: 'asc' }
96
+ });
97
+
98
+ // Map the raw payload to find the actual response audio for each pending review
99
+ const liveFeed = await Promise.all(pendingReviews.map(async (progress) => {
100
+ const enrollment = await prisma.enrollment.findFirst({
101
+ where: { userId: progress.userId, trackId: progress.trackId, status: 'ACTIVE' }
102
+ });
103
+
104
+ // If no active enrollment found, fallback gracefully
105
+ if (!enrollment) return { ...progress, lastResponse: null };
106
+
107
+ // Find the most recent response from this user for this enrollment
108
+ const lastResponse = await prisma.response.findFirst({
109
+ where: { userId: progress.userId, enrollmentId: enrollment.id },
110
+ orderBy: { createdAt: 'desc' }
111
+ });
112
+
113
+ return {
114
+ ...progress,
115
+ audioUrl: lastResponse?.mediaUrl || null,
116
+ content: lastResponse?.content || null,
117
+ dayNumber: lastResponse?.dayNumber || Math.floor(enrollment.currentDay)
118
+ };
119
+ }));
120
+
121
+ return liveFeed;
122
+ });
123
+
124
+ // OVERRIDE ACTION : Admin posts the manual review
125
+ fastify.post('/admin/override-feedback', async (req, reply) => {
126
+ const body = OverrideFeedbackSchema.safeParse(req.body);
127
+ if (!body.success) return reply.code(400).send({ error: body.error.flatten() });
128
+
129
+ const { userId, trackId, transcription, overrideAudioUrl, adminId } = body.data;
130
+
131
+ // 1. Update UserProgress status & logs
132
+ const progress = await prisma.userProgress.update({
133
+ where: { userId_trackId: { userId, trackId } },
134
+ data: {
135
+ exerciseStatus: 'COMPLETED',
136
+ adminTranscription: transcription,
137
+ overrideAudioUrl: overrideAudioUrl,
138
+ reviewedBy: adminId
139
+ }
140
+ });
141
+
142
+ // 2. Update BusinessProfile with human-cleaned transcription
143
+ const enrollment = await prisma.enrollment.findFirst({
144
+ where: { userId, trackId, status: 'ACTIVE' }
145
+ });
146
+
147
+ const currentDay = enrollment ? Math.floor(enrollment.currentDay) : 0;
148
+
149
+ await prisma.businessProfile.upsert({
150
+ where: { userId },
151
+ update: { lastUpdatedFromDay: currentDay },
152
+ create: { userId, lastUpdatedFromDay: currentDay }
153
+ });
154
+
155
+ // 3. Dispatch Background Job (Audio Delivery + Next Day Increment)
156
+ await whatsappQueue.add('send-admin-audio-override', {
157
+ userId,
158
+ trackId,
159
+ overrideAudioUrl,
160
+ adminId
161
+ });
162
+
163
+ return reply.code(200).send({ ok: true, progress });
164
+ });
165
+
166
  // ══════════════════════════════════════════════════════════════════════════
167
  // TRACKS CRUD
168
  // ══════════════════════════════════════════════════════════════════════════
apps/whatsapp-worker/src/index.ts CHANGED
@@ -60,6 +60,19 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
60
  });
61
 
62
  console.log(`[WORKER] Generating expert feedback for User ${userId}`);
 
 
 
 
 
 
 
 
 
 
 
 
 
63
  const AI_API_BASE_URL = getApiUrl();
64
  const apiKey = getAdminApiKey();
65
 
@@ -696,6 +709,36 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
696
  }
697
  }
698
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
699
  } catch (error) {
700
  console.error(`Job ${job.id} failed:`, error);
701
  throw error;
 
60
  });
61
 
62
  console.log(`[WORKER] Generating expert feedback for User ${userId}`);
63
+
64
+ // 🚨 HUMAN-IN-THE-LOOP INTERCEPTION
65
+ // If the user's language is WOLOF, we pause AI interpretation and wait for an Admin Review
66
+ if (language === 'WOLOF') {
67
+ console.log(`[WORKER] Intercepting WOLOF audio for User ${userId}. Shifting to PENDING_REVIEW.`);
68
+ await prisma.userProgress.upsert({
69
+ where: { userId_trackId: { userId, trackId } },
70
+ update: { exerciseStatus: 'PENDING_REVIEW' as any },
71
+ create: { userId, trackId, exerciseStatus: 'PENDING_REVIEW' as any }
72
+ });
73
+ await sendTextMessage(user.phone, "πŸŽ™οΈ Nyangi jaxas sa kΓ ddu. Xamle dina la tontu ci kanam ! (En cours d'analyse)");
74
+ return; // Stop job execution, wait for Admin
75
+ }
76
  const AI_API_BASE_URL = getApiUrl();
77
  const apiKey = getAdminApiKey();
78
 
 
709
  }
710
  }
711
  }
712
+ else if (job.name === 'send-admin-audio-override') {
713
+ const { userId, trackId, overrideAudioUrl, adminId } = job.data;
714
+ const user = await prisma.user.findUnique({ where: { id: userId } });
715
+
716
+ if (user?.phone) {
717
+ // 1. Send the Admin's Voice Message
718
+ const { sendAudioMessage } = await import('./whatsapp-cloud');
719
+ await sendAudioMessage(user.phone, overrideAudioUrl);
720
+
721
+ // 2. Send transition prompt
722
+ await sendTextMessage(user.phone,
723
+ user.language === 'WOLOF'
724
+ ? "Baax na ! YΓ³nnee *SUITE* ngir dem ci kanam."
725
+ : "Bravo ! Envoyez *SUITE* pour passer Γ  la leΓ§on suivante."
726
+ );
727
+
728
+ console.log(`[WORKER] Admin ${adminId} Audio Overdrive sent to User ${userId}.`);
729
+
730
+ // 3. Increment the logic via Queue so that user doesn't fall behind.
731
+ const enrollment = await prisma.enrollment.findFirst({
732
+ where: { userId, trackId, status: 'ACTIVE' }
733
+ });
734
+
735
+ if (enrollment) {
736
+ const nextDay = Math.floor(enrollment.currentDay) + 1;
737
+ const q = new Queue('whatsapp-queue', { connection: connection as any });
738
+ await q.add('send-content', { userId, trackId, dayNumber: nextDay }, { delay: 2000 });
739
+ }
740
+ }
741
+ }
742
  } catch (error) {
743
  console.error(`Job ${job.id} failed:`, error);
744
  throw error;
packages/database/prisma/schema.prisma CHANGED
@@ -101,6 +101,9 @@ model UserProgress {
101
  exerciseStatus ExerciseStatus @default(PENDING)
102
  badges Json? // Array of strings: ["CLARTE", "CONFIANCE"]
103
  behavioralScoring Json? // { discipline_financiere: 0, organisation: 0, ... }
 
 
 
104
  createdAt DateTime @default(now())
105
  updatedAt DateTime @updatedAt
106
 
@@ -212,5 +215,6 @@ enum ExerciseType {
212
  enum ExerciseStatus {
213
  PENDING
214
  PENDING_REMEDIATION
 
215
  COMPLETED
216
  }
 
101
  exerciseStatus ExerciseStatus @default(PENDING)
102
  badges Json? // Array of strings: ["CLARTE", "CONFIANCE"]
103
  behavioralScoring Json? // { discipline_financiere: 0, organisation: 0, ... }
104
+ adminTranscription String?
105
+ overrideAudioUrl String?
106
+ reviewedBy String?
107
  createdAt DateTime @default(now())
108
  updatedAt DateTime @updatedAt
109
 
 
215
  enum ExerciseStatus {
216
  PENDING
217
  PENDING_REMEDIATION
218
+ PENDING_REVIEW
219
  COMPLETED
220
  }