Kraft102's picture
Initial deployment - WidgeTDC Cortex Backend v2.1.0
529090e
import cron from 'node-cron';
import path from 'path';
import { logger } from '../../utils/logger.js';
import { getNeo4jVectorStore } from '../../platform/vector/Neo4jVectorStoreAdapter.js';
import { OutlookEmailReader } from './OutlookEmailReader.js';
import { PublicThreatScraper } from './PublicThreatScraper.js';
import { InternalLeakHunter } from './InternalLeakHunter.js';
import { NewsMonitorScraper } from './NewsMonitorScraper.js';
import { LocalFileScanner } from './LocalFileScanner.js';
import { eventBus } from '../../mcp/EventBus.js';
type Telemetry = {
lastRun?: string;
lastError?: string;
runs: number;
};
export class DataScheduler {
private isRunning = false;
private tasks: cron.ScheduledTask[] = [];
private telemetry: Record<string, Telemetry> = {};
start() {
if (this.isRunning) return;
this.isRunning = true;
logger.info('βœ… DataScheduler started');
this.scheduleEmailIngestion();
this.scheduleThreatIntel();
this.scheduleInternalHunt();
this.scheduleNewsMonitor();
this.scheduleSystemHealth();
this.scheduleLocalFiles();
}
stop() {
this.tasks.forEach((t) => t.stop());
this.isRunning = false;
logger.info('πŸ›‘ DataScheduler stopped');
}
public getStatus() {
return {
running: this.isRunning,
tasks: Object.entries(this.telemetry).map(([name, info]) => ({
name,
lastRun: info.lastRun || null,
lastError: info.lastError || null,
runs: info.runs,
})),
};
}
private markRun(name: string, error?: unknown) {
if (!this.telemetry[name]) {
this.telemetry[name] = { runs: 0 };
}
const entry = this.telemetry[name];
entry.lastRun = new Date().toISOString();
entry.runs += 1;
entry.lastError = error ? (error as any)?.message || String(error) : undefined;
}
private scheduleLocalFiles() {
const scanPath = path.resolve('data/ingestion');
const scanner = new LocalFileScanner({
rootPaths: [scanPath],
extensions: ['.pdf', '.txt', '.md'],
});
const task = cron.schedule('*/30 * * * *', async () => {
const taskName = 'local_files';
logger.info(`πŸ“ Scanning local files in ${scanPath}...`);
try {
if (await scanner.isAvailable()) {
const files = await scanner.fetch();
const entities = await scanner.transform(files);
if (entities.length > 0) {
const vectorStore = getNeo4jVectorStore();
await vectorStore.batchUpsert({
records: entities.map((entity) => ({
id: entity.id,
content: entity.content,
metadata: {
...entity.metadata,
type: 'document',
source: 'local_ingestion',
},
namespace: 'documents',
})),
namespace: 'documents',
});
logger.info(`πŸ“¨ Ingested ${entities.length} local documents`);
eventBus.emit('ingestion:documents', { count: entities.length });
}
this.markRun(taskName);
} else {
logger.warn(`Local ingestion path not accessible: ${scanPath}`);
this.markRun(taskName, new Error('path not accessible'));
}
} catch (error) {
logger.error('Local file scan failed:', error);
this.markRun(taskName, error);
}
});
this.tasks.push(task);
setTimeout(async () => {
const taskName = 'local_files_initial';
logger.info('πŸ“ Running initial local file scan...');
try {
if (await scanner.isAvailable()) {
const files = await scanner.fetch();
const entities = await scanner.transform(files);
if (entities.length > 0) {
const vectorStore = getNeo4jVectorStore();
await vectorStore.batchUpsert({
records: entities.map((entity) => ({
id: entity.id,
content: entity.content,
metadata: {
...entity.metadata,
type: 'document',
source: 'local_ingestion',
},
namespace: 'documents',
})),
namespace: 'documents',
});
logger.info(`πŸ“¨ Ingested ${entities.length} local documents (initial)`);
}
this.markRun(taskName);
}
} catch (e) {
logger.error('Initial local scan failed:', e);
this.markRun(taskName, e);
}
}, 15000);
}
private scheduleEmailIngestion() {
const task = cron.schedule('*/5 * * * *', async () => {
const taskName = 'emails';
logger.info('πŸ“§ Running scheduled email ingestion...');
try {
const reader = new OutlookEmailReader();
const emails = await reader.readData();
if (emails.length > 0) {
const vectorStore = getNeo4jVectorStore();
for (const email of emails) {
await vectorStore.upsert({
id: `email-${email.id}`,
content: `Subject: ${email.subject}\nFrom: ${email.sender}\n\n${email.content}`,
metadata: {
type: 'email',
source: 'outlook',
sender: email.sender,
receivedAt: email.timestamp,
...email.metadata,
},
namespace: 'emails',
});
}
logger.info(`βœ… Ingested ${emails.length} emails to vector store`);
eventBus.emit('ingestion:emails', { count: emails.length });
} else {
logger.debug('No new emails found');
}
this.markRun(taskName);
} catch (error) {
if ((error as any).message?.includes('Mangler IMAP credentials')) {
logger.debug('Skipping email ingestion (no credentials)');
} else {
logger.error('❌ Email ingestion failed:', error);
}
this.markRun(taskName, error);
}
});
this.tasks.push(task);
}
private scheduleThreatIntel() {
const scraper = new PublicThreatScraper();
const task = cron.schedule('*/15 * * * *', async () => {
const taskName = 'threat_intel';
logger.info('🌐 Running public threat intelligence scan...');
try {
const threats = await scraper.fetchThreats();
if (threats.length > 0) {
eventBus.emit('threat:detected', { threats });
logger.info(`βœ… Broadcasted ${threats.length} threats to UI`);
}
this.markRun(taskName);
} catch (error) {
logger.error('Threat scan failed:', error);
this.markRun(taskName, error);
}
});
this.tasks.push(task);
}
private scheduleInternalHunt() {
const hunter = new InternalLeakHunter();
const task = cron.schedule('*/10 * * * *', async () => {
const taskName = 'internal_hunt';
try {
await hunter.hunt();
this.markRun(taskName);
} catch (error) {
logger.error('Internal hunt failed:', error);
this.markRun(taskName, error);
}
});
this.tasks.push(task);
}
private scheduleNewsMonitor() {
const scraper = new NewsMonitorScraper();
const task = cron.schedule('0 * * * *', async () => {
const taskName = 'news_monitor';
logger.info('πŸ“° Running news monitor scan...');
try {
const news = await scraper.fetchNews();
if (news.length > 0) {
const vectorStore = getNeo4jVectorStore();
await vectorStore.batchUpsert({
records: news.map((item) => ({
id: item.id,
content: `${item.title}\n${item.snippet}\nSource: ${item.source}\nCategory: ${item.category}`,
metadata: {
...item,
type: 'news',
ingestedAt: new Date().toISOString(),
},
namespace: 'news',
})),
namespace: 'news',
});
logger.info(`βœ… Ingested ${news.length} news items`);
eventBus.emit('ingestion:news', { count: news.length, items: news });
}
this.markRun(taskName);
} catch (error) {
logger.error('News monitor failed:', error);
this.markRun(taskName, error);
}
});
this.tasks.push(task);
setTimeout(() => {
const taskName = 'news_monitor_initial';
logger.info('πŸ“° Running initial news scan...');
scraper
.fetchNews()
.then((news) => {
if (news.length > 0) {
eventBus.emit('ingestion:news', { count: news.length, items: news });
}
this.markRun(taskName);
})
.catch((err) => {
logger.error('Initial news scan failed:', err);
this.markRun(taskName, err);
});
}, 10000);
}
private scheduleSystemHealth() {
const task = cron.schedule('* * * * *', async () => {
const taskName = 'system_heartbeat';
eventBus.emit('system:heartbeat', {
timestamp: new Date().toISOString(),
status: 'healthy',
memory: process.memoryUsage(),
});
this.markRun(taskName);
});
this.tasks.push(task);
}
}
export const dataScheduler = new DataScheduler();