import cron from 'node-cron'; import path from 'path'; import { logger } from '../../utils/logger.js'; import { getNeo4jVectorStore } from '../../platform/vector/Neo4jVectorStoreAdapter.js'; import { OutlookEmailReader } from './OutlookEmailReader.js'; import { PublicThreatScraper } from './PublicThreatScraper.js'; import { InternalLeakHunter } from './InternalLeakHunter.js'; import { NewsMonitorScraper } from './NewsMonitorScraper.js'; import { LocalFileScanner } from './LocalFileScanner.js'; import { eventBus } from '../../mcp/EventBus.js'; type Telemetry = { lastRun?: string; lastError?: string; runs: number; }; export class DataScheduler { private isRunning = false; private tasks: cron.ScheduledTask[] = []; private telemetry: Record = {}; start() { if (this.isRunning) return; this.isRunning = true; logger.info('✅ DataScheduler started'); this.scheduleEmailIngestion(); this.scheduleThreatIntel(); this.scheduleInternalHunt(); this.scheduleNewsMonitor(); this.scheduleSystemHealth(); this.scheduleLocalFiles(); } stop() { this.tasks.forEach((t) => t.stop()); this.isRunning = false; logger.info('🛑 DataScheduler stopped'); } public getStatus() { return { running: this.isRunning, tasks: Object.entries(this.telemetry).map(([name, info]) => ({ name, lastRun: info.lastRun || null, lastError: info.lastError || null, runs: info.runs, })), }; } private markRun(name: string, error?: unknown) { if (!this.telemetry[name]) { this.telemetry[name] = { runs: 0 }; } const entry = this.telemetry[name]; entry.lastRun = new Date().toISOString(); entry.runs += 1; entry.lastError = error ? (error as any)?.message || String(error) : undefined; } private scheduleLocalFiles() { const scanPath = path.resolve('data/ingestion'); const scanner = new LocalFileScanner({ rootPaths: [scanPath], extensions: ['.pdf', '.txt', '.md'], }); const task = cron.schedule('*/30 * * * *', async () => { const taskName = 'local_files'; logger.info(`📁 Scanning local files in ${scanPath}...`); try { if (await scanner.isAvailable()) { const files = await scanner.fetch(); const entities = await scanner.transform(files); if (entities.length > 0) { const vectorStore = getNeo4jVectorStore(); await vectorStore.batchUpsert({ records: entities.map((entity) => ({ id: entity.id, content: entity.content, metadata: { ...entity.metadata, type: 'document', source: 'local_ingestion', }, namespace: 'documents', })), namespace: 'documents', }); logger.info(`📨 Ingested ${entities.length} local documents`); eventBus.emit('ingestion:documents', { count: entities.length }); } this.markRun(taskName); } else { logger.warn(`Local ingestion path not accessible: ${scanPath}`); this.markRun(taskName, new Error('path not accessible')); } } catch (error) { logger.error('Local file scan failed:', error); this.markRun(taskName, error); } }); this.tasks.push(task); setTimeout(async () => { const taskName = 'local_files_initial'; logger.info('📁 Running initial local file scan...'); try { if (await scanner.isAvailable()) { const files = await scanner.fetch(); const entities = await scanner.transform(files); if (entities.length > 0) { const vectorStore = getNeo4jVectorStore(); await vectorStore.batchUpsert({ records: entities.map((entity) => ({ id: entity.id, content: entity.content, metadata: { ...entity.metadata, type: 'document', source: 'local_ingestion', }, namespace: 'documents', })), namespace: 'documents', }); logger.info(`📨 Ingested ${entities.length} local documents (initial)`); } this.markRun(taskName); } } catch (e) { logger.error('Initial local scan failed:', e); this.markRun(taskName, e); } }, 15000); } private scheduleEmailIngestion() { const task = cron.schedule('*/5 * * * *', async () => { const taskName = 'emails'; logger.info('📧 Running scheduled email ingestion...'); try { const reader = new OutlookEmailReader(); const emails = await reader.readData(); if (emails.length > 0) { const vectorStore = getNeo4jVectorStore(); for (const email of emails) { await vectorStore.upsert({ id: `email-${email.id}`, content: `Subject: ${email.subject}\nFrom: ${email.sender}\n\n${email.content}`, metadata: { type: 'email', source: 'outlook', sender: email.sender, receivedAt: email.timestamp, ...email.metadata, }, namespace: 'emails', }); } logger.info(`✅ Ingested ${emails.length} emails to vector store`); eventBus.emit('ingestion:emails', { count: emails.length }); } else { logger.debug('No new emails found'); } this.markRun(taskName); } catch (error) { if ((error as any).message?.includes('Mangler IMAP credentials')) { logger.debug('Skipping email ingestion (no credentials)'); } else { logger.error('❌ Email ingestion failed:', error); } this.markRun(taskName, error); } }); this.tasks.push(task); } private scheduleThreatIntel() { const scraper = new PublicThreatScraper(); const task = cron.schedule('*/15 * * * *', async () => { const taskName = 'threat_intel'; logger.info('🌐 Running public threat intelligence scan...'); try { const threats = await scraper.fetchThreats(); if (threats.length > 0) { eventBus.emit('threat:detected', { threats }); logger.info(`✅ Broadcasted ${threats.length} threats to UI`); } this.markRun(taskName); } catch (error) { logger.error('Threat scan failed:', error); this.markRun(taskName, error); } }); this.tasks.push(task); } private scheduleInternalHunt() { const hunter = new InternalLeakHunter(); const task = cron.schedule('*/10 * * * *', async () => { const taskName = 'internal_hunt'; try { await hunter.hunt(); this.markRun(taskName); } catch (error) { logger.error('Internal hunt failed:', error); this.markRun(taskName, error); } }); this.tasks.push(task); } private scheduleNewsMonitor() { const scraper = new NewsMonitorScraper(); const task = cron.schedule('0 * * * *', async () => { const taskName = 'news_monitor'; logger.info('📰 Running news monitor scan...'); try { const news = await scraper.fetchNews(); if (news.length > 0) { const vectorStore = getNeo4jVectorStore(); await vectorStore.batchUpsert({ records: news.map((item) => ({ id: item.id, content: `${item.title}\n${item.snippet}\nSource: ${item.source}\nCategory: ${item.category}`, metadata: { ...item, type: 'news', ingestedAt: new Date().toISOString(), }, namespace: 'news', })), namespace: 'news', }); logger.info(`✅ Ingested ${news.length} news items`); eventBus.emit('ingestion:news', { count: news.length, items: news }); } this.markRun(taskName); } catch (error) { logger.error('News monitor failed:', error); this.markRun(taskName, error); } }); this.tasks.push(task); setTimeout(() => { const taskName = 'news_monitor_initial'; logger.info('📰 Running initial news scan...'); scraper .fetchNews() .then((news) => { if (news.length > 0) { eventBus.emit('ingestion:news', { count: news.length, items: news }); } this.markRun(taskName); }) .catch((err) => { logger.error('Initial news scan failed:', err); this.markRun(taskName, err); }); }, 10000); } private scheduleSystemHealth() { const task = cron.schedule('* * * * *', async () => { const taskName = 'system_heartbeat'; eventBus.emit('system:heartbeat', { timestamp: new Date().toISOString(), status: 'healthy', memory: process.memoryUsage(), }); this.markRun(taskName); }); this.tasks.push(task); } } export const dataScheduler = new DataScheduler();