CognxSafeTrack commited on
Commit
7098b31
Β·
1 Parent(s): fe40cec

fix: add bridge server to worker to handle forwarded webhooks

Browse files
apps/whatsapp-worker/package.json CHANGED
@@ -16,6 +16,7 @@
16
  "axios": "^1.13.5",
17
  "bullmq": "^4.0.0",
18
  "dotenv": "^16.0.0",
 
19
  "ioredis": "^5.9.3",
20
  "node-cron": "^4.2.1",
21
  "node-fetch": "^2.6.7",
 
16
  "axios": "^1.13.5",
17
  "bullmq": "^4.0.0",
18
  "dotenv": "^16.0.0",
19
+ "fastify": "^4.24.3",
20
  "ioredis": "^5.9.3",
21
  "node-cron": "^4.2.1",
22
  "node-fetch": "^2.6.7",
apps/whatsapp-worker/src/index.ts CHANGED
@@ -2,7 +2,8 @@ import { logger } from './logger';
2
  import dns from 'node:dns';
3
  dns.setDefaultResultOrder('ipv4first');
4
 
5
- import { Worker, Job } from 'bullmq';
 
6
  import dotenv from 'dotenv';
7
  import Redis from 'ioredis';
8
  import { validateEnvironment } from './config';
@@ -24,16 +25,21 @@ dotenv.config();
24
  validateEnvironment();
25
  startWorkerCleanupCron();
26
 
27
- const connection = process.env.REDIS_URL
28
- ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
29
- : new Redis({
30
  host: process.env.REDIS_HOST || 'localhost',
31
  port: parseInt(process.env.REDIS_PORT || '6379'),
32
  username: process.env.REDIS_USERNAME || 'default',
33
  password: process.env.REDIS_PASSWORD || undefined,
34
  tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
35
- maxRetriesPerRequest: null
36
- });
 
 
 
 
 
37
 
38
  const handlers: Record<string, JobHandler> = {
39
  'send-message': new MessageHandler(),
@@ -49,6 +55,31 @@ const handlers: Record<string, JobHandler> = {
49
  'handle-inbound': new InboundHandler()
50
  };
51
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
  const worker = new Worker('whatsapp-queue', async (job: Job<JobData>) => {
53
  const organizationId = job.data.organizationId || 'default-org-id';
54
 
@@ -77,11 +108,24 @@ const worker = new Worker('whatsapp-queue', async (job: Job<JobData>) => {
77
  concurrency: parseInt(process.env.WORKER_CONCURRENCY || '5')
78
  });
79
 
80
- logger.info('πŸš€ WhatsApp Worker started (Modular Mode)...');
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
- // Start the daily cron scheduler
83
- import { startDailyScheduler } from './scheduler';
84
- startDailyScheduler();
85
 
86
  worker.on('completed', job => {
87
  logger.info(`[WORKER] Job ${job.id} has completed!`);
 
2
  import dns from 'node:dns';
3
  dns.setDefaultResultOrder('ipv4first');
4
 
5
+ import fastify from 'fastify';
6
+ import { Queue, Worker, Job } from 'bullmq';
7
  import dotenv from 'dotenv';
8
  import Redis from 'ioredis';
9
  import { validateEnvironment } from './config';
 
25
  validateEnvironment();
26
  startWorkerCleanupCron();
27
 
28
+ const redisConfig = process.env.REDIS_URL
29
+ ? { url: process.env.REDIS_URL }
30
+ : {
31
  host: process.env.REDIS_HOST || 'localhost',
32
  port: parseInt(process.env.REDIS_PORT || '6379'),
33
  username: process.env.REDIS_USERNAME || 'default',
34
  password: process.env.REDIS_PASSWORD || undefined,
35
  tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
36
+ };
37
+
38
+ const connection = process.env.REDIS_URL
39
+ ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
40
+ : new Redis({ ...redisConfig, maxRetriesPerRequest: null } as any);
41
+
42
+ const whatsappQueue = new Queue('whatsapp-queue', { connection: connection as any });
43
 
44
  const handlers: Record<string, JobHandler> = {
45
  'send-message': new MessageHandler(),
 
55
  'handle-inbound': new InboundHandler()
56
  };
57
 
58
+ // ─── HTTP SERVER (Inbound Bridge) ─────────────────────────────────────────────
59
+ // This server receives forwarded webhooks from the Hugging Face Gateway
60
+ const server = fastify({ logger: false });
61
+
62
+ server.post('/v1/internal/whatsapp/inbound', async (req, reply) => {
63
+ const organizationId = req.headers['x-organization-id'] as string || 'default-org-id';
64
+ const payload = req.body as any;
65
+
66
+ logger.info(`[BRIDGE] Received forwarded webhook for Org: ${organizationId}`);
67
+
68
+ // Schedule the job to be processed by the worker modularly
69
+ await whatsappQueue.add('handle-inbound', {
70
+ ...payload,
71
+ organizationId
72
+ }, {
73
+ attempts: 3,
74
+ backoff: { type: 'exponential', delay: 1000 }
75
+ });
76
+
77
+ return reply.code(200).send({ ok: true });
78
+ });
79
+
80
+ server.get('/health', async () => ({ status: 'ok' }));
81
+
82
+ // ─── WORKER ──────────────────────────────────────────────────────────────────
83
  const worker = new Worker('whatsapp-queue', async (job: Job<JobData>) => {
84
  const organizationId = job.data.organizationId || 'default-org-id';
85
 
 
108
  concurrency: parseInt(process.env.WORKER_CONCURRENCY || '5')
109
  });
110
 
111
+ // ─── STARTUP ─────────────────────────────────────────────────────────────────
112
+ const PORT = parseInt(process.env.PORT || '7860');
113
+
114
+ const start = async () => {
115
+ try {
116
+ await server.listen({ port: PORT, host: '0.0.0.0' });
117
+ logger.info(`πŸš€ WhatsApp Worker + Bridge listening on port ${PORT}`);
118
+
119
+ // Start the daily cron scheduler
120
+ const { startDailyScheduler } = await import('./scheduler');
121
+ startDailyScheduler();
122
+ } catch (err) {
123
+ logger.error('Failed to start worker server:', err);
124
+ process.exit(1);
125
+ }
126
+ };
127
 
128
+ start();
 
 
129
 
130
  worker.on('completed', job => {
131
  logger.info(`[WORKER] Job ${job.id} has completed!`);