File size: 1,288 Bytes
1f1b4db |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
import gzip
from typing import Union, List, Generator
from io import BytesIO
def compress_data(data: bytes) -> bytes:
"""Compress data using gzip and add header"""
compressed = gzip.compress(data)
header = bytes([0x01]) + len(compressed).to_bytes(4, byteorder="big")
return header + compressed
def decompress_chunks(data: bytes) -> Generator[tuple[bytes, bytes], None, None]:
"""Decompress chunked gzip data and yield results as they are processed"""
index = 0
status_code_len = 1
magic_gzip = b"\x1F\x8B"
while index < len(data):
chunk_len_pos = index + status_code_len
gzip_pos = chunk_len_pos + 4
if chunk_len_pos + 3 >= len(data):
break
chunk_length = int.from_bytes(data[chunk_len_pos:gzip_pos], byteorder="big")
if data[gzip_pos : gzip_pos + 2] != magic_gzip:
index = gzip_pos + 2
continue
chunk_end = gzip_pos + chunk_length
chunk_data = data[gzip_pos:chunk_end]
try:
with gzip.GzipFile(fileobj=BytesIO(chunk_data)) as gz:
yield (data[index], gz.read())
except Exception as e:
print(f"Failed to decompress chunk: {e}")
index = chunk_end
|