""" ECH0-PRIME Compressed Knowledge Base Stores massive amounts of knowledge using compressed data format. Copyright (c) 2025 Joshua Hendricks Cole (DBA: Corporation of Light). All Rights Reserved. PATENT PENDING. """ import os import json import asyncio import hashlib import time from typing import Dict, List, Any, Optional, Set, Tuple from dataclasses import dataclass, field from datetime import datetime, timedelta import numpy as np from collections import defaultdict import aiofiles from learning.data_compressor import DataCompressor, CompressedChunk, CompressionConfig try: from reasoning.llm_bridge import OllamaBridge LLM_AVAILABLE = True except ImportError: LLM_AVAILABLE = False print("Warning: LLM bridge not available, using simplified compression") from memory.manager import SemanticMemory, EpisodicMemory from ech0_governance.persistent_memory import PersistentMemory @dataclass class KnowledgeNode: """A node in the compressed knowledge graph""" id: str compressed_content: str domain: str modality: str quality_score: float compression_ratio: float timestamp: datetime connections: Set[str] = field(default_factory=set) # Connected node IDs metadata: Dict[str, Any] = field(default_factory=dict) access_count: int = 0 last_accessed: Optional[datetime] = None def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "compressed_content": self.compressed_content, "domain": self.domain, "modality": self.modality, "quality_score": self.quality_score, "compression_ratio": self.compression_ratio, "timestamp": self.timestamp.isoformat(), "connections": list(self.connections), "metadata": self.metadata, "access_count": self.access_count, "last_accessed": self.last_accessed.isoformat() if self.last_accessed else None } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'KnowledgeNode': return cls( id=data["id"], compressed_content=data["compressed_content"], domain=data["domain"], modality=data["modality"], quality_score=data["quality_score"], compression_ratio=data["compression_ratio"], timestamp=datetime.fromisoformat(data["timestamp"]), connections=set(data.get("connections", [])), metadata=data.get("metadata", {}), access_count=data.get("access_count", 0), last_accessed=datetime.fromisoformat(data["last_accessed"]) if data.get("last_accessed") else None ) @dataclass class KnowledgeDomain: """Represents a domain of knowledge with statistics""" name: str node_count: int = 0 total_compressed_tokens: int = 0 avg_quality_score: float = 0.0 avg_compression_ratio: float = 0.0 subdomains: Dict[str, 'KnowledgeDomain'] = field(default_factory=dict) last_updated: Optional[datetime] = None class CompressedKnowledgeBase: """ Massive-scale knowledge storage using compressed data format. Can store 10^15+ tokens worth of knowledge efficiently. """ def __init__(self, storage_path: str = "./compressed_kb", max_nodes_per_file: int = 10000): self.storage_path = storage_path self.max_nodes_per_file = max_nodes_per_file self.compressor = DataCompressor() # In-memory knowledge graph self.nodes: Dict[str, KnowledgeNode] = {} self.domains: Dict[str, KnowledgeDomain] = defaultdict(KnowledgeDomain) # Indexing for fast retrieval self.content_index: Dict[str, Set[str]] = defaultdict(set) # term -> node_ids self.domain_index: Dict[str, Set[str]] = defaultdict(set) # domain -> node_ids self.quality_index: List[Tuple[str, float]] = [] # (node_id, quality) sorted # Caching self.access_cache: Dict[str, KnowledgeNode] = {} self.cache_max_size = 1000 # Statistics self.stats = { "total_nodes": 0, "total_compressed_tokens": 0, "total_original_tokens": 0, "avg_compression_ratio": 0.0, "avg_quality_score": 0.0, "domains_count": 0, "storage_files": 0 } # Ensure storage directory exists os.makedirs(storage_path, exist_ok=True) os.makedirs(os.path.join(storage_path, "domains"), exist_ok=True) os.makedirs(os.path.join(storage_path, "indices"), exist_ok=True) async def add_compressed_chunk(self, chunk: CompressedChunk) -> str: """ Add a compressed chunk to the knowledge base. Returns the node ID. """ # Generate unique ID content_hash = hashlib.md5(chunk.compressed_content.encode()).hexdigest()[:16] node_id = f"{chunk.domain}_{chunk.modality}_{content_hash}" # Check for duplicates (simple hash-based) if node_id in self.nodes: return node_id # Already exists # Create knowledge node node = KnowledgeNode( id=node_id, compressed_content=chunk.compressed_content, domain=chunk.domain, modality=chunk.modality, quality_score=chunk.quality_score, compression_ratio=chunk.compression_ratio, timestamp=chunk.timestamp, metadata=chunk.metadata ) # Add to in-memory graph self.nodes[node_id] = node # Update indices self._update_indices(node) # Update domain statistics self._update_domain_stats(chunk.domain, chunk) # Update global statistics self._update_global_stats(chunk) # Auto-save if needed if len(self.nodes) % 1000 == 0: await self.save_async() return node_id def _update_indices(self, node: KnowledgeNode): """Update search indices for the node""" # Domain index self.domain_index[node.domain].add(node.id) # Content index (simple keyword extraction) keywords = self._extract_keywords(node.compressed_content) for keyword in keywords: self.content_index[keyword].add(node.id) # Quality index (maintain sorted list) self.quality_index.append((node.id, node.quality_score)) self.quality_index.sort(key=lambda x: x[1], reverse=True) # Keep only top quality entries in index if len(self.quality_index) > 10000: self.quality_index = self.quality_index[:10000] def _extract_keywords(self, content: str) -> List[str]: """Extract keywords from compressed content""" # Simple extraction: nouns and important terms words = content.lower().split() keywords = [] # Domain-specific important terms important_terms = { 'academic': ['theory', 'method', 'results', 'evidence', 'hypothesis', 'conclusion'], 'technical': ['algorithm', 'function', 'class', 'api', 'system', 'performance'], 'scientific': ['experiment', 'data', 'analysis', 'model', 'prediction', 'validation'], 'general': ['important', 'key', 'main', 'core', 'significant', 'critical'] } for word in words: word = word.strip('.,!?;:') if len(word) > 3: # Skip short words keywords.append(word) # Add domain-specific terms if they appear for term in important_terms.get('general', []): if term in content.lower(): keywords.append(term) return list(set(keywords)) # Remove duplicates def _update_domain_stats(self, domain: str, chunk: CompressedChunk): """Update statistics for a knowledge domain""" if domain not in self.domains: self.domains[domain] = KnowledgeDomain(name=domain) domain_stats = self.domains[domain] domain_stats.node_count += 1 domain_stats.total_compressed_tokens += chunk.compressed_tokens domain_stats.last_updated = chunk.timestamp # Rolling average for quality and compression count = domain_stats.node_count domain_stats.avg_quality_score = ( (domain_stats.avg_quality_score * (count - 1) + chunk.quality_score) / count ) domain_stats.avg_compression_ratio = ( (domain_stats.avg_compression_ratio * (count - 1) + chunk.compression_ratio) / count ) def _update_global_stats(self, chunk: CompressedChunk): """Update global knowledge base statistics""" self.stats["total_nodes"] += 1 self.stats["total_compressed_tokens"] += chunk.compressed_tokens self.stats["total_original_tokens"] += chunk.original_tokens # Update averages total = self.stats["total_nodes"] self.stats["avg_compression_ratio"] = ( (self.stats["avg_compression_ratio"] * (total - 1) + chunk.compression_ratio) / total ) self.stats["avg_quality_score"] = ( (self.stats["avg_quality_score"] * (total - 1) + chunk.quality_score) / total ) self.stats["domains_count"] = len(self.domains) async def retrieve_knowledge(self, query: str, domain: str = None, min_quality: float = 0.0, limit: int = 10) -> List[KnowledgeNode]: """ Retrieve relevant knowledge nodes based on query. """ candidates = set() # Find candidate nodes query_terms = query.lower().split() for term in query_terms: if term in self.content_index: candidates.update(self.content_index[term]) # Filter by domain if specified if domain: domain_nodes = self.domain_index.get(domain, set()) candidates = candidates.intersection(domain_nodes) # Filter by quality filtered_candidates = [] for node_id in candidates: if node_id in self.nodes: node = self.nodes[node_id] if node.quality_score >= min_quality: filtered_candidates.append((node_id, node.quality_score)) # Sort by quality and return top results filtered_candidates.sort(key=lambda x: x[1], reverse=True) results = [] for node_id, _ in filtered_candidates[:limit]: node = await self.get_node(node_id) if node: results.append(node) return results async def get_node(self, node_id: str) -> Optional[KnowledgeNode]: """Get a knowledge node by ID with caching""" # Check cache first if node_id in self.access_cache: node = self.access_cache[node_id] node.access_count += 1 node.last_accessed = datetime.now() return node # Check in-memory if node_id in self.nodes: node = self.nodes[node_id] node.access_count += 1 node.last_accessed = datetime.now() # Add to cache if len(self.access_cache) < self.cache_max_size: self.access_cache[node_id] = node return node # Try loading from disk node = await self.load_node_from_disk(node_id) if node: node.access_count += 1 node.last_accessed = datetime.now() return node return None async def load_node_from_disk(self, node_id: str) -> Optional[KnowledgeNode]: """Load a node from disk storage""" domain = node_id.split('_')[0] file_path = os.path.join(self.storage_path, "domains", f"{domain}.jsonl") if not os.path.exists(file_path): return None try: async with aiofiles.open(file_path, 'r') as f: async for line in f: data = json.loads(line.strip()) if data["id"] == node_id: node = KnowledgeNode.from_dict(data) # Add to in-memory cache self.nodes[node_id] = node return node except Exception as e: print(f"Error loading node {node_id}: {e}") return None async def connect_nodes(self, node_id1: str, node_id2: str): """Create a connection between two knowledge nodes""" node1 = await self.get_node(node_id1) node2 = await self.get_node(node_id2) if node1 and node2: node1.connections.add(node_id2) node2.connections.add(node_id1) async def find_related_nodes(self, node_id: str, depth: int = 2) -> List[KnowledgeNode]: """Find related nodes through the knowledge graph""" visited = set() to_visit = [(node_id, 0)] related = [] while to_visit: current_id, current_depth = to_visit.pop(0) if current_id in visited or current_depth > depth: continue visited.add(current_id) node = await self.get_node(current_id) if node and current_depth > 0: # Don't include the starting node related.append(node) if current_depth < depth: for connected_id in node.connections: if connected_id not in visited: to_visit.append((connected_id, current_depth + 1)) return related async def consolidate_knowledge(self, domain: str) -> str: """ Consolidate knowledge in a domain using LLM to create higher-level abstractions. """ domain_nodes = list(self.domain_index.get(domain, set())) if not domain_nodes: return f"No knowledge found in domain {domain}" # Get high-quality nodes high_quality_nodes = [] for node_id in domain_nodes[:50]: # Limit for processing node = await self.get_node(node_id) if node and node.quality_score > 0.8: high_quality_nodes.append(node) if not high_quality_nodes: return f"No high-quality knowledge found in domain {domain}" # Combine compressed content combined_content = "\n\n".join([node.compressed_content for node in high_quality_nodes]) # Use LLM to create consolidated knowledge if LLM_AVAILABLE: consolidation_prompt = f""" Consolidate this collection of compressed knowledge from the {domain} domain. Create a coherent, comprehensive summary that captures the key insights and patterns. Focus on the most important concepts, relationships, and implications. Knowledge to consolidate: {combined_content[:8000]} # Limit for LLM processing Consolidated Knowledge Summary: """ llm_bridge = OllamaBridge() consolidated = llm_bridge.query(consolidation_prompt, temperature=0.2) else: # Fallback: Simple concatenation with summary consolidated = f"Consolidated knowledge from {len(high_quality_nodes)} sources in {domain} domain:\n\n{combined_content[:1000]}..." # Store the consolidated knowledge consolidated_chunk = CompressedChunk( original_tokens=sum(len(node.compressed_content.split()) for node in high_quality_nodes), compressed_tokens=len(consolidated.split()), compression_ratio=len(consolidated.split()) / sum(len(node.compressed_content.split()) for node in high_quality_nodes), quality_score=0.9, # Assume high quality for consolidated knowledge timestamp=datetime.now(), modality="text", domain=f"{domain}_consolidated", compressed_content=consolidated, metadata={"consolidated_from": len(high_quality_nodes), "type": "consolidated"} ) await self.add_compressed_chunk(consolidated_chunk) return consolidated async def save_async(self): """Asynchronously save knowledge base to disk""" # Save nodes by domain for domain, domain_nodes in self.domain_index.items(): file_path = os.path.join(self.storage_path, "domains", f"{domain}.jsonl") # Collect nodes for this domain domain_data = [] for node_id in domain_nodes: if node_id in self.nodes: domain_data.append(self.nodes[node_id].to_dict()) # Save to file async with aiofiles.open(file_path, 'w') as f: for node_data in domain_data: await f.write(json.dumps(node_data) + '\n') # Save indices indices_path = os.path.join(self.storage_path, "indices", "indices.json") indices_data = { "content_index": {k: list(v) for k, v in self.content_index.items()}, "domain_index": {k: list(v) for k, v in self.domain_index.items()}, "quality_index": self.quality_index, "stats": self.stats, "domains": {k: { "name": v.name, "node_count": v.node_count, "total_compressed_tokens": v.total_compressed_tokens, "avg_quality_score": v.avg_quality_score, "avg_compression_ratio": v.avg_compression_ratio, "last_updated": v.last_updated.isoformat() if v.last_updated else None } for k, v in self.domains.items()} } async with aiofiles.open(indices_path, 'w') as f: await f.write(json.dumps(indices_data, indent=2)) async def load_async(self): """Asynchronously load knowledge base from disk""" indices_path = os.path.join(self.storage_path, "indices", "indices.json") if not os.path.exists(indices_path): return try: async with aiofiles.open(indices_path, 'r') as f: indices_data = json.loads(await f.read()) # Load indices self.content_index = defaultdict(set, {k: set(v) for k, v in indices_data["content_index"].items()}) self.domain_index = defaultdict(set, {k: set(v) for k, v in indices_data["domain_index"].items()}) self.quality_index = indices_data["quality_index"] self.stats = indices_data["stats"] # Load domains for k, v in indices_data["domains"].items(): domain = KnowledgeDomain( name=v["name"], node_count=v["node_count"], total_compressed_tokens=v["total_compressed_tokens"], avg_quality_score=v["avg_quality_score"], avg_compression_ratio=v["avg_compression_ratio"], last_updated=datetime.fromisoformat(v["last_updated"]) if v.get("last_updated") else None ) self.domains[k] = domain except Exception as e: print(f"Error loading knowledge base indices: {e}") def get_statistics(self) -> Dict[str, Any]: """Get comprehensive statistics about the knowledge base""" return { **self.stats, "cache_size": len(self.access_cache), "memory_nodes": len(self.nodes), "domains": {k: { "node_count": v.node_count, "avg_quality": round(v.avg_quality_score, 3), "avg_compression": round(v.avg_compression_ratio, 3), "total_tokens": v.total_compressed_tokens } for k, v in self.domains.items()}, "estimated_original_tokens": self.stats["total_original_tokens"], "storage_efficiency": round(self.stats["total_original_tokens"] / max(1, self.stats["total_compressed_tokens"]), 2) } async def optimize_storage(self): """Optimize storage by removing low-quality nodes and consolidating""" # Remove nodes with quality score < 0.3 to_remove = [] for node_id, node in self.nodes.items(): if node.quality_score < 0.3: to_remove.append(node_id) for node_id in to_remove: del self.nodes[node_id] # Rebuild indices self.content_index.clear() self.domain_index.clear() self.quality_index.clear() for node in self.nodes.values(): self._update_indices(node) # Consolidate domains with too many nodes for domain in self.domains.keys(): if self.domains[domain].node_count > 50000: await self.consolidate_knowledge(domain) await self.save_async() class MassiveDataIngestor: """ Ingests massive amounts of data and compresses it for storage in the knowledge base. """ def __init__(self, knowledge_base: CompressedKnowledgeBase): self.kb = knowledge_base self.ingestion_stats = { "total_processed": 0, "total_stored": 0, "errors": 0, "start_time": None, "end_time": None } async def ingest_data_stream(self, data_stream, domain: str = "web", batch_size: int = 100): """ Ingest a stream of data and compress it for storage. """ self.ingestion_stats["start_time"] = datetime.now() batch = [] async for data_item in data_stream: batch.append(data_item) if len(batch) >= batch_size: await self._process_batch(batch, domain) batch.clear() # Process remaining items if batch: await self._process_batch(batch, domain) self.ingestion_stats["end_time"] = datetime.now() async def _process_batch(self, batch: List[Dict[str, Any]], domain: str): """Process a batch of data items""" for item in batch: try: content = item.get("content", "") if not content.strip(): continue # Compress the content compressed_chunk = await self.kb.compressor.compress_chunk( content=content, domain=domain, modality=item.get("modality", "text"), metadata=item.get("metadata", {}) ) # Store in knowledge base node_id = await self.kb.add_compressed_chunk(compressed_chunk) self.ingestion_stats["total_processed"] += 1 self.ingestion_stats["total_stored"] += 1 except Exception as e: print(f"Error processing item: {e}") self.ingestion_stats["errors"] += 1 def get_ingestion_stats(self) -> Dict[str, Any]: """Get ingestion statistics""" stats = self.ingestion_stats.copy() if stats["start_time"] and stats["end_time"]: duration = stats["end_time"] - stats["start_time"] stats["duration_seconds"] = duration.total_seconds() stats["processing_rate"] = stats["total_processed"] / max(1, duration.total_seconds()) return stats # Integration with ECH0's reasoning system class CompressedKnowledgeBridge: """ Bridge between compressed knowledge base and ECH0's reasoning system. """ def __init__(self, knowledge_base: CompressedKnowledgeBase): self.kb = knowledge_base self.llm_bridge = OllamaBridge() async def retrieve_for_reasoning(self, query: str, context_limit: int = 5000) -> str: """ Retrieve relevant compressed knowledge for reasoning tasks. """ # Retrieve relevant nodes nodes = await self.kb.retrieve_knowledge(query, limit=5, min_quality=0.7) if not nodes: return "No relevant compressed knowledge found." # Combine and format for reasoning knowledge_texts = [] total_length = 0 for node in nodes: if total_length + len(node.compressed_content) > context_limit: break knowledge_texts.append(f"[{node.domain.upper()}] {node.compressed_content}") total_length += len(node.compressed_content) return "\n\n".join(knowledge_texts) async def expand_compressed_knowledge(self, compressed_content: str, query: str) -> str: """ Use LLM to expand compressed knowledge in response to a specific query. """ expansion_prompt = f""" Given this compressed knowledge: {compressed_content} And this specific query: {query} Expand and elaborate on the relevant parts of the compressed knowledge to provide a detailed, contextual response. Focus on accuracy and relevance. """ return self.llm_bridge.query(expansion_prompt, temperature=0.3) if __name__ == "__main__": async def demo(): # Initialize compressed knowledge base kb = CompressedKnowledgeBase("./demo_kb") # Load existing knowledge await kb.load_async() # Add some sample compressed knowledge sample_chunks = [ CompressedChunk( original_tokens=1000, compressed_tokens=100, compression_ratio=0.1, quality_score=0.9, timestamp=datetime.now(), modality="text", domain="academic", compressed_content="Deep learning transformers revolutionized NLP through self-attention mechanisms." ), CompressedChunk( original_tokens=800, compressed_tokens=80, compression_ratio=0.1, quality_score=0.85, timestamp=datetime.now(), modality="text", domain="technical", compressed_content="Quantum computing uses superposition and entanglement for exponential speedup on specific problems." ) ] for chunk in sample_chunks: await kb.add_compressed_chunk(chunk) # Retrieve knowledge results = await kb.retrieve_knowledge("quantum computing", limit=3) for node in results: print(f"Found: {node.compressed_content}") # Get statistics stats = kb.get_statistics() print(f"Knowledge base stats: {stats['total_nodes']} nodes, {stats['storage_efficiency']}x compression") # Save knowledge base await kb.save_async() asyncio.run(demo())