enumerative-entropy-coding / test_compression.py
osteele's picture
Initial commit
24c19d8 unverified
#!/usr/bin/env python3
"""
Test entropy coding with equiprobable partitioning on various datasets.
Compares against Huffman coding and theoretical limits.
"""
import numpy as np
from datasets import load_dataset
import os
from typing import List, Dict, Tuple, Optional
from enumerative_coding import EnumerativeEncoder
from entropy_coding import HuffmanEncoder, theoretical_minimum_size
import json
import signal
from contextlib import contextmanager
class TimeoutError(Exception):
"""Raised when a timeout occurs."""
pass
@contextmanager
def timeout(seconds):
"""Context manager for timeout functionality."""
def timeout_handler(signum, frame):
raise TimeoutError(f"Operation timed out after {seconds} seconds")
# Set the signal handler
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
# Restore the old signal handler and disable alarm
signal.signal(signal.SIGALRM, old_handler)
signal.alarm(0)
def generate_iid_data(size: int, vocab_size: int, distribution: str = 'uniform') -> List[int]:
"""
Generate i.i.d. data with specified vocabulary size and distribution.
Args:
size: Number of symbols to generate
vocab_size: Size of the vocabulary (number of unique symbols)
distribution: 'uniform', 'zipf', or 'geometric'
"""
if distribution == 'uniform':
return list(np.random.randint(0, vocab_size, size))
elif distribution == 'zipf':
# Zipf distribution (power law)
probabilities = 1 / np.arange(1, vocab_size + 1)
probabilities /= probabilities.sum()
return list(np.random.choice(vocab_size, size, p=probabilities))
elif distribution == 'geometric':
# Geometric distribution
p = 0.3 # Parameter for geometric distribution
probabilities = [(1 - p) ** i * p for i in range(vocab_size - 1)]
probabilities.append((1 - p) ** (vocab_size - 1))
probabilities = np.array(probabilities)
probabilities /= probabilities.sum()
return list(np.random.choice(vocab_size, size, p=probabilities))
else:
raise ValueError(f"Unknown distribution: {distribution}")
def download_english_text() -> str:
"""Download English text from Hugging Face."""
print("Downloading English text dataset...")
# Using WikiText-2 dataset as a source of English text
dataset = load_dataset("wikitext", "wikitext-2-raw-v1", split="train[:10%]")
# Concatenate text samples
text = " ".join(dataset['text'][:1000]) # Take first 1000 samples
# Clean up text
text = " ".join(text.split()) # Remove extra whitespace
return text
def text_to_symbols(text: str) -> List[int]:
"""Convert text to list of integer symbols (byte values)."""
return list(text.encode('utf-8'))
def symbols_to_text(symbols: List[int]) -> str:
"""Convert list of integer symbols back to text."""
return bytes(symbols).decode('utf-8', errors='ignore')
def compress_and_compare(data: List[int], name: str, timeout_seconds: int = 30) -> Dict:
"""
Compress data using different methods and compare results.
Args:
data: Input data as list of integers
name: Name for this dataset
timeout_seconds: Timeout in seconds for enumerative coding
"""
original_size = len(data)
results = {
'name': name,
'original_size': original_size,
'vocabulary_size': len(set(data)),
'theoretical_minimum': theoretical_minimum_size(data),
'methods': {}
}
# Huffman coding (always works quickly)
print(f" Compressing with Huffman coding...")
import time
huffman_start_time = time.time()
huffman = HuffmanEncoder()
huffman_encoded, huffman_metadata = huffman.encode(data)
huffman_size = len(huffman_encoded)
# Verify Huffman decoding
huffman_decoded = huffman.decode(huffman_encoded, huffman_metadata)
huffman_correct = huffman_decoded == data
huffman_encoding_time = time.time() - huffman_start_time
results['methods']['huffman'] = {
'compressed_size': huffman_size,
'compression_ratio': original_size / huffman_size,
'bits_per_symbol': huffman_size * 8 / len(data),
'correct': huffman_correct,
'encoding_time': huffman_encoding_time
}
# Enumerative entropy coding (may timeout on large datasets)
print(f" Compressing with enumerative entropy coding (timeout: {timeout_seconds}s)...")
import time
start_time = time.time()
try:
with timeout(timeout_seconds):
ep_encoder = EnumerativeEncoder()
ep_encoded = ep_encoder.encode(data)
ep_size = len(ep_encoded)
# Verify decoding
ep_decoded = ep_encoder.decode(ep_encoded)
ep_correct = ep_decoded == data
encoding_time = time.time() - start_time
results['methods']['enumerative'] = {
'compressed_size': ep_size,
'compression_ratio': original_size / ep_size,
'bits_per_symbol': ep_size * 8 / len(data),
'correct': ep_correct,
'encoding_time': encoding_time
}
except TimeoutError as e:
encoding_time = time.time() - start_time
print(f" Enumerative coding timed out: {e}")
results['methods']['enumerative'] = {
'compressed_size': None,
'compression_ratio': None,
'bits_per_symbol': None,
'correct': False,
'encoding_time': encoding_time,
'timed_out': True
}
except ValueError as e:
encoding_time = time.time() - start_time
print(f" Enumerative coding failed: {e}")
results['methods']['enumerative'] = {
'compressed_size': None,
'compression_ratio': None,
'bits_per_symbol': None,
'correct': False,
'encoding_time': encoding_time,
'timed_out': False,
'error': str(e)
}
except Exception as e:
encoding_time = time.time() - start_time
print(f" Enumerative coding failed: {e}")
results['methods']['enumerative'] = {
'compressed_size': None,
'compression_ratio': None,
'bits_per_symbol': None,
'correct': False,
'encoding_time': encoding_time,
'timed_out': False,
'error': str(e)
}
return results
def main():
"""Run compression tests on various datasets."""
np.random.seed(42) # For reproducibility
all_results = []
# Test configurations
test_configs = [
# (name, size, vocab_size, distribution)
# Uniform distribution (one representative case)
("uniform_10k_v256", 10000, 256, 'uniform'),
# Zipf distribution with different vocabulary sizes
("zipf_10k_v16", 10000, 16, 'zipf'),
("zipf_10k_v64", 10000, 64, 'zipf'),
("zipf_10k_v256", 10000, 256, 'zipf'),
("zipf_5k_v16", 5000, 16, 'zipf'),
("zipf_5k_v64", 5000, 64, 'zipf'),
("zipf_5k_v256", 5000, 256, 'zipf'),
# Geometric distribution with different vocabulary sizes
("geometric_10k_v16", 10000, 16, 'geometric'),
("geometric_10k_v64", 10000, 64, 'geometric'),
("geometric_10k_v256", 10000, 256, 'geometric'),
("geometric_5k_v16", 5000, 16, 'geometric'),
("geometric_5k_v64", 5000, 64, 'geometric'),
("geometric_5k_v256", 5000, 256, 'geometric'),
# Large scale test with most interesting distribution
("zipf_100k_v256", 100000, 256, 'zipf'),
]
# Generate and test i.i.d. datasets
print("Testing i.i.d. datasets...")
for name, size, vocab_size, distribution in test_configs:
print(f"\nTesting {name} (size={size}, vocab={vocab_size}, dist={distribution})...")
data = generate_iid_data(size, vocab_size, distribution)
results = compress_and_compare(data, name)
all_results.append(results)
print_results(results)
# Test English text
print("\nTesting English text...")
english_text = download_english_text()
print(f"Text length: {len(english_text)} characters")
# Convert to symbols and compress (use subset to avoid timeout)
text_symbols = text_to_symbols(english_text)
# Use a subset that's computationally feasible for enumerative coding
max_text_length = 2000 # Should complete within timeout
if len(text_symbols) > max_text_length:
text_symbols = text_symbols[:max_text_length]
print(f"Using text subset of {len(text_symbols)} symbols (original: {len(text_to_symbols(english_text))})")
text_results = compress_and_compare(text_symbols, "english_text")
all_results.append(text_results)
print_results(text_results)
# Save all results to JSON
with open('compression_results.json', 'w') as f:
json.dump(all_results, f, indent=2)
print("\nResults saved to compression_results.json")
# Generate summary table
print("\n" + "="*80)
print("COMPRESSION SUMMARY")
print("="*80)
print(f"{'Dataset':<20} {'Original':<10} {'Theoretical':<12} {'Huffman':<10} {'Enumerative':<12}")
print("-"*70)
for result in all_results:
name = result['name'][:20]
original = result['original_size']
theoretical = result['theoretical_minimum']
huffman = result['methods']['huffman']['compressed_size']
enumerative_method = result['methods']['enumerative']
if enumerative_method is not None and enumerative_method.get('compressed_size') is not None:
enumerative = enumerative_method['compressed_size']
else:
enumerative = "TIMEOUT"
row = f"{name:<20} {original:<10} {theoretical:<12.1f} {huffman:<10} {enumerative:<12}"
print(row)
def print_results(results: Dict):
"""Print compression results for a dataset."""
print(f"\nResults for {results['name']}:")
print(f" Original size: {results['original_size']} bytes")
print(f" Vocabulary size: {results['vocabulary_size']}")
print(f" Theoretical minimum: {results['theoretical_minimum']:.1f} bytes")
for method, data in results['methods'].items():
print(f"\n {method}:")
if data is None or data.get('compressed_size') is None:
if data and data.get('timed_out'):
print(f" Status: TIMEOUT after {data.get('encoding_time', 0):.2f}s - computational complexity too high")
elif data and data.get('error'):
print(f" Status: FAILED - {data.get('error')}")
else:
print(f" Status: FAILED - computational complexity too high")
else:
print(f" Compressed size: {data['compressed_size']} bytes")
print(f" Compression ratio: {data['compression_ratio']:.2f}")
print(f" Bits per symbol: {data['bits_per_symbol']:.2f}")
print(f" Correctly decoded: {data['correct']}")
if 'encoding_time' in data:
print(f" Encoding time: {data['encoding_time']:.3f}s")
if __name__ == "__main__":
main()