Kraft102's picture
Initial deployment - WidgeTDC Cortex Backend v2.1.0
529090e
/**
* ╔═══════════════════════════════════════════════════════════════════════════╗
* β•‘ GRAPH INGESTOR - KNOWLEDGE HARVESTER β•‘
* ║═══════════════════════════════════════════════════════════════════════════║
* β•‘ Converts filesystem structure to Neo4j knowledge graph β•‘
* β•‘ β•‘
* β•‘ Structure: β•‘
* β•‘ (:Repository)-[:CONTAINS]->(:Directory)-[:CONTAINS]->(:File) β•‘
* β•‘ β•‘
* β•‘ Delivered by: Gemini (The Architect) β•‘
* β•‘ Implemented by: Claude (The Captain) β•‘
* β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
*/
import * as fs from 'fs/promises';
import { createReadStream } from 'fs';
import * as path from 'path';
import * as crypto from 'crypto';
import { setImmediate as yieldToLoop } from 'timers/promises';
import { neo4jAdapter } from '../adapters/Neo4jAdapter.js';
import { getEmbeddingService, EmbeddingService } from './embeddings/EmbeddingService.js';
import { hyperLog } from './HyperLog.js';
// ═══════════════════════════════════════════════════════════════════════════
// Types
// ═══════════════════════════════════════════════════════════════════════════
export interface IngestOptions {
rootPath: string;
repositoryName?: string;
includePatterns?: string[];
excludePatterns?: string[];
maxDepth?: number;
parseContent?: boolean;
generateEmbeddings?: boolean;
maxEmbedSizeBytes?: number;
embeddingTextLimit?: number;
contentPreviewLength?: number;
}
export interface IngestResult {
success: boolean;
repositoryId: string;
stats: {
directoriesCreated: number;
filesCreated: number;
relationshipsCreated: number;
totalNodes: number;
duration: number;
};
errors: string[];
}
interface FileInfo {
name: string;
path: string;
relativePath: string;
extension: string;
language: string;
size: number;
lines?: number;
contentPreview?: string;
contentHash?: string;
embedding?: number[];
embeddingProvider?: string;
}
// ═══════════════════════════════════════════════════════════════════════════
// Language Detection
// ═══════════════════════════════════════════════════════════════════════════
const LANGUAGE_MAP: Record<string, string> = {
'.ts': 'TypeScript',
'.tsx': 'TypeScript/React',
'.js': 'JavaScript',
'.jsx': 'JavaScript/React',
'.md': 'Markdown',
'.json': 'JSON',
'.yaml': 'YAML',
'.yml': 'YAML',
'.css': 'CSS',
'.scss': 'SCSS',
'.html': 'HTML',
'.sql': 'SQL',
'.py': 'Python',
'.sh': 'Shell',
'.bat': 'Batch',
'.ps1': 'PowerShell',
'.dockerfile': 'Dockerfile',
'.env': 'Environment',
'.gitignore': 'Git',
};
const DEFAULT_EXCLUDE = [
'node_modules',
'.git',
'dist',
'build',
'.next',
'coverage',
'.cache',
'__pycache__',
'.vscode',
'.idea',
];
// ═══════════════════════════════════════════════════════════════════════════
// Graph Ingestor Class
// ═══════════════════════════════════════════════════════════════════════════
export class GraphIngestor {
private options: Required<IngestOptions>;
private stats = {
directoriesCreated: 0,
filesCreated: 0,
relationshipsCreated: 0,
totalNodes: 0,
duration: 0,
};
private errors: string[] = [];
private readonly yieldInterval = 100; // Yield to event loop every N entries
private embeddingService: EmbeddingService | null = null;
private embeddingProvider: string | null = null;
constructor(options: IngestOptions) {
this.options = {
rootPath: options.rootPath,
repositoryName: options.repositoryName || path.basename(options.rootPath),
includePatterns: options.includePatterns || ['*'],
excludePatterns: options.excludePatterns || DEFAULT_EXCLUDE,
maxDepth: options.maxDepth || 10,
parseContent: options.parseContent ?? false,
generateEmbeddings: options.generateEmbeddings ?? false,
maxEmbedSizeBytes: options.maxEmbedSizeBytes ?? 120_000,
embeddingTextLimit: options.embeddingTextLimit ?? 4000,
contentPreviewLength: options.contentPreviewLength ?? 800,
};
}
// ═══════════════════════════════════════════════════════════════════════
// Main Ingestion Method
// ═══════════════════════════════════════════════════════════════════════
async ingest(): Promise<IngestResult> {
const startTime = Date.now();
try {
console.log(`[GraphIngestor] πŸš€ Starting ingestion of: ${this.options.rootPath}`);
await hyperLog.logEvent('GRAPH_INGEST_START', {
rootPath: this.options.rootPath,
repository: this.options.repositoryName,
embeddings: this.options.generateEmbeddings,
});
// Create Repository node
const repoId = this.generateId('Repository', this.options.repositoryName);
await this.createRepositoryNode(repoId);
// Recursively process directory
await this.processDirectory(this.options.rootPath, repoId, 0);
this.stats.duration = Date.now() - startTime;
this.stats.totalNodes = this.stats.directoriesCreated + this.stats.filesCreated + 1;
console.log(`[GraphIngestor] βœ… Ingestion complete in ${this.stats.duration}ms`);
console.log(
`[GraphIngestor] πŸ“Š Stats: ${this.stats.totalNodes} nodes, ${this.stats.relationshipsCreated} relationships`
);
await hyperLog.logEvent('GRAPH_INGEST_COMPLETED', {
repository: this.options.repositoryName,
stats: this.stats,
errors: this.errors.length,
});
return {
success: true,
repositoryId: repoId,
stats: this.stats,
errors: this.errors,
};
} catch (error: any) {
this.errors.push(`Fatal error: ${error.message}`);
await hyperLog.logEvent('GRAPH_INGEST_FATAL', {
repository: this.options.repositoryName,
error: error.message,
});
return {
success: false,
repositoryId: '',
stats: this.stats,
errors: this.errors,
};
}
}
// ═══════════════════════════════════════════════════════════════════════
// Node Creation
// ═══════════════════════════════════════════════════════════════════════
private async createRepositoryNode(repoId: string): Promise<void> {
await neo4jAdapter.writeQuery(
`
MERGE (r:Repository {id: $id})
SET r.name = $name,
r.path = $path,
r.ingestedAt = datetime(),
r.source = 'graph-ingestor'
RETURN r
`,
{
id: repoId,
name: this.options.repositoryName,
path: this.options.rootPath,
}
);
console.log(`[GraphIngestor] πŸ“¦ Created Repository: ${this.options.repositoryName}`);
}
private async createDirectoryNode(
dirPath: string,
parentId: string,
depth: number
): Promise<string> {
const dirName = path.basename(dirPath);
const relativePath = path.relative(this.options.rootPath, dirPath);
const dirId = this.generateId('Directory', relativePath || dirName);
await neo4jAdapter.writeQuery(
`
MERGE (d:Directory {id: $id})
SET d.name = $name,
d.path = $path,
d.relativePath = $relativePath,
d.depth = $depth,
d.ingestedAt = datetime()
WITH d
MATCH (p {id: $parentId})
MERGE (p)-[:CONTAINS]->(d)
RETURN d
`,
{
id: dirId,
name: dirName,
path: dirPath,
relativePath: relativePath || '.',
depth: depth,
parentId: parentId,
}
);
this.stats.directoriesCreated++;
this.stats.relationshipsCreated++;
return dirId;
}
private async createFileNode(fileInfo: FileInfo, parentId: string): Promise<void> {
const fileId = this.generateId('File', fileInfo.relativePath);
await neo4jAdapter.writeQuery(
`
MERGE (f:File:${this.sanitizeLabel(fileInfo.language)} {id: $id})
SET f.name = $name,
f.path = $path,
f.relativePath = $relativePath,
f.extension = $extension,
f.language = $language,
f.size = $size,
f.lines = $lines,
f.contentPreview = coalesce($contentPreview, f.contentPreview),
f.contentHash = coalesce($contentHash, f.contentHash),
f.embedding = CASE WHEN $embedding IS NOT NULL THEN $embedding ELSE f.embedding END,
f.embeddingProvider = coalesce($embeddingProvider, f.embeddingProvider),
f.embeddingDimensions = CASE
WHEN $embedding IS NOT NULL THEN size($embedding)
ELSE coalesce(f.embeddingDimensions, CASE WHEN f.embedding IS NOT NULL THEN size(f.embedding) END)
END,
f.hasEmbedding = CASE
WHEN $embedding IS NOT NULL THEN true
WHEN f.embedding IS NOT NULL THEN true
ELSE false
END,
f.ingestedAt = datetime()
WITH f
MATCH (p {id: $parentId})
MERGE (p)-[:CONTAINS]->(f)
RETURN f
`,
{
id: fileId,
name: fileInfo.name,
path: fileInfo.path,
relativePath: fileInfo.relativePath,
extension: fileInfo.extension,
language: fileInfo.language,
size: fileInfo.size,
lines: fileInfo.lines || 0,
contentPreview: fileInfo.contentPreview || null,
contentHash: fileInfo.contentHash || null,
embedding: fileInfo.embedding || null,
embeddingProvider: fileInfo.embeddingProvider || null,
parentId: parentId,
}
);
this.stats.filesCreated++;
this.stats.relationshipsCreated++;
}
// ═══════════════════════════════════════════════════════════════════════
// Directory Processing
// ═══════════════════════════════════════════════════════════════════════
private async processDirectory(dirPath: string, parentId: string, depth: number): Promise<void> {
const queue: Array<{ dirPath: string; parentId: string; depth: number }> = [
{ dirPath, parentId, depth },
];
let processed = 0;
while (queue.length) {
const current = queue.shift();
if (!current || current.depth > this.options.maxDepth) {
continue;
}
let entries;
try {
entries = await fs.readdir(current.dirPath, { withFileTypes: true });
} catch (error: any) {
this.errors.push(`Error processing ${current.dirPath}: ${error.message}`);
continue;
}
for (const entry of entries) {
const fullPath = path.join(current.dirPath, entry.name);
if (this.shouldExclude(entry.name)) {
continue;
}
processed++;
if (processed % this.yieldInterval === 0) {
await yieldToLoop(); // Yield to avoid starving the event loop on large trees
}
if (entry.isDirectory()) {
const dirId = await this.createDirectoryNode(fullPath, current.parentId, current.depth);
queue.push({ dirPath: fullPath, parentId: dirId, depth: current.depth + 1 });
} else if (entry.isFile()) {
const fileInfo = await this.getFileInfo(fullPath);
if (fileInfo) {
await this.createFileNode(fileInfo, current.parentId);
}
}
}
}
}
// ═══════════════════════════════════════════════════════════════════════
// Helpers
// ═══════════════════════════════════════════════════════════════════════
private async getFileInfo(filePath: string): Promise<FileInfo | null> {
try {
const stats = await fs.stat(filePath);
const ext = path.extname(filePath).toLowerCase();
const relativePath = path.relative(this.options.rootPath, filePath);
const shouldLoadContent =
(this.options.parseContent || this.options.generateEmbeddings) && this.isTextFile(ext);
let content: string | null = null;
let lines: number | undefined;
if (this.isTextFile(ext)) {
if (shouldLoadContent && stats.size <= this.options.maxEmbedSizeBytes) {
content = await fs.readFile(filePath, 'utf-8');
lines = content.split(/\r?\n/).length;
} else {
lines = await this.countLines(filePath, stats.size);
}
}
const contentPreview = content
? content.slice(0, this.options.contentPreviewLength)
: undefined;
let embedding: number[] | undefined;
let embeddingProvider: string | undefined;
if (this.options.generateEmbeddings && content && content.trim()) {
let embedText = content;
if (embedText.length > this.options.embeddingTextLimit) {
embedText = embedText.slice(0, this.options.embeddingTextLimit);
}
const embedded = await this.generateEmbedding(embedText, relativePath);
if (embedded) {
embedding = embedded;
embeddingProvider = this.embeddingProvider || undefined;
}
} else if (
this.options.generateEmbeddings &&
stats.size > this.options.maxEmbedSizeBytes &&
this.isTextFile(ext)
) {
await hyperLog.logEvent('GRAPH_INGEST_EMBEDDING_SKIPPED', {
path: relativePath,
reason: 'file_too_large',
size: stats.size,
limit: this.options.maxEmbedSizeBytes,
});
}
return {
name: path.basename(filePath),
path: filePath,
relativePath: relativePath,
extension: ext,
language: this.detectLanguage(ext, path.basename(filePath)),
size: stats.size,
lines: lines,
contentPreview,
contentHash: content ? this.hashContent(content) : undefined,
embedding,
embeddingProvider,
};
} catch (error) {
return null;
}
}
private detectLanguage(ext: string, filename: string): string {
// Special cases for files without extensions
const lowerName = filename.toLowerCase();
if (lowerName === 'dockerfile') return 'Dockerfile';
if (lowerName === 'makefile') return 'Makefile';
if (lowerName.startsWith('.env')) return 'Environment';
if (lowerName === '.gitignore') return 'Git';
return LANGUAGE_MAP[ext] || 'Unknown';
}
private isTextFile(ext: string): boolean {
const textExtensions = [
'.ts',
'.tsx',
'.js',
'.jsx',
'.json',
'.md',
'.yaml',
'.yml',
'.css',
'.scss',
'.html',
'.sql',
'.py',
'.sh',
'.bat',
'.ps1',
'.txt',
'.env',
'.gitignore',
'.prettierrc',
'.eslintrc',
];
return textExtensions.includes(ext);
}
private shouldExclude(name: string): boolean {
return this.options.excludePatterns.some(pattern => {
if (pattern.includes('*')) {
const regex = new RegExp(pattern.replace(/\*/g, '.*'));
return regex.test(name);
}
return name === pattern;
});
}
private generateId(type: string, identifier: string): string {
const content = `${type}:${identifier}`;
return crypto.createHash('md5').update(content).digest('hex');
}
private sanitizeLabel(language: string): string {
// Convert language to valid Neo4j label
return language.replace(/[^a-zA-Z0-9]/g, '_').replace(/^_+|_+$/g, '') || 'Unknown';
}
private async countLines(filePath: string, size: number): Promise<number | undefined> {
if (size > 2_000_000) {
return undefined; // Avoid heavy scans on very large files
}
return new Promise(resolve => {
let count = 0;
const stream = createReadStream(filePath);
stream.on('data', (chunk: Buffer) => {
for (let i = 0; i < chunk.length; i++) {
if (chunk[i] === 10) count++; // '\n'
}
});
stream.on('end', () => resolve(count || 1));
stream.on('error', () => resolve(undefined));
});
}
private hashContent(content: string): string {
return crypto.createHash('md5').update(content).digest('hex');
}
private async ensureEmbeddingService(): Promise<void> {
if (this.embeddingService) return;
try {
this.embeddingService = getEmbeddingService();
await this.embeddingService.initialize();
this.embeddingProvider = this.embeddingService.getProviderName();
} catch (error: any) {
this.errors.push(`Embedding provider unavailable: ${error.message}`);
await hyperLog.logEvent('GRAPH_INGEST_EMBEDDING_PROVIDER_FAILED', {
error: error.message,
});
throw error;
}
}
private async generateEmbedding(content: string, relativePath: string): Promise<number[] | null> {
try {
await this.ensureEmbeddingService();
const embedding = await this.embeddingService!.generateEmbedding(content);
return embedding;
} catch (error: any) {
this.errors.push(`Embedding failed for ${relativePath}: ${error.message}`);
await hyperLog.logEvent('GRAPH_INGEST_EMBEDDING_FAILED', {
path: relativePath,
error: error.message,
});
return null;
}
}
}
// ═══════════════════════════════════════════════════════════════════════════
// Export Factory Function
// ═══════════════════════════════════════════════════════════════════════════
export async function ingestRepository(options: IngestOptions): Promise<IngestResult> {
const ingestor = new GraphIngestor(options);
return ingestor.ingest();
}