CognxSafeTrack commited on
Commit
f4e46e8
ยท
1 Parent(s): 2e888dc

Fix: Add Garbage Collector (GC) for media and temp files

Browse files
apps/api/package.json CHANGED
@@ -25,6 +25,7 @@
25
  "fastify": "^4.0.0",
26
  "fastify-plugin": "^4.5.1",
27
  "ioredis": "^5.9.3",
 
28
  "openai": "^4.0.0",
29
  "pptxgenjs": "^3.12.0",
30
  "puppeteer": "^22.0.0",
@@ -37,6 +38,7 @@
37
  "@types/dotenv": "^8.2.3",
38
  "@types/fast-levenshtein": "^0.0.4",
39
  "@types/node": "^20.0.0",
 
40
  "@vitest/ui": "^4.0.18",
41
  "tsx": "^3.0.0",
42
  "typescript": "^5.0.0",
 
25
  "fastify": "^4.0.0",
26
  "fastify-plugin": "^4.5.1",
27
  "ioredis": "^5.9.3",
28
+ "node-cron": "^4.2.1",
29
  "openai": "^4.0.0",
30
  "pptxgenjs": "^3.12.0",
31
  "puppeteer": "^22.0.0",
 
38
  "@types/dotenv": "^8.2.3",
39
  "@types/fast-levenshtein": "^0.0.4",
40
  "@types/node": "^20.0.0",
41
+ "@types/node-cron": "^3.0.11",
42
  "@vitest/ui": "^4.0.18",
43
  "tsx": "^3.0.0",
44
  "typescript": "^5.0.0",
apps/api/src/index.ts CHANGED
@@ -10,6 +10,7 @@ import { aiRoutes } from './routes/ai';
10
  import { paymentRoutes, stripeWebhookRoute } from './routes/payments';
11
  import { internalRoutes } from './routes/internal';
12
  import { studentRoutes } from './routes/student';
 
13
 
14
  // โ”€โ”€ Fail-fast: vรฉrifier les secrets critiques au dรฉmarrage โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
15
  // Only WHATSAPP_VERIFY_TOKEN is strictly needed at startup (for Meta webhook validation).
@@ -203,6 +204,7 @@ server.get('/v1/privacy', async (_req, reply) => {
203
  const start = async () => {
204
  try {
205
  await setupRateLimit();
 
206
  const port = parseInt(process.env.PORT || '8080');
207
  const isGateway = process.env.IS_GATEWAY === 'true' || process.env.HF_SPACE_ID !== undefined;
208
  logger.info(`[STARTUP] Mode: ${isGateway ? 'GATEWAY (Forwarding Only)' : 'DIRECT (Processing)'}`);
 
10
  import { paymentRoutes, stripeWebhookRoute } from './routes/payments';
11
  import { internalRoutes } from './routes/internal';
12
  import { studentRoutes } from './routes/student';
13
+ import { startCleanupCron } from './services/cleanup';
14
 
15
  // โ”€โ”€ Fail-fast: vรฉrifier les secrets critiques au dรฉmarrage โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
16
  // Only WHATSAPP_VERIFY_TOKEN is strictly needed at startup (for Meta webhook validation).
 
204
  const start = async () => {
205
  try {
206
  await setupRateLimit();
207
+ startCleanupCron();
208
  const port = parseInt(process.env.PORT || '8080');
209
  const isGateway = process.env.IS_GATEWAY === 'true' || process.env.HF_SPACE_ID !== undefined;
210
  logger.info(`[STARTUP] Mode: ${isGateway ? 'GATEWAY (Forwarding Only)' : 'DIRECT (Processing)'}`);
apps/api/src/services/cleanup.ts ADDED
@@ -0,0 +1,62 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { logger } from '../logger';
2
+ import fs from 'fs/promises';
3
+ import path from 'path';
4
+ import cron from 'node-cron';
5
+
6
+ /**
7
+ * Cleanup Service
8
+ *
9
+ * Scans /tmp for old media files and deletes them to prevent disk saturation.
10
+ */
11
+ export async function cleanTempFiles(maxAgeMs: number = 2 * 60 * 60 * 1000) {
12
+ const tempDir = '/tmp';
13
+ const now = Date.now();
14
+
15
+ try {
16
+ const folders = ['', 'audio', 'images', 'documents'];
17
+
18
+ for (const folder of folders) {
19
+ const dirPath = path.join(tempDir, folder);
20
+
21
+ // Check if directory exists
22
+ try {
23
+ await fs.access(dirPath);
24
+ } catch {
25
+ continue;
26
+ }
27
+
28
+ const files = await fs.readdir(dirPath);
29
+
30
+ for (const file of files) {
31
+ const filePath = path.join(dirPath, file);
32
+
33
+ // Skip directories
34
+ const stats = await fs.stat(filePath);
35
+ if (stats.isDirectory()) continue;
36
+
37
+ // Check age
38
+ const age = now - stats.mtimeMs;
39
+ if (age > maxAgeMs) {
40
+ await fs.unlink(filePath);
41
+ logger.info(`[CLEANUP] Deleted old temp file: ${filePath} (${Math.round(age / 60000)} min old)`);
42
+ }
43
+ }
44
+ }
45
+ } catch (err: unknown) {
46
+ logger.error(`[CLEANUP] Error during temp file cleanup:`, (err instanceof Error ? err.message : String(err)));
47
+ }
48
+ }
49
+
50
+ /**
51
+ * Starts a cron job that runs every hour to clean up /tmp.
52
+ */
53
+ export function startCleanupCron() {
54
+ // Run every hour at minute 0
55
+ cron.schedule('0 * * * *', () => {
56
+ logger.info('[CLEANUP] ๐Ÿงน Starting scheduled temp file cleanup...');
57
+ cleanTempFiles();
58
+ });
59
+
60
+ // Also run once at startup
61
+ cleanTempFiles();
62
+ }
apps/whatsapp-worker/src/index.ts CHANGED
@@ -11,11 +11,13 @@ import { updateBehavioralScore } from './scoring';
11
  import { normalizeWolof } from './normalizeWolof';
12
  import { getApiUrl, getAdminApiKey, validateEnvironment, isFeatureEnabled } from './config';
13
  import { WhatsAppLogic } from './services/whatsapp-logic';
 
14
 
15
  dotenv.config();
16
 
17
  // ๐Ÿš€ CRITICAL: Validate environment variables at boot
18
  validateEnvironment();
 
19
 
20
  const prisma = new PrismaClient();
21
 
@@ -175,11 +177,11 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
175
  // ๐ŸŒŸ Adaptive Pedagogy: Dynamic Remediation & Diagnostic Logic v1.1 ๐ŸŒŸ
176
  // ๐Ÿšจ RACE CONDITION FIX: Update UserProgress strictly BEFORE sending the message over WhatsApp.
177
  let nextDay = currentDay + 1;
178
- const currentProgress = await prisma.userProgress.findUnique({
179
  where: { userId_trackId: { userId, trackId } },
180
  include: { userBadges: true }
181
  });
182
- const currentBadges = currentProgress?.userBadges.map(b => b.name) || [];
183
  let updatedBadges = [...currentBadges];
184
 
185
  if (feedbackData?.isQualified === false) {
@@ -263,7 +265,7 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
263
  if (!isTimeTravelMode) {
264
  const newBadges = updatedBadges.filter(b => !currentBadges.includes(b));
265
 
266
- await prisma.userProgress.update({
267
  where: { userId_trackId: { userId, trackId } },
268
  data: {
269
  exerciseStatus: newStatus,
@@ -294,7 +296,7 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
294
  // Let's do a clean replacement for the SQL relation to stay consistent with the JSON "replace" logic.
295
  const profile = await prisma.businessProfile.findUnique({ where: { userId } });
296
  if (profile) {
297
- await prisma.teamMember.deleteMany({ where: { businessProfileId: profile.id } });
298
  updatePayload.teamMembersList = {
299
  create: newMembers.map((m: any) => ({
300
  name: m.name || m.fullName || 'Unknown',
 
11
  import { normalizeWolof } from './normalizeWolof';
12
  import { getApiUrl, getAdminApiKey, validateEnvironment, isFeatureEnabled } from './config';
13
  import { WhatsAppLogic } from './services/whatsapp-logic';
14
+ import { startWorkerCleanupCron } from './services/cleanup';
15
 
16
  dotenv.config();
17
 
18
  // ๐Ÿš€ CRITICAL: Validate environment variables at boot
19
  validateEnvironment();
20
+ startWorkerCleanupCron();
21
 
22
  const prisma = new PrismaClient();
23
 
 
177
  // ๐ŸŒŸ Adaptive Pedagogy: Dynamic Remediation & Diagnostic Logic v1.1 ๐ŸŒŸ
178
  // ๐Ÿšจ RACE CONDITION FIX: Update UserProgress strictly BEFORE sending the message over WhatsApp.
179
  let nextDay = currentDay + 1;
180
+ const currentProgress = await (prisma as any).userProgress.findUnique({
181
  where: { userId_trackId: { userId, trackId } },
182
  include: { userBadges: true }
183
  });
184
+ const currentBadges = (currentProgress?.userBadges || []).map((b: any) => b.name);
185
  let updatedBadges = [...currentBadges];
186
 
187
  if (feedbackData?.isQualified === false) {
 
265
  if (!isTimeTravelMode) {
266
  const newBadges = updatedBadges.filter(b => !currentBadges.includes(b));
267
 
268
+ await (prisma as any).userProgress.update({
269
  where: { userId_trackId: { userId, trackId } },
270
  data: {
271
  exerciseStatus: newStatus,
 
296
  // Let's do a clean replacement for the SQL relation to stay consistent with the JSON "replace" logic.
297
  const profile = await prisma.businessProfile.findUnique({ where: { userId } });
298
  if (profile) {
299
+ await (prisma as any).teamMember.deleteMany({ where: { businessProfileId: profile.id } });
300
  updatePayload.teamMembersList = {
301
  create: newMembers.map((m: any) => ({
302
  name: m.name || m.fullName || 'Unknown',
apps/whatsapp-worker/src/services/cleanup.ts ADDED
@@ -0,0 +1,55 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { logger } from '../logger';
2
+ import fs from 'fs/promises';
3
+ import path from 'path';
4
+ import cron from 'node-cron';
5
+
6
+ /**
7
+ * Worker Cleanup Service
8
+ *
9
+ * Safety guard to purge any temporary files in /tmp on the worker instance.
10
+ */
11
+ export async function cleanWorkerTempFiles(maxAgeMs: number = 2 * 60 * 60 * 1000) {
12
+ const tempDir = '/tmp';
13
+ const now = Date.now();
14
+
15
+ try {
16
+ const folders = ['', 'audio', 'images', 'documents'];
17
+
18
+ for (const folder of folders) {
19
+ const dirPath = path.join(tempDir, folder);
20
+
21
+ try {
22
+ await fs.access(dirPath);
23
+ } catch {
24
+ continue;
25
+ }
26
+
27
+ const files = await fs.readdir(dirPath);
28
+
29
+ for (const file of files) {
30
+ const filePath = path.join(dirPath, file);
31
+ const stats = await fs.stat(filePath);
32
+ if (stats.isDirectory()) continue;
33
+
34
+ const age = now - stats.mtimeMs;
35
+ if (age > maxAgeMs) {
36
+ await fs.unlink(filePath);
37
+ logger.info(`[CLEANUP-WORKER] Deleted old file: ${filePath} (${Math.round(age / 60000)} min old)`);
38
+ }
39
+ }
40
+ }
41
+ } catch (err: unknown) {
42
+ logger.error(`[CLEANUP-WORKER] Error:`, (err instanceof Error ? err.message : String(err)));
43
+ }
44
+ }
45
+
46
+ export function startWorkerCleanupCron() {
47
+ // Run every hour
48
+ cron.schedule('0 * * * *', () => {
49
+ logger.info('[CLEANUP-WORKER] ๐Ÿงน Starting cleanup...');
50
+ cleanWorkerTempFiles();
51
+ });
52
+
53
+ // Initial run
54
+ cleanWorkerTempFiles();
55
+ }
pnpm-lock.yaml CHANGED
@@ -118,6 +118,9 @@ importers:
118
  ioredis:
119
  specifier: ^5.9.3
120
  version: 5.9.3
 
 
 
121
  openai:
122
  specifier: ^4.0.0
123
  version: 4.104.0(ws@8.19.0)(zod@3.25.76)
@@ -149,6 +152,9 @@ importers:
149
  '@types/node':
150
  specifier: ^20.0.0
151
  version: 20.19.33
 
 
 
152
  '@vitest/ui':
153
  specifier: ^4.0.18
154
  version: 4.0.18(vitest@4.0.18)
 
118
  ioredis:
119
  specifier: ^5.9.3
120
  version: 5.9.3
121
+ node-cron:
122
+ specifier: ^4.2.1
123
+ version: 4.2.1
124
  openai:
125
  specifier: ^4.0.0
126
  version: 4.104.0(ws@8.19.0)(zod@3.25.76)
 
152
  '@types/node':
153
  specifier: ^20.0.0
154
  version: 20.19.33
155
+ '@types/node-cron':
156
+ specifier: ^3.0.11
157
+ version: 3.0.11
158
  '@vitest/ui':
159
  specifier: ^4.0.18
160
  version: 4.0.18(vitest@4.0.18)