cron_server / app.js
everydaytok's picture
Update app.js
fe20fea verified
import express from 'express';
import cors from 'cors';
import { createClient } from '@supabase/supabase-js';
// ==========================================
// βš™οΈ CONFIGURATION & SETTINGS
// ==========================================
const PORT = process.env.PORT || 7860;
const SUPABASE_URL = process.env.SUPABASE_URL;
const SUPABASE_KEY = process.env.SUPABASE_SERVICE_KEY;
const CRON_SECRET = process.env.CRON_SECRET || "default_secret";
// πŸ”΄ VARIABLE BOOLEAN: Should past/missed jobs fire immediately on server startup?
const RUN_PAST_EVENTS_ON_STARTUP = true;
if (!SUPABASE_URL || !SUPABASE_KEY) {
console.error("❌ Missing Supabase Credentials");
process.exit(1);
}
const app = express();
const supabase = createClient(SUPABASE_URL, SUPABASE_KEY);
const activeJobs = new Map();
app.use(express.json());
app.use(cors());
// ==========================================
// ⏱️ BULLETPROOF TIMEZONE MATH
// ==========================================
function getNextFiveAMDetails(rawOffset = 0) {
let offsetHours = 0;
// Auto-detect if payload sent Minutes (from JS) or standard Hours
// JS getTimezoneOffset() is large (e.g. 300 for EST, -60 for CET)
if (Math.abs(rawOffset) >= 30) {
offsetHours = -(rawOffset / 60);
} else {
offsetHours = Number(rawOffset); // Already in hours (e.g. -5)
}
const now = new Date();
const target = new Date(now.getTime());
// Set to today's midnight UTC
target.setUTCHours(0, 0, 0, 0);
// Target 5 AM in the specified timezone
// Example (EST / UTC-5): 5 - (-5) = 10:00 AM UTC
// Example (CET / UTC+1): 5 - (+1) = 04:00 AM UTC
target.setUTCHours(5 - offsetHours);
// If 5 AM has ALREADY PASSED in that timezone today, schedule for tomorrow
if (target.getTime() <= now.getTime()) {
target.setUTCDate(target.getUTCDate() + 1);
}
return {
delayMs: target.getTime() - now.getTime(),
offsetHours: offsetHours
};
}
// ==========================================
// πŸš€ JOB EXECUTION & SCHEDULING
// ==========================================
async function executeJob(jobId) {
const job = activeJobs.get(jobId);
if (!job) return;
console.log(`⏰ [${new Date().toLocaleTimeString()}] Executing: ${jobId}`);
try {
const res = await fetch(job.url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(job.payload)
});
console.log(` └─ Response: ${res.status}`);
// βœ… UPDATE DATABASE ON SUCCESS
if (res.ok) {
const { error } = await supabase
.from('system_jobs')
.update({ updated_at: new Date().toISOString() })
.eq('id', jobId);
if (!error) {
console.log(` └─ πŸ’Ύ Database 'updated_at' successfully synced.`);
} else {
console.error(` └─ ❌ Failed to update DB:`, error.message);
}
}
} catch (e) {
console.error(`❌ Job ${jobId} HTTP Failed:`, e.message);
}
}
function startJobInternal(jobId, intervalMs, url, payload, delayData, runImmediately = false) {
let delay = delayData.delayMs;
let offsetHours = delayData.offsetHours;
// Failsafe format for log printing (e.g., UTC+1 or UTC-5)
const tzString = `UTC${offsetHours >= 0 ? '+' : ''}${offsetHours}`;
const nextRunAt = Date.now() + delay;
const hours = (delay / 1000 / 60 / 60).toFixed(2);
const minutes = (delay / 1000 / 60).toFixed(2);
activeJobs.set(jobId, {
url,
payload,
intervalMs,
offset: offsetHours,
nextRunAt
});
if (runImmediately) {
console.log(`⚑ [Startup] Missed event detected! Firing immediately: ${jobId}`);
executeJob(jobId);
console.log(`⏳ Job ${jobId} re-scheduled. Target: 5AM (${tzString}). Next run in ${hours} hours (${minutes} mins).`);
} else {
console.log(`⏳ Job ${jobId} scheduled. Target: 5AM (${tzString}). Next run in ${hours} hours (${minutes} mins).`);
}
}
// ==========================================
// βš™οΈ THE TICK ENGINE (WATCHDOG)
// ==========================================
setInterval(() => {
const now = Date.now();
for (const [jobId, job] of activeJobs.entries()) {
if (now >= job.nextRunAt) {
executeJob(jobId);
const nextDelayData = getNextFiveAMDetails(job.offset);
job.nextRunAt = now + nextDelayData.delayMs;
const hours = (nextDelayData.delayMs / 1000 / 60 / 60).toFixed(2);
console.log(`πŸ”„ Job ${jobId} entered 24h cycle. Target: 5AM (UTC${job.offset >= 0 ? '+' : ''}${job.offset}). Next run in ${hours} hours.`);
}
}
}, 60000);
// ==========================================
// πŸ’Ύ DB HYDRATION USING `updated_at`
// ==========================================
async function hydrateJobs() {
console.log("πŸ’§ Hydrating Cron Jobs from DB...");
const { data, error } = await supabase.from('system_jobs').select('*');
if (error) {
console.error("❌ Hydration Failed:", error.message);
return;
}
let count = 0;
const now = Date.now();
for (const job of data) {
const delayData = getNextFiveAMDetails(job.payload?.timezoneOffset ?? 0);
let runImmediately = false;
// βœ… CHECK LAST UPDATED TIME
if (job.updated_at) {
const lastUpdatedTime = new Date(job.updated_at).getTime();
const timeSinceLastRunMs = now - lastUpdatedTime;
// If it's been more than 24 hours (86,400,000 ms), fire it immediately to catch up.
if (RUN_PAST_EVENTS_ON_STARTUP && timeSinceLastRunMs >= 86400000) {
runImmediately = true;
}
} else if (RUN_PAST_EVENTS_ON_STARTUP) {
runImmediately = true;
}
startJobInternal(job.id, job.interval_ms, job.webhook_url, job.payload, delayData, runImmediately);
count++;
}
console.log(`βœ… Successfully hydrated and scheduled ${count} jobs.`);
}
app.get('/', (req, res) => res.json({ status: "Cron Registry Active", active_jobs: activeJobs.size }));
app.post('/register', async (req, res) => {
const { secret, jobId, intervalMs, webhookUrl, payload, initialDelay, leadId } = req.body;
if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });
const { error } = await supabase.from('system_jobs').upsert({
id: jobId,
lead_id: leadId,
interval_ms: intervalMs,
webhook_url: webhookUrl,
payload: payload,
updated_at: new Date().toISOString()
});
if (error) return res.status(500).json({ error: error.message });
const delayData = getNextFiveAMDetails(payload?.timezoneOffset || 0);
if (initialDelay !== undefined) {
delayData.delayMs = initialDelay;
}
startJobInternal(jobId, intervalMs, webhookUrl, payload, delayData, false);
console.log(`βž• Registered & Scheduled Job (5AM Anchor): ${jobId}`);
res.json({ success: true, jobId });
});
app.post('/deregister', async (req, res) => {
const { secret, jobId } = req.body;
if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });
await supabase.from('system_jobs').delete().eq('id', jobId);
if (activeJobs.has(jobId)) {
activeJobs.delete(jobId);
}
console.log(`βž– Deregistered Job: ${jobId}`);
res.json({ success: true });
});
app.listen(PORT, async () => {
console.log(`πŸš€ Cron Registry live on port ${PORT}`);
setTimeout(hydrateJobs, 2000);
});