File size: 4,718 Bytes
06ba7ea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import gzip
import base64
import zlib
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Union, Optional
import hashlib

@dataclass
class CompressedFile:
    """Data class for compressed file information"""
    filename: str
    original_size: int
    compressed_size: int
    compression_ratio: str
    method: str
    md5: str
    base64: str

class FileCompressor:
    """File compression and encoding utility class"""
    
    @staticmethod
    def calculate_md5(data: bytes) -> str:
        """Calculate MD5 hash of data"""
        return hashlib.md5(data).hexdigest()

    @staticmethod
    def compress_and_encode(
        file_path: Union[str, Path], 
        method: str = 'gzip'
    ) -> CompressedFile:
        """
        Compresses a file and encodes it in Base64.
        :param file_path: Path to the file.
        :param method: Compression method ('gzip' or 'zlib').
        :return: A CompressedFile object containing the encoded data and metadata.
        """
        file_path = Path(file_path)
        
        if not file_path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")
        
        with open(file_path, 'rb') as f:
            original_data = f.read()
        
        original_md5 = hashlib.md5(original_data).hexdigest()
        original_size = len(original_data)
        
        if method == 'gzip':
            compressed_data = gzip.compress(
                original_data, 
            )
        elif method == 'zlib':
            compressed_data = zlib.compress(
                original_data, 
            )
        else:
            raise ValueError(f"Unsupported compression method: {method}")
        
        compressed_size = len(compressed_data)
        
        encoded_data = base64.b64encode(compressed_data).decode('utf-8')
        
        return CompressedFile(
            filename=file_path.name,
            original_size=original_size,
            compressed_size=compressed_size,
            compression_ratio=f"{(1 - compressed_size/original_size)*100:.2f}%",
            method=method,
            md5=original_md5,
            base64=encoded_data
        )
    
    @staticmethod
    def decode_and_decompress(
        encoded_file: CompressedFile, 
        output_path: Optional[Union[str, Path]] = None
    ) -> bytes:

        compressed_data = base64.b64decode(encoded_file.base64)
        
        method = encoded_file.method
        if method == 'gzip':
            original_data = gzip.decompress(compressed_data)
        elif method == 'zlib':
            original_data = zlib.decompress(compressed_data)
        else:
            raise ValueError(f"Unsupported compression method: {method}")
        
        decoded_md5 = hashlib.md5(original_data).hexdigest()
        if decoded_md5 != encoded_file.md5:
            raise ValueError("MD5 checksum verification failed — the file may be corrupted.")
        
        if output_path:
            output_path = Path(output_path)
            output_path.parent.mkdir(parents=True, exist_ok=True)
            with open(output_path, 'wb') as f:
                f.write(original_data)
        
        return original_data
    
    @staticmethod
    def save_encoded_to_json(encoded_file: CompressedFile, json_path: Union[str, Path]):
        json_path = Path(json_path)
        json_path.parent.mkdir(parents=True, exist_ok=True)
        
        with open(json_path, 'w', encoding='utf-8') as f:
            json.dump(asdict(encoded_file), f, indent=2, ensure_ascii=False)
    
    @staticmethod
    def load_encoded_from_json(json_path: Union[str, Path]) -> CompressedFile:
        json_path = Path(json_path)
        
        if not json_path.exists():
            raise FileNotFoundError(f"JSON file not found: {json_path}")
        
        with open(json_path, 'r', encoding='utf-8') as f:
            return CompressedFile(**json.load(f))

    @staticmethod
    def decompress_from_string(
        encoded_string: str, 
        output_path: Union[str, Path],
        method: str = 'gzip'
    ) -> bytes:

        compressed_data = base64.b64decode(encoded_string)
        
        if method == 'gzip':
            original_data = gzip.decompress(compressed_data)
        elif method == 'zlib':
            original_data = zlib.decompress(compressed_data)
        else:
            raise ValueError(f"Unsupported compression method: {method}")
        
        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        with open(output_path, 'wb') as f:
            f.write(original_data)
        
        return original_data