#!/usr/bin/env python3 """ Implementation of enumerative entropy coding as described in Han et al. (2008). This is the actual "equiprobable partitioning" algorithm from the paper. """ # Import the actual implementation from enumerative_coding import EnumerativeEncoder, ExpGolombCoder import numpy as np from typing import Tuple, List, Dict from collections import Counter import heapq from scipy.stats import entropy as scipy_entropy # For backward compatibility with existing code that expects this class name EquiprobablePartitioningEncoder = EnumerativeEncoder class HuffmanEncoder: """Standard Huffman coding for comparison.""" class Node: def __init__(self, symbol=None, freq=0, left=None, right=None): self.symbol = symbol self.freq = freq self.left = left self.right = right def __lt__(self, other): return self.freq < other.freq def __init__(self): self.codes = {} self.root = None def _build_tree(self, frequencies: Dict[int, int]): """Build Huffman tree from symbol frequencies.""" heap = [] for symbol, freq in frequencies.items(): heapq.heappush(heap, self.Node(symbol=symbol, freq=freq)) while len(heap) > 1: left = heapq.heappop(heap) right = heapq.heappop(heap) parent = self.Node(freq=left.freq + right.freq, left=left, right=right) heapq.heappush(heap, parent) self.root = heap[0] def _generate_codes(self, node, code=''): """Generate Huffman codes by traversing the tree.""" if node.symbol is not None: self.codes[node.symbol] = code if code else '0' return if node.left: self._generate_codes(node.left, code + '0') if node.right: self._generate_codes(node.right, code + '1') def encode(self, data: List[int]) -> Tuple[bytes, Dict]: """Encode data using Huffman coding.""" frequencies = Counter(data) if len(frequencies) == 1: # Special case: only one symbol symbol = list(frequencies.keys())[0] self.codes = {symbol: '0'} else: self._build_tree(frequencies) self._generate_codes(self.root) # Encode data encoded_bits = ''.join(self.codes[symbol] for symbol in data) # Pad to byte boundary padding = (8 - len(encoded_bits) % 8) % 8 encoded_bits += '0' * padding encoded_bytes = bytes(int(encoded_bits[i:i+8], 2) for i in range(0, len(encoded_bits), 8)) metadata = { 'codes': self.codes, 'padding': padding, 'original_length': len(data) } return encoded_bytes, metadata def decode(self, encoded_bytes: bytes, metadata: Dict) -> List[int]: """Decode Huffman encoded data.""" # Create reverse mapping reverse_codes = {code: symbol for symbol, code in metadata['codes'].items()} # Convert bytes to bit string bit_string = ''.join(format(byte, '08b') for byte in encoded_bytes) # Remove padding if metadata['padding'] > 0: bit_string = bit_string[:-metadata['padding']] decoded = [] current_code = '' for bit in bit_string: current_code += bit if current_code in reverse_codes: decoded.append(reverse_codes[current_code]) current_code = '' if len(decoded) >= metadata['original_length']: break return decoded def calculate_entropy(data: List[int]) -> float: """Calculate Shannon entropy of data.""" probabilities = list(Counter(data).values()) return scipy_entropy(probabilities, base=2) * len(data) / 8 # Convert to bytes def theoretical_minimum_size(data: List[int]) -> float: """Calculate theoretical minimum compressed size in bytes.""" return calculate_entropy(data)