"""Main processing logic for GGUF splitting""" import logging import pathlib import re import shutil import subprocess import tempfile import time from typing import Optional import gradio as gr from huggingface_hub import HfApi from .config import DOWNLOAD_TIMEOUT, MAX_DOWNLOAD_RETRIES, RUN_LOCALLY from .gguf_utils import calculate_optimal_split_size, split_gguf_file from .hf_utils import ( check_repo_exists, extract_quantization, extract_username, validate_repo_id, ) from .logging_config import setup_logging try: from huggingface_hub.errors import HFValidationError except ImportError: from huggingface_hub.utils import HFValidationError logger = setup_logging() def update_status(status_markdown: str, text: str) -> str: """Update status display""" return f"{status_markdown}\n\n{text}" def _validate_download(gguf_path: pathlib.Path) -> None: """Validate that download completed successfully""" logger.info(f"Validating download: {gguf_path}") if not gguf_path.exists(): logger.error("Download validation failed - file not found") raise Exception("Download failed - file not found") if gguf_path.stat().st_size == 0: logger.error("Download validation failed - file is empty") raise Exception("Download failed - file is empty") logger.info(f"Download validation successful: {gguf_path.stat().st_size} bytes") def save_locally( split_files: list[pathlib.Path], source_repo_id: str, gguf_filename: str, max_size_mb: int, output_dir: pathlib.Path, status_callback=None, ) -> str: """Save split files locally with proper organization""" if status_callback: status_callback(f"๐Ÿ’พ Saving {len(split_files)} split files locally...") model_name = source_repo_id.split("/")[-1] model_name = re.sub(r"-?GGUF$", "", model_name, flags=re.IGNORECASE) model_name = model_name.rstrip("-") sharded_dir_name = f"{model_name}-sharded" sharded_path = output_dir / sharded_dir_name sharded_path.mkdir(exist_ok=True) for split_file in split_files: destination = sharded_path / split_file.name shutil.move(str(split_file), str(destination)) readme_content = _generate_readme_content( source_repo_id, len(split_files), max_size_mb ) readme_path = sharded_path / "README.md" readme_path.write_text(readme_content) if status_callback: status_callback(f"โœ… Files saved to {sharded_path}") return str(sharded_path) def _cleanup_temp_files( gguf_path: pathlib.Path, split_files: list[pathlib.Path], sharded_path: Optional[pathlib.Path] = None, ) -> None: """Clean up temporary files""" logger.info("Cleaning up temporary files") if gguf_path.exists(): try: gguf_path.unlink() logger.info(f"Removed original GGUF file: {gguf_path}") except (OSError, PermissionError) as e: logger.warning(f"Failed to remove GGUF file {gguf_path}: {e}") for split_file in split_files: if split_file.exists(): try: split_file.unlink() logger.info(f"Removed split file: {split_file}") except (OSError, PermissionError) as e: logger.warning(f"Failed to remove split file {split_file}: {e}") if sharded_path and sharded_path.exists(): try: shutil.rmtree(sharded_path) logger.info(f"Removed sharded directory: {sharded_path}") except (OSError, PermissionError) as e: logger.warning(f"Failed to remove sharded directory {sharded_path}: {e}") logger.info("Temporary file cleanup completed") def upload_to_hf( split_files: list[pathlib.Path], repo_name: str, source_repo_id: str, oauth_token: gr.OAuthToken, max_size_mb: int, make_public: bool, status_callback=None, ) -> str: """Upload split files to Hugging Face Hub""" if status_callback: status_callback( f"โ˜๏ธ Uploading {len(split_files)} split files to Hugging Face..." ) api = HfApi(token=oauth_token.token) try: repo_url = api.create_repo(repo_name, private=not make_public, exist_ok=True) if status_callback: status_callback(f"โœ… Repository created: {repo_url}") except Exception as e: if status_callback: status_callback(f"โŒ Failed to create repository: {str(e)}") return "" try: for i, split_file in enumerate(split_files): if status_callback: status_callback( f"๐Ÿ“ค Uploading file {i + 1}/{len(split_files)}: {split_file.name}" ) api.upload_file( path_or_fileobj=str(split_file), path_in_repo=split_file.name, repo_id=repo_name, ) readme_content = _generate_readme_content( source_repo_id, len(split_files), max_size_mb ) if status_callback: status_callback("๐Ÿ“„ Uploading README.md...") api.upload_file( path_or_fileobj=readme_content.encode(), path_in_repo="README.md", repo_id=repo_name, ) if status_callback: status_callback( f"โœ… Upload completed! Repository: https://huggingface.co/{repo_name}" ) return f"https://huggingface.co/{repo_name}" except Exception as e: if status_callback: status_callback(f"โŒ Upload failed: {str(e)}") return "" def _download_with_retry( gguf_url: str, gguf_path: pathlib.Path, max_retries: int = MAX_DOWNLOAD_RETRIES, status_callback=None, ) -> bool: """Download file with retry logic and exponential backoff""" logger.info(f"Starting download attempt for {gguf_url}") for attempt in range(max_retries): try: download_cmd = [ "curl", "-L", "--fail", "--max-time", str(DOWNLOAD_TIMEOUT), "--progress-bar", gguf_url, "-o", str(gguf_path), ] if status_callback: status_callback( f"๐Ÿ“ฅ Downloading file (attempt {attempt + 1}/{max_retries})..." ) subprocess.run(download_cmd, check=True, capture_output=False) if gguf_path.exists() and gguf_path.stat().st_size > 0: logger.info(f"Download successful: {gguf_path.stat().st_size} bytes") if status_callback: status_callback( f"โœ… Download successful ({gguf_path.stat().st_size} bytes)" ) return True else: raise Exception("Download validation failed - file empty or missing") except subprocess.CalledProcessError as e: logger.error(f"Download attempt {attempt + 1} failed: {e}") if status_callback: status_callback(f"โš ๏ธ Download attempt {attempt + 1} failed, retrying...") if attempt < max_retries - 1: wait_time = 2**attempt logger.info(f"Waiting {wait_time}s before retry") if status_callback: status_callback(f"โณ Waiting {wait_time}s before retry...") time.sleep(wait_time) except Exception as e: logger.error(f"Download validation failed: {e}") if status_callback: status_callback(f"โš ๏ธ Download validation failed: {e}") logger.error("All download attempts failed") if status_callback: status_callback("โŒ All download attempts failed") return False def _generate_readme_content( source_repo_id: str, split_count: int, max_size_mb: int ) -> str: """Generate README content for split GGUF files""" return f"""--- base_model: {source_repo_id} --- Sharded GGUF version of [{source_repo_id}](https://huggingface.co/{source_repo_id}). """ def process_split_request( repo_name: str, source_repo_id: str, gguf_filename: str, make_public: bool, oauth_token: Optional[gr.OAuthToken], status_display: str, ) -> str: """Main function to handle GGUF splitting and upload/save""" gguf_path: Optional[pathlib.Path] = None split_files: list[pathlib.Path] = [] sharded_path: Optional[pathlib.Path] = None tmp_path: Optional[pathlib.Path] = None api: Optional[HfApi] = None username: Optional[str] = None output_dir: Optional[pathlib.Path] = None try: if not RUN_LOCALLY and not oauth_token: return update_status(status_display, "โŒ Please sign in first.") if not source_repo_id or not gguf_filename: return update_status( status_display, "โŒ Please select a model and GGUF file." ) if not RUN_LOCALLY and (not repo_name or not repo_name.strip()): return update_status(status_display, "โŒ Please provide a repository name.") try: validate_repo_id(source_repo_id) except HFValidationError as e: return update_status( status_display, f"โŒ Invalid source repository ID format: {str(e)}" ) status_display = update_status( status_display, "โณ Validating repository exists..." ) try: if RUN_LOCALLY: temp_api = HfApi() elif oauth_token is not None: temp_api = HfApi(token=oauth_token.token) else: return update_status(status_display, "Please sign in first.") if not check_repo_exists(source_repo_id, temp_api): return update_status( status_display, "Source repository does not exist or is not accessible.", ) except Exception: return update_status( status_display, "Unable to verify repository existence." ) if not RUN_LOCALLY: if oauth_token is None: return update_status(status_display, "Please sign in first.") api = HfApi(token=oauth_token.token) user_info = api.whoami() username = extract_username(user_info) if not username: return update_status( status_display, "โŒ Unable to determine your Hugging Face username." ) status_display = update_status(status_display, "โณ Downloading GGUF file...") if RUN_LOCALLY: output_dir = pathlib.Path("./output") output_dir.mkdir(exist_ok=True) tmp_path = output_dir / f"temp_{int(time.time())}" tmp_path.mkdir(exist_ok=True) else: tmp_dir = tempfile.mkdtemp() tmp_path = pathlib.Path(tmp_dir) def status_callback(message: str) -> None: nonlocal status_display status_display = update_status(status_display, message) logger.info(f"Status update: {message}") gguf_url = ( f"https://huggingface.co/{source_repo_id}/resolve/main/{gguf_filename}" ) gguf_path = tmp_path / gguf_filename if not _download_with_retry( gguf_url, gguf_path, status_callback=status_callback ): return update_status(status_display, "โŒ Failed to download GGUF file") _validate_download(gguf_path) status_callback("โณ Calculating optimal split size...") model_name = source_repo_id.split("/")[-1] model_name = re.sub(r"-?GGUF$", "", model_name, flags=re.IGNORECASE) model_name = model_name.rstrip("-") quantization = extract_quantization(gguf_filename) output_prefix = tmp_path / f"{model_name}-{quantization}" max_size_mb = calculate_optimal_split_size(str(gguf_path), str(output_prefix)) status_callback(f"โณ Splitting GGUF file with max size {max_size_mb}M...") output_pattern = tmp_path / f"{model_name}-{quantization}" if not split_gguf_file( str(gguf_path), str(output_pattern), max_size_mb, status_callback=status_callback, ): return update_status(status_display, "โŒ Failed to split GGUF file") split_files = list(tmp_path.glob(f"{model_name}-{quantization}-*.gguf")) if not split_files: return update_status(status_display, "โŒ No split files generated") if RUN_LOCALLY: if output_dir is None: return update_status(status_display, "โŒ Output directory error.") status_display = update_status( status_display, f"โณ Saving {len(split_files)} split files locally..." ) quantization = extract_quantization(gguf_filename) final_output_dir = output_dir / f"{model_name}_{quantization}" final_output_dir.mkdir(exist_ok=True) for split_file in split_files: target = final_output_dir / split_file.name shutil.copy2(split_file, target) readme_content = _generate_readme_content( source_repo_id, len(split_files), max_size_mb ) with open(final_output_dir / "README.md", "w") as f: f.write(readme_content) _cleanup_temp_files(gguf_path, split_files) tmp_path.rmdir() success_message = f"""โœ… GGUF file split and saved locally! ๐Ÿ“‚ Output directory: {final_output_dir.absolute()} ๐Ÿ” Created {len(split_files)} split files with max size {max_size_mb}M each ๐Ÿ’พ Files saved to local disk. """ return update_status(status_display, success_message) else: if api is None or username is None or oauth_token is None: return update_status(status_display, "โŒ Authentication error.") status_display = update_status( status_display, "โณ Creating new repository..." ) if "/" in repo_name: new_repo_name = repo_name else: new_repo_name = f"{username}/{repo_name}" api.create_repo( repo_id=new_repo_name, repo_type="model", exist_ok=True, private=not make_public, ) status_display = update_status( status_display, f"โณ Uploading {len(split_files)} split files..." ) sharded_path = tmp_path / "sharded" sharded_path.mkdir(exist_ok=True) for split_file in split_files: target = sharded_path / split_file.name target.symlink_to(split_file) api.upload_file( path_or_fileobj=str(split_file), path_in_repo=split_file.name, repo_id=new_repo_name, token=oauth_token.token, ) status_display = update_status(status_display, "โณ Creating README...") readme_content = _generate_readme_content( source_repo_id, len(split_files), max_size_mb ) api.upload_file( path_or_fileobj=readme_content.encode(), path_in_repo="README.md", repo_id=new_repo_name, token=oauth_token.token, ) status_display = update_status( status_display, "๐Ÿงน Cleaning up temporary files..." ) _cleanup_temp_files(gguf_path, split_files, sharded_path) success_message = f"""โœ… GGUF file split and uploaded successfully! ๐Ÿ“‚ New repository: https://huggingface.co/{new_repo_name} ๐Ÿ” Created {len(split_files)} split files with max size {max_size_mb}M each ๐Ÿงน Temporary files cleaned up successfully! """ return update_status(status_display, success_message) except Exception as e: return update_status(status_display, f"โŒ Error: {str(e)}") finally: if tmp_path and tmp_path.exists() and not RUN_LOCALLY: try: shutil.rmtree(tmp_path) except Exception: pass