import gradio as gr from huggingface_hub import HfApi, hf_hub_download, BucketFile, BucketFolder import os import re import time import shutil import tempfile RATE_LIMIT_PATTERN = re.compile(r"Retry after\s*(\d+)\s*seconds", re.IGNORECASE) def _is_rate_limit_error(exc: Exception) -> bool: message = str(exc).lower() return "429" in message or "too many requests" in message or "rate limit" in message def _get_retry_after(exc: Exception, default: int = 2) -> int: match = RATE_LIMIT_PATTERN.search(str(exc)) if match: return int(match.group(1)) return default def _retry_api_call(fn, *args, retries: int = 3, **kwargs): delay = 2 for attempt in range(1, retries + 1): try: return fn(*args, **kwargs) except Exception as exc: if not _is_rate_limit_error(exc) or attempt == retries: raise wait = _get_retry_after(exc, delay) time.sleep(wait) delay = min(delay * 2, 60) def _format_bucket_uri(bucket_id: str) -> str: bucket_id = bucket_id.strip() if bucket_id.startswith("hf://buckets/"): return bucket_id if bucket_id.startswith("buckets/"): return f"hf://{bucket_id}" return f"hf://buckets/{bucket_id}" def _stream_clone_repo(source_repo, target_repo, repo_type, api, hf_token): file_paths = api.list_repo_files( repo_id=source_repo, repo_type=repo_type, token=hf_token, ) if not file_paths: raise ValueError("source repo is empty or could not be listed") with tempfile.TemporaryDirectory(prefix="hf_file_") as root_dir: for file_path in file_paths: if file_path.endswith("/"): continue try: downloaded_path = hf_hub_download( repo_id=source_repo, filename=file_path, repo_type=repo_type, local_dir=root_dir, local_dir_use_symlinks=False, token=hf_token, ) if not os.path.isfile(downloaded_path): raise ValueError(f"Downloaded file not found: {downloaded_path}") api.upload_file( path_or_fileobj=downloaded_path, path_in_repo=file_path, repo_id=target_repo, repo_type=repo_type, commit_message=f"clone {file_path}", token=hf_token, ) finally: if os.path.exists(downloaded_path): os.remove(downloaded_path) def _upload_local_source(source_path, target_repo, repo_type, api): if not os.path.isdir(source_path): raise ValueError("Local source path must be an existing directory.") api.upload_large_folder( repo_id=target_repo, folder_path=source_path, repo_type=repo_type, num_workers=1, print_report=False, ) def _stream_clone_bucket(source_repo, target_repo, repo_type, api, hf_token): bucket_uri = _format_bucket_uri(source_repo) bucket_id = bucket_uri[len("hf://"):] items = api.list_bucket_tree(bucket_id=bucket_id, recursive=True, token=hf_token) with tempfile.TemporaryDirectory(prefix="hf_file_") as root_dir: for item in items: if isinstance(item, BucketFolder): continue if isinstance(item, BucketFile): local_path = os.path.join(root_dir, item.path) os.makedirs(os.path.dirname(local_path), exist_ok=True) try: api.download_bucket_files( bucket_id=bucket_id, files=[(item.path, local_path)], token=hf_token, ) api.upload_file( path_or_fileobj=local_path, path_in_repo=item.path, repo_id=target_repo, repo_type=repo_type, commit_message=f"clone {item.path}", token=hf_token, ) finally: if os.path.exists(local_path): os.remove(local_path) def stealth_clone_hf_repo(hf_token_ui, source_repo, source_type, target_repo, repo_type): # Prioritize the token pasted by the user. Fallback to Space secrets if empty. hf_token = hf_token_ui.strip() if hf_token_ui.strip() else os.environ.get("HF_TOKEN") if not hf_token: return "error: Please provide a valid Hugging Face Write Token." api = HfApi(token=hf_token) try: _retry_api_call( api.create_repo, repo_id=target_repo, repo_type=repo_type, exist_ok=True, ) if source_type == "bucket": _stream_clone_bucket(source_repo, target_repo, repo_type, api, hf_token) elif source_type == "local": _upload_local_source(source_repo, target_repo, repo_type, api) else: _stream_clone_repo(source_repo, target_repo, repo_type, api, hf_token) return f"success! cleanly cloned {source_repo} to {target_repo} with no tags." except Exception as e: return f"error: {type(e).__name__}: {str(e)}" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("## 🥷 Hugging Face Stealth Cloner") gr.Markdown( "Clone repositories, datasets, or HF buckets cleanly **without** the 'duplicated from' tag showing up on the new repository.\n\n" "**Note:** To use this tool, you must provide your own Hugging Face token with **Write** permissions so it can push files to your account." ) with gr.Row(): hf_token_input = gr.Textbox( label="Hugging Face Token (Write Access)", type="password", placeholder="hf_..." ) with gr.Row(): source_input = gr.Textbox( label="Source Repo, Bucket ID, or Local Path", placeholder="e.g. source-user/source-model, username/my-bucket" ) source_type_input = gr.Radio( choices=["repo", "bucket", "local"], value="repo", label="Source Type" ) with gr.Row(): target_input = gr.Textbox( label="Target Repo ID", placeholder="e.g. your-username/cloned-model" ) repo_type_input = gr.Radio( choices=["model", "dataset", "space"], value="model", label="Target Repository Type" ) clone_btn = gr.Button("Stealth Clone Repo", variant="primary") output = gr.Textbox(label="Status", lines=2) clone_btn.click( fn=stealth_clone_hf_repo, inputs=[hf_token_input, source_input, source_type_input, target_input, repo_type_input], outputs=output ) if __name__ == "__main__": demo.launch()