Spaces:
Paused
Paused
| /** | |
| * NudgeService - Aggressive Data Generation System | |
| * | |
| * Runs every 15 minutes to push data into the system: | |
| * - System metrics β Neo4j | |
| * - Process snapshots β Neo4j | |
| * - Graph evolution triggers | |
| * - Agent activity pings | |
| * - OmniHarvester triggers | |
| */ | |
| import cron from 'node-cron'; | |
| import si from 'systeminformation'; | |
| import { logger } from '../utils/logger.js'; | |
| import { eventBus } from '../mcp/EventBus.js'; | |
| interface NudgeStats { | |
| lastRun: Date | null; | |
| totalRuns: number; | |
| nodesCreated: number; | |
| eventsEmitted: number; | |
| errors: number; | |
| } | |
| class NudgeService { | |
| private isRunning = false; | |
| private task: cron.ScheduledTask | null = null; | |
| private neo4jAdapter: any = null; | |
| private stats: NudgeStats = { | |
| lastRun: null, | |
| totalRuns: 0, | |
| nodesCreated: 0, | |
| eventsEmitted: 0, | |
| errors: 0 | |
| }; | |
| async start() { | |
| if (this.isRunning) return; | |
| this.isRunning = true; | |
| // Dynamic import to avoid circular deps | |
| try { | |
| const { neo4jAdapter } = await import('../adapters/Neo4jAdapter.js'); | |
| this.neo4jAdapter = neo4jAdapter; | |
| await this.neo4jAdapter.connect(); | |
| } catch (e) { | |
| logger.warn('β‘ NudgeService: Neo4j not available, running in limited mode'); | |
| } | |
| console.log('β‘ NudgeService ACTIVATED - Running every 15 minutes'); | |
| console.log(' ββ Pushing: System metrics, Process snapshots, Graph evolution'); | |
| // Every 15 minutes: */15 * * * * | |
| this.task = cron.schedule('*/15 * * * *', async () => { | |
| await this.runNudgeCycle(); | |
| }); | |
| // Initial nudge after 30 seconds | |
| setTimeout(() => this.runNudgeCycle(), 30000); | |
| // Also run a mini-nudge every 5 minutes for basic stats | |
| cron.schedule('*/5 * * * *', async () => { | |
| await this.miniNudge(); | |
| }); | |
| } | |
| stop() { | |
| if (this.task) { | |
| this.task.stop(); | |
| this.task = null; | |
| } | |
| this.isRunning = false; | |
| console.log('π NudgeService stopped'); | |
| } | |
| private async runNudgeCycle() { | |
| const cycleStart = Date.now(); | |
| console.log('\nβ‘βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ‘'); | |
| console.log(' NUDGE CYCLE #' + (this.stats.totalRuns + 1) + ' - ' + new Date().toISOString()); | |
| console.log('β‘βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ‘'); | |
| try { | |
| // 1. Capture System Metrics | |
| await this.captureSystemMetrics(); | |
| // 2. Capture Process Snapshot | |
| await this.captureProcessSnapshot(); | |
| // 3. Trigger Graph Evolution | |
| await this.triggerGraphEvolution(); | |
| // 4. Ping All Agents | |
| await this.pingAgents(); | |
| // 5. Emit Activity Events | |
| await this.emitActivityEvents(); | |
| // 6. Run Knowledge Compilation | |
| await this.triggerKnowledgeCompilation(); | |
| this.stats.totalRuns++; | |
| this.stats.lastRun = new Date(); | |
| const duration = Date.now() - cycleStart; | |
| console.log(`\nβ NUDGE CYCLE COMPLETE in ${duration}ms`); | |
| console.log(` Stats: ${this.stats.nodesCreated} nodes | ${this.stats.eventsEmitted} events | ${this.stats.errors} errors`); | |
| console.log('β‘βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ‘\n'); | |
| } catch (error) { | |
| this.stats.errors++; | |
| logger.error('β Nudge cycle failed:', error); | |
| } | |
| } | |
| private async miniNudge() { | |
| // Quick stats push every 5 minutes | |
| try { | |
| const [cpu, mem] = await Promise.all([ | |
| si.currentLoad(), | |
| si.mem() | |
| ]); | |
| eventBus.emitEvent({ | |
| type: 'system:heartbeat', | |
| timestamp: new Date().toISOString(), | |
| source: 'NudgeService', | |
| payload: { | |
| cpuLoad: cpu.currentLoad.toFixed(1), | |
| memUsed: ((mem.used / mem.total) * 100).toFixed(1), | |
| uptime: process.uptime() | |
| } | |
| }); | |
| this.stats.eventsEmitted++; | |
| } catch (e) { | |
| // Silent fail for mini-nudge | |
| } | |
| } | |
| private async captureSystemMetrics() { | |
| console.log('π [1/6] Capturing system metrics...'); | |
| try { | |
| const [cpu, mem, osInfo, currentLoad, disk, network] = await Promise.all([ | |
| si.cpu(), | |
| si.mem(), | |
| si.osInfo(), | |
| si.currentLoad(), | |
| si.fsSize(), | |
| si.networkStats() | |
| ]); | |
| if (this.neo4jAdapter) { | |
| const timestamp = new Date().toISOString(); | |
| const snapshotId = `sys-${Date.now()}`; | |
| await this.neo4jAdapter.runQuery(` | |
| MERGE (s:SystemSnapshot {id: $id}) | |
| SET s.timestamp = $timestamp, | |
| s.cpuBrand = $cpuBrand, | |
| s.cpuCores = $cpuCores, | |
| s.cpuLoad = $cpuLoad, | |
| s.memTotal = $memTotal, | |
| s.memUsed = $memUsed, | |
| s.memPercent = $memPercent, | |
| s.platform = $platform, | |
| s.osDistro = $osDistro, | |
| s.diskUsed = $diskUsed | |
| MERGE (sys:System {name: 'WidgeTDC'}) | |
| MERGE (sys)-[:HAS_SNAPSHOT]->(s) | |
| RETURN s | |
| `, { | |
| id: snapshotId, | |
| timestamp, | |
| cpuBrand: cpu.brand, | |
| cpuCores: cpu.cores, | |
| cpuLoad: currentLoad.currentLoad, | |
| memTotal: mem.total, | |
| memUsed: mem.used, | |
| memPercent: (mem.used / mem.total) * 100, | |
| platform: osInfo.platform, | |
| osDistro: osInfo.distro, | |
| diskUsed: disk[0]?.used || 0 | |
| }); | |
| this.stats.nodesCreated++; | |
| console.log(' β System snapshot saved to Neo4j'); | |
| } | |
| eventBus.emitEvent({ | |
| type: 'nudge.system_metrics', | |
| timestamp: new Date().toISOString(), | |
| source: 'NudgeService', | |
| payload: { | |
| cpu: currentLoad.currentLoad.toFixed(1) + '%', | |
| memory: ((mem.used / mem.total) * 100).toFixed(1) + '%' | |
| } | |
| }); | |
| this.stats.eventsEmitted++; | |
| } catch (error) { | |
| logger.error('System metrics capture failed:', error); | |
| this.stats.errors++; | |
| } | |
| } | |
| private async captureProcessSnapshot() { | |
| console.log('π [2/6] Capturing process snapshot...'); | |
| try { | |
| const data = await si.processes(); | |
| const topProcesses = data.list | |
| .sort((a, b) => (b.cpu || 0) - (a.cpu || 0)) | |
| .slice(0, 10); | |
| if (this.neo4jAdapter) { | |
| const timestamp = new Date().toISOString(); | |
| for (const proc of topProcesses) { | |
| await this.neo4jAdapter.runQuery(` | |
| MERGE (p:Process {name: $name}) | |
| SET p.lastSeen = $timestamp, | |
| p.cpu = $cpu, | |
| p.mem = $mem, | |
| p.pid = $pid | |
| MERGE (sys:System {name: 'WidgeTDC'}) | |
| MERGE (sys)-[:RUNS]->(p) | |
| `, { | |
| name: proc.name || 'Unknown', | |
| timestamp, | |
| cpu: proc.cpu || 0, | |
| mem: proc.mem || 0, | |
| pid: proc.pid | |
| }); | |
| } | |
| this.stats.nodesCreated += topProcesses.length; | |
| console.log(` β ${topProcesses.length} processes tracked`); | |
| } | |
| } catch (error) { | |
| logger.error('Process snapshot failed:', error); | |
| this.stats.errors++; | |
| } | |
| } | |
| private async triggerGraphEvolution() { | |
| console.log('𧬠[3/6] Triggering graph evolution...'); | |
| try { | |
| if (this.neo4jAdapter) { | |
| // Get current graph stats - use separate queries to avoid memory overflow | |
| // on Neo4j AuraDB free tier (824 MiB limit with 150K+ nodes) | |
| const [nodeResult, relResult] = await Promise.all([ | |
| this.neo4jAdapter.runQuery(`MATCH (n) RETURN count(n) as nodeCount`), | |
| this.neo4jAdapter.runQuery(`MATCH ()-[r]->() RETURN count(r) as relCount`) | |
| ]); | |
| const nodeCount = nodeResult?.[0]?.nodeCount || 0; | |
| const relCount = relResult?.[0]?.relCount || 0; | |
| // Create evolution event node | |
| await this.neo4jAdapter.runQuery(` | |
| CREATE (e:EvolutionEvent { | |
| id: $id, | |
| timestamp: $timestamp, | |
| nodeCount: $nodeCount, | |
| relationshipCount: $relCount, | |
| source: 'NudgeService' | |
| }) | |
| `, { | |
| id: `evo-${Date.now()}`, | |
| timestamp: new Date().toISOString(), | |
| nodeCount, | |
| relCount | |
| }); | |
| this.stats.nodesCreated++; | |
| console.log(` β Graph: ${nodeCount} nodes, ${relCount} relationships`); | |
| } | |
| } catch (error) { | |
| logger.error('Graph evolution failed:', error); | |
| this.stats.errors++; | |
| } | |
| } | |
| private async pingAgents() { | |
| console.log('π€ [4/6] Pinging agents...'); | |
| const agents = ['HansPedder', 'GraphRAG', 'System', 'OmniHarvester']; | |
| for (const agent of agents) { | |
| eventBus.emitEvent({ | |
| type: 'agent.ping', | |
| timestamp: new Date().toISOString(), | |
| source: 'NudgeService', | |
| payload: { agent, action: 'heartbeat' } | |
| }); | |
| this.stats.eventsEmitted++; | |
| } | |
| console.log(` β Pinged ${agents.length} agents`); | |
| } | |
| private async emitActivityEvents() { | |
| console.log('π‘ [5/6] Emitting activity events...'); | |
| const events = [ | |
| { type: 'nudge.cycle_complete', payload: { cycle: this.stats.totalRuns + 1 } }, | |
| { type: 'system.activity', payload: { source: 'NudgeService', active: true } }, | |
| { type: 'data.push', payload: { nodesCreated: this.stats.nodesCreated } } | |
| ]; | |
| for (const event of events) { | |
| eventBus.emitEvent({ | |
| type: event.type as any, // Dynamic event types | |
| timestamp: new Date().toISOString(), | |
| source: 'NudgeService', | |
| payload: event.payload | |
| }); | |
| this.stats.eventsEmitted++; | |
| } | |
| console.log(` β Emitted ${events.length} events`); | |
| } | |
| private async triggerKnowledgeCompilation() { | |
| console.log('π§ [6/6] Triggering knowledge compilation...'); | |
| try { | |
| // Trigger compilation via HTTP to self | |
| const port = process.env.PORT || 7860; | |
| const response = await fetch(`http://localhost:${port}/api/knowledge/compile`, { | |
| method: 'POST', | |
| headers: { 'Content-Type': 'application/json' } | |
| }).catch(() => null); | |
| if (response?.ok) { | |
| console.log(' β Knowledge compilation triggered'); | |
| } else { | |
| console.log(' β Knowledge compilation endpoint not available'); | |
| } | |
| } catch (e) { | |
| // Silent - endpoint might not exist | |
| } | |
| } | |
| getStats(): NudgeStats { | |
| return { ...this.stats }; | |
| } | |
| } | |
| export const nudgeService = new NudgeService(); | |