enumerative-entropy-coding / entropy_coding.py
osteele's picture
Initial commit
24c19d8 unverified
#!/usr/bin/env python3
"""
Implementation of enumerative entropy coding as described in Han et al. (2008).
This is the actual "equiprobable partitioning" algorithm from the paper.
"""
# Import the actual implementation
from enumerative_coding import EnumerativeEncoder, ExpGolombCoder
import numpy as np
from typing import Tuple, List, Dict
from collections import Counter
import heapq
from scipy.stats import entropy as scipy_entropy
# For backward compatibility with existing code that expects this class name
EquiprobablePartitioningEncoder = EnumerativeEncoder
class HuffmanEncoder:
"""Standard Huffman coding for comparison."""
class Node:
def __init__(self, symbol=None, freq=0, left=None, right=None):
self.symbol = symbol
self.freq = freq
self.left = left
self.right = right
def __lt__(self, other):
return self.freq < other.freq
def __init__(self):
self.codes = {}
self.root = None
def _build_tree(self, frequencies: Dict[int, int]):
"""Build Huffman tree from symbol frequencies."""
heap = []
for symbol, freq in frequencies.items():
heapq.heappush(heap, self.Node(symbol=symbol, freq=freq))
while len(heap) > 1:
left = heapq.heappop(heap)
right = heapq.heappop(heap)
parent = self.Node(freq=left.freq + right.freq, left=left, right=right)
heapq.heappush(heap, parent)
self.root = heap[0]
def _generate_codes(self, node, code=''):
"""Generate Huffman codes by traversing the tree."""
if node.symbol is not None:
self.codes[node.symbol] = code if code else '0'
return
if node.left:
self._generate_codes(node.left, code + '0')
if node.right:
self._generate_codes(node.right, code + '1')
def encode(self, data: List[int]) -> Tuple[bytes, Dict]:
"""Encode data using Huffman coding."""
frequencies = Counter(data)
if len(frequencies) == 1:
# Special case: only one symbol
symbol = list(frequencies.keys())[0]
self.codes = {symbol: '0'}
else:
self._build_tree(frequencies)
self._generate_codes(self.root)
# Encode data
encoded_bits = ''.join(self.codes[symbol] for symbol in data)
# Pad to byte boundary
padding = (8 - len(encoded_bits) % 8) % 8
encoded_bits += '0' * padding
encoded_bytes = bytes(int(encoded_bits[i:i+8], 2) for i in range(0, len(encoded_bits), 8))
metadata = {
'codes': self.codes,
'padding': padding,
'original_length': len(data)
}
return encoded_bytes, metadata
def decode(self, encoded_bytes: bytes, metadata: Dict) -> List[int]:
"""Decode Huffman encoded data."""
# Create reverse mapping
reverse_codes = {code: symbol for symbol, code in metadata['codes'].items()}
# Convert bytes to bit string
bit_string = ''.join(format(byte, '08b') for byte in encoded_bytes)
# Remove padding
if metadata['padding'] > 0:
bit_string = bit_string[:-metadata['padding']]
decoded = []
current_code = ''
for bit in bit_string:
current_code += bit
if current_code in reverse_codes:
decoded.append(reverse_codes[current_code])
current_code = ''
if len(decoded) >= metadata['original_length']:
break
return decoded
def calculate_entropy(data: List[int]) -> float:
"""Calculate Shannon entropy of data."""
probabilities = list(Counter(data).values())
return scipy_entropy(probabilities, base=2) * len(data) / 8 # Convert to bytes
def theoretical_minimum_size(data: List[int]) -> float:
"""Calculate theoretical minimum compressed size in bytes."""
return calculate_entropy(data)