Spaces:
Paused
Paused
File size: 7,816 Bytes
63870b6 8ae5837 63870b6 27b60ac 8ae5837 86366a6 27b60ac f5eeb4a 27b60ac 8ae5837 59bfafb a842e18 8ae5837 a842e18 8ae5837 a842e18 ac15076 27b60ac fe20fea 27b60ac fe20fea a99f1f3 27b60ac fa6a3b0 fe20fea a99f1f3 fe20fea ac15076 a99f1f3 fe20fea ac15076 27b60ac f5eeb4a 27b60ac f5eeb4a 27b60ac fe20fea 27b60ac fe20fea 27b60ac 9c321fd fe20fea 9c321fd 2f19bf4 27b60ac f5eeb4a fe20fea 27b60ac fe20fea 27b60ac 9c321fd f5eeb4a 9c321fd 27b60ac 9c321fd f5eeb4a 9c321fd fe20fea 9c321fd fe20fea ac15076 59bfafb f5eeb4a a99f1f3 27b60ac f5eeb4a 27b60ac a99f1f3 f5eeb4a a99f1f3 fe20fea f5eeb4a fe20fea f5eeb4a a99f1f3 fe20fea a99f1f3 ac15076 a99f1f3 04392bb 8ae5837 a99f1f3 f5eeb4a a99f1f3 6f54930 8ae5837 71eb220 fe20fea a99f1f3 fe20fea 71eb220 27b60ac 8ae5837 a9cf784 04392bb 8ae5837 9c321fd 8ae5837 ac15076 9c321fd 8ae5837 e865f13 8ae5837 a99f1f3 9c321fd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 | import express from 'express';
import cors from 'cors';
import { createClient } from '@supabase/supabase-js';
// ==========================================
// βοΈ CONFIGURATION & SETTINGS
// ==========================================
const PORT = process.env.PORT || 7860;
const SUPABASE_URL = process.env.SUPABASE_URL;
const SUPABASE_KEY = process.env.SUPABASE_SERVICE_KEY;
const CRON_SECRET = process.env.CRON_SECRET || "default_secret";
// π΄ VARIABLE BOOLEAN: Should past/missed jobs fire immediately on server startup?
const RUN_PAST_EVENTS_ON_STARTUP = true;
if (!SUPABASE_URL || !SUPABASE_KEY) {
console.error("β Missing Supabase Credentials");
process.exit(1);
}
const app = express();
const supabase = createClient(SUPABASE_URL, SUPABASE_KEY);
const activeJobs = new Map();
app.use(express.json());
app.use(cors());
// ==========================================
// β±οΈ BULLETPROOF TIMEZONE MATH
// ==========================================
function getNextFiveAMDetails(rawOffset = 0) {
let offsetHours = 0;
// Auto-detect if payload sent Minutes (from JS) or standard Hours
// JS getTimezoneOffset() is large (e.g. 300 for EST, -60 for CET)
if (Math.abs(rawOffset) >= 30) {
offsetHours = -(rawOffset / 60);
} else {
offsetHours = Number(rawOffset); // Already in hours (e.g. -5)
}
const now = new Date();
const target = new Date(now.getTime());
// Set to today's midnight UTC
target.setUTCHours(0, 0, 0, 0);
// Target 5 AM in the specified timezone
// Example (EST / UTC-5): 5 - (-5) = 10:00 AM UTC
// Example (CET / UTC+1): 5 - (+1) = 04:00 AM UTC
target.setUTCHours(5 - offsetHours);
// If 5 AM has ALREADY PASSED in that timezone today, schedule for tomorrow
if (target.getTime() <= now.getTime()) {
target.setUTCDate(target.getUTCDate() + 1);
}
return {
delayMs: target.getTime() - now.getTime(),
offsetHours: offsetHours
};
}
// ==========================================
// π JOB EXECUTION & SCHEDULING
// ==========================================
async function executeJob(jobId) {
const job = activeJobs.get(jobId);
if (!job) return;
console.log(`β° [${new Date().toLocaleTimeString()}] Executing: ${jobId}`);
try {
const res = await fetch(job.url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(job.payload)
});
console.log(` ββ Response: ${res.status}`);
// β
UPDATE DATABASE ON SUCCESS
if (res.ok) {
const { error } = await supabase
.from('system_jobs')
.update({ updated_at: new Date().toISOString() })
.eq('id', jobId);
if (!error) {
console.log(` ββ πΎ Database 'updated_at' successfully synced.`);
} else {
console.error(` ββ β Failed to update DB:`, error.message);
}
}
} catch (e) {
console.error(`β Job ${jobId} HTTP Failed:`, e.message);
}
}
function startJobInternal(jobId, intervalMs, url, payload, delayData, runImmediately = false) {
let delay = delayData.delayMs;
let offsetHours = delayData.offsetHours;
// Failsafe format for log printing (e.g., UTC+1 or UTC-5)
const tzString = `UTC${offsetHours >= 0 ? '+' : ''}${offsetHours}`;
const nextRunAt = Date.now() + delay;
const hours = (delay / 1000 / 60 / 60).toFixed(2);
const minutes = (delay / 1000 / 60).toFixed(2);
activeJobs.set(jobId, {
url,
payload,
intervalMs,
offset: offsetHours,
nextRunAt
});
if (runImmediately) {
console.log(`β‘ [Startup] Missed event detected! Firing immediately: ${jobId}`);
executeJob(jobId);
console.log(`β³ Job ${jobId} re-scheduled. Target: 5AM (${tzString}). Next run in ${hours} hours (${minutes} mins).`);
} else {
console.log(`β³ Job ${jobId} scheduled. Target: 5AM (${tzString}). Next run in ${hours} hours (${minutes} mins).`);
}
}
// ==========================================
// βοΈ THE TICK ENGINE (WATCHDOG)
// ==========================================
setInterval(() => {
const now = Date.now();
for (const [jobId, job] of activeJobs.entries()) {
if (now >= job.nextRunAt) {
executeJob(jobId);
const nextDelayData = getNextFiveAMDetails(job.offset);
job.nextRunAt = now + nextDelayData.delayMs;
const hours = (nextDelayData.delayMs / 1000 / 60 / 60).toFixed(2);
console.log(`π Job ${jobId} entered 24h cycle. Target: 5AM (UTC${job.offset >= 0 ? '+' : ''}${job.offset}). Next run in ${hours} hours.`);
}
}
}, 60000);
// ==========================================
// πΎ DB HYDRATION USING `updated_at`
// ==========================================
async function hydrateJobs() {
console.log("π§ Hydrating Cron Jobs from DB...");
const { data, error } = await supabase.from('system_jobs').select('*');
if (error) {
console.error("β Hydration Failed:", error.message);
return;
}
let count = 0;
const now = Date.now();
for (const job of data) {
const delayData = getNextFiveAMDetails(job.payload?.timezoneOffset ?? 0);
let runImmediately = false;
// β
CHECK LAST UPDATED TIME
if (job.updated_at) {
const lastUpdatedTime = new Date(job.updated_at).getTime();
const timeSinceLastRunMs = now - lastUpdatedTime;
// If it's been more than 24 hours (86,400,000 ms), fire it immediately to catch up.
if (RUN_PAST_EVENTS_ON_STARTUP && timeSinceLastRunMs >= 86400000) {
runImmediately = true;
}
} else if (RUN_PAST_EVENTS_ON_STARTUP) {
runImmediately = true;
}
startJobInternal(job.id, job.interval_ms, job.webhook_url, job.payload, delayData, runImmediately);
count++;
}
console.log(`β
Successfully hydrated and scheduled ${count} jobs.`);
}
app.get('/', (req, res) => res.json({ status: "Cron Registry Active", active_jobs: activeJobs.size }));
app.post('/register', async (req, res) => {
const { secret, jobId, intervalMs, webhookUrl, payload, initialDelay, leadId } = req.body;
if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });
const { error } = await supabase.from('system_jobs').upsert({
id: jobId,
lead_id: leadId,
interval_ms: intervalMs,
webhook_url: webhookUrl,
payload: payload,
updated_at: new Date().toISOString()
});
if (error) return res.status(500).json({ error: error.message });
const delayData = getNextFiveAMDetails(payload?.timezoneOffset || 0);
if (initialDelay !== undefined) {
delayData.delayMs = initialDelay;
}
startJobInternal(jobId, intervalMs, webhookUrl, payload, delayData, false);
console.log(`β Registered & Scheduled Job (5AM Anchor): ${jobId}`);
res.json({ success: true, jobId });
});
app.post('/deregister', async (req, res) => {
const { secret, jobId } = req.body;
if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });
await supabase.from('system_jobs').delete().eq('id', jobId);
if (activeJobs.has(jobId)) {
activeJobs.delete(jobId);
}
console.log(`β Deregistered Job: ${jobId}`);
res.json({ success: true });
});
app.listen(PORT, async () => {
console.log(`π Cron Registry live on port ${PORT}`);
setTimeout(hydrateJobs, 2000);
}); |