File size: 4,718 Bytes
06ba7ea | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | #!/usr/bin/env python3
# -*- coding: utf-8 -*-
import gzip
import base64
import zlib
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Union, Optional
import hashlib
@dataclass
class CompressedFile:
"""Data class for compressed file information"""
filename: str
original_size: int
compressed_size: int
compression_ratio: str
method: str
md5: str
base64: str
class FileCompressor:
"""File compression and encoding utility class"""
@staticmethod
def calculate_md5(data: bytes) -> str:
"""Calculate MD5 hash of data"""
return hashlib.md5(data).hexdigest()
@staticmethod
def compress_and_encode(
file_path: Union[str, Path],
method: str = 'gzip'
) -> CompressedFile:
"""
Compresses a file and encodes it in Base64.
:param file_path: Path to the file.
:param method: Compression method ('gzip' or 'zlib').
:return: A CompressedFile object containing the encoded data and metadata.
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
with open(file_path, 'rb') as f:
original_data = f.read()
original_md5 = hashlib.md5(original_data).hexdigest()
original_size = len(original_data)
if method == 'gzip':
compressed_data = gzip.compress(
original_data,
)
elif method == 'zlib':
compressed_data = zlib.compress(
original_data,
)
else:
raise ValueError(f"Unsupported compression method: {method}")
compressed_size = len(compressed_data)
encoded_data = base64.b64encode(compressed_data).decode('utf-8')
return CompressedFile(
filename=file_path.name,
original_size=original_size,
compressed_size=compressed_size,
compression_ratio=f"{(1 - compressed_size/original_size)*100:.2f}%",
method=method,
md5=original_md5,
base64=encoded_data
)
@staticmethod
def decode_and_decompress(
encoded_file: CompressedFile,
output_path: Optional[Union[str, Path]] = None
) -> bytes:
compressed_data = base64.b64decode(encoded_file.base64)
method = encoded_file.method
if method == 'gzip':
original_data = gzip.decompress(compressed_data)
elif method == 'zlib':
original_data = zlib.decompress(compressed_data)
else:
raise ValueError(f"Unsupported compression method: {method}")
decoded_md5 = hashlib.md5(original_data).hexdigest()
if decoded_md5 != encoded_file.md5:
raise ValueError("MD5 checksum verification failed — the file may be corrupted.")
if output_path:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'wb') as f:
f.write(original_data)
return original_data
@staticmethod
def save_encoded_to_json(encoded_file: CompressedFile, json_path: Union[str, Path]):
json_path = Path(json_path)
json_path.parent.mkdir(parents=True, exist_ok=True)
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(asdict(encoded_file), f, indent=2, ensure_ascii=False)
@staticmethod
def load_encoded_from_json(json_path: Union[str, Path]) -> CompressedFile:
json_path = Path(json_path)
if not json_path.exists():
raise FileNotFoundError(f"JSON file not found: {json_path}")
with open(json_path, 'r', encoding='utf-8') as f:
return CompressedFile(**json.load(f))
@staticmethod
def decompress_from_string(
encoded_string: str,
output_path: Union[str, Path],
method: str = 'gzip'
) -> bytes:
compressed_data = base64.b64decode(encoded_string)
if method == 'gzip':
original_data = gzip.decompress(compressed_data)
elif method == 'zlib':
original_data = zlib.decompress(compressed_data)
else:
raise ValueError(f"Unsupported compression method: {method}")
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'wb') as f:
f.write(original_data)
return original_data |