File size: 4,182 Bytes
24c19d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
"""
Implementation of enumerative entropy coding as described in Han et al. (2008).
This is the actual "equiprobable partitioning" algorithm from the paper.
"""

# Import the actual implementation
from enumerative_coding import EnumerativeEncoder, ExpGolombCoder

import numpy as np
from typing import Tuple, List, Dict
from collections import Counter
import heapq
from scipy.stats import entropy as scipy_entropy


# For backward compatibility with existing code that expects this class name
EquiprobablePartitioningEncoder = EnumerativeEncoder


class HuffmanEncoder:
    """Standard Huffman coding for comparison."""
    
    class Node:
        def __init__(self, symbol=None, freq=0, left=None, right=None):
            self.symbol = symbol
            self.freq = freq
            self.left = left
            self.right = right
        
        def __lt__(self, other):
            return self.freq < other.freq
    
    def __init__(self):
        self.codes = {}
        self.root = None
    
    def _build_tree(self, frequencies: Dict[int, int]):
        """Build Huffman tree from symbol frequencies."""
        heap = []
        
        for symbol, freq in frequencies.items():
            heapq.heappush(heap, self.Node(symbol=symbol, freq=freq))
        
        while len(heap) > 1:
            left = heapq.heappop(heap)
            right = heapq.heappop(heap)
            parent = self.Node(freq=left.freq + right.freq, left=left, right=right)
            heapq.heappush(heap, parent)
        
        self.root = heap[0]
    
    def _generate_codes(self, node, code=''):
        """Generate Huffman codes by traversing the tree."""
        if node.symbol is not None:
            self.codes[node.symbol] = code if code else '0'
            return
        
        if node.left:
            self._generate_codes(node.left, code + '0')
        if node.right:
            self._generate_codes(node.right, code + '1')
    
    def encode(self, data: List[int]) -> Tuple[bytes, Dict]:
        """Encode data using Huffman coding."""
        frequencies = Counter(data)
        
        if len(frequencies) == 1:
            # Special case: only one symbol
            symbol = list(frequencies.keys())[0]
            self.codes = {symbol: '0'}
        else:
            self._build_tree(frequencies)
            self._generate_codes(self.root)
        
        # Encode data
        encoded_bits = ''.join(self.codes[symbol] for symbol in data)
        
        # Pad to byte boundary
        padding = (8 - len(encoded_bits) % 8) % 8
        encoded_bits += '0' * padding
        
        encoded_bytes = bytes(int(encoded_bits[i:i+8], 2) for i in range(0, len(encoded_bits), 8))
        
        metadata = {
            'codes': self.codes,
            'padding': padding,
            'original_length': len(data)
        }
        
        return encoded_bytes, metadata
    
    def decode(self, encoded_bytes: bytes, metadata: Dict) -> List[int]:
        """Decode Huffman encoded data."""
        # Create reverse mapping
        reverse_codes = {code: symbol for symbol, code in metadata['codes'].items()}
        
        # Convert bytes to bit string
        bit_string = ''.join(format(byte, '08b') for byte in encoded_bytes)
        
        # Remove padding
        if metadata['padding'] > 0:
            bit_string = bit_string[:-metadata['padding']]
        
        decoded = []
        current_code = ''
        
        for bit in bit_string:
            current_code += bit
            if current_code in reverse_codes:
                decoded.append(reverse_codes[current_code])
                current_code = ''
                if len(decoded) >= metadata['original_length']:
                    break
        
        return decoded


def calculate_entropy(data: List[int]) -> float:
    """Calculate Shannon entropy of data."""
    probabilities = list(Counter(data).values())
    return scipy_entropy(probabilities, base=2) * len(data) / 8  # Convert to bytes


def theoretical_minimum_size(data: List[int]) -> float:
    """Calculate theoretical minimum compressed size in bytes."""
    return calculate_entropy(data)