import express from 'express'; import cors from 'cors'; import { createClient } from '@supabase/supabase-js'; // ========================================== // ⚙️ CONFIGURATION & SETTINGS // ========================================== const PORT = process.env.PORT || 7860; const SUPABASE_URL = process.env.SUPABASE_URL; const SUPABASE_KEY = process.env.SUPABASE_SERVICE_KEY; const CRON_SECRET = process.env.CRON_SECRET || "default_secret"; // 🔴 VARIABLE BOOLEAN: Should past/missed jobs fire immediately on server startup? const RUN_PAST_EVENTS_ON_STARTUP = true; if (!SUPABASE_URL || !SUPABASE_KEY) { console.error("❌ Missing Supabase Credentials"); process.exit(1); } const app = express(); const supabase = createClient(SUPABASE_URL, SUPABASE_KEY); const activeJobs = new Map(); app.use(express.json()); app.use(cors()); // ========================================== // ⏱️ BULLETPROOF TIMEZONE MATH // ========================================== function getNextFiveAMDetails(rawOffset = 0) { let offsetHours = 0; // Auto-detect if payload sent Minutes (from JS) or standard Hours // JS getTimezoneOffset() is large (e.g. 300 for EST, -60 for CET) if (Math.abs(rawOffset) >= 30) { offsetHours = -(rawOffset / 60); } else { offsetHours = Number(rawOffset); // Already in hours (e.g. -5) } const now = new Date(); const target = new Date(now.getTime()); // Set to today's midnight UTC target.setUTCHours(0, 0, 0, 0); // Target 5 AM in the specified timezone // Example (EST / UTC-5): 5 - (-5) = 10:00 AM UTC // Example (CET / UTC+1): 5 - (+1) = 04:00 AM UTC target.setUTCHours(5 - offsetHours); // If 5 AM has ALREADY PASSED in that timezone today, schedule for tomorrow if (target.getTime() <= now.getTime()) { target.setUTCDate(target.getUTCDate() + 1); } return { delayMs: target.getTime() - now.getTime(), offsetHours: offsetHours }; } // ========================================== // 🚀 JOB EXECUTION & SCHEDULING // ========================================== async function executeJob(jobId) { const job = activeJobs.get(jobId); if (!job) return; console.log(`⏰ [${new Date().toLocaleTimeString()}] Executing: ${jobId}`); try { const res = await fetch(job.url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(job.payload) }); console.log(` └─ Response: ${res.status}`); // ✅ UPDATE DATABASE ON SUCCESS if (res.ok) { const { error } = await supabase .from('system_jobs') .update({ updated_at: new Date().toISOString() }) .eq('id', jobId); if (!error) { console.log(` └─ 💾 Database 'updated_at' successfully synced.`); } else { console.error(` └─ ❌ Failed to update DB:`, error.message); } } } catch (e) { console.error(`❌ Job ${jobId} HTTP Failed:`, e.message); } } function startJobInternal(jobId, intervalMs, url, payload, delayData, runImmediately = false) { let delay = delayData.delayMs; let offsetHours = delayData.offsetHours; // Failsafe format for log printing (e.g., UTC+1 or UTC-5) const tzString = `UTC${offsetHours >= 0 ? '+' : ''}${offsetHours}`; const nextRunAt = Date.now() + delay; const hours = (delay / 1000 / 60 / 60).toFixed(2); const minutes = (delay / 1000 / 60).toFixed(2); activeJobs.set(jobId, { url, payload, intervalMs, offset: offsetHours, nextRunAt }); if (runImmediately) { console.log(`⚡ [Startup] Missed event detected! Firing immediately: ${jobId}`); executeJob(jobId); console.log(`⏳ Job ${jobId} re-scheduled. Target: 5AM (${tzString}). Next run in ${hours} hours (${minutes} mins).`); } else { console.log(`⏳ Job ${jobId} scheduled. Target: 5AM (${tzString}). Next run in ${hours} hours (${minutes} mins).`); } } // ========================================== // ⚙️ THE TICK ENGINE (WATCHDOG) // ========================================== setInterval(() => { const now = Date.now(); for (const [jobId, job] of activeJobs.entries()) { if (now >= job.nextRunAt) { executeJob(jobId); const nextDelayData = getNextFiveAMDetails(job.offset); job.nextRunAt = now + nextDelayData.delayMs; const hours = (nextDelayData.delayMs / 1000 / 60 / 60).toFixed(2); console.log(`🔄 Job ${jobId} entered 24h cycle. Target: 5AM (UTC${job.offset >= 0 ? '+' : ''}${job.offset}). Next run in ${hours} hours.`); } } }, 60000); // ========================================== // 💾 DB HYDRATION USING `updated_at` // ========================================== async function hydrateJobs() { console.log("💧 Hydrating Cron Jobs from DB..."); const { data, error } = await supabase.from('system_jobs').select('*'); if (error) { console.error("❌ Hydration Failed:", error.message); return; } let count = 0; const now = Date.now(); for (const job of data) { const delayData = getNextFiveAMDetails(job.payload?.timezoneOffset ?? 0); let runImmediately = false; // ✅ CHECK LAST UPDATED TIME if (job.updated_at) { const lastUpdatedTime = new Date(job.updated_at).getTime(); const timeSinceLastRunMs = now - lastUpdatedTime; // If it's been more than 24 hours (86,400,000 ms), fire it immediately to catch up. if (RUN_PAST_EVENTS_ON_STARTUP && timeSinceLastRunMs >= 86400000) { runImmediately = true; } } else if (RUN_PAST_EVENTS_ON_STARTUP) { runImmediately = true; } startJobInternal(job.id, job.interval_ms, job.webhook_url, job.payload, delayData, runImmediately); count++; } console.log(`✅ Successfully hydrated and scheduled ${count} jobs.`); } app.get('/', (req, res) => res.json({ status: "Cron Registry Active", active_jobs: activeJobs.size })); app.post('/register', async (req, res) => { const { secret, jobId, intervalMs, webhookUrl, payload, initialDelay, leadId } = req.body; if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" }); const { error } = await supabase.from('system_jobs').upsert({ id: jobId, lead_id: leadId, interval_ms: intervalMs, webhook_url: webhookUrl, payload: payload, updated_at: new Date().toISOString() }); if (error) return res.status(500).json({ error: error.message }); const delayData = getNextFiveAMDetails(payload?.timezoneOffset || 0); if (initialDelay !== undefined) { delayData.delayMs = initialDelay; } startJobInternal(jobId, intervalMs, webhookUrl, payload, delayData, false); console.log(`➕ Registered & Scheduled Job (5AM Anchor): ${jobId}`); res.json({ success: true, jobId }); }); app.post('/deregister', async (req, res) => { const { secret, jobId } = req.body; if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" }); await supabase.from('system_jobs').delete().eq('id', jobId); if (activeJobs.has(jobId)) { activeJobs.delete(jobId); } console.log(`➖ Deregistered Job: ${jobId}`); res.json({ success: true }); }); app.listen(PORT, async () => { console.log(`🚀 Cron Registry live on port ${PORT}`); setTimeout(hydrateJobs, 2000); });