Spaces:
Paused
Paused
| import cron from 'node-cron'; | |
| import path from 'path'; | |
| import { logger } from '../../utils/logger.js'; | |
| import { getNeo4jVectorStore } from '../../platform/vector/Neo4jVectorStoreAdapter.js'; | |
| import { OutlookEmailReader } from './OutlookEmailReader.js'; | |
| import { PublicThreatScraper } from './PublicThreatScraper.js'; | |
| import { InternalLeakHunter } from './InternalLeakHunter.js'; | |
| import { NewsMonitorScraper } from './NewsMonitorScraper.js'; | |
| import { LocalFileScanner } from './LocalFileScanner.js'; | |
| import { eventBus } from '../../mcp/EventBus.js'; | |
| type Telemetry = { | |
| lastRun?: string; | |
| lastError?: string; | |
| runs: number; | |
| }; | |
| export class DataScheduler { | |
| private isRunning = false; | |
| private tasks: cron.ScheduledTask[] = []; | |
| private telemetry: Record<string, Telemetry> = {}; | |
| start() { | |
| if (this.isRunning) return; | |
| this.isRunning = true; | |
| logger.info('β DataScheduler started'); | |
| this.scheduleEmailIngestion(); | |
| this.scheduleThreatIntel(); | |
| this.scheduleInternalHunt(); | |
| this.scheduleNewsMonitor(); | |
| this.scheduleSystemHealth(); | |
| this.scheduleLocalFiles(); | |
| } | |
| stop() { | |
| this.tasks.forEach((t) => t.stop()); | |
| this.isRunning = false; | |
| logger.info('π DataScheduler stopped'); | |
| } | |
| public getStatus() { | |
| return { | |
| running: this.isRunning, | |
| tasks: Object.entries(this.telemetry).map(([name, info]) => ({ | |
| name, | |
| lastRun: info.lastRun || null, | |
| lastError: info.lastError || null, | |
| runs: info.runs, | |
| })), | |
| }; | |
| } | |
| private markRun(name: string, error?: unknown) { | |
| if (!this.telemetry[name]) { | |
| this.telemetry[name] = { runs: 0 }; | |
| } | |
| const entry = this.telemetry[name]; | |
| entry.lastRun = new Date().toISOString(); | |
| entry.runs += 1; | |
| entry.lastError = error ? (error as any)?.message || String(error) : undefined; | |
| } | |
| private scheduleLocalFiles() { | |
| const scanPath = path.resolve('data/ingestion'); | |
| const scanner = new LocalFileScanner({ | |
| rootPaths: [scanPath], | |
| extensions: ['.pdf', '.txt', '.md'], | |
| }); | |
| const task = cron.schedule('*/30 * * * *', async () => { | |
| const taskName = 'local_files'; | |
| logger.info(`π Scanning local files in ${scanPath}...`); | |
| try { | |
| if (await scanner.isAvailable()) { | |
| const files = await scanner.fetch(); | |
| const entities = await scanner.transform(files); | |
| if (entities.length > 0) { | |
| const vectorStore = getNeo4jVectorStore(); | |
| await vectorStore.batchUpsert({ | |
| records: entities.map((entity) => ({ | |
| id: entity.id, | |
| content: entity.content, | |
| metadata: { | |
| ...entity.metadata, | |
| type: 'document', | |
| source: 'local_ingestion', | |
| }, | |
| namespace: 'documents', | |
| })), | |
| namespace: 'documents', | |
| }); | |
| logger.info(`π¨ Ingested ${entities.length} local documents`); | |
| eventBus.emit('ingestion:documents', { count: entities.length }); | |
| } | |
| this.markRun(taskName); | |
| } else { | |
| logger.warn(`Local ingestion path not accessible: ${scanPath}`); | |
| this.markRun(taskName, new Error('path not accessible')); | |
| } | |
| } catch (error) { | |
| logger.error('Local file scan failed:', error); | |
| this.markRun(taskName, error); | |
| } | |
| }); | |
| this.tasks.push(task); | |
| setTimeout(async () => { | |
| const taskName = 'local_files_initial'; | |
| logger.info('π Running initial local file scan...'); | |
| try { | |
| if (await scanner.isAvailable()) { | |
| const files = await scanner.fetch(); | |
| const entities = await scanner.transform(files); | |
| if (entities.length > 0) { | |
| const vectorStore = getNeo4jVectorStore(); | |
| await vectorStore.batchUpsert({ | |
| records: entities.map((entity) => ({ | |
| id: entity.id, | |
| content: entity.content, | |
| metadata: { | |
| ...entity.metadata, | |
| type: 'document', | |
| source: 'local_ingestion', | |
| }, | |
| namespace: 'documents', | |
| })), | |
| namespace: 'documents', | |
| }); | |
| logger.info(`π¨ Ingested ${entities.length} local documents (initial)`); | |
| } | |
| this.markRun(taskName); | |
| } | |
| } catch (e) { | |
| logger.error('Initial local scan failed:', e); | |
| this.markRun(taskName, e); | |
| } | |
| }, 15000); | |
| } | |
| private scheduleEmailIngestion() { | |
| const task = cron.schedule('*/5 * * * *', async () => { | |
| const taskName = 'emails'; | |
| logger.info('π§ Running scheduled email ingestion...'); | |
| try { | |
| const reader = new OutlookEmailReader(); | |
| const emails = await reader.readData(); | |
| if (emails.length > 0) { | |
| const vectorStore = getNeo4jVectorStore(); | |
| for (const email of emails) { | |
| await vectorStore.upsert({ | |
| id: `email-${email.id}`, | |
| content: `Subject: ${email.subject}\nFrom: ${email.sender}\n\n${email.content}`, | |
| metadata: { | |
| type: 'email', | |
| source: 'outlook', | |
| sender: email.sender, | |
| receivedAt: email.timestamp, | |
| ...email.metadata, | |
| }, | |
| namespace: 'emails', | |
| }); | |
| } | |
| logger.info(`β Ingested ${emails.length} emails to vector store`); | |
| eventBus.emit('ingestion:emails', { count: emails.length }); | |
| } else { | |
| logger.debug('No new emails found'); | |
| } | |
| this.markRun(taskName); | |
| } catch (error) { | |
| if ((error as any).message?.includes('Mangler IMAP credentials')) { | |
| logger.debug('Skipping email ingestion (no credentials)'); | |
| } else { | |
| logger.error('β Email ingestion failed:', error); | |
| } | |
| this.markRun(taskName, error); | |
| } | |
| }); | |
| this.tasks.push(task); | |
| } | |
| private scheduleThreatIntel() { | |
| const scraper = new PublicThreatScraper(); | |
| const task = cron.schedule('*/15 * * * *', async () => { | |
| const taskName = 'threat_intel'; | |
| logger.info('π Running public threat intelligence scan...'); | |
| try { | |
| const threats = await scraper.fetchThreats(); | |
| if (threats.length > 0) { | |
| eventBus.emit('threat:detected', { threats }); | |
| logger.info(`β Broadcasted ${threats.length} threats to UI`); | |
| } | |
| this.markRun(taskName); | |
| } catch (error) { | |
| logger.error('Threat scan failed:', error); | |
| this.markRun(taskName, error); | |
| } | |
| }); | |
| this.tasks.push(task); | |
| } | |
| private scheduleInternalHunt() { | |
| const hunter = new InternalLeakHunter(); | |
| const task = cron.schedule('*/10 * * * *', async () => { | |
| const taskName = 'internal_hunt'; | |
| try { | |
| await hunter.hunt(); | |
| this.markRun(taskName); | |
| } catch (error) { | |
| logger.error('Internal hunt failed:', error); | |
| this.markRun(taskName, error); | |
| } | |
| }); | |
| this.tasks.push(task); | |
| } | |
| private scheduleNewsMonitor() { | |
| const scraper = new NewsMonitorScraper(); | |
| const task = cron.schedule('0 * * * *', async () => { | |
| const taskName = 'news_monitor'; | |
| logger.info('π° Running news monitor scan...'); | |
| try { | |
| const news = await scraper.fetchNews(); | |
| if (news.length > 0) { | |
| const vectorStore = getNeo4jVectorStore(); | |
| await vectorStore.batchUpsert({ | |
| records: news.map((item) => ({ | |
| id: item.id, | |
| content: `${item.title}\n${item.snippet}\nSource: ${item.source}\nCategory: ${item.category}`, | |
| metadata: { | |
| ...item, | |
| type: 'news', | |
| ingestedAt: new Date().toISOString(), | |
| }, | |
| namespace: 'news', | |
| })), | |
| namespace: 'news', | |
| }); | |
| logger.info(`β Ingested ${news.length} news items`); | |
| eventBus.emit('ingestion:news', { count: news.length, items: news }); | |
| } | |
| this.markRun(taskName); | |
| } catch (error) { | |
| logger.error('News monitor failed:', error); | |
| this.markRun(taskName, error); | |
| } | |
| }); | |
| this.tasks.push(task); | |
| setTimeout(() => { | |
| const taskName = 'news_monitor_initial'; | |
| logger.info('π° Running initial news scan...'); | |
| scraper | |
| .fetchNews() | |
| .then((news) => { | |
| if (news.length > 0) { | |
| eventBus.emit('ingestion:news', { count: news.length, items: news }); | |
| } | |
| this.markRun(taskName); | |
| }) | |
| .catch((err) => { | |
| logger.error('Initial news scan failed:', err); | |
| this.markRun(taskName, err); | |
| }); | |
| }, 10000); | |
| } | |
| private scheduleSystemHealth() { | |
| const task = cron.schedule('* * * * *', async () => { | |
| const taskName = 'system_heartbeat'; | |
| eventBus.emit('system:heartbeat', { | |
| timestamp: new Date().toISOString(), | |
| status: 'healthy', | |
| memory: process.memoryUsage(), | |
| }); | |
| this.markRun(taskName); | |
| }); | |
| this.tasks.push(task); | |
| } | |
| } | |
| export const dataScheduler = new DataScheduler(); | |