Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from huggingface_hub import HfApi, hf_hub_download, BucketFile, BucketFolder | |
| import os | |
| import re | |
| import time | |
| import shutil | |
| import tempfile | |
| RATE_LIMIT_PATTERN = re.compile(r"Retry after\s*(\d+)\s*seconds", re.IGNORECASE) | |
| def _is_rate_limit_error(exc: Exception) -> bool: | |
| message = str(exc).lower() | |
| return "429" in message or "too many requests" in message or "rate limit" in message | |
| def _get_retry_after(exc: Exception, default: int = 2) -> int: | |
| match = RATE_LIMIT_PATTERN.search(str(exc)) | |
| if match: | |
| return int(match.group(1)) | |
| return default | |
| def _retry_api_call(fn, *args, retries: int = 3, **kwargs): | |
| delay = 2 | |
| for attempt in range(1, retries + 1): | |
| try: | |
| return fn(*args, **kwargs) | |
| except Exception as exc: | |
| if not _is_rate_limit_error(exc) or attempt == retries: | |
| raise | |
| wait = _get_retry_after(exc, delay) | |
| time.sleep(wait) | |
| delay = min(delay * 2, 60) | |
| def _format_bucket_uri(bucket_id: str) -> str: | |
| bucket_id = bucket_id.strip() | |
| if bucket_id.startswith("hf://buckets/"): | |
| return bucket_id | |
| if bucket_id.startswith("buckets/"): | |
| return f"hf://{bucket_id}" | |
| return f"hf://buckets/{bucket_id}" | |
| def _stream_clone_repo(source_repo, target_repo, repo_type, api, hf_token): | |
| file_paths = api.list_repo_files( | |
| repo_id=source_repo, | |
| repo_type=repo_type, | |
| token=hf_token, | |
| ) | |
| if not file_paths: | |
| raise ValueError("source repo is empty or could not be listed") | |
| with tempfile.TemporaryDirectory(prefix="hf_file_") as root_dir: | |
| for file_path in file_paths: | |
| if file_path.endswith("/"): | |
| continue | |
| try: | |
| downloaded_path = hf_hub_download( | |
| repo_id=source_repo, | |
| filename=file_path, | |
| repo_type=repo_type, | |
| local_dir=root_dir, | |
| local_dir_use_symlinks=False, | |
| token=hf_token, | |
| ) | |
| if not os.path.isfile(downloaded_path): | |
| raise ValueError(f"Downloaded file not found: {downloaded_path}") | |
| api.upload_file( | |
| path_or_fileobj=downloaded_path, | |
| path_in_repo=file_path, | |
| repo_id=target_repo, | |
| repo_type=repo_type, | |
| commit_message=f"clone {file_path}", | |
| token=hf_token, | |
| ) | |
| finally: | |
| if os.path.exists(downloaded_path): | |
| os.remove(downloaded_path) | |
| def _upload_local_source(source_path, target_repo, repo_type, api): | |
| if not os.path.isdir(source_path): | |
| raise ValueError("Local source path must be an existing directory.") | |
| api.upload_large_folder( | |
| repo_id=target_repo, | |
| folder_path=source_path, | |
| repo_type=repo_type, | |
| num_workers=1, | |
| print_report=False, | |
| ) | |
| def _stream_clone_bucket(source_repo, target_repo, repo_type, api, hf_token): | |
| bucket_uri = _format_bucket_uri(source_repo) | |
| bucket_id = bucket_uri[len("hf://"):] | |
| items = api.list_bucket_tree(bucket_id=bucket_id, recursive=True, token=hf_token) | |
| with tempfile.TemporaryDirectory(prefix="hf_file_") as root_dir: | |
| for item in items: | |
| if isinstance(item, BucketFolder): | |
| continue | |
| if isinstance(item, BucketFile): | |
| local_path = os.path.join(root_dir, item.path) | |
| os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
| try: | |
| api.download_bucket_files( | |
| bucket_id=bucket_id, | |
| files=[(item.path, local_path)], | |
| token=hf_token, | |
| ) | |
| api.upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=item.path, | |
| repo_id=target_repo, | |
| repo_type=repo_type, | |
| commit_message=f"clone {item.path}", | |
| token=hf_token, | |
| ) | |
| finally: | |
| if os.path.exists(local_path): | |
| os.remove(local_path) | |
| def stealth_clone_hf_repo(hf_token_ui, source_repo, source_type, target_repo, repo_type): | |
| # Prioritize the token pasted by the user. Fallback to Space secrets if empty. | |
| hf_token = hf_token_ui.strip() if hf_token_ui.strip() else os.environ.get("HF_TOKEN") | |
| if not hf_token: | |
| return "error: Please provide a valid Hugging Face Write Token." | |
| api = HfApi(token=hf_token) | |
| try: | |
| _retry_api_call( | |
| api.create_repo, | |
| repo_id=target_repo, | |
| repo_type=repo_type, | |
| exist_ok=True, | |
| ) | |
| if source_type == "bucket": | |
| _stream_clone_bucket(source_repo, target_repo, repo_type, api, hf_token) | |
| elif source_type == "local": | |
| _upload_local_source(source_repo, target_repo, repo_type, api) | |
| else: | |
| _stream_clone_repo(source_repo, target_repo, repo_type, api, hf_token) | |
| return f"success! cleanly cloned {source_repo} to {target_repo} with no tags." | |
| except Exception as e: | |
| return f"error: {type(e).__name__}: {str(e)}" | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("## 🥷 Hugging Face Stealth Cloner") | |
| gr.Markdown( | |
| "Clone repositories, datasets, or HF buckets cleanly **without** the 'duplicated from' tag showing up on the new repository.\n\n" | |
| "**Note:** To use this tool, you must provide your own Hugging Face token with **Write** permissions so it can push files to your account." | |
| ) | |
| with gr.Row(): | |
| hf_token_input = gr.Textbox( | |
| label="Hugging Face Token (Write Access)", | |
| type="password", | |
| placeholder="hf_..." | |
| ) | |
| with gr.Row(): | |
| source_input = gr.Textbox( | |
| label="Source Repo, Bucket ID, or Local Path", | |
| placeholder="e.g. source-user/source-model, username/my-bucket" | |
| ) | |
| source_type_input = gr.Radio( | |
| choices=["repo", "bucket", "local"], | |
| value="repo", | |
| label="Source Type" | |
| ) | |
| with gr.Row(): | |
| target_input = gr.Textbox( | |
| label="Target Repo ID", | |
| placeholder="e.g. your-username/cloned-model" | |
| ) | |
| repo_type_input = gr.Radio( | |
| choices=["model", "dataset", "space"], | |
| value="model", | |
| label="Target Repository Type" | |
| ) | |
| clone_btn = gr.Button("Stealth Clone Repo", variant="primary") | |
| output = gr.Textbox(label="Status", lines=2) | |
| clone_btn.click( | |
| fn=stealth_clone_hf_repo, | |
| inputs=[hf_token_input, source_input, source_type_input, target_input, repo_type_input], | |
| outputs=output | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |