xusijie
Clean branch for HF push
06ba7ea
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import gzip
import base64
import zlib
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Union, Optional
import hashlib
@dataclass
class CompressedFile:
"""Data class for compressed file information"""
filename: str
original_size: int
compressed_size: int
compression_ratio: str
method: str
md5: str
base64: str
class FileCompressor:
"""File compression and encoding utility class"""
@staticmethod
def calculate_md5(data: bytes) -> str:
"""Calculate MD5 hash of data"""
return hashlib.md5(data).hexdigest()
@staticmethod
def compress_and_encode(
file_path: Union[str, Path],
method: str = 'gzip'
) -> CompressedFile:
"""
Compresses a file and encodes it in Base64.
:param file_path: Path to the file.
:param method: Compression method ('gzip' or 'zlib').
:return: A CompressedFile object containing the encoded data and metadata.
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
with open(file_path, 'rb') as f:
original_data = f.read()
original_md5 = hashlib.md5(original_data).hexdigest()
original_size = len(original_data)
if method == 'gzip':
compressed_data = gzip.compress(
original_data,
)
elif method == 'zlib':
compressed_data = zlib.compress(
original_data,
)
else:
raise ValueError(f"Unsupported compression method: {method}")
compressed_size = len(compressed_data)
encoded_data = base64.b64encode(compressed_data).decode('utf-8')
return CompressedFile(
filename=file_path.name,
original_size=original_size,
compressed_size=compressed_size,
compression_ratio=f"{(1 - compressed_size/original_size)*100:.2f}%",
method=method,
md5=original_md5,
base64=encoded_data
)
@staticmethod
def decode_and_decompress(
encoded_file: CompressedFile,
output_path: Optional[Union[str, Path]] = None
) -> bytes:
compressed_data = base64.b64decode(encoded_file.base64)
method = encoded_file.method
if method == 'gzip':
original_data = gzip.decompress(compressed_data)
elif method == 'zlib':
original_data = zlib.decompress(compressed_data)
else:
raise ValueError(f"Unsupported compression method: {method}")
decoded_md5 = hashlib.md5(original_data).hexdigest()
if decoded_md5 != encoded_file.md5:
raise ValueError("MD5 checksum verification failed — the file may be corrupted.")
if output_path:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'wb') as f:
f.write(original_data)
return original_data
@staticmethod
def save_encoded_to_json(encoded_file: CompressedFile, json_path: Union[str, Path]):
json_path = Path(json_path)
json_path.parent.mkdir(parents=True, exist_ok=True)
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(asdict(encoded_file), f, indent=2, ensure_ascii=False)
@staticmethod
def load_encoded_from_json(json_path: Union[str, Path]) -> CompressedFile:
json_path = Path(json_path)
if not json_path.exists():
raise FileNotFoundError(f"JSON file not found: {json_path}")
with open(json_path, 'r', encoding='utf-8') as f:
return CompressedFile(**json.load(f))
@staticmethod
def decompress_from_string(
encoded_string: str,
output_path: Union[str, Path],
method: str = 'gzip'
) -> bytes:
compressed_data = base64.b64decode(encoded_string)
if method == 'gzip':
original_data = gzip.decompress(compressed_data)
elif method == 'zlib':
original_data = zlib.decompress(compressed_data)
else:
raise ValueError(f"Unsupported compression method: {method}")
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'wb') as f:
f.write(original_data)
return original_data