| | import os |
| | import subprocess |
| | import sys |
| | import argparse |
| | import time |
| |
|
| | sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) |
| |
|
| | KB = 1024 |
| | MB = 1024 * 1024 |
| | GB = 1024 * 1024 * 1024 |
| |
|
| | RED = "\033[91m" |
| | YELLOW = "\033[93m" |
| | GREEN = "\033[92m" |
| | RESET = "\033[0m" |
| |
|
| | def check_and_install_zipnn(): |
| | try: |
| | import zipnn |
| | except ImportError: |
| | print("zipnn not found. Installing...") |
| | subprocess.check_call( |
| | [ |
| | sys.executable, |
| | "-m", |
| | "pip", |
| | "install", |
| | "zipnn", |
| | "--upgrade", |
| | ] |
| | ) |
| | import zipnn |
| |
|
| |
|
| | def parse_streaming_chunk_size( |
| | streaming_chunk_size, |
| | ): |
| | if str(streaming_chunk_size).isdigit(): |
| | final = int(streaming_chunk_size) |
| | else: |
| | size_value = int(streaming_chunk_size[:-2]) |
| | size_unit = streaming_chunk_size[-2].lower() |
| |
|
| | if size_unit == "k": |
| | final = KB * size_value |
| | elif size_unit == "m": |
| | final = MB * size_value |
| | elif size_unit == "g": |
| | final = GB * size_value |
| | else: |
| | raise ValueError(f"Invalid size unit: {size_unit}. Use 'k', 'm', or 'g'.") |
| |
|
| | return final |
| |
|
| |
|
| | def compress_file( |
| | input_file, |
| | dtype="", |
| | streaming_chunk_size=1048576, |
| | delete=False, |
| | force=False, |
| | hf_cache=False, |
| | ): |
| | import zipnn |
| |
|
| | streaming_chunk_size = parse_streaming_chunk_size(streaming_chunk_size) |
| | full_path = input_file |
| | if not os.path.exists(full_path): |
| | print(f"{RED}File not found{RESET}") |
| | return |
| |
|
| | compressed_path = full_path + ".znn" |
| | if not force and os.path.exists(compressed_path): |
| | user_input = ( |
| | input(f"{compressed_path} already exists; overwrite (y/n)? ").strip().lower() |
| | ) |
| | if user_input not in ("yes", "y"): |
| | print(f"Skipping {full_path}...") |
| | return |
| | print(f"Compressing {full_path}...") |
| | |
| | output_file = input_file + ".znn" |
| | if dtype: |
| | zpn = zipnn.ZipNN( |
| | bytearray_dtype="float32", |
| | is_streaming=True, |
| | streaming_chunk_kb=streaming_chunk_size, |
| | ) |
| | else: |
| | zpn = zipnn.ZipNN( |
| | is_streaming=True, |
| | streaming_chunk_kb=streaming_chunk_size, |
| | ) |
| | file_size_before = 0 |
| | file_size_after = 0 |
| | start_time = time.time() |
| | with open(input_file, "rb") as infile, open(output_file, "wb") as outfile: |
| | chunk = infile.read() |
| | file_size_before += len(chunk) |
| | compressed_chunk = zpn.compress(chunk) |
| | if compressed_chunk: |
| | file_size_after += len(compressed_chunk) |
| | outfile.write(compressed_chunk) |
| | end_time = time.time() - start_time |
| | print(f"Compressed {input_file} to {output_file}") |
| | print( |
| | f"{GREEN}Original size: {file_size_before/GB:.02f}GB size after compression: {file_size_after/GB:.02f}GB, Remaining size is {file_size_after/file_size_before*100:.02f}% of original, time: {end_time:.02f}{RESET}" |
| | ) |
| |
|
| | if delete and not hf_cache: |
| | print(f"Deleting {full_path}...") |
| | os.remove(full_path) |
| |
|
| | if hf_cache: |
| | |
| | print(f"{YELLOW}Reorganizing Hugging Face cache...{RESET}") |
| | try: |
| | snapshot_path = os.path.dirname(input_file) |
| | blob_name = os.path.join(snapshot_path, os.readlink(input_file)) |
| | os.rename(output_file, blob_name) |
| | os.symlink(blob_name, output_file) |
| | if os.path.exists(input_file): |
| | os.remove(input_file) |
| | except Exception as e: |
| | raise Exception(f"Error reorganizing Hugging Face cache: {e}") |
| |
|
| | if __name__ == "__main__": |
| | if len(sys.argv) < 2: |
| | print("Usage: python compress_files.py <suffix>") |
| | print("Example: python compress_files.py 'safetensors'") |
| | sys.exit(1) |
| |
|
| | parser = argparse.ArgumentParser(description="Enter a file path to compress.") |
| | parser.add_argument( |
| | "input_file", |
| | type=str, |
| | help="Specify the path to the file to compress.", |
| | ) |
| | parser.add_argument( |
| | "--float32", |
| | action="store_true", |
| | help="A flag that triggers float32 compression", |
| | ) |
| | parser.add_argument( |
| | "--streaming_chunk_size", |
| | type=str, |
| | help="An optional streaming chunk size. The format is int (for size in Bytes) or int+KB/MB/GB. Default is 1MB", |
| | ) |
| | parser.add_argument( |
| | "--delete", |
| | action="store_true", |
| | help="A flag that triggers deletion of a single file instead of compression", |
| | ) |
| | parser.add_argument( |
| | "--force", |
| | action="store_true", |
| | help="A flag that forces overwriting when compressing.", |
| | ) |
| | parser.add_argument( |
| | "--hf_cache", |
| | action="store_true", |
| | help="A flag that indicates if the file is in the Hugging Face cache.", |
| | ) |
| | args = parser.parse_args() |
| | optional_kwargs = {} |
| | if args.float32: |
| | optional_kwargs["dtype"] = 32 |
| | if args.streaming_chunk_size is not None: |
| | optional_kwargs["streaming_chunk_size"] = args.streaming_chunk_size |
| | if args.delete: |
| | optional_kwargs["delete"] = args.delete |
| | if args.force: |
| | optional_kwargs["force"] = args.force |
| | if args.hf_cache: |
| | optional_kwargs["hf_cache"] = args.hf_cache |
| |
|
| | check_and_install_zipnn() |
| | compress_file(args.input_file, **optional_kwargs) |
| |
|