| | |
| | """ |
| | Implementation of enumerative entropy coding as described in Han et al. (2008). |
| | This is the actual "equiprobable partitioning" algorithm from the paper. |
| | """ |
| |
|
| | |
| | from enumerative_coding import EnumerativeEncoder, ExpGolombCoder |
| |
|
| | import numpy as np |
| | from typing import Tuple, List, Dict |
| | from collections import Counter |
| | import heapq |
| | from scipy.stats import entropy as scipy_entropy |
| |
|
| |
|
| | |
| | EquiprobablePartitioningEncoder = EnumerativeEncoder |
| |
|
| |
|
| | class HuffmanEncoder: |
| | """Standard Huffman coding for comparison.""" |
| | |
| | class Node: |
| | def __init__(self, symbol=None, freq=0, left=None, right=None): |
| | self.symbol = symbol |
| | self.freq = freq |
| | self.left = left |
| | self.right = right |
| | |
| | def __lt__(self, other): |
| | return self.freq < other.freq |
| | |
| | def __init__(self): |
| | self.codes = {} |
| | self.root = None |
| | |
| | def _build_tree(self, frequencies: Dict[int, int]): |
| | """Build Huffman tree from symbol frequencies.""" |
| | heap = [] |
| | |
| | for symbol, freq in frequencies.items(): |
| | heapq.heappush(heap, self.Node(symbol=symbol, freq=freq)) |
| | |
| | while len(heap) > 1: |
| | left = heapq.heappop(heap) |
| | right = heapq.heappop(heap) |
| | parent = self.Node(freq=left.freq + right.freq, left=left, right=right) |
| | heapq.heappush(heap, parent) |
| | |
| | self.root = heap[0] |
| | |
| | def _generate_codes(self, node, code=''): |
| | """Generate Huffman codes by traversing the tree.""" |
| | if node.symbol is not None: |
| | self.codes[node.symbol] = code if code else '0' |
| | return |
| | |
| | if node.left: |
| | self._generate_codes(node.left, code + '0') |
| | if node.right: |
| | self._generate_codes(node.right, code + '1') |
| | |
| | def encode(self, data: List[int]) -> Tuple[bytes, Dict]: |
| | """Encode data using Huffman coding.""" |
| | frequencies = Counter(data) |
| | |
| | if len(frequencies) == 1: |
| | |
| | symbol = list(frequencies.keys())[0] |
| | self.codes = {symbol: '0'} |
| | else: |
| | self._build_tree(frequencies) |
| | self._generate_codes(self.root) |
| | |
| | |
| | encoded_bits = ''.join(self.codes[symbol] for symbol in data) |
| | |
| | |
| | padding = (8 - len(encoded_bits) % 8) % 8 |
| | encoded_bits += '0' * padding |
| | |
| | encoded_bytes = bytes(int(encoded_bits[i:i+8], 2) for i in range(0, len(encoded_bits), 8)) |
| | |
| | metadata = { |
| | 'codes': self.codes, |
| | 'padding': padding, |
| | 'original_length': len(data) |
| | } |
| | |
| | return encoded_bytes, metadata |
| | |
| | def decode(self, encoded_bytes: bytes, metadata: Dict) -> List[int]: |
| | """Decode Huffman encoded data.""" |
| | |
| | reverse_codes = {code: symbol for symbol, code in metadata['codes'].items()} |
| | |
| | |
| | bit_string = ''.join(format(byte, '08b') for byte in encoded_bytes) |
| | |
| | |
| | if metadata['padding'] > 0: |
| | bit_string = bit_string[:-metadata['padding']] |
| | |
| | decoded = [] |
| | current_code = '' |
| | |
| | for bit in bit_string: |
| | current_code += bit |
| | if current_code in reverse_codes: |
| | decoded.append(reverse_codes[current_code]) |
| | current_code = '' |
| | if len(decoded) >= metadata['original_length']: |
| | break |
| | |
| | return decoded |
| |
|
| |
|
| | def calculate_entropy(data: List[int]) -> float: |
| | """Calculate Shannon entropy of data.""" |
| | probabilities = list(Counter(data).values()) |
| | return scipy_entropy(probabilities, base=2) * len(data) / 8 |
| |
|
| |
|
| | def theoretical_minimum_size(data: List[int]) -> float: |
| | """Calculate theoretical minimum compressed size in bytes.""" |
| | return calculate_entropy(data) |