#!/usr/bin/env python3 """ Test entropy coding with equiprobable partitioning on various datasets. Compares against Huffman coding and theoretical limits. """ import numpy as np from datasets import load_dataset import os from typing import List, Dict, Tuple, Optional from enumerative_coding import EnumerativeEncoder from entropy_coding import HuffmanEncoder, theoretical_minimum_size import json import signal from contextlib import contextmanager class TimeoutError(Exception): """Raised when a timeout occurs.""" pass @contextmanager def timeout(seconds): """Context manager for timeout functionality.""" def timeout_handler(signum, frame): raise TimeoutError(f"Operation timed out after {seconds} seconds") # Set the signal handler old_handler = signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(seconds) try: yield finally: # Restore the old signal handler and disable alarm signal.signal(signal.SIGALRM, old_handler) signal.alarm(0) def generate_iid_data(size: int, vocab_size: int, distribution: str = 'uniform') -> List[int]: """ Generate i.i.d. data with specified vocabulary size and distribution. Args: size: Number of symbols to generate vocab_size: Size of the vocabulary (number of unique symbols) distribution: 'uniform', 'zipf', or 'geometric' """ if distribution == 'uniform': return list(np.random.randint(0, vocab_size, size)) elif distribution == 'zipf': # Zipf distribution (power law) probabilities = 1 / np.arange(1, vocab_size + 1) probabilities /= probabilities.sum() return list(np.random.choice(vocab_size, size, p=probabilities)) elif distribution == 'geometric': # Geometric distribution p = 0.3 # Parameter for geometric distribution probabilities = [(1 - p) ** i * p for i in range(vocab_size - 1)] probabilities.append((1 - p) ** (vocab_size - 1)) probabilities = np.array(probabilities) probabilities /= probabilities.sum() return list(np.random.choice(vocab_size, size, p=probabilities)) else: raise ValueError(f"Unknown distribution: {distribution}") def download_english_text() -> str: """Download English text from Hugging Face.""" print("Downloading English text dataset...") # Using WikiText-2 dataset as a source of English text dataset = load_dataset("wikitext", "wikitext-2-raw-v1", split="train[:10%]") # Concatenate text samples text = " ".join(dataset['text'][:1000]) # Take first 1000 samples # Clean up text text = " ".join(text.split()) # Remove extra whitespace return text def text_to_symbols(text: str) -> List[int]: """Convert text to list of integer symbols (byte values).""" return list(text.encode('utf-8')) def symbols_to_text(symbols: List[int]) -> str: """Convert list of integer symbols back to text.""" return bytes(symbols).decode('utf-8', errors='ignore') def compress_and_compare(data: List[int], name: str, timeout_seconds: int = 30) -> Dict: """ Compress data using different methods and compare results. Args: data: Input data as list of integers name: Name for this dataset timeout_seconds: Timeout in seconds for enumerative coding """ original_size = len(data) results = { 'name': name, 'original_size': original_size, 'vocabulary_size': len(set(data)), 'theoretical_minimum': theoretical_minimum_size(data), 'methods': {} } # Huffman coding (always works quickly) print(f" Compressing with Huffman coding...") import time huffman_start_time = time.time() huffman = HuffmanEncoder() huffman_encoded, huffman_metadata = huffman.encode(data) huffman_size = len(huffman_encoded) # Verify Huffman decoding huffman_decoded = huffman.decode(huffman_encoded, huffman_metadata) huffman_correct = huffman_decoded == data huffman_encoding_time = time.time() - huffman_start_time results['methods']['huffman'] = { 'compressed_size': huffman_size, 'compression_ratio': original_size / huffman_size, 'bits_per_symbol': huffman_size * 8 / len(data), 'correct': huffman_correct, 'encoding_time': huffman_encoding_time } # Enumerative entropy coding (may timeout on large datasets) print(f" Compressing with enumerative entropy coding (timeout: {timeout_seconds}s)...") import time start_time = time.time() try: with timeout(timeout_seconds): ep_encoder = EnumerativeEncoder() ep_encoded = ep_encoder.encode(data) ep_size = len(ep_encoded) # Verify decoding ep_decoded = ep_encoder.decode(ep_encoded) ep_correct = ep_decoded == data encoding_time = time.time() - start_time results['methods']['enumerative'] = { 'compressed_size': ep_size, 'compression_ratio': original_size / ep_size, 'bits_per_symbol': ep_size * 8 / len(data), 'correct': ep_correct, 'encoding_time': encoding_time } except TimeoutError as e: encoding_time = time.time() - start_time print(f" Enumerative coding timed out: {e}") results['methods']['enumerative'] = { 'compressed_size': None, 'compression_ratio': None, 'bits_per_symbol': None, 'correct': False, 'encoding_time': encoding_time, 'timed_out': True } except ValueError as e: encoding_time = time.time() - start_time print(f" Enumerative coding failed: {e}") results['methods']['enumerative'] = { 'compressed_size': None, 'compression_ratio': None, 'bits_per_symbol': None, 'correct': False, 'encoding_time': encoding_time, 'timed_out': False, 'error': str(e) } except Exception as e: encoding_time = time.time() - start_time print(f" Enumerative coding failed: {e}") results['methods']['enumerative'] = { 'compressed_size': None, 'compression_ratio': None, 'bits_per_symbol': None, 'correct': False, 'encoding_time': encoding_time, 'timed_out': False, 'error': str(e) } return results def main(): """Run compression tests on various datasets.""" np.random.seed(42) # For reproducibility all_results = [] # Test configurations test_configs = [ # (name, size, vocab_size, distribution) # Uniform distribution (one representative case) ("uniform_10k_v256", 10000, 256, 'uniform'), # Zipf distribution with different vocabulary sizes ("zipf_10k_v16", 10000, 16, 'zipf'), ("zipf_10k_v64", 10000, 64, 'zipf'), ("zipf_10k_v256", 10000, 256, 'zipf'), ("zipf_5k_v16", 5000, 16, 'zipf'), ("zipf_5k_v64", 5000, 64, 'zipf'), ("zipf_5k_v256", 5000, 256, 'zipf'), # Geometric distribution with different vocabulary sizes ("geometric_10k_v16", 10000, 16, 'geometric'), ("geometric_10k_v64", 10000, 64, 'geometric'), ("geometric_10k_v256", 10000, 256, 'geometric'), ("geometric_5k_v16", 5000, 16, 'geometric'), ("geometric_5k_v64", 5000, 64, 'geometric'), ("geometric_5k_v256", 5000, 256, 'geometric'), # Large scale test with most interesting distribution ("zipf_100k_v256", 100000, 256, 'zipf'), ] # Generate and test i.i.d. datasets print("Testing i.i.d. datasets...") for name, size, vocab_size, distribution in test_configs: print(f"\nTesting {name} (size={size}, vocab={vocab_size}, dist={distribution})...") data = generate_iid_data(size, vocab_size, distribution) results = compress_and_compare(data, name) all_results.append(results) print_results(results) # Test English text print("\nTesting English text...") english_text = download_english_text() print(f"Text length: {len(english_text)} characters") # Convert to symbols and compress (use subset to avoid timeout) text_symbols = text_to_symbols(english_text) # Use a subset that's computationally feasible for enumerative coding max_text_length = 2000 # Should complete within timeout if len(text_symbols) > max_text_length: text_symbols = text_symbols[:max_text_length] print(f"Using text subset of {len(text_symbols)} symbols (original: {len(text_to_symbols(english_text))})") text_results = compress_and_compare(text_symbols, "english_text") all_results.append(text_results) print_results(text_results) # Save all results to JSON with open('compression_results.json', 'w') as f: json.dump(all_results, f, indent=2) print("\nResults saved to compression_results.json") # Generate summary table print("\n" + "="*80) print("COMPRESSION SUMMARY") print("="*80) print(f"{'Dataset':<20} {'Original':<10} {'Theoretical':<12} {'Huffman':<10} {'Enumerative':<12}") print("-"*70) for result in all_results: name = result['name'][:20] original = result['original_size'] theoretical = result['theoretical_minimum'] huffman = result['methods']['huffman']['compressed_size'] enumerative_method = result['methods']['enumerative'] if enumerative_method is not None and enumerative_method.get('compressed_size') is not None: enumerative = enumerative_method['compressed_size'] else: enumerative = "TIMEOUT" row = f"{name:<20} {original:<10} {theoretical:<12.1f} {huffman:<10} {enumerative:<12}" print(row) def print_results(results: Dict): """Print compression results for a dataset.""" print(f"\nResults for {results['name']}:") print(f" Original size: {results['original_size']} bytes") print(f" Vocabulary size: {results['vocabulary_size']}") print(f" Theoretical minimum: {results['theoretical_minimum']:.1f} bytes") for method, data in results['methods'].items(): print(f"\n {method}:") if data is None or data.get('compressed_size') is None: if data and data.get('timed_out'): print(f" Status: TIMEOUT after {data.get('encoding_time', 0):.2f}s - computational complexity too high") elif data and data.get('error'): print(f" Status: FAILED - {data.get('error')}") else: print(f" Status: FAILED - computational complexity too high") else: print(f" Compressed size: {data['compressed_size']} bytes") print(f" Compression ratio: {data['compression_ratio']:.2f}") print(f" Bits per symbol: {data['bits_per_symbol']:.2f}") print(f" Correctly decoded: {data['correct']}") if 'encoding_time' in data: print(f" Encoding time: {data['encoding_time']:.3f}s") if __name__ == "__main__": main()