| import gzip | |
| from typing import Union, List, Generator | |
| from io import BytesIO | |
| def compress_data(data: bytes) -> bytes: | |
| """Compress data using gzip and add header""" | |
| compressed = gzip.compress(data) | |
| header = bytes([0x01]) + len(compressed).to_bytes(4, byteorder="big") | |
| return header + compressed | |
| def decompress_chunks(data: bytes) -> Generator[tuple[bytes, bytes], None, None]: | |
| """Decompress chunked gzip data and yield results as they are processed""" | |
| index = 0 | |
| status_code_len = 1 | |
| magic_gzip = b"\x1F\x8B" | |
| while index < len(data): | |
| chunk_len_pos = index + status_code_len | |
| gzip_pos = chunk_len_pos + 4 | |
| if chunk_len_pos + 3 >= len(data): | |
| break | |
| chunk_length = int.from_bytes(data[chunk_len_pos:gzip_pos], byteorder="big") | |
| if data[gzip_pos : gzip_pos + 2] != magic_gzip: | |
| index = gzip_pos + 2 | |
| continue | |
| chunk_end = gzip_pos + chunk_length | |
| chunk_data = data[gzip_pos:chunk_end] | |
| try: | |
| with gzip.GzipFile(fileobj=BytesIO(chunk_data)) as gz: | |
| yield (data[index], gz.read()) | |
| except Exception as e: | |
| print(f"Failed to decompress chunk: {e}") | |
| index = chunk_end | |