Kraft102's picture
Deploy from GitHub Actions 2025-12-15_13-51-53
a84b07b verified
// DataIngestionEngine – Autonomous data collection and enrichment
import { projectMemory } from '../project/ProjectMemory.js';
import { eventBus } from '../../mcp/EventBus.js';
import { dataSourceConfig } from './DataSourceConfigManager.js';
export interface DataSourceAdapter {
name: string;
type: 'local_files' | 'outlook_mail' | 'browser_history' | 'google_drive' | 'other';
/** Fetch raw data from the source */
fetch(): Promise<any[]>;
/** Transform raw data into normalized entities */
transform(raw: any[]): Promise<IngestedEntity[]>;
/** Health check */
isAvailable(): Promise<boolean>;
}
export interface IngestedEntity {
id: string;
type: string;
source: string;
title?: string;
content?: string;
metadata: Record<string, any>;
timestamp: Date;
}
export class DataIngestionEngine {
private adapters: Map<string, DataSourceAdapter> = new Map();
private isRunning: boolean = false;
private ingestedCount: number = 0;
/** Register a data source adapter */
async registerAdapter(adapter: DataSourceAdapter, description: string, requiresApproval: boolean = false): Promise<void> {
this.adapters.set(adapter.name, adapter);
const canUse = await dataSourceConfig.registerSource(adapter.name, description, requiresApproval);
console.log(`📥 Registered data adapter: ${adapter.name} (${adapter.type}) - ${canUse ? 'Ready' : 'Awaiting approval'}`);
}
/** Get a registered adapter */
getAdapter(name: string): DataSourceAdapter | undefined {
return this.adapters.get(name);
}
/** Start ingestion from all registered adapters */
async ingestAll(): Promise<void> {
if (this.isRunning) {
console.warn('⚠️ Ingestion already running');
return;
}
// Load config
await dataSourceConfig.load();
this.isRunning = true;
this.ingestedCount = 0;
console.log(`🚀 Starting data ingestion from ${this.adapters.size} sources...`);
projectMemory.logLifecycleEvent({
eventType: 'other',
status: 'in_progress',
details: {
type: 'data_ingestion_started',
sources: Array.from(this.adapters.keys())
}
});
const results: any[] = [];
for (const [name, adapter] of this.adapters) {
try {
// Check if source is enabled
if (!dataSourceConfig.isEnabled(name)) {
console.log(`⏭️ ${name} is disabled, skipping`);
results.push({ source: name, status: 'skipped', reason: 'disabled' });
continue;
}
console.log(`📂 Ingesting from: ${name}...`);
// Check availability
const available = await adapter.isAvailable();
if (!available) {
console.warn(`⚠️ ${name} not available, skipping`);
results.push({ source: name, status: 'skipped', reason: 'not_available' });
continue;
}
// Fetch raw data
const rawData = await adapter.fetch();
console.log(` → Fetched ${rawData.length} items from ${name}`);
// Transform to normalized entities
const entities = await adapter.transform(rawData);
console.log(` → Transformed ${entities.length} entities`);
// Store entities (for now, just log - later we'll save to memory/database)
this.ingestedCount += entities.length;
// Auto-add to Vidensarkiv (Knowledge Archive) for continuous learning
try {
const { getVectorStore } = await import('../../platform/vector/index.js');
const vectorStore = await getVectorStore();
// Batch add entities to vidensarkiv
const vectorRecords = entities.map(entity => ({
id: entity.id,
content: entity.content || entity.title || JSON.stringify(entity.metadata),
embedding: [], // Will be generated automatically
metadata: {
...entity.metadata,
datasetType: 'new',
source: name,
type: entity.type,
ingestedAt: new Date().toISOString()
},
namespace: `org:default:user:system` // TODO: Get from context
}));
if (vectorRecords.length > 0) {
await vectorStore.batchUpsert({
records: vectorRecords,
namespace: `org:default:user:system`
});
console.log(` → Added ${vectorRecords.length} entities to vidensarkiv`);
}
} catch (err) {
console.warn(`⚠️ Failed to add to vidensarkiv:`, err);
// Non-critical, continue ingestion
}
results.push({
source: name,
status: 'success',
items: entities.length
});
// Mark as used
await dataSourceConfig.markUsed(name);
// Emit event for real-time updates
// Emit event for real-time updates
eventBus.emit('data:ingested', {
source: name,
count: entities.length,
entities: entities // Send ALL entities so IngestionPipeline can process them
});
} catch (error: any) {
console.error(`❌ Failed to ingest from ${name}:`, error.message);
results.push({
source: name,
status: 'error',
error: error.message
});
}
}
this.isRunning = false;
// Log completion
projectMemory.logLifecycleEvent({
eventType: 'other',
status: 'success',
details: {
type: 'data_ingestion_completed',
totalIngested: this.ingestedCount,
results
}
});
console.log(`✅ Data ingestion complete! Total entities: ${this.ingestedCount}`);
}
/** Ingest from a specific source */
async ingestFrom(sourceName: string): Promise<number> {
const adapter = this.adapters.get(sourceName);
if (!adapter) {
throw new Error(`Unknown data source: ${sourceName}`);
}
// Check if enabled
if (!dataSourceConfig.isEnabled(sourceName)) {
throw new Error(`Data source ${sourceName} is not enabled`);
}
const available = await adapter.isAvailable();
if (!available) {
throw new Error(`Source ${sourceName} is not available`);
}
const rawData = await adapter.fetch();
const entities = await adapter.transform(rawData);
await dataSourceConfig.markUsed(sourceName);
eventBus.emit('data:ingested', {
source: sourceName,
count: entities.length,
entities: entities.slice(0, 5)
});
return entities.length;
}
/** Get ingestion status */
getStatus() {
return {
running: this.isRunning,
totalIngested: this.ingestedCount,
adapters: Array.from(this.adapters.keys()),
enabled: dataSourceConfig.getEnabledSources(),
pendingApprovals: dataSourceConfig.getPendingApprovals()
};
}
}
// Singleton instance
export const dataIngestionEngine = new DataIngestionEngine();