Kraft102's picture
Deploy from GitHub Actions 2025-12-16_04-55-23
f1a6f7e verified
/**
* NudgeService - Aggressive Data Generation System
*
* Runs every 15 minutes to push data into the system:
* - System metrics β†’ Neo4j
* - Process snapshots β†’ Neo4j
* - Graph evolution triggers
* - Agent activity pings
* - OmniHarvester triggers
*/
import cron from 'node-cron';
import si from 'systeminformation';
import { logger } from '../utils/logger.js';
import { eventBus } from '../mcp/EventBus.js';
interface NudgeStats {
lastRun: Date | null;
totalRuns: number;
nodesCreated: number;
eventsEmitted: number;
errors: number;
}
class NudgeService {
private isRunning = false;
private task: cron.ScheduledTask | null = null;
private neo4jAdapter: any = null;
private stats: NudgeStats = {
lastRun: null,
totalRuns: 0,
nodesCreated: 0,
eventsEmitted: 0,
errors: 0
};
async start() {
if (this.isRunning) return;
this.isRunning = true;
// Dynamic import to avoid circular deps
try {
const { neo4jAdapter } = await import('../adapters/Neo4jAdapter.js');
this.neo4jAdapter = neo4jAdapter;
await this.neo4jAdapter.connect();
} catch (e) {
logger.warn('⚑ NudgeService: Neo4j not available, running in limited mode');
}
console.log('⚑ NudgeService ACTIVATED - Running every 15 minutes');
console.log(' └─ Pushing: System metrics, Process snapshots, Graph evolution');
// Every 15 minutes: */15 * * * *
this.task = cron.schedule('*/15 * * * *', async () => {
await this.runNudgeCycle();
});
// Initial nudge after 30 seconds
setTimeout(() => this.runNudgeCycle(), 30000);
// Also run a mini-nudge every 5 minutes for basic stats
cron.schedule('*/5 * * * *', async () => {
await this.miniNudge();
});
}
stop() {
if (this.task) {
this.task.stop();
this.task = null;
}
this.isRunning = false;
console.log('πŸ›‘ NudgeService stopped');
}
private async runNudgeCycle() {
const cycleStart = Date.now();
console.log('\nβš‘β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•βš‘');
console.log(' NUDGE CYCLE #' + (this.stats.totalRuns + 1) + ' - ' + new Date().toISOString());
console.log('βš‘β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•βš‘');
try {
// 1. Capture System Metrics
await this.captureSystemMetrics();
// 2. Capture Process Snapshot
await this.captureProcessSnapshot();
// 3. Trigger Graph Evolution
await this.triggerGraphEvolution();
// 4. Ping All Agents
await this.pingAgents();
// 5. Emit Activity Events
await this.emitActivityEvents();
// 6. Run Knowledge Compilation
await this.triggerKnowledgeCompilation();
this.stats.totalRuns++;
this.stats.lastRun = new Date();
const duration = Date.now() - cycleStart;
console.log(`\nβœ… NUDGE CYCLE COMPLETE in ${duration}ms`);
console.log(` Stats: ${this.stats.nodesCreated} nodes | ${this.stats.eventsEmitted} events | ${this.stats.errors} errors`);
console.log('βš‘β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•βš‘\n');
} catch (error) {
this.stats.errors++;
logger.error('❌ Nudge cycle failed:', error);
}
}
private async miniNudge() {
// Quick stats push every 5 minutes
try {
const [cpu, mem] = await Promise.all([
si.currentLoad(),
si.mem()
]);
eventBus.emitEvent({
type: 'system:heartbeat',
timestamp: new Date().toISOString(),
source: 'NudgeService',
payload: {
cpuLoad: cpu.currentLoad.toFixed(1),
memUsed: ((mem.used / mem.total) * 100).toFixed(1),
uptime: process.uptime()
}
});
this.stats.eventsEmitted++;
} catch (e) {
// Silent fail for mini-nudge
}
}
private async captureSystemMetrics() {
console.log('πŸ“Š [1/6] Capturing system metrics...');
try {
const [cpu, mem, osInfo, currentLoad, disk, network] = await Promise.all([
si.cpu(),
si.mem(),
si.osInfo(),
si.currentLoad(),
si.fsSize(),
si.networkStats()
]);
if (this.neo4jAdapter) {
const timestamp = new Date().toISOString();
const snapshotId = `sys-${Date.now()}`;
await this.neo4jAdapter.runQuery(`
MERGE (s:SystemSnapshot {id: $id})
SET s.timestamp = $timestamp,
s.cpuBrand = $cpuBrand,
s.cpuCores = $cpuCores,
s.cpuLoad = $cpuLoad,
s.memTotal = $memTotal,
s.memUsed = $memUsed,
s.memPercent = $memPercent,
s.platform = $platform,
s.osDistro = $osDistro,
s.diskUsed = $diskUsed
MERGE (sys:System {name: 'WidgeTDC'})
MERGE (sys)-[:HAS_SNAPSHOT]->(s)
RETURN s
`, {
id: snapshotId,
timestamp,
cpuBrand: cpu.brand,
cpuCores: cpu.cores,
cpuLoad: currentLoad.currentLoad,
memTotal: mem.total,
memUsed: mem.used,
memPercent: (mem.used / mem.total) * 100,
platform: osInfo.platform,
osDistro: osInfo.distro,
diskUsed: disk[0]?.used || 0
});
this.stats.nodesCreated++;
console.log(' βœ“ System snapshot saved to Neo4j');
}
eventBus.emitEvent({
type: 'nudge.system_metrics',
timestamp: new Date().toISOString(),
source: 'NudgeService',
payload: {
cpu: currentLoad.currentLoad.toFixed(1) + '%',
memory: ((mem.used / mem.total) * 100).toFixed(1) + '%'
}
});
this.stats.eventsEmitted++;
} catch (error) {
logger.error('System metrics capture failed:', error);
this.stats.errors++;
}
}
private async captureProcessSnapshot() {
console.log('πŸ”„ [2/6] Capturing process snapshot...');
try {
const data = await si.processes();
const topProcesses = data.list
.sort((a, b) => (b.cpu || 0) - (a.cpu || 0))
.slice(0, 10);
if (this.neo4jAdapter) {
const timestamp = new Date().toISOString();
for (const proc of topProcesses) {
await this.neo4jAdapter.runQuery(`
MERGE (p:Process {name: $name})
SET p.lastSeen = $timestamp,
p.cpu = $cpu,
p.mem = $mem,
p.pid = $pid
MERGE (sys:System {name: 'WidgeTDC'})
MERGE (sys)-[:RUNS]->(p)
`, {
name: proc.name || 'Unknown',
timestamp,
cpu: proc.cpu || 0,
mem: proc.mem || 0,
pid: proc.pid
});
}
this.stats.nodesCreated += topProcesses.length;
console.log(` βœ“ ${topProcesses.length} processes tracked`);
}
} catch (error) {
logger.error('Process snapshot failed:', error);
this.stats.errors++;
}
}
private async triggerGraphEvolution() {
console.log('🧬 [3/6] Triggering graph evolution...');
try {
if (this.neo4jAdapter) {
// Get current graph stats - use separate queries to avoid memory overflow
// on Neo4j AuraDB free tier (824 MiB limit with 150K+ nodes)
const [nodeResult, relResult] = await Promise.all([
this.neo4jAdapter.runQuery(`MATCH (n) RETURN count(n) as nodeCount`),
this.neo4jAdapter.runQuery(`MATCH ()-[r]->() RETURN count(r) as relCount`)
]);
const nodeCount = nodeResult?.[0]?.nodeCount || 0;
const relCount = relResult?.[0]?.relCount || 0;
// Create evolution event node
await this.neo4jAdapter.runQuery(`
CREATE (e:EvolutionEvent {
id: $id,
timestamp: $timestamp,
nodeCount: $nodeCount,
relationshipCount: $relCount,
source: 'NudgeService'
})
`, {
id: `evo-${Date.now()}`,
timestamp: new Date().toISOString(),
nodeCount,
relCount
});
this.stats.nodesCreated++;
console.log(` βœ“ Graph: ${nodeCount} nodes, ${relCount} relationships`);
}
} catch (error) {
logger.error('Graph evolution failed:', error);
this.stats.errors++;
}
}
private async pingAgents() {
console.log('πŸ€– [4/6] Pinging agents...');
const agents = ['HansPedder', 'GraphRAG', 'System', 'OmniHarvester'];
for (const agent of agents) {
eventBus.emitEvent({
type: 'agent.ping',
timestamp: new Date().toISOString(),
source: 'NudgeService',
payload: { agent, action: 'heartbeat' }
});
this.stats.eventsEmitted++;
}
console.log(` βœ“ Pinged ${agents.length} agents`);
}
private async emitActivityEvents() {
console.log('πŸ“‘ [5/6] Emitting activity events...');
const events = [
{ type: 'nudge.cycle_complete', payload: { cycle: this.stats.totalRuns + 1 } },
{ type: 'system.activity', payload: { source: 'NudgeService', active: true } },
{ type: 'data.push', payload: { nodesCreated: this.stats.nodesCreated } }
];
for (const event of events) {
eventBus.emitEvent({
type: event.type as any, // Dynamic event types
timestamp: new Date().toISOString(),
source: 'NudgeService',
payload: event.payload
});
this.stats.eventsEmitted++;
}
console.log(` βœ“ Emitted ${events.length} events`);
}
private async triggerKnowledgeCompilation() {
console.log('🧠 [6/6] Triggering knowledge compilation...');
try {
// Trigger compilation via HTTP to self
const port = process.env.PORT || 7860;
const response = await fetch(`http://localhost:${port}/api/knowledge/compile`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' }
}).catch(() => null);
if (response?.ok) {
console.log(' βœ“ Knowledge compilation triggered');
} else {
console.log(' ⚠ Knowledge compilation endpoint not available');
}
} catch (e) {
// Silent - endpoint might not exist
}
}
getStats(): NudgeStats {
return { ...this.stats };
}
}
export const nudgeService = new NudgeService();