ech0-prime-agi / learning /data_compressor.py
workofarttattoo's picture
Upload folder using huggingface_hub
f3dce3d verified
"""
ECH0-PRIME Data Compression System
Uses prompt engineering to compress streaming data instead of vectorization.
Copyright (c) 2025 Joshua Hendricks Cole (DBA: Corporation of Light). All Rights Reserved. PATENT PENDING.
"""
import os
import json
import asyncio
import time
import numpy as np
from typing import Dict, List, Any, Optional, AsyncGenerator
from dataclasses import dataclass, field
from datetime import datetime
import aiofiles
import aiohttp
from reasoning.llm_bridge import OllamaBridge
try:
from memory.manager import EpisodicMemory, SemanticMemory # type: ignore
memory_available = True
except ImportError:
memory_available = False
print("Warning: Memory modules not available, using simplified mode")
try:
from ech0_governance.persistent_memory import PersistentMemory # type: ignore
persistent_memory_available = True
except ImportError:
persistent_memory_available = False
print("Warning: Persistent memory not available, using simplified mode")
@dataclass
class CompressionConfig:
"""Configuration for data compression pipeline"""
max_chunk_size: int = 4000 # tokens
compression_ratio: float = 0.1 # 10% of original size
min_compression_ratio: float = 0.05 # minimum compression
max_compression_ratio: float = 0.3 # maximum compression
batch_size: int = 10 # process in batches
quality_threshold: float = 0.7 # minimum quality score
deduplication_threshold: float = 0.85 # similarity threshold for dedup
@dataclass
class CompressedChunk:
"""Represents a compressed data chunk"""
original_tokens: int
compressed_tokens: int
compression_ratio: float
quality_score: float
timestamp: datetime
modality: str # text, image, audio, video
domain: str # academic, web, social, etc.
compressed_content: str
metadata: Dict[str, Any] = field(default_factory=dict)
source_url: Optional[str] = None
checksum: Optional[str] = None
class DataCompressor:
"""
Compresses streaming data using prompt engineering instead of vectorization.
Leverages ECH0-PRIME's LLM bridge for intelligent compression.
"""
def __init__(self, config: CompressionConfig = None):
self.config = config or CompressionConfig()
self.llm_bridge = OllamaBridge(model="llama3.2")
# Initialize memory systems if available
if memory_available:
self.episodic_memory = EpisodicMemory()
self.semantic_memory = SemanticMemory()
else:
self.episodic_memory = None
self.semantic_memory = None
if persistent_memory_available:
# For demo purposes, create a simple mock
self.persistent_memory = None # We'll handle this in store_compressed
else:
self.persistent_memory = None
# Compression statistics
self.stats = {
"total_processed": 0,
"total_compressed": 0,
"avg_compression_ratio": 0.0,
"avg_quality_score": 0.0,
"compression_errors": 0
}
# Domain-specific compression prompts
self.compression_prompts = {
"academic": self._academic_compression_prompt,
"web": self._web_compression_prompt,
"social": self._social_compression_prompt,
"code": self._code_compression_prompt,
"news": self._news_compression_prompt,
"multimodal": self._multimodal_compression_prompt
}
def _academic_compression_prompt(self, content: str) -> str:
"""Compress academic/scientific content"""
return f"""Compress this academic content while preserving:
- Core hypotheses and findings
- Methodological details
- Key evidence and results
- Theoretical contributions
- Important citations
Remove: verbose explanations, redundant details, tangential discussions.
Content: {content[:self.config.max_chunk_size]}
Compressed Summary:"""
def _web_compression_prompt(self, content: str) -> str:
"""Compress web/general content"""
return f"""Compress this web content while preserving:
- Main topic and key points
- Important facts and data
- Actionable information
- Unique insights
Remove: advertisements, navigation, boilerplate, redundancy.
Content: {content[:self.config.max_chunk_size]}
Compressed Summary:"""
def _social_compression_prompt(self, content: str) -> str:
"""Compress social media content"""
return f"""Compress this social content while preserving:
- Core message or opinion
- Key facts mentioned
- Important context
- Unique perspectives
Remove: emojis, hashtags, @mentions, casual language filler.
Content: {content[:self.config.max_chunk_size]}
Compressed Summary:"""
def _code_compression_prompt(self, content: str) -> str:
"""Compress code and technical content"""
return f"""Compress this code/technical content while preserving:
- Core algorithms and logic
- Important functions and classes
- Key design patterns
- Technical specifications
- API signatures
Remove: comments, whitespace, examples, verbose documentation.
Content: {content[:self.config.max_chunk_size]}
Compressed Summary:"""
def _news_compression_prompt(self, content: str) -> str:
"""Compress news content"""
return f"""Compress this news content while preserving:
- Who, What, When, Where, Why, How
- Key facts and quotes
- Important context
- Impact and implications
Remove: journalistic style, repetitive phrases, advertisements.
Content: {content[:self.config.max_chunk_size]}
Compressed Summary:"""
def _multimodal_compression_prompt(self, content: str) -> str:
"""Compress multimodal content (text + images/audio)"""
return f"""Compress this multimodal content while preserving:
- Visual/audio descriptions and their significance
- Text-visual correlations
- Key multimodal insights
- Cross-modal relationships
Remove: redundant descriptions, technical metadata, file formats.
Content: {content[:self.config.max_chunk_size]}
Compressed Summary:"""
async def compress_chunk(self, content: str, modality: str = "text",
domain: str = "web", metadata: Dict[str, Any] = None) -> CompressedChunk:
"""
Compress a single chunk of data using prompt engineering.
"""
start_time = time.time()
# Select appropriate compression prompt
compression_prompt = self.compression_prompts.get(domain, self._web_compression_prompt)(content)
# Use LLM for compression
compressed_content = await asyncio.get_event_loop().run_in_executor(
None, self.llm_bridge.query, compression_prompt, None, None, 0.3, 0.9
)
if not compressed_content or "BRIDGE ERROR" in compressed_content:
# Fallback: simple extractive compression
compressed_content = self._extractive_compress(content)
# Calculate compression metrics
original_tokens = len(content.split())
compressed_tokens = len(compressed_content.split())
compression_ratio = compressed_tokens / original_tokens if original_tokens > 0 else 0
# Quality assessment (simplified)
quality_score = self._assess_compression_quality(content, compressed_content, domain)
# Adjust compression if outside bounds
if compression_ratio < self.config.min_compression_ratio:
compressed_content = self._aggressive_compress(compressed_content)
elif compression_ratio > self.config.max_compression_ratio:
compressed_content = self._expand_compress(compressed_content, content)
chunk = CompressedChunk(
original_tokens=original_tokens,
compressed_tokens=len(compressed_content.split()),
compression_ratio=compression_ratio,
quality_score=quality_score,
timestamp=datetime.now(),
modality=modality,
domain=domain,
compressed_content=compressed_content,
metadata=metadata or {},
source_url=metadata.get("source_url") if metadata else None
)
# Update statistics
self._update_stats(chunk)
return chunk
def _extractive_compress(self, content: str) -> str:
"""Simple extractive compression fallback"""
sentences = content.split('.')
# Keep first and last sentences, and any with key terms
key_sentences = [sentences[0]] if sentences else []
key_sentences.extend([s for s in sentences[1:-1]
if any(term in s.lower() for term in
['important', 'key', 'significant', 'main', 'core'])])
if len(sentences) > 1:
key_sentences.append(sentences[-1])
return '. '.join(set(key_sentences)).strip()
def _aggressive_compress(self, content: str) -> str:
"""More aggressive compression"""
words = content.split()
# Keep every 3rd word approximately
compressed = ' '.join(words[::3])
return compressed if len(compressed) > 10 else content[:200]
def _expand_compress(self, compressed: str, original: str) -> str:
"""Expand compression if too aggressive"""
return compressed + " [Note: Compression too aggressive, keeping more detail]"
def _assess_compression_quality(self, original: str, compressed: str, domain: str) -> float:
"""Simple quality assessment"""
# Length ratio (too short = bad)
length_ratio = len(compressed) / len(original)
# Information density (sentences per word)
original_sentences = len(original.split('.'))
compressed_sentences = len(compressed.split('.'))
density_score = min(1.0, compressed_sentences / max(1, original_sentences))
# Domain-specific checks
domain_score = 1.0
if domain == "academic":
# Check for academic keywords
academic_terms = ['hypothesis', 'method', 'results', 'theory', 'evidence']
found_terms = sum(1 for term in academic_terms if term in compressed.lower())
domain_score = found_terms / len(academic_terms)
return (length_ratio * 0.4 + density_score * 0.3 + domain_score * 0.3)
def _update_stats(self, chunk: CompressedChunk):
"""Update compression statistics"""
self.stats["total_processed"] += 1
self.stats["total_compressed"] += chunk.compressed_tokens
# Rolling average
total = self.stats["total_processed"]
self.stats["avg_compression_ratio"] = (
(self.stats["avg_compression_ratio"] * (total - 1) + chunk.compression_ratio) / total
)
self.stats["avg_quality_score"] = (
(self.stats["avg_quality_score"] * (total - 1) + chunk.quality_score) / total
)
async def compress_stream(self, data_stream: AsyncGenerator[Dict[str, Any], None]) -> AsyncGenerator[CompressedChunk, None]:
"""
Compress a stream of data chunks asynchronously.
"""
buffer = []
async for chunk_data in data_stream:
buffer.append(chunk_data)
if len(buffer) >= self.config.batch_size:
# Process batch
tasks = []
for data in buffer:
task = self.compress_chunk(
content=data.get("content", ""),
modality=data.get("modality", "text"),
domain=data.get("domain", "web"),
metadata=data.get("metadata", {})
)
tasks.append(task)
# Execute batch compression
compressed_chunks = await asyncio.gather(*tasks)
# Yield results
for chunk in compressed_chunks:
if chunk.quality_score >= self.config.quality_threshold:
yield chunk
else:
self.stats["compression_errors"] += 1
buffer.clear()
# Process remaining buffer
if buffer:
tasks = []
for data in buffer:
task = self.compress_chunk(
content=data.get("content", ""),
modality=data.get("modality", "text"),
domain=data.get("domain", "web"),
metadata=data.get("metadata", {})
)
tasks.append(task)
compressed_chunks = await asyncio.gather(*tasks)
for chunk in compressed_chunks:
if chunk.quality_score >= self.config.quality_threshold:
yield chunk
async def store_compressed(self, chunk: CompressedChunk, use_memory: bool = True):
"""
Store compressed chunk in ECH0's memory systems.
"""
if use_memory:
# Store in episodic memory
if self.episodic_memory:
feature_vector = self._chunk_to_vector(chunk)
self.episodic_memory.store_episode(feature_vector, {
"compressed_content": chunk.compressed_content,
"metadata": chunk.metadata,
"quality_score": chunk.quality_score,
"timestamp": chunk.timestamp.isoformat()
})
# Store in semantic memory if high quality
if self.semantic_memory and chunk.quality_score > 0.8:
self.semantic_memory.store_fact(
fact=chunk.compressed_content,
confidence=chunk.quality_score,
domain=chunk.domain
)
# Store in persistent memory
if self.persistent_memory:
self.persistent_memory.store(chunk.compressed_content, chunk.metadata)
def _chunk_to_vector(self, chunk: CompressedChunk) -> np.ndarray:
"""Convert compressed chunk to vector representation"""
import hashlib
import numpy as np
# Simple hash-based vectorization (can be improved with actual embeddings)
content_hash = hashlib.md5(chunk.compressed_content.encode()).hexdigest()
vector = np.array([int(content_hash[i:i+2], 16) for i in range(0, len(content_hash), 2)])
vector = vector / 255.0 # normalize to [0,1]
return vector[:1024] # truncate to expected dimension
def get_compression_stats(self) -> Dict[str, Any]:
"""Get current compression statistics"""
return self.stats.copy()
async def adaptive_compress(self, content: str, target_ratio: float = None,
domain: str = "web") -> str:
"""
Adaptively compress content to achieve target compression ratio.
"""
if target_ratio is None:
target_ratio = self.config.compression_ratio
# Try different compression levels
ratios = [0.05, 0.1, 0.15, 0.2, 0.3]
best_compression = content
best_ratio_diff = float('inf')
for ratio in ratios:
self.config.compression_ratio = ratio
chunk = await self.compress_chunk(content, domain=domain)
ratio_diff = abs(chunk.compression_ratio - target_ratio)
if ratio_diff < best_ratio_diff:
best_ratio_diff = ratio_diff
best_compression = chunk.compressed_content
return best_compression
class StreamingDataProcessor:
"""
Processes streaming data sources and compresses them using ECH0-PRIME.
"""
def __init__(self, compressor: DataCompressor = None):
self.compressor = compressor or DataCompressor()
self.active_streams = {}
self.processed_count = 0
async def stream_from_api(self, api_url: str, api_key: str = None,
domain: str = "web") -> AsyncGenerator[CompressedChunk, None]:
"""
Stream and compress data from an API endpoint.
"""
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
async with aiohttp.ClientSession(headers=headers) as session:
try:
async with session.get(api_url) as response:
if response.content_type == 'application/json':
data = await response.json()
# Process JSON data stream
if isinstance(data, list):
for item in data:
content = json.dumps(item)
async def single_item():
yield {
"content": content,
"modality": "text",
"domain": domain,
"metadata": {"source": api_url, "type": "api"}
}
async for chunk in self.compressor.compress_stream(single_item()):
yield chunk
else:
content = json.dumps(data)
async def single_item():
yield {
"content": content,
"modality": "text",
"domain": domain,
"metadata": {"source": api_url, "type": "api"}
}
async for chunk in self.compressor.compress_stream(single_item()):
yield chunk
elif response.content_type.startswith('text/'):
text = await response.text()
# Split into chunks and process
chunks = self._split_text(text, 4000)
for i, chunk_text in enumerate(chunks):
async def single_chunk():
yield {
"content": chunk_text,
"modality": "text",
"domain": domain,
"metadata": {"source": api_url, "chunk_id": i, "type": "api"}
}
async for chunk in self.compressor.compress_stream(single_chunk()):
yield chunk
except Exception as e:
print(f"API streaming error: {e}")
async def stream_from_file(self, file_path: str, domain: str = "web") -> AsyncGenerator[CompressedChunk, None]:
"""
Stream and compress data from a file.
"""
try:
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
# Split large files into chunks
chunks = self._split_text(content, 4000)
for i, chunk_text in enumerate(chunks):
async def single_chunk():
yield {
"content": chunk_text,
"modality": "text",
"domain": domain,
"metadata": {"source": file_path, "chunk_id": i, "type": "file"}
}
async for chunk in self.compressor.compress_stream(single_chunk()):
yield chunk
except Exception as e:
print(f"File streaming error: {e}")
def _split_text(self, text: str, chunk_size: int) -> List[str]:
"""Split text into chunks while preserving sentence boundaries"""
sentences = text.split('.')
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk + sentence) > chunk_size and current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + "."
else:
current_chunk += sentence + "."
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
async def process_stream(self, stream_id: str, stream_source: AsyncGenerator[CompressedChunk, None]):
"""
Process a data stream and store compressed results.
"""
self.active_streams[stream_id] = {"status": "processing", "processed": 0}
try:
async for compressed_chunk in stream_source:
# Store the compressed chunk
await self.compressor.store_compressed(compressed_chunk)
self.processed_count += 1
self.active_streams[stream_id]["processed"] = self.processed_count
except Exception as e:
print(f"Stream processing error for {stream_id}: {e}")
self.active_streams[stream_id]["status"] = "error"
else:
self.active_streams[stream_id]["status"] = "completed"
def get_stream_status(self, stream_id: str) -> Dict[str, Any]:
"""Get processing status for a stream"""
return self.active_streams.get(stream_id, {"status": "not_found"})
# Convenience functions for easy use
async def compress_data_stream(data_stream: AsyncGenerator[Dict[str, Any], None],
domain: str = "web") -> AsyncGenerator[CompressedChunk, None]:
"""
High-level function to compress a data stream.
"""
compressor = DataCompressor()
async for chunk in compressor.compress_stream(data_stream):
yield chunk
async def process_api_stream(api_url: str, api_key: str = None, domain: str = "web") -> AsyncGenerator[CompressedChunk, None]:
"""
Process and compress data from an API.
"""
processor = StreamingDataProcessor()
async for chunk in processor.stream_from_api(api_url, api_key, domain):
yield chunk
if __name__ == "__main__":
# Example usage
async def demo():
compressor = DataCompressor()
# Example content
sample_content = """
The field of artificial intelligence has seen remarkable progress in recent years.
Large language models have demonstrated impressive capabilities in natural language understanding,
generation, and reasoning. However, achieving true artificial general intelligence remains
a significant challenge that requires advances in multiple areas including multimodal learning,
causal reasoning, and continual learning. The development of systems that can learn from
diverse data sources while maintaining robustness and safety is crucial for the future of AI.
"""
# Compress the content
compressed = await compressor.compress_chunk(sample_content, domain="academic")
print(f"Original: {compressed.original_tokens} tokens")
print(f"Compressed: {compressed.compressed_tokens} tokens")
print(f"Ratio: {compressed.compression_ratio:.2f}")
print(f"Quality: {compressed.quality_score:.2f}")
print(f"Compressed content: {compressed.compressed_content}")
asyncio.run(demo())