import gzip from typing import Union, List, Generator from io import BytesIO def compress_data(data: bytes) -> bytes: """Compress data using gzip and add header""" compressed = gzip.compress(data) header = bytes([0x01]) + len(compressed).to_bytes(4, byteorder="big") return header + compressed def decompress_chunks(data: bytes) -> Generator[tuple[bytes, bytes], None, None]: """Decompress chunked gzip data and yield results as they are processed""" index = 0 status_code_len = 1 magic_gzip = b"\x1F\x8B" while index < len(data): chunk_len_pos = index + status_code_len gzip_pos = chunk_len_pos + 4 if chunk_len_pos + 3 >= len(data): break chunk_length = int.from_bytes(data[chunk_len_pos:gzip_pos], byteorder="big") if data[gzip_pos : gzip_pos + 2] != magic_gzip: index = gzip_pos + 2 continue chunk_end = gzip_pos + chunk_length chunk_data = data[gzip_pos:chunk_end] try: with gzip.GzipFile(fileobj=BytesIO(chunk_data)) as gz: yield (data[index], gz.read()) except Exception as e: print(f"Failed to decompress chunk: {e}") index = chunk_end