| | |
| | """ |
| | Test entropy coding with equiprobable partitioning on various datasets. |
| | Compares against Huffman coding and theoretical limits. |
| | """ |
| |
|
| | import numpy as np |
| | from datasets import load_dataset |
| | import os |
| | from typing import List, Dict, Tuple, Optional |
| | from enumerative_coding import EnumerativeEncoder |
| | from entropy_coding import HuffmanEncoder, theoretical_minimum_size |
| | import json |
| | import signal |
| | from contextlib import contextmanager |
| |
|
| |
|
| | class TimeoutError(Exception): |
| | """Raised when a timeout occurs.""" |
| | pass |
| |
|
| |
|
| | @contextmanager |
| | def timeout(seconds): |
| | """Context manager for timeout functionality.""" |
| | def timeout_handler(signum, frame): |
| | raise TimeoutError(f"Operation timed out after {seconds} seconds") |
| | |
| | |
| | old_handler = signal.signal(signal.SIGALRM, timeout_handler) |
| | signal.alarm(seconds) |
| | |
| | try: |
| | yield |
| | finally: |
| | |
| | signal.signal(signal.SIGALRM, old_handler) |
| | signal.alarm(0) |
| |
|
| |
|
| | def generate_iid_data(size: int, vocab_size: int, distribution: str = 'uniform') -> List[int]: |
| | """ |
| | Generate i.i.d. data with specified vocabulary size and distribution. |
| | |
| | Args: |
| | size: Number of symbols to generate |
| | vocab_size: Size of the vocabulary (number of unique symbols) |
| | distribution: 'uniform', 'zipf', or 'geometric' |
| | """ |
| | if distribution == 'uniform': |
| | return list(np.random.randint(0, vocab_size, size)) |
| | elif distribution == 'zipf': |
| | |
| | probabilities = 1 / np.arange(1, vocab_size + 1) |
| | probabilities /= probabilities.sum() |
| | return list(np.random.choice(vocab_size, size, p=probabilities)) |
| | elif distribution == 'geometric': |
| | |
| | p = 0.3 |
| | probabilities = [(1 - p) ** i * p for i in range(vocab_size - 1)] |
| | probabilities.append((1 - p) ** (vocab_size - 1)) |
| | probabilities = np.array(probabilities) |
| | probabilities /= probabilities.sum() |
| | return list(np.random.choice(vocab_size, size, p=probabilities)) |
| | else: |
| | raise ValueError(f"Unknown distribution: {distribution}") |
| |
|
| |
|
| | def download_english_text() -> str: |
| | """Download English text from Hugging Face.""" |
| | print("Downloading English text dataset...") |
| | |
| | dataset = load_dataset("wikitext", "wikitext-2-raw-v1", split="train[:10%]") |
| | |
| | |
| | text = " ".join(dataset['text'][:1000]) |
| | |
| | |
| | text = " ".join(text.split()) |
| | |
| | return text |
| |
|
| |
|
| | def text_to_symbols(text: str) -> List[int]: |
| | """Convert text to list of integer symbols (byte values).""" |
| | return list(text.encode('utf-8')) |
| |
|
| |
|
| | def symbols_to_text(symbols: List[int]) -> str: |
| | """Convert list of integer symbols back to text.""" |
| | return bytes(symbols).decode('utf-8', errors='ignore') |
| |
|
| |
|
| | def compress_and_compare(data: List[int], name: str, timeout_seconds: int = 30) -> Dict: |
| | """ |
| | Compress data using different methods and compare results. |
| | |
| | Args: |
| | data: Input data as list of integers |
| | name: Name for this dataset |
| | timeout_seconds: Timeout in seconds for enumerative coding |
| | """ |
| | original_size = len(data) |
| | |
| | results = { |
| | 'name': name, |
| | 'original_size': original_size, |
| | 'vocabulary_size': len(set(data)), |
| | 'theoretical_minimum': theoretical_minimum_size(data), |
| | 'methods': {} |
| | } |
| | |
| | |
| | print(f" Compressing with Huffman coding...") |
| | import time |
| | |
| | huffman_start_time = time.time() |
| | huffman = HuffmanEncoder() |
| | huffman_encoded, huffman_metadata = huffman.encode(data) |
| | huffman_size = len(huffman_encoded) |
| | |
| | |
| | huffman_decoded = huffman.decode(huffman_encoded, huffman_metadata) |
| | huffman_correct = huffman_decoded == data |
| | huffman_encoding_time = time.time() - huffman_start_time |
| | |
| | results['methods']['huffman'] = { |
| | 'compressed_size': huffman_size, |
| | 'compression_ratio': original_size / huffman_size, |
| | 'bits_per_symbol': huffman_size * 8 / len(data), |
| | 'correct': huffman_correct, |
| | 'encoding_time': huffman_encoding_time |
| | } |
| | |
| | |
| | print(f" Compressing with enumerative entropy coding (timeout: {timeout_seconds}s)...") |
| | import time |
| | |
| | start_time = time.time() |
| | try: |
| | with timeout(timeout_seconds): |
| | ep_encoder = EnumerativeEncoder() |
| | ep_encoded = ep_encoder.encode(data) |
| | ep_size = len(ep_encoded) |
| | |
| | |
| | ep_decoded = ep_encoder.decode(ep_encoded) |
| | ep_correct = ep_decoded == data |
| | |
| | encoding_time = time.time() - start_time |
| | results['methods']['enumerative'] = { |
| | 'compressed_size': ep_size, |
| | 'compression_ratio': original_size / ep_size, |
| | 'bits_per_symbol': ep_size * 8 / len(data), |
| | 'correct': ep_correct, |
| | 'encoding_time': encoding_time |
| | } |
| | except TimeoutError as e: |
| | encoding_time = time.time() - start_time |
| | print(f" Enumerative coding timed out: {e}") |
| | results['methods']['enumerative'] = { |
| | 'compressed_size': None, |
| | 'compression_ratio': None, |
| | 'bits_per_symbol': None, |
| | 'correct': False, |
| | 'encoding_time': encoding_time, |
| | 'timed_out': True |
| | } |
| | except ValueError as e: |
| | encoding_time = time.time() - start_time |
| | print(f" Enumerative coding failed: {e}") |
| | results['methods']['enumerative'] = { |
| | 'compressed_size': None, |
| | 'compression_ratio': None, |
| | 'bits_per_symbol': None, |
| | 'correct': False, |
| | 'encoding_time': encoding_time, |
| | 'timed_out': False, |
| | 'error': str(e) |
| | } |
| | except Exception as e: |
| | encoding_time = time.time() - start_time |
| | print(f" Enumerative coding failed: {e}") |
| | results['methods']['enumerative'] = { |
| | 'compressed_size': None, |
| | 'compression_ratio': None, |
| | 'bits_per_symbol': None, |
| | 'correct': False, |
| | 'encoding_time': encoding_time, |
| | 'timed_out': False, |
| | 'error': str(e) |
| | } |
| | |
| | return results |
| |
|
| |
|
| | def main(): |
| | """Run compression tests on various datasets.""" |
| | np.random.seed(42) |
| | |
| | all_results = [] |
| | |
| | |
| | test_configs = [ |
| | |
| | |
| | ("uniform_10k_v256", 10000, 256, 'uniform'), |
| | |
| | |
| | ("zipf_10k_v16", 10000, 16, 'zipf'), |
| | ("zipf_10k_v64", 10000, 64, 'zipf'), |
| | ("zipf_10k_v256", 10000, 256, 'zipf'), |
| | ("zipf_5k_v16", 5000, 16, 'zipf'), |
| | ("zipf_5k_v64", 5000, 64, 'zipf'), |
| | ("zipf_5k_v256", 5000, 256, 'zipf'), |
| | |
| | |
| | ("geometric_10k_v16", 10000, 16, 'geometric'), |
| | ("geometric_10k_v64", 10000, 64, 'geometric'), |
| | ("geometric_10k_v256", 10000, 256, 'geometric'), |
| | ("geometric_5k_v16", 5000, 16, 'geometric'), |
| | ("geometric_5k_v64", 5000, 64, 'geometric'), |
| | ("geometric_5k_v256", 5000, 256, 'geometric'), |
| | |
| | |
| | ("zipf_100k_v256", 100000, 256, 'zipf'), |
| | ] |
| | |
| | |
| | print("Testing i.i.d. datasets...") |
| | for name, size, vocab_size, distribution in test_configs: |
| | print(f"\nTesting {name} (size={size}, vocab={vocab_size}, dist={distribution})...") |
| | data = generate_iid_data(size, vocab_size, distribution) |
| | results = compress_and_compare(data, name) |
| | all_results.append(results) |
| | print_results(results) |
| | |
| | |
| | print("\nTesting English text...") |
| | english_text = download_english_text() |
| | print(f"Text length: {len(english_text)} characters") |
| | |
| | |
| | text_symbols = text_to_symbols(english_text) |
| | |
| | |
| | max_text_length = 2000 |
| | if len(text_symbols) > max_text_length: |
| | text_symbols = text_symbols[:max_text_length] |
| | print(f"Using text subset of {len(text_symbols)} symbols (original: {len(text_to_symbols(english_text))})") |
| | |
| | text_results = compress_and_compare(text_symbols, "english_text") |
| | all_results.append(text_results) |
| | print_results(text_results) |
| | |
| | |
| | with open('compression_results.json', 'w') as f: |
| | json.dump(all_results, f, indent=2) |
| | |
| | print("\nResults saved to compression_results.json") |
| | |
| | |
| | print("\n" + "="*80) |
| | print("COMPRESSION SUMMARY") |
| | print("="*80) |
| | print(f"{'Dataset':<20} {'Original':<10} {'Theoretical':<12} {'Huffman':<10} {'Enumerative':<12}") |
| | print("-"*70) |
| | |
| | for result in all_results: |
| | name = result['name'][:20] |
| | original = result['original_size'] |
| | theoretical = result['theoretical_minimum'] |
| | huffman = result['methods']['huffman']['compressed_size'] |
| | enumerative_method = result['methods']['enumerative'] |
| | if enumerative_method is not None and enumerative_method.get('compressed_size') is not None: |
| | enumerative = enumerative_method['compressed_size'] |
| | else: |
| | enumerative = "TIMEOUT" |
| | |
| | row = f"{name:<20} {original:<10} {theoretical:<12.1f} {huffman:<10} {enumerative:<12}" |
| | print(row) |
| |
|
| |
|
| | def print_results(results: Dict): |
| | """Print compression results for a dataset.""" |
| | print(f"\nResults for {results['name']}:") |
| | print(f" Original size: {results['original_size']} bytes") |
| | print(f" Vocabulary size: {results['vocabulary_size']}") |
| | print(f" Theoretical minimum: {results['theoretical_minimum']:.1f} bytes") |
| | |
| | for method, data in results['methods'].items(): |
| | print(f"\n {method}:") |
| | if data is None or data.get('compressed_size') is None: |
| | if data and data.get('timed_out'): |
| | print(f" Status: TIMEOUT after {data.get('encoding_time', 0):.2f}s - computational complexity too high") |
| | elif data and data.get('error'): |
| | print(f" Status: FAILED - {data.get('error')}") |
| | else: |
| | print(f" Status: FAILED - computational complexity too high") |
| | else: |
| | print(f" Compressed size: {data['compressed_size']} bytes") |
| | print(f" Compression ratio: {data['compression_ratio']:.2f}") |
| | print(f" Bits per symbol: {data['bits_per_symbol']:.2f}") |
| | print(f" Correctly decoded: {data['correct']}") |
| | if 'encoding_time' in data: |
| | print(f" Encoding time: {data['encoding_time']:.3f}s") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |