Spaces:
Sleeping
Sleeping
| """Main processing logic for GGUF splitting""" | |
| import logging | |
| import pathlib | |
| import re | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import time | |
| from typing import Optional | |
| import gradio as gr | |
| from huggingface_hub import HfApi | |
| from .config import DOWNLOAD_TIMEOUT, MAX_DOWNLOAD_RETRIES, RUN_LOCALLY | |
| from .gguf_utils import calculate_optimal_split_size, split_gguf_file | |
| from .hf_utils import ( | |
| check_repo_exists, | |
| extract_quantization, | |
| extract_username, | |
| validate_repo_id, | |
| ) | |
| from .logging_config import setup_logging | |
| try: | |
| from huggingface_hub.errors import HFValidationError | |
| except ImportError: | |
| from huggingface_hub.utils import HFValidationError | |
| logger = setup_logging() | |
| def update_status(status_markdown: str, text: str) -> str: | |
| """Update status display""" | |
| return f"{status_markdown}\n\n{text}" | |
| def _validate_download(gguf_path: pathlib.Path) -> None: | |
| """Validate that download completed successfully""" | |
| logger.info(f"Validating download: {gguf_path}") | |
| if not gguf_path.exists(): | |
| logger.error("Download validation failed - file not found") | |
| raise Exception("Download failed - file not found") | |
| if gguf_path.stat().st_size == 0: | |
| logger.error("Download validation failed - file is empty") | |
| raise Exception("Download failed - file is empty") | |
| logger.info(f"Download validation successful: {gguf_path.stat().st_size} bytes") | |
| def save_locally( | |
| split_files: list[pathlib.Path], | |
| source_repo_id: str, | |
| gguf_filename: str, | |
| max_size_mb: int, | |
| output_dir: pathlib.Path, | |
| status_callback=None, | |
| ) -> str: | |
| """Save split files locally with proper organization""" | |
| if status_callback: | |
| status_callback(f"💾 Saving {len(split_files)} split files locally...") | |
| model_name = source_repo_id.split("/")[-1] | |
| model_name = re.sub(r"-?GGUF$", "", model_name, flags=re.IGNORECASE) | |
| model_name = model_name.rstrip("-") | |
| sharded_dir_name = f"{model_name}-sharded" | |
| sharded_path = output_dir / sharded_dir_name | |
| sharded_path.mkdir(exist_ok=True) | |
| for split_file in split_files: | |
| destination = sharded_path / split_file.name | |
| shutil.move(str(split_file), str(destination)) | |
| readme_content = _generate_readme_content( | |
| source_repo_id, len(split_files), max_size_mb | |
| ) | |
| readme_path = sharded_path / "README.md" | |
| readme_path.write_text(readme_content) | |
| if status_callback: | |
| status_callback(f"✅ Files saved to {sharded_path}") | |
| return str(sharded_path) | |
| def _cleanup_temp_files( | |
| gguf_path: pathlib.Path, | |
| split_files: list[pathlib.Path], | |
| sharded_path: Optional[pathlib.Path] = None, | |
| ) -> None: | |
| """Clean up temporary files""" | |
| logger.info("Cleaning up temporary files") | |
| if gguf_path.exists(): | |
| try: | |
| gguf_path.unlink() | |
| logger.info(f"Removed original GGUF file: {gguf_path}") | |
| except (OSError, PermissionError) as e: | |
| logger.warning(f"Failed to remove GGUF file {gguf_path}: {e}") | |
| for split_file in split_files: | |
| if split_file.exists(): | |
| try: | |
| split_file.unlink() | |
| logger.info(f"Removed split file: {split_file}") | |
| except (OSError, PermissionError) as e: | |
| logger.warning(f"Failed to remove split file {split_file}: {e}") | |
| if sharded_path and sharded_path.exists(): | |
| try: | |
| shutil.rmtree(sharded_path) | |
| logger.info(f"Removed sharded directory: {sharded_path}") | |
| except (OSError, PermissionError) as e: | |
| logger.warning(f"Failed to remove sharded directory {sharded_path}: {e}") | |
| logger.info("Temporary file cleanup completed") | |
| def upload_to_hf( | |
| split_files: list[pathlib.Path], | |
| repo_name: str, | |
| source_repo_id: str, | |
| oauth_token: gr.OAuthToken, | |
| max_size_mb: int, | |
| make_public: bool, | |
| status_callback=None, | |
| ) -> str: | |
| """Upload split files to Hugging Face Hub""" | |
| if status_callback: | |
| status_callback( | |
| f"☁️ Uploading {len(split_files)} split files to Hugging Face..." | |
| ) | |
| api = HfApi(token=oauth_token.token) | |
| try: | |
| repo_url = api.create_repo(repo_name, private=not make_public, exist_ok=True) | |
| if status_callback: | |
| status_callback(f"✅ Repository created: {repo_url}") | |
| except Exception as e: | |
| if status_callback: | |
| status_callback(f"❌ Failed to create repository: {str(e)}") | |
| return "" | |
| try: | |
| for i, split_file in enumerate(split_files): | |
| if status_callback: | |
| status_callback( | |
| f"📤 Uploading file {i + 1}/{len(split_files)}: {split_file.name}" | |
| ) | |
| api.upload_file( | |
| path_or_fileobj=str(split_file), | |
| path_in_repo=split_file.name, | |
| repo_id=repo_name, | |
| ) | |
| readme_content = _generate_readme_content( | |
| source_repo_id, len(split_files), max_size_mb | |
| ) | |
| if status_callback: | |
| status_callback("📄 Uploading README.md...") | |
| api.upload_file( | |
| path_or_fileobj=readme_content.encode(), | |
| path_in_repo="README.md", | |
| repo_id=repo_name, | |
| ) | |
| if status_callback: | |
| status_callback( | |
| f"✅ Upload completed! Repository: https://huggingface.co/{repo_name}" | |
| ) | |
| return f"https://huggingface.co/{repo_name}" | |
| except Exception as e: | |
| if status_callback: | |
| status_callback(f"❌ Upload failed: {str(e)}") | |
| return "" | |
| def _download_with_retry( | |
| gguf_url: str, | |
| gguf_path: pathlib.Path, | |
| max_retries: int = MAX_DOWNLOAD_RETRIES, | |
| status_callback=None, | |
| ) -> bool: | |
| """Download file with retry logic and exponential backoff""" | |
| logger.info(f"Starting download attempt for {gguf_url}") | |
| for attempt in range(max_retries): | |
| try: | |
| download_cmd = [ | |
| "curl", | |
| "-L", | |
| "--fail", | |
| "--max-time", | |
| str(DOWNLOAD_TIMEOUT), | |
| "--progress-bar", | |
| gguf_url, | |
| "-o", | |
| str(gguf_path), | |
| ] | |
| if status_callback: | |
| status_callback( | |
| f"📥 Downloading file (attempt {attempt + 1}/{max_retries})..." | |
| ) | |
| subprocess.run(download_cmd, check=True, capture_output=False) | |
| if gguf_path.exists() and gguf_path.stat().st_size > 0: | |
| logger.info(f"Download successful: {gguf_path.stat().st_size} bytes") | |
| if status_callback: | |
| status_callback( | |
| f"✅ Download successful ({gguf_path.stat().st_size} bytes)" | |
| ) | |
| return True | |
| else: | |
| raise Exception("Download validation failed - file empty or missing") | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Download attempt {attempt + 1} failed: {e}") | |
| if status_callback: | |
| status_callback(f"⚠️ Download attempt {attempt + 1} failed, retrying...") | |
| if attempt < max_retries - 1: | |
| wait_time = 2**attempt | |
| logger.info(f"Waiting {wait_time}s before retry") | |
| if status_callback: | |
| status_callback(f"⏳ Waiting {wait_time}s before retry...") | |
| time.sleep(wait_time) | |
| except Exception as e: | |
| logger.error(f"Download validation failed: {e}") | |
| if status_callback: | |
| status_callback(f"⚠️ Download validation failed: {e}") | |
| logger.error("All download attempts failed") | |
| if status_callback: | |
| status_callback("❌ All download attempts failed") | |
| return False | |
| def _generate_readme_content( | |
| source_repo_id: str, split_count: int, max_size_mb: int | |
| ) -> str: | |
| """Generate README content for split GGUF files""" | |
| return f"""--- | |
| base_model: {source_repo_id} | |
| --- | |
| Sharded GGUF version of [{source_repo_id}](https://huggingface.co/{source_repo_id}). | |
| """ | |
| def process_split_request( | |
| repo_name: str, | |
| source_repo_id: str, | |
| gguf_filename: str, | |
| make_public: bool, | |
| oauth_token: Optional[gr.OAuthToken], | |
| status_display: str, | |
| ) -> str: | |
| """Main function to handle GGUF splitting and upload/save""" | |
| gguf_path: Optional[pathlib.Path] = None | |
| split_files: list[pathlib.Path] = [] | |
| sharded_path: Optional[pathlib.Path] = None | |
| tmp_path: Optional[pathlib.Path] = None | |
| api: Optional[HfApi] = None | |
| username: Optional[str] = None | |
| output_dir: Optional[pathlib.Path] = None | |
| try: | |
| if not RUN_LOCALLY and not oauth_token: | |
| return update_status(status_display, "❌ Please sign in first.") | |
| if not source_repo_id or not gguf_filename: | |
| return update_status( | |
| status_display, "❌ Please select a model and GGUF file." | |
| ) | |
| if not RUN_LOCALLY and (not repo_name or not repo_name.strip()): | |
| return update_status(status_display, "❌ Please provide a repository name.") | |
| try: | |
| validate_repo_id(source_repo_id) | |
| except HFValidationError as e: | |
| return update_status( | |
| status_display, f"❌ Invalid source repository ID format: {str(e)}" | |
| ) | |
| status_display = update_status( | |
| status_display, "⏳ Validating repository exists..." | |
| ) | |
| try: | |
| if RUN_LOCALLY: | |
| temp_api = HfApi() | |
| elif oauth_token is not None: | |
| temp_api = HfApi(token=oauth_token.token) | |
| else: | |
| return update_status(status_display, "Please sign in first.") | |
| if not check_repo_exists(source_repo_id, temp_api): | |
| return update_status( | |
| status_display, | |
| "Source repository does not exist or is not accessible.", | |
| ) | |
| except Exception: | |
| return update_status( | |
| status_display, "Unable to verify repository existence." | |
| ) | |
| if not RUN_LOCALLY: | |
| if oauth_token is None: | |
| return update_status(status_display, "Please sign in first.") | |
| api = HfApi(token=oauth_token.token) | |
| user_info = api.whoami() | |
| username = extract_username(user_info) | |
| if not username: | |
| return update_status( | |
| status_display, "❌ Unable to determine your Hugging Face username." | |
| ) | |
| status_display = update_status(status_display, "⏳ Downloading GGUF file...") | |
| if RUN_LOCALLY: | |
| output_dir = pathlib.Path("./output") | |
| output_dir.mkdir(exist_ok=True) | |
| tmp_path = output_dir / f"temp_{int(time.time())}" | |
| tmp_path.mkdir(exist_ok=True) | |
| else: | |
| tmp_dir = tempfile.mkdtemp() | |
| tmp_path = pathlib.Path(tmp_dir) | |
| def status_callback(message: str) -> None: | |
| nonlocal status_display | |
| status_display = update_status(status_display, message) | |
| logger.info(f"Status update: {message}") | |
| gguf_url = ( | |
| f"https://huggingface.co/{source_repo_id}/resolve/main/{gguf_filename}" | |
| ) | |
| gguf_path = tmp_path / gguf_filename | |
| if not _download_with_retry( | |
| gguf_url, gguf_path, status_callback=status_callback | |
| ): | |
| return update_status(status_display, "❌ Failed to download GGUF file") | |
| _validate_download(gguf_path) | |
| status_callback("⏳ Calculating optimal split size...") | |
| model_name = source_repo_id.split("/")[-1] | |
| model_name = re.sub(r"-?GGUF$", "", model_name, flags=re.IGNORECASE) | |
| model_name = model_name.rstrip("-") | |
| quantization = extract_quantization(gguf_filename) | |
| output_prefix = tmp_path / f"{model_name}-{quantization}" | |
| max_size_mb = calculate_optimal_split_size(str(gguf_path), str(output_prefix)) | |
| status_callback(f"⏳ Splitting GGUF file with max size {max_size_mb}M...") | |
| output_pattern = tmp_path / f"{model_name}-{quantization}" | |
| if not split_gguf_file( | |
| str(gguf_path), | |
| str(output_pattern), | |
| max_size_mb, | |
| status_callback=status_callback, | |
| ): | |
| return update_status(status_display, "❌ Failed to split GGUF file") | |
| split_files = list(tmp_path.glob(f"{model_name}-{quantization}-*.gguf")) | |
| if not split_files: | |
| return update_status(status_display, "❌ No split files generated") | |
| if RUN_LOCALLY: | |
| if output_dir is None: | |
| return update_status(status_display, "❌ Output directory error.") | |
| status_display = update_status( | |
| status_display, f"⏳ Saving {len(split_files)} split files locally..." | |
| ) | |
| quantization = extract_quantization(gguf_filename) | |
| final_output_dir = output_dir / f"{model_name}_{quantization}" | |
| final_output_dir.mkdir(exist_ok=True) | |
| for split_file in split_files: | |
| target = final_output_dir / split_file.name | |
| shutil.copy2(split_file, target) | |
| readme_content = _generate_readme_content( | |
| source_repo_id, len(split_files), max_size_mb | |
| ) | |
| with open(final_output_dir / "README.md", "w") as f: | |
| f.write(readme_content) | |
| _cleanup_temp_files(gguf_path, split_files) | |
| tmp_path.rmdir() | |
| success_message = f"""✅ GGUF file split and saved locally! | |
| 📂 Output directory: {final_output_dir.absolute()} | |
| 🔍 Created {len(split_files)} split files with max size {max_size_mb}M each | |
| 💾 Files saved to local disk. | |
| """ | |
| return update_status(status_display, success_message) | |
| else: | |
| if api is None or username is None or oauth_token is None: | |
| return update_status(status_display, "❌ Authentication error.") | |
| status_display = update_status( | |
| status_display, "⏳ Creating new repository..." | |
| ) | |
| if "/" in repo_name: | |
| new_repo_name = repo_name | |
| else: | |
| new_repo_name = f"{username}/{repo_name}" | |
| api.create_repo( | |
| repo_id=new_repo_name, | |
| repo_type="model", | |
| exist_ok=True, | |
| private=not make_public, | |
| ) | |
| status_display = update_status( | |
| status_display, f"⏳ Uploading {len(split_files)} split files..." | |
| ) | |
| sharded_path = tmp_path / "sharded" | |
| sharded_path.mkdir(exist_ok=True) | |
| for split_file in split_files: | |
| target = sharded_path / split_file.name | |
| target.symlink_to(split_file) | |
| api.upload_file( | |
| path_or_fileobj=str(split_file), | |
| path_in_repo=split_file.name, | |
| repo_id=new_repo_name, | |
| token=oauth_token.token, | |
| ) | |
| status_display = update_status(status_display, "⏳ Creating README...") | |
| readme_content = _generate_readme_content( | |
| source_repo_id, len(split_files), max_size_mb | |
| ) | |
| api.upload_file( | |
| path_or_fileobj=readme_content.encode(), | |
| path_in_repo="README.md", | |
| repo_id=new_repo_name, | |
| token=oauth_token.token, | |
| ) | |
| status_display = update_status( | |
| status_display, "🧹 Cleaning up temporary files..." | |
| ) | |
| _cleanup_temp_files(gguf_path, split_files, sharded_path) | |
| success_message = f"""✅ GGUF file split and uploaded successfully! | |
| 📂 New repository: https://huggingface.co/{new_repo_name} | |
| 🔍 Created {len(split_files)} split files with max size {max_size_mb}M each | |
| 🧹 Temporary files cleaned up successfully! | |
| """ | |
| return update_status(status_display, success_message) | |
| except Exception as e: | |
| return update_status(status_display, f"❌ Error: {str(e)}") | |
| finally: | |
| if tmp_path and tmp_path.exists() and not RUN_LOCALLY: | |
| try: | |
| shutil.rmtree(tmp_path) | |
| except Exception: | |
| pass | |