CognxSafeTrack Claude Sonnet 4.6 commited on
Commit
fae3e96
·
1 Parent(s): f5126bc

fix(security): Sprint 1 — five critical debt fixes

Browse files

C1: Await debitWallet() in usage-tracker and broadcast/campaign handlers
so billing failures are caught and logged, not silently swallowed.

C2+C3: Rewrite debitWallet() as a single Prisma interactive transaction:
balanceAfter now set atomically; isHardStopped flag set in the same
transaction as the balance decrement — no window between debit and stop.

C4: Add X-Hub-Signature-256 HMAC-SHA256 verification to POST /webhook.
Uses preParsing hook for raw body capture; fails open when
WHATSAPP_APP_SECRET is not configured.

H3: Fix payment webhook race condition — use updateMany(where: PENDING)
as atomic idempotency guard; only the request that flips PENDING→COMPLETED
proceeds to enrollment creation.

H7: Replace $queryRawUnsafe/$executeRawUnsafe with $queryRaw/$executeRaw
Prisma.sql template literals in indexing.ts; user-controlled values
(organizationId, limit) are properly parametrized.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

apps/api/src/routes/payments.ts CHANGED
@@ -97,12 +97,16 @@ async function handleWebhook(
97
  const payment = await prisma.payment.findUnique({ where: { id: event.reference } });
98
  if (!payment) return reply.code(200).send({ ok: true }); // idempotent — don't fail unknown refs
99
 
100
- if (payment.status === 'COMPLETED') return reply.code(200).send({ ok: true }); // already processed
101
-
102
  if (event.status === 'SUCCESS') {
103
- await prisma.payment.update({ where: { id: payment.id }, data: { status: 'COMPLETED' } });
 
 
 
 
 
 
104
 
105
- // Auto-enroll the user in the track
106
  const existing = await prisma.enrollment.findFirst({
107
  where: { userId: payment.userId, trackId: payment.trackId },
108
  });
@@ -116,10 +120,10 @@ async function handleWebhook(
116
  currentDay: 1,
117
  },
118
  });
119
- logger.info({ userId: payment.userId, trackId: payment.trackId }, '[PAYMENT-WEBHOOK] Auto-enrolled after payment');
120
  }
 
121
  } else if (event.status === 'FAILED') {
122
- await prisma.payment.update({ where: { id: payment.id }, data: { status: 'FAILED' } });
123
  }
124
 
125
  return reply.code(200).send({ ok: true });
 
97
  const payment = await prisma.payment.findUnique({ where: { id: event.reference } });
98
  if (!payment) return reply.code(200).send({ ok: true }); // idempotent — don't fail unknown refs
99
 
 
 
100
  if (event.status === 'SUCCESS') {
101
+ // Atomic idempotency: only the request that flips PENDING COMPLETED proceeds.
102
+ // Concurrent webhooks for the same payment will get count=0 and exit early.
103
+ const { count } = await prisma.payment.updateMany({
104
+ where: { id: payment.id, status: 'PENDING' },
105
+ data: { status: 'COMPLETED' },
106
+ });
107
+ if (count === 0) return reply.code(200).send({ ok: true });
108
 
109
+ // Only one request reaches here (guaranteed by updateMany above), so no race on create
110
  const existing = await prisma.enrollment.findFirst({
111
  where: { userId: payment.userId, trackId: payment.trackId },
112
  });
 
120
  currentDay: 1,
121
  },
122
  });
 
123
  }
124
+ logger.info({ userId: payment.userId, trackId: payment.trackId }, '[PAYMENT-WEBHOOK] Auto-enrolled after payment');
125
  } else if (event.status === 'FAILED') {
126
+ await prisma.payment.updateMany({ where: { id: payment.id, status: 'PENDING' }, data: { status: 'FAILED' } });
127
  }
128
 
129
  return reply.code(200).send({ ok: true });
apps/api/src/routes/whatsapp.ts CHANGED
@@ -1,4 +1,5 @@
1
  import { FastifyInstance } from 'fastify';
 
2
  import { logger } from '../logger';
3
  import { z } from 'zod';
4
 
@@ -28,6 +29,18 @@ const WebhookSchema = z.object({
28
  });
29
 
30
  export async function whatsappRoutes(fastify: FastifyInstance) {
 
 
 
 
 
 
 
 
 
 
 
 
31
  fastify.get('/webhook', async (request, reply) => {
32
  const query = request.query as Record<string, string | undefined>;
33
  const mode = query['hub.mode'];
@@ -52,6 +65,28 @@ export async function whatsappRoutes(fastify: FastifyInstance) {
52
  * Main entry point for incoming messages and events
53
  */
54
  fastify.post('/webhook', async (request, reply) => {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  const result = WebhookSchema.safeParse(request.body);
56
 
57
  if (!result.success) {
 
1
  import { FastifyInstance } from 'fastify';
2
+ import crypto from 'crypto';
3
  import { logger } from '../logger';
4
  import { z } from 'zod';
5
 
 
29
  });
30
 
31
  export async function whatsappRoutes(fastify: FastifyInstance) {
32
+ // Capture raw body for X-Hub-Signature-256 verification on POST /webhook
33
+ fastify.addHook('preParsing', async (_request, _reply, payload) => {
34
+ const chunks: Buffer[] = [];
35
+ for await (const chunk of payload) {
36
+ chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string));
37
+ }
38
+ const raw = Buffer.concat(chunks);
39
+ (_request as any).rawBody = raw;
40
+ const { Readable } = await import('stream');
41
+ return Readable.from(raw);
42
+ });
43
+
44
  fastify.get('/webhook', async (request, reply) => {
45
  const query = request.query as Record<string, string | undefined>;
46
  const mode = query['hub.mode'];
 
65
  * Main entry point for incoming messages and events
66
  */
67
  fastify.post('/webhook', async (request, reply) => {
68
+ // Verify X-Hub-Signature-256 — fail-open if WHATSAPP_APP_SECRET is not configured
69
+ const APP_SECRET = process.env.WHATSAPP_APP_SECRET;
70
+ if (APP_SECRET) {
71
+ const sigHeader = request.headers['x-hub-signature-256'] as string | undefined;
72
+ const rawBody = (request as any).rawBody as Buffer | undefined;
73
+ if (!sigHeader || !rawBody) {
74
+ logger.warn('[WHATSAPP-WEBHOOK] Missing X-Hub-Signature-256 header');
75
+ return reply.code(401).send({ error: 'Unauthorized' });
76
+ }
77
+ const expected = `sha256=${crypto.createHmac('sha256', APP_SECRET).update(rawBody).digest('hex')}`;
78
+ try {
79
+ if (!crypto.timingSafeEqual(Buffer.from(sigHeader), Buffer.from(expected))) {
80
+ logger.warn('[WHATSAPP-WEBHOOK] Invalid X-Hub-Signature-256 — rejecting');
81
+ return reply.code(401).send({ error: 'Unauthorized' });
82
+ }
83
+ } catch {
84
+ // Buffer length mismatch (different signature format)
85
+ logger.warn('[WHATSAPP-WEBHOOK] Signature comparison failed — rejecting');
86
+ return reply.code(401).send({ error: 'Unauthorized' });
87
+ }
88
+ }
89
+
90
  const result = WebhookSchema.safeParse(request.body);
91
 
92
  if (!result.success) {
apps/whatsapp-worker/src/handlers/BroadcastHandler.ts CHANGED
@@ -95,12 +95,16 @@ export class BroadcastHandler implements JobHandler {
95
  logger.info(`[BroadcastHandler] Broadcast finished for "${list.name}". Success: ${successCount}, Failed: ${failCount}`);
96
 
97
  if (successCount > 0) {
98
- debitWallet({
99
- organizationId,
100
- amount: CREDIT_PRICE.BROADCAST_PER_USER * successCount,
101
- type: 'DEBIT_BROADCAST',
102
- description: `Broadcast "${list.name}" — ${successCount} recipients`,
103
- }).catch(err => logger.warn({ err, organizationId }, '[BroadcastHandler] debitWallet failed'));
 
 
 
 
104
  }
105
  }
106
  }
 
95
  logger.info(`[BroadcastHandler] Broadcast finished for "${list.name}". Success: ${successCount}, Failed: ${failCount}`);
96
 
97
  if (successCount > 0) {
98
+ try {
99
+ await debitWallet({
100
+ organizationId,
101
+ amount: CREDIT_PRICE.BROADCAST_PER_USER * successCount,
102
+ type: 'DEBIT_BROADCAST',
103
+ description: `Broadcast "${list.name}" ${successCount} recipients`,
104
+ });
105
+ } catch (err) {
106
+ logger.warn({ err, organizationId }, '[BroadcastHandler] debitWallet failed');
107
+ }
108
  }
109
  }
110
  }
apps/whatsapp-worker/src/handlers/CampaignHandler.ts CHANGED
@@ -105,12 +105,16 @@ export class CampaignHandler implements JobHandler {
105
  logger.info(`[CampaignHandler] Finished. Success: ${successCount}, Failed: ${failCount}`);
106
 
107
  if (successCount > 0) {
108
- debitWallet({
109
- organizationId,
110
- amount: CREDIT_PRICE.BROADCAST_PER_USER * successCount,
111
- type: 'DEBIT_BROADCAST',
112
- description: `Campaign — ${successCount} recipients`,
113
- }).catch(err => logger.warn({ err, organizationId }, '[CampaignHandler] debitWallet failed'));
 
 
 
 
114
  }
115
  }
116
  }
 
105
  logger.info(`[CampaignHandler] Finished. Success: ${successCount}, Failed: ${failCount}`);
106
 
107
  if (successCount > 0) {
108
+ try {
109
+ await debitWallet({
110
+ organizationId,
111
+ amount: CREDIT_PRICE.BROADCAST_PER_USER * successCount,
112
+ type: 'DEBIT_BROADCAST',
113
+ description: `Campaign ${successCount} recipients`,
114
+ });
115
+ } catch (err) {
116
+ logger.warn({ err, organizationId }, '[CampaignHandler] debitWallet failed');
117
+ }
118
  }
119
  }
120
  }
apps/whatsapp-worker/src/services/indexing.ts CHANGED
@@ -2,6 +2,7 @@ import axios from 'axios';
2
  const pdf = require('pdf-parse');
3
  import * as XLSX from 'xlsx';
4
  import { convert } from 'html-to-text';
 
5
  import { prisma } from './prisma';
6
  import { logger } from '../logger';
7
 
@@ -47,15 +48,11 @@ export class IndexingService {
47
  for (let j = 0; j < batch.length; j++) {
48
  const chunk = batch[j];
49
  const embedding = embeddings[j];
50
- const vectorStr = `[${embedding.join(',')}]`;
51
-
52
- await prisma.$executeRawUnsafe(
53
- `INSERT INTO "KnowledgeBaseEntry" ("id", "organizationId", "content", "embedding", "createdAt")
54
- VALUES (gen_random_uuid(), $1, $2, $3::vector, NOW())`,
55
- organizationId,
56
- chunk,
57
- vectorStr
58
- );
59
  }
60
 
61
  logger.info(`[INDEXING] Indexed ${Math.min(i + BATCH_SIZE, chunks.length)}/${chunks.length} chunks...`);
@@ -206,19 +203,17 @@ export class IndexingService {
206
  static async searchRelevantContext(organizationId: string, query: string, limit: number = 3): Promise<string> {
207
  try {
208
  const queryEmbedding = await this.generateEmbedding(query);
209
- const vectorStr = `[${queryEmbedding.join(',')}]`;
 
210
 
211
  // Cosine similarity search using pgvector
212
- const results: any[] = await prisma.$queryRawUnsafe(
213
- `SELECT content, 1 - (embedding <=> $1::vector) as similarity
214
- FROM "KnowledgeBaseEntry"
215
- WHERE "organizationId" = $2
216
- ORDER BY embedding <=> $1::vector
217
- LIMIT $3`,
218
- vectorStr,
219
- organizationId,
220
- limit
221
- );
222
 
223
  return results.map(r => r.content).join('\n\n---\n\n');
224
  } catch (error) {
 
2
  const pdf = require('pdf-parse');
3
  import * as XLSX from 'xlsx';
4
  import { convert } from 'html-to-text';
5
+ import { Prisma } from '@repo/database';
6
  import { prisma } from './prisma';
7
  import { logger } from '../logger';
8
 
 
48
  for (let j = 0; j < batch.length; j++) {
49
  const chunk = batch[j];
50
  const embedding = embeddings[j];
51
+ const vecRaw = Prisma.raw(`'[${embedding.join(',')}]'::vector`);
52
+ await prisma.$executeRaw(Prisma.sql`
53
+ INSERT INTO "KnowledgeBaseEntry" ("id", "organizationId", "content", "embedding", "createdAt")
54
+ VALUES (gen_random_uuid(), ${organizationId}, ${chunk}, ${vecRaw}, NOW())
55
+ `);
 
 
 
 
56
  }
57
 
58
  logger.info(`[INDEXING] Indexed ${Math.min(i + BATCH_SIZE, chunks.length)}/${chunks.length} chunks...`);
 
203
  static async searchRelevantContext(organizationId: string, query: string, limit: number = 3): Promise<string> {
204
  try {
205
  const queryEmbedding = await this.generateEmbedding(query);
206
+ // Prisma.raw is safe here: embedding is machine-generated floats from OpenAI, not user input
207
+ const vecRaw = Prisma.raw(`'[${queryEmbedding.join(',')}]'::vector`);
208
 
209
  // Cosine similarity search using pgvector
210
+ const results: any[] = await prisma.$queryRaw(Prisma.sql`
211
+ SELECT content, 1 - (embedding <=> ${vecRaw}) as similarity
212
+ FROM "KnowledgeBaseEntry"
213
+ WHERE "organizationId" = ${organizationId}
214
+ ORDER BY embedding <=> ${vecRaw}
215
+ LIMIT ${limit}
216
+ `);
 
 
 
217
 
218
  return results.map(r => r.content).join('\n\n---\n\n');
219
  } catch (error) {
apps/whatsapp-worker/src/services/usage-tracker.ts CHANGED
@@ -109,15 +109,14 @@ export async function trackAiUsage(params: TrackAiUsageParams): Promise<void> {
109
  }),
110
  ]);
111
 
112
- // Debit wallet (fire-and-forget — usage tracking must never block message flow)
113
- debitWallet({
114
  organizationId,
115
  amount: CREDIT_PRICE.AI_TEXT,
116
  type: 'DEBIT_AI',
117
  description: `AI text — ${feature}`,
118
  actorId: userId,
119
  byok: isByok,
120
- }).catch(err => logger.warn({ err, organizationId }, '[USAGE-TRACKER] debitWallet AI_TEXT failed'));
121
 
122
  // Fire quota alert when crossing 85% threshold (fire-and-forget)
123
  maybeQueueQuotaAlert(organizationId, updatedOrg.aiCreditsUsed - 1);
@@ -151,15 +150,14 @@ export async function trackAudioUsage(params: TrackAudioUsageParams): Promise<vo
151
  }),
152
  ]);
153
 
154
- // Debit wallet (fire-and-forget)
155
- debitWallet({
156
  organizationId,
157
  amount: CREDIT_PRICE.AI_AUDIO,
158
  type: 'DEBIT_AI',
159
  description: 'Audio transcription',
160
  actorId: userId,
161
  byok,
162
- }).catch(err => logger.warn({ err, organizationId }, '[USAGE-TRACKER] debitWallet AI_AUDIO failed'));
163
  } catch (err) {
164
  logger.error({ err }, '[USAGE-TRACKER] Failed to track audio usage');
165
  }
@@ -185,13 +183,12 @@ export async function trackWhatsAppSent(organizationId: string, count: number =
185
  }),
186
  ]);
187
 
188
- // Debit wallet for WhatsApp conversation (fire-and-forget)
189
- debitWallet({
190
  organizationId,
191
  amount: CREDIT_PRICE.WHATSAPP_CONVERSATION * count,
192
  type: 'DEBIT_WHATSAPP',
193
  description: `WhatsApp message${count > 1 ? ` x${count}` : ''}`,
194
- }).catch(err => logger.warn({ err, organizationId }, '[USAGE-TRACKER] debitWallet WHATSAPP failed'));
195
  } catch (err) {
196
  logger.error({ err }, '[USAGE-TRACKER] Failed to track WhatsApp message');
197
  }
 
109
  }),
110
  ]);
111
 
112
+ await debitWallet({
 
113
  organizationId,
114
  amount: CREDIT_PRICE.AI_TEXT,
115
  type: 'DEBIT_AI',
116
  description: `AI text — ${feature}`,
117
  actorId: userId,
118
  byok: isByok,
119
+ });
120
 
121
  // Fire quota alert when crossing 85% threshold (fire-and-forget)
122
  maybeQueueQuotaAlert(organizationId, updatedOrg.aiCreditsUsed - 1);
 
150
  }),
151
  ]);
152
 
153
+ await debitWallet({
 
154
  organizationId,
155
  amount: CREDIT_PRICE.AI_AUDIO,
156
  type: 'DEBIT_AI',
157
  description: 'Audio transcription',
158
  actorId: userId,
159
  byok,
160
+ });
161
  } catch (err) {
162
  logger.error({ err }, '[USAGE-TRACKER] Failed to track audio usage');
163
  }
 
183
  }),
184
  ]);
185
 
186
+ await debitWallet({
 
187
  organizationId,
188
  amount: CREDIT_PRICE.WHATSAPP_CONVERSATION * count,
189
  type: 'DEBIT_WHATSAPP',
190
  description: `WhatsApp message${count > 1 ? ` x${count}` : ''}`,
191
+ });
192
  } catch (err) {
193
  logger.error({ err }, '[USAGE-TRACKER] Failed to track WhatsApp message');
194
  }
apps/whatsapp-worker/src/services/wallet.ts CHANGED
@@ -140,44 +140,44 @@ export async function debitWallet(params: DebitWalletParams): Promise<void> {
140
  }
141
 
142
  try {
143
- const [, updatedOrg] = await prisma.$transaction([
144
- prisma.walletTransaction.create({
 
 
 
 
 
 
 
 
 
 
145
  data: {
146
  organizationId,
147
  amount: -amount,
148
- balanceAfter: 0, // will be updated below — placeholder
149
  type,
150
  description,
151
  actorId,
152
  byok: false,
153
  metadata: (metadata ?? {}) as Prisma.InputJsonValue,
154
  },
155
- }),
156
- prisma.organization.update({
157
- where: { id: organizationId },
158
- data: { walletBalance: { decrement: amount } },
159
- select: { walletBalance: true },
160
- }),
161
- ]);
162
 
163
- const newBalance = updatedOrg.walletBalance;
 
 
 
 
 
 
164
 
165
- // Update balanceAfter on the transaction (best-effort — analytics only)
166
- prisma.walletTransaction.updateMany({
167
- where: { organizationId, balanceAfter: 0, type, byok: false },
168
- data: { balanceAfter: newBalance },
169
- }).catch(() => {});
170
 
171
- // Invalidate cache so the guard picks up the new balance immediately
172
  await invalidateWalletCache(organizationId);
173
 
174
- if (newBalance <= 0) {
175
- await prisma.organization.update({
176
- where: { id: organizationId },
177
- data: { isHardStopped: true },
178
- });
179
- }
180
-
181
  // Fire alerts async — never block the main flow
182
  maybeQueueWalletAlert(organizationId, newBalance, newBalance + amount > 0);
183
  } catch (err) {
 
140
  }
141
 
142
  try {
143
+ const newBalance = await prisma.$transaction(async tx => {
144
+ // Decrement balance atomically and capture new value
145
+ const updatedOrg = await tx.organization.update({
146
+ where: { id: organizationId },
147
+ data: { walletBalance: { decrement: amount } },
148
+ select: { walletBalance: true },
149
+ });
150
+
151
+ const balance = updatedOrg.walletBalance;
152
+
153
+ // Create ledger entry with correct balanceAfter in same transaction
154
+ await tx.walletTransaction.create({
155
  data: {
156
  organizationId,
157
  amount: -amount,
158
+ balanceAfter: balance,
159
  type,
160
  description,
161
  actorId,
162
  byok: false,
163
  metadata: (metadata ?? {}) as Prisma.InputJsonValue,
164
  },
165
+ });
 
 
 
 
 
 
166
 
167
+ // Hard-stop flag set atomically — no window between debit and stop
168
+ if (balance <= 0) {
169
+ await tx.organization.update({
170
+ where: { id: organizationId },
171
+ data: { isHardStopped: true },
172
+ });
173
+ }
174
 
175
+ return balance;
176
+ });
 
 
 
177
 
178
+ // Cache invalidation outside transaction best-effort, fail is logged
179
  await invalidateWalletCache(organizationId);
180
 
 
 
 
 
 
 
 
181
  // Fire alerts async — never block the main flow
182
  maybeQueueWalletAlert(organizationId, newBalance, newBalance + amount > 0);
183
  } catch (err) {