everydaytok commited on
Commit
a99f1f3
Β·
verified Β·
1 Parent(s): 901c960

Update app.js

Browse files
Files changed (1) hide show
  1. app.js +76 -70
app.js CHANGED
@@ -3,7 +3,7 @@ import cors from 'cors';
3
  import { createClient } from '@supabase/supabase-js';
4
 
5
  // --- CONFIG ---
6
- const PORT = 7860;
7
  const SUPABASE_URL = process.env.SUPABASE_URL;
8
  const SUPABASE_KEY = process.env.SUPABASE_SERVICE_KEY;
9
  const CRON_SECRET = process.env.CRON_SECRET || "default_secret";
@@ -13,126 +13,132 @@ if (!SUPABASE_URL || !SUPABASE_KEY) {
13
  process.exit(1);
14
  }
15
 
16
- // --- STATE ---
17
  const app = express();
18
  const supabase = createClient(SUPABASE_URL, SUPABASE_KEY);
19
- // In-Memory Map: JobID -> Timeout/Interval Object
20
  const activeJobs = new Map();
21
 
22
  app.use(express.json());
23
  app.use(cors());
24
 
25
- // --- HYDRATION ---
26
- async function hydrateJobs() {
27
- console.log("πŸ’§ Hydrating Cron Jobs from DB...");
28
- const { data, error } = await supabase.from('system_jobs').select('*');
29
 
30
- if (error) {
31
- console.error("Hydration Failed:", error);
32
- return;
33
- }
34
 
35
- let count = 0;
36
- for (const job of data) {
37
- // Calculate remaining time if it was a one-off, or restart interval
38
- // For simplicity in this architecture, we restart the cycle based on stored config
39
- startJobInternal(job.id, job.interval_ms, job.webhook_url, job.payload);
40
- count++;
41
  }
42
- console.log(`βœ… Hydrated ${count} jobs.`);
 
43
  }
44
 
45
- // --- LOGIC ---
46
- function startJobInternal(jobId, intervalMs, url, payload, initialDelay = 0) {
47
  if (activeJobs.has(jobId)) {
48
- clearTimeout(activeJobs.get(jobId));
49
- clearInterval(activeJobs.get(jobId));
 
 
50
  }
51
 
52
  const runTask = async () => {
53
- console.log(`⏰ Executing Job: ${jobId}`);
54
  try {
55
- await fetch(url, {
56
  method: 'POST',
57
  headers: { 'Content-Type': 'application/json' },
58
  body: JSON.stringify(payload)
59
  });
 
60
  } catch (e) {
61
- console.error(`❌ Job ${jobId} Failed:`, e.message);
62
  }
63
  };
64
 
 
 
65
  if (initialDelay > 0) {
66
- // Schedule the first run with delay, then set interval
67
- console.log(`⏳ Job ${jobId} delayed by ${(initialDelay/1000/60).toFixed(1)} mins`);
68
- const timeout = setTimeout(() => {
69
- runTask(); // Run once
70
- // Then start interval
71
- const interval = setInterval(runTask, intervalMs);
72
- activeJobs.set(jobId, interval);
73
  }, initialDelay);
74
- activeJobs.set(jobId, timeout);
75
  } else {
76
- // Immediate interval start
77
- const interval = setInterval(runTask, intervalMs);
78
- activeJobs.set(jobId, interval);
79
  }
 
 
80
  }
81
 
82
- // --- ENDPOINTS ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
- // app.get('/', (req, res) => res.send('Cron Registry Active'));
85
 
86
  app.post('/register', async (req, res) => {
87
- const { secret, jobId, intervalMs, webhookUrl, payload, initialDelay, leadId } = req.body;
88
-
89
- if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });
90
-
91
- // Save to DB (Now including lead_id)
92
- const { error } = await supabase.from('system_jobs').upsert({
93
- id: jobId,
94
- lead_id: leadId, // <--- Save the UUID here
95
- interval_ms: intervalMs,
96
- webhook_url: webhookUrl,
97
- payload: payload,
98
- updated_at: new Date()
99
- });
100
 
101
  if (error) return res.status(500).json({ error: error.message });
102
 
103
- // 2. Start in Memory
104
- startJobInternal(jobId, intervalMs, webhookUrl, payload, initialDelay || 0);
 
105
 
106
- console.log(`βž• Registered Job: ${jobId}`);
107
  res.json({ success: true, jobId });
108
  });
109
 
110
  app.post('/deregister', async (req, res) => {
111
  const { secret, jobId } = req.body;
112
-
113
  if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });
114
-
115
- // 1. Remove from DB
116
  await supabase.from('system_jobs').delete().eq('id', jobId);
117
-
118
- // 2. Stop in Memory
119
  if (activeJobs.has(jobId)) {
120
- const handle = activeJobs.get(jobId);
121
- clearTimeout(handle); // safe to call even if it's an interval
122
- clearInterval(handle);
123
  activeJobs.delete(jobId);
124
  }
125
-
126
  console.log(`βž– Deregistered Job: ${jobId}`);
127
  res.json({ success: true });
128
  });
129
 
130
- app.get('/', async (req, res) => {
131
- res.status(200).json({ status: "Alive" });
132
- });
133
-
134
- // --- BOOT ---
135
  app.listen(PORT, async () => {
136
- console.log(`πŸš€ Cron Registry running on port ${PORT}`);
137
- await hydrateJobs();
138
  });
 
3
  import { createClient } from '@supabase/supabase-js';
4
 
5
  // --- CONFIG ---
6
+ const PORT = 7861;
7
  const SUPABASE_URL = process.env.SUPABASE_URL;
8
  const SUPABASE_KEY = process.env.SUPABASE_SERVICE_KEY;
9
  const CRON_SECRET = process.env.CRON_SECRET || "default_secret";
 
13
  process.exit(1);
14
  }
15
 
 
16
  const app = express();
17
  const supabase = createClient(SUPABASE_URL, SUPABASE_KEY);
18
+
19
  const activeJobs = new Map();
20
 
21
  app.use(express.json());
22
  app.use(cors());
23
 
24
+ // --- BULLETPROOF DELAY MATH (UPDATED TO 5 AM) ---
25
+ function getMsUntilNextFiveAM(offset = 0) {
26
+ const now = new Date();
27
+ const target = new Date(now);
28
 
29
+ // Calculate target UTC minutes for local 5 AM
30
+ const targetUtcMinutes = (5 * 60) - (offset * 60);
 
 
31
 
32
+ target.setUTCHours(0, targetUtcMinutes, 0, 0);
33
+
34
+ if (target <= now) {
35
+ target.setDate(target.getDate() + 1);
 
 
36
  }
37
+
38
+ return target.getTime() - now.getTime();
39
  }
40
 
41
+ function startJobInternal(jobId, intervalMs, url, payload, initialDelay) {
 
42
  if (activeJobs.has(jobId)) {
43
+ const existing = activeJobs.get(jobId);
44
+ if (existing.timeout) clearTimeout(existing.timeout);
45
+ if (existing.interval) clearInterval(existing.interval);
46
+ activeJobs.delete(jobId);
47
  }
48
 
49
  const runTask = async () => {
50
+ console.log(`⏰ [${new Date().toLocaleTimeString()}] Executing: ${jobId}`);
51
  try {
52
+ const res = await fetch(url, {
53
  method: 'POST',
54
  headers: { 'Content-Type': 'application/json' },
55
  body: JSON.stringify(payload)
56
  });
57
+ console.log(` └─ Response: ${res.status}`);
58
  } catch (e) {
59
+ console.error(`❌ Job ${jobId} HTTP Failed:`, e.message);
60
  }
61
  };
62
 
63
+ const jobHandles = { timeout: null, interval: null };
64
+
65
  if (initialDelay > 0) {
66
+ console.log(`⏳ Job ${jobId} scheduled. First 5AM run in ${(initialDelay/1000/60/60).toFixed(2)} hours.`);
67
+
68
+ jobHandles.timeout = setTimeout(async () => {
69
+ await runTask();
70
+ jobHandles.interval = setInterval(runTask, intervalMs);
71
+ console.log(`πŸ”„ Job ${jobId} entering 24h interval cycle.`);
 
72
  }, initialDelay);
 
73
  } else {
74
+ console.log(`πŸš€ Job ${jobId} starting immediate interval.`);
75
+ jobHandles.interval = setInterval(runTask, intervalMs);
 
76
  }
77
+
78
+ activeJobs.set(jobId, jobHandles);
79
  }
80
 
81
+ async function hydrateJobs() {
82
+ console.log("πŸ’§ Hydrating Cron Jobs from DB...");
83
+ const { data, error } = await supabase.from('system_jobs').select('*');
84
+
85
+ if (error) {
86
+ console.error("❌ Hydration Failed:", error.message);
87
+ return;
88
+ }
89
+
90
+ let count = 0;
91
+ for (const job of data) {
92
+ const offset = job.payload?.timezoneOffset ?? 0;
93
+ const newDelay = getMsUntilNextFiveAM(offset);
94
+
95
+ startJobInternal(job.id, job.interval_ms, job.webhook_url, job.payload, newDelay);
96
+ count++;
97
+ }
98
+ console.log(`βœ… Successfully hydrated and scheduled ${count} jobs.`);
99
+ }
100
 
101
+ app.get('/', (req, res) => res.json({ status: "Cron Registry Active", active_jobs: activeJobs.size }));
102
 
103
  app.post('/register', async (req, res) => {
104
+ const { secret, jobId, intervalMs, webhookUrl, payload, initialDelay, leadId } = req.body;
105
+
106
+ if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });
107
+
108
+ const { error } = await supabase.from('system_jobs').upsert({
109
+ id: jobId,
110
+ lead_id: leadId,
111
+ interval_ms: intervalMs,
112
+ webhook_url: webhookUrl,
113
+ payload: payload,
114
+ updated_at: new Date()
115
+ });
 
116
 
117
  if (error) return res.status(500).json({ error: error.message });
118
 
119
+ const delay = (initialDelay !== undefined) ? initialDelay : getMsUntilNextFiveAM(payload?.timezoneOffset || 0);
120
+
121
+ startJobInternal(jobId, intervalMs, webhookUrl, payload, delay);
122
 
123
+ console.log(`βž• Registered & Scheduled Job (5AM Anchor): ${jobId}`);
124
  res.json({ success: true, jobId });
125
  });
126
 
127
  app.post('/deregister', async (req, res) => {
128
  const { secret, jobId } = req.body;
 
129
  if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });
 
 
130
  await supabase.from('system_jobs').delete().eq('id', jobId);
 
 
131
  if (activeJobs.has(jobId)) {
132
+ const handles = activeJobs.get(jobId);
133
+ if (handles.timeout) clearTimeout(handles.timeout);
134
+ if (handles.interval) clearInterval(handles.interval);
135
  activeJobs.delete(jobId);
136
  }
 
137
  console.log(`βž– Deregistered Job: ${jobId}`);
138
  res.json({ success: true });
139
  });
140
 
 
 
 
 
 
141
  app.listen(PORT, async () => {
142
+ console.log(`πŸš€ Cron Registry live on port ${PORT}`);
143
+ setTimeout(hydrateJobs, 2000);
144
  });