Spaces:
Sleeping
Sleeping
| import heapq | |
| from functools import total_ordering | |
| """ | |
| Code for Huffman Coding, compression and decompression. | |
| Explanation at http://bhrigu.me/blog/2017/01/17/huffman-coding-python-implementation/ | |
| Adapted from https://github.com/bhrigu123/huffman-coding | |
| """ | |
| class HeapNode: | |
| def __init__(self, token, freq): | |
| self.token = token | |
| self.freq = freq | |
| self.left = None | |
| self.right = None | |
| # defining comparators less_than and equals | |
| def __lt__(self, other): | |
| return self.freq < other.freq | |
| def __eq__(self, other): | |
| if(other == None): | |
| return False | |
| if(not isinstance(other, HeapNode)): | |
| return False | |
| return self.freq == other.freq | |
| class HuffmanCoding: | |
| def __init__(self): | |
| self.heap = [] | |
| self.codes = {} | |
| self.reverse_mapping = {} | |
| # functions for compression: | |
| def make_heap(self, frequency): | |
| for key in frequency: | |
| node = HeapNode(key, frequency[key]) | |
| heapq.heappush(self.heap, node) | |
| def make_heap_from_array(self, freqs): | |
| for index in range(len(freqs)): | |
| node = HeapNode(index, freqs[index]) | |
| heapq.heappush(self.heap, node) | |
| def merge_nodes(self): | |
| while(len(self.heap)>1): | |
| node1 = heapq.heappop(self.heap) | |
| node2 = heapq.heappop(self.heap) | |
| merged = HeapNode(None, node1.freq + node2.freq) | |
| merged.left = node1 | |
| merged.right = node2 | |
| heapq.heappush(self.heap, merged) | |
| def make_codes_helper(self, root, current_code): | |
| if(root == None): | |
| return | |
| if(root.token != None): | |
| self.codes[root.token] = current_code | |
| self.reverse_mapping[current_code] = root.token | |
| return | |
| self.make_codes_helper(root.left, current_code + "0") | |
| self.make_codes_helper(root.right, current_code + "1") | |
| def make_codes(self): | |
| root = heapq.heappop(self.heap) | |
| current_code = "" | |
| self.make_codes_helper(root, current_code) | |
| return root | |
| def get_encoded_tokens(self, token_list): | |
| encoded_text = "" | |
| for token in token_list: | |
| encoded_text += self.codes[token] | |
| return encoded_text | |
| def decode_text(self, encoded_text): | |
| current_code = "" | |
| decoded_text = "" | |
| for bit in encoded_text: | |
| current_code += bit | |
| if(current_code in self.reverse_mapping): | |
| character = self.reverse_mapping[current_code] | |
| decoded_text += character | |
| current_code = "" | |
| return decoded_text | |
| def decompress(self, input_path): | |
| filename, file_extension = os.path.splitext(self.path) | |
| output_path = filename + "_decompressed" + ".txt" | |
| with open(input_path, 'rb') as file, open(output_path, 'w') as output: | |
| bit_string = "" | |
| byte = file.read(1) | |
| while(len(byte) > 0): | |
| byte = ord(byte) | |
| bits = bin(byte)[2:].rjust(8, '0') | |
| bit_string += bits | |
| byte = file.read(1) | |
| encoded_text = self.remove_padding(bit_string) | |
| decompressed_text = self.decode_text(encoded_text) | |
| output.write(decompressed_text) | |
| return output_path | |