everydaytok commited on
Commit
9c321fd
Β·
verified Β·
1 Parent(s): e517b35

Update app.js

Browse files
Files changed (1) hide show
  1. app.js +59 -53
app.js CHANGED
@@ -1,4 +1,3 @@
1
- Let's fix this cron server, when it enters it's 24 hours interval loop, it doesn't deliver at all, only when it's in it's Timeout stage from the logs we see, script first:
2
  import express from 'express';
3
  import cors from 'cors';
4
  import { createClient } from '@supabase/supabase-js';
@@ -17,12 +16,13 @@ if (!SUPABASE_URL || !SUPABASE_KEY) {
17
  const app = express();
18
  const supabase = createClient(SUPABASE_URL, SUPABASE_KEY);
19
 
 
20
  const activeJobs = new Map();
21
 
22
  app.use(express.json());
23
  app.use(cors());
24
 
25
- // --- BULLETPROOF DELAY MATH (UPDATED TO 5 AM) ---
26
  function getMsUntilNextFiveAM(offset = 0) {
27
  const now = new Date();
28
  const target = new Date(now);
@@ -32,7 +32,9 @@ function getMsUntilNextFiveAM(offset = 0) {
32
 
33
  target.setUTCHours(0, targetUtcMinutes, 0, 0);
34
 
35
- if (target <= now) {
 
 
36
  target.setDate(target.getDate() + 1);
37
  }
38
 
@@ -40,45 +42,59 @@ function getMsUntilNextFiveAM(offset = 0) {
40
  }
41
 
42
  function startJobInternal(jobId, intervalMs, url, payload, initialDelay) {
43
- if (activeJobs.has(jobId)) {
44
- const existing = activeJobs.get(jobId);
45
- if (existing.timeout) clearTimeout(existing.timeout);
46
- if (existing.interval) clearInterval(existing.interval);
47
- activeJobs.delete(jobId);
48
- }
 
 
 
 
49
 
50
- const runTask = async () => {
51
- console.log(`⏰ [${new Date().toLocaleTimeString()}] Executing: ${jobId}`);
52
- try {
53
- const res = await fetch(url, {
54
- method: 'POST',
55
- headers: { 'Content-Type': 'application/json' },
56
- body: JSON.stringify(payload)
57
- });
58
- console.log(` └─ Response: ${res.status}`);
59
- } catch (e) {
60
- console.error(`❌ Job ${jobId} HTTP Failed:`, e.message);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
  }
62
- };
63
-
64
- const jobHandles = { timeout: null, interval: null };
65
-
66
- if (initialDelay > 0) {
67
- console.log(`⏳ Job ${jobId} scheduled. First 5AM run in ${(initialDelay/1000/60/60).toFixed(2)} hours.`);
68
-
69
- jobHandles.timeout = setTimeout(async () => {
70
- await runTask();
71
- jobHandles.interval = setInterval(runTask, intervalMs);
72
- console.log(`πŸ”„ Job ${jobId} entering 24h interval cycle.`);
73
- }, initialDelay);
74
- } else {
75
- console.log(`πŸš€ Job ${jobId} starting immediate interval.`);
76
- jobHandles.interval = setInterval(runTask, intervalMs);
77
  }
 
78
 
79
- activeJobs.set(jobId, jobHandles);
80
- }
81
 
 
82
  async function hydrateJobs() {
83
  console.log("πŸ’§ Hydrating Cron Jobs from DB...");
84
  const { data, error } = await supabase.from('system_jobs').select('*');
@@ -99,6 +115,7 @@ async function hydrateJobs() {
99
  console.log(`βœ… Successfully hydrated and scheduled ${count} jobs.`);
100
  }
101
 
 
102
  app.get('/', (req, res) => res.json({ status: "Cron Registry Active", active_jobs: activeJobs.size }));
103
 
104
  app.post('/register', async (req, res) => {
@@ -121,20 +138,20 @@ app.post('/register', async (req, res) => {
121
 
122
  startJobInternal(jobId, intervalMs, webhookUrl, payload, delay);
123
 
124
- console.log(`βž• Registered & Scheduled Job (5AM Anchor): ${jobId}`);
125
  res.json({ success: true, jobId });
126
  });
127
 
128
  app.post('/deregister', async (req, res) => {
129
  const { secret, jobId } = req.body;
130
  if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });
 
131
  await supabase.from('system_jobs').delete().eq('id', jobId);
 
132
  if (activeJobs.has(jobId)) {
133
- const handles = activeJobs.get(jobId);
134
- if (handles.timeout) clearTimeout(handles.timeout);
135
- if (handles.interval) clearInterval(handles.interval);
136
  activeJobs.delete(jobId);
137
  }
 
138
  console.log(`βž– Deregistered Job: ${jobId}`);
139
  res.json({ success: true });
140
  });
@@ -142,15 +159,4 @@ app.post('/deregister', async (req, res) => {
142
  app.listen(PORT, async () => {
143
  console.log(`πŸš€ Cron Registry live on port ${PORT}`);
144
  setTimeout(hydrateJobs, 2000);
145
- });
146
-
147
- ⏰ [4:59:59 AM] Executing: briefing_ba3a93b7-9032-49f6-998a-48228a8ef526
148
- └─ Response: 200
149
- πŸ”„ Job briefing_ba3a93b7-9032-49f6-998a-48228a8ef526 entering 24h interval cycle.
150
- ⏳ Job briefing_ba3a93b7-9032-49f6-998a-48228a8ef526 scheduled. First 5AM run in 4.99 hours.
151
- βž• Registered & Scheduled Job (5AM Anchor): briefing_ba3a93b7-9032-49f6-998a-48228a8ef526
152
- ⏰ [10:00:00 AM] Executing: briefing_ba3a93b7-9032-49f6-998a-48228a8ef526
153
- └─ Response: 200
154
- πŸ”„ Job briefing_ba3a93b7-9032-49f6-998a-48228a8ef526 entering 24h interval cycle.
155
-
156
- And we can't keep restarting the server to keep it running, let's find a solution either a reoccurring timeout or something that works
 
 
1
  import express from 'express';
2
  import cors from 'cors';
3
  import { createClient } from '@supabase/supabase-js';
 
16
  const app = express();
17
  const supabase = createClient(SUPABASE_URL, SUPABASE_KEY);
18
 
19
+ // Map will now store job details and absolute timestamps (nextRunAt)
20
  const activeJobs = new Map();
21
 
22
  app.use(express.json());
23
  app.use(cors());
24
 
25
+ // --- BULLETPROOF DELAY MATH ---
26
  function getMsUntilNextFiveAM(offset = 0) {
27
  const now = new Date();
28
  const target = new Date(now);
 
32
 
33
  target.setUTCHours(0, targetUtcMinutes, 0, 0);
34
 
35
+ // BUFFER: If the target is in the past OR within the next 2 minutes
36
+ // (meaning we just fired it), schedule for TOMORROW.
37
+ if (target.getTime() <= now.getTime() + 120000) {
38
  target.setDate(target.getDate() + 1);
39
  }
40
 
 
42
  }
43
 
44
  function startJobInternal(jobId, intervalMs, url, payload, initialDelay) {
45
+ // Calculate absolute timestamp for when this should run next
46
+ const nextRunAt = Date.now() + initialDelay;
47
+
48
+ activeJobs.set(jobId, {
49
+ url,
50
+ payload,
51
+ intervalMs,
52
+ offset: payload?.timezoneOffset || 0,
53
+ nextRunAt
54
+ });
55
 
56
+ console.log(`⏳ Job ${jobId} scheduled. Next run: ${new Date(nextRunAt).toLocaleString()}`);
57
+ }
58
+
59
+ // ==========================================
60
+ // βš™οΈ THE CRON ENGINE (HEARTBEAT)
61
+ // ==========================================
62
+ // This loop checks every 60 seconds if any job is ready to fire.
63
+ // This survives container sleeps, CPU throttling, and timer drifts.
64
+ setInterval(async () => {
65
+ const now = Date.now();
66
+
67
+ for (const [jobId, job] of activeJobs.entries()) {
68
+ if (now >= job.nextRunAt) {
69
+ console.log(`⏰ [${new Date().toLocaleTimeString()}] Executing: ${jobId}`);
70
+
71
+ // 1. Immediately push the next run time forward to prevent double-firing
72
+ // We lock it back to exactly the next 5 AM based on their timezone
73
+ const nextDelay = getMsUntilNextFiveAM(job.offset);
74
+ job.nextRunAt = now + nextDelay;
75
+
76
+ // 2. Fire the webhook asynchronously
77
+ try {
78
+ fetch(job.url, {
79
+ method: 'POST',
80
+ headers: { 'Content-Type': 'application/json' },
81
+ body: JSON.stringify(job.payload)
82
+ }).then(res => {
83
+ console.log(` └─ Response [${jobId}]: ${res.status}`);
84
+ }).catch(err => {
85
+ console.error(`❌ Fetch Error [${jobId}]:`, err.message);
86
+ });
87
+ } catch (e) {
88
+ console.error(`❌ Job ${jobId} HTTP Failed:`, e.message);
89
+ }
90
+
91
+ console.log(`πŸ”„ Job ${jobId} entered 24h cycle. Next run in ${(nextDelay/1000/60/60).toFixed(2)} hours.`);
92
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93
  }
94
+ }, 60000); // Ticks every 60 seconds
95
 
 
 
96
 
97
+ // --- DB HYDRATION ---
98
  async function hydrateJobs() {
99
  console.log("πŸ’§ Hydrating Cron Jobs from DB...");
100
  const { data, error } = await supabase.from('system_jobs').select('*');
 
115
  console.log(`βœ… Successfully hydrated and scheduled ${count} jobs.`);
116
  }
117
 
118
+ // --- ENDPOINTS ---
119
  app.get('/', (req, res) => res.json({ status: "Cron Registry Active", active_jobs: activeJobs.size }));
120
 
121
  app.post('/register', async (req, res) => {
 
138
 
139
  startJobInternal(jobId, intervalMs, webhookUrl, payload, delay);
140
 
141
+ console.log(`βž• Registered Job: ${jobId}`);
142
  res.json({ success: true, jobId });
143
  });
144
 
145
  app.post('/deregister', async (req, res) => {
146
  const { secret, jobId } = req.body;
147
  if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });
148
+
149
  await supabase.from('system_jobs').delete().eq('id', jobId);
150
+
151
  if (activeJobs.has(jobId)) {
 
 
 
152
  activeJobs.delete(jobId);
153
  }
154
+
155
  console.log(`βž– Deregistered Job: ${jobId}`);
156
  res.json({ success: true });
157
  });
 
159
  app.listen(PORT, async () => {
160
  console.log(`πŸš€ Cron Registry live on port ${PORT}`);
161
  setTimeout(hydrateJobs, 2000);
162
+ });