File size: 7,816 Bytes
63870b6
 
8ae5837
63870b6
27b60ac
 
 
 
8ae5837
 
 
86366a6
27b60ac
f5eeb4a
27b60ac
8ae5837
 
59bfafb
a842e18
 
 
8ae5837
 
a842e18
8ae5837
a842e18
ac15076
27b60ac
fe20fea
27b60ac
fe20fea
 
 
 
 
 
 
 
 
 
 
a99f1f3
27b60ac
fa6a3b0
fe20fea
 
 
 
 
 
 
a99f1f3
fe20fea
 
 
ac15076
a99f1f3
fe20fea
 
 
 
ac15076
 
27b60ac
 
 
 
 
 
 
 
 
 
 
 
 
 
f5eeb4a
27b60ac
f5eeb4a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27b60ac
 
 
 
 
fe20fea
 
 
27b60ac
fe20fea
 
27b60ac
 
 
 
9c321fd
 
 
 
 
fe20fea
9c321fd
 
2f19bf4
27b60ac
f5eeb4a
 
fe20fea
27b60ac
fe20fea
27b60ac
9c321fd
 
 
f5eeb4a
9c321fd
27b60ac
9c321fd
 
 
 
f5eeb4a
9c321fd
fe20fea
 
9c321fd
fe20fea
 
ac15076
59bfafb
f5eeb4a
a99f1f3
27b60ac
f5eeb4a
27b60ac
a99f1f3
 
 
 
 
 
 
 
 
 
f5eeb4a
 
a99f1f3
fe20fea
f5eeb4a
 
 
 
 
 
 
fe20fea
f5eeb4a
 
 
 
 
 
a99f1f3
fe20fea
a99f1f3
 
 
 
ac15076
a99f1f3
04392bb
8ae5837
a99f1f3
 
 
 
 
 
 
 
 
 
f5eeb4a
a99f1f3
6f54930
8ae5837
71eb220
fe20fea
 
 
 
a99f1f3
fe20fea
71eb220
27b60ac
8ae5837
a9cf784
04392bb
8ae5837
 
 
9c321fd
8ae5837
 
 
ac15076
9c321fd
8ae5837
 
e865f13
 
8ae5837
a99f1f3
 
9c321fd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import express from 'express';
import cors from 'cors';
import { createClient } from '@supabase/supabase-js';

// ==========================================
// βš™οΈ CONFIGURATION & SETTINGS
// ==========================================
const PORT = process.env.PORT || 7860; 
const SUPABASE_URL = process.env.SUPABASE_URL;
const SUPABASE_KEY = process.env.SUPABASE_SERVICE_KEY;
const CRON_SECRET = process.env.CRON_SECRET || "default_secret";

// πŸ”΄ VARIABLE BOOLEAN: Should past/missed jobs fire immediately on server startup?
const RUN_PAST_EVENTS_ON_STARTUP = true; 

if (!SUPABASE_URL || !SUPABASE_KEY) {
    console.error("❌ Missing Supabase Credentials");
    process.exit(1);
}

const app = express();
const supabase = createClient(SUPABASE_URL, SUPABASE_KEY);
const activeJobs = new Map();

app.use(express.json());
app.use(cors());

// ==========================================
// ⏱️ BULLETPROOF TIMEZONE MATH
// ==========================================
function getNextFiveAMDetails(rawOffset = 0) {
    let offsetHours = 0;
    
    // Auto-detect if payload sent Minutes (from JS) or standard Hours
    // JS getTimezoneOffset() is large (e.g. 300 for EST, -60 for CET)
    if (Math.abs(rawOffset) >= 30) {
        offsetHours = -(rawOffset / 60); 
    } else {
        offsetHours = Number(rawOffset); // Already in hours (e.g. -5)
    }

    const now = new Date();
    const target = new Date(now.getTime());
    
    // Set to today's midnight UTC
    target.setUTCHours(0, 0, 0, 0);
    
    // Target 5 AM in the specified timezone
    // Example (EST / UTC-5): 5 - (-5) = 10:00 AM UTC
    // Example (CET / UTC+1): 5 - (+1) = 04:00 AM UTC
    target.setUTCHours(5 - offsetHours);
    
    // If 5 AM has ALREADY PASSED in that timezone today, schedule for tomorrow
    if (target.getTime() <= now.getTime()) {
        target.setUTCDate(target.getUTCDate() + 1);
    }

    return {
        delayMs: target.getTime() - now.getTime(),
        offsetHours: offsetHours
    };
}

// ==========================================
// πŸš€ JOB EXECUTION & SCHEDULING
// ==========================================
async function executeJob(jobId) {
    const job = activeJobs.get(jobId);
    if (!job) return;

    console.log(`⏰ [${new Date().toLocaleTimeString()}] Executing: ${jobId}`);
    try {
        const res = await fetch(job.url, {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify(job.payload)
        });
        
        console.log(`   └─ Response: ${res.status}`);

        // βœ… UPDATE DATABASE ON SUCCESS
        if (res.ok) {
            const { error } = await supabase
                .from('system_jobs')
                .update({ updated_at: new Date().toISOString() })
                .eq('id', jobId);

            if (!error) {
                console.log(`   └─ πŸ’Ύ Database 'updated_at' successfully synced.`);
            } else {
                console.error(`   └─ ❌ Failed to update DB:`, error.message);
            }
        }

    } catch (e) {
        console.error(`❌ Job ${jobId} HTTP Failed:`, e.message);
    }
}

function startJobInternal(jobId, intervalMs, url, payload, delayData, runImmediately = false) {
    let delay = delayData.delayMs;
    let offsetHours = delayData.offsetHours;

    // Failsafe format for log printing (e.g., UTC+1 or UTC-5)
    const tzString = `UTC${offsetHours >= 0 ? '+' : ''}${offsetHours}`;

    const nextRunAt = Date.now() + delay;
    const hours = (delay / 1000 / 60 / 60).toFixed(2);
    const minutes = (delay / 1000 / 60).toFixed(2);

    activeJobs.set(jobId, {
        url,
        payload,
        intervalMs,
        offset: offsetHours,
        nextRunAt
    });

    if (runImmediately) {
        console.log(`⚑ [Startup] Missed event detected! Firing immediately: ${jobId}`);
        executeJob(jobId); 
        console.log(`⏳ Job ${jobId} re-scheduled. Target: 5AM (${tzString}). Next run in ${hours} hours (${minutes} mins).`);
    } else {
        console.log(`⏳ Job ${jobId} scheduled. Target: 5AM (${tzString}). Next run in ${hours} hours (${minutes} mins).`);
    }
}

// ==========================================
// βš™οΈ THE TICK ENGINE (WATCHDOG)
// ==========================================
setInterval(() => {
    const now = Date.now();

    for (const [jobId, job] of activeJobs.entries()) {
        if (now >= job.nextRunAt) {
            executeJob(jobId); 
            
            const nextDelayData = getNextFiveAMDetails(job.offset);
            job.nextRunAt = now + nextDelayData.delayMs;
            
            const hours = (nextDelayData.delayMs / 1000 / 60 / 60).toFixed(2);
            console.log(`πŸ”„ Job ${jobId} entered 24h cycle. Target: 5AM (UTC${job.offset >= 0 ? '+' : ''}${job.offset}). Next run in ${hours} hours.`);
        }
    }
}, 60000); 

// ==========================================
// πŸ’Ύ DB HYDRATION USING `updated_at`
// ==========================================
async function hydrateJobs() {
    console.log("πŸ’§ Hydrating Cron Jobs from DB...");
    const { data, error } = await supabase.from('system_jobs').select('*');
    
    if (error) {
        console.error("❌ Hydration Failed:", error.message);
        return;
    }

    let count = 0;
    const now = Date.now();

    for (const job of data) {
        const delayData = getNextFiveAMDetails(job.payload?.timezoneOffset ?? 0);
        let runImmediately = false;

        // βœ… CHECK LAST UPDATED TIME
        if (job.updated_at) {
            const lastUpdatedTime = new Date(job.updated_at).getTime();
            const timeSinceLastRunMs = now - lastUpdatedTime;
            
            // If it's been more than 24 hours (86,400,000 ms), fire it immediately to catch up.
            if (RUN_PAST_EVENTS_ON_STARTUP && timeSinceLastRunMs >= 86400000) {
                runImmediately = true;
            }
        } else if (RUN_PAST_EVENTS_ON_STARTUP) {
            runImmediately = true;
        }

        startJobInternal(job.id, job.interval_ms, job.webhook_url, job.payload, delayData, runImmediately);
        count++;
    }
    console.log(`βœ… Successfully hydrated and scheduled ${count} jobs.`);
}

app.get('/', (req, res) => res.json({ status: "Cron Registry Active", active_jobs: activeJobs.size }));

app.post('/register', async (req, res) => {
    const { secret, jobId, intervalMs, webhookUrl, payload, initialDelay, leadId } = req.body;

    if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });

    const { error } = await supabase.from('system_jobs').upsert({
        id: jobId,
        lead_id: leadId,
        interval_ms: intervalMs,
        webhook_url: webhookUrl,
        payload: payload,
        updated_at: new Date().toISOString()
    });
  
    if (error) return res.status(500).json({ error: error.message });

    const delayData = getNextFiveAMDetails(payload?.timezoneOffset || 0);
    if (initialDelay !== undefined) {
        delayData.delayMs = initialDelay;
    }
    
    startJobInternal(jobId, intervalMs, webhookUrl, payload, delayData, false);

    console.log(`βž• Registered & Scheduled Job (5AM Anchor): ${jobId}`);
    res.json({ success: true, jobId });
});

app.post('/deregister', async (req, res) => {
    const { secret, jobId } = req.body;
    if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" });
    
    await supabase.from('system_jobs').delete().eq('id', jobId);
    if (activeJobs.has(jobId)) {
        activeJobs.delete(jobId);
    }
    
    console.log(`βž– Deregistered Job: ${jobId}`);
    res.json({ success: true });
});

app.listen(PORT, async () => {
    console.log(`πŸš€ Cron Registry live on port ${PORT}`);
    setTimeout(hydrateJobs, 2000);
});