Spaces:
Sleeping
Sleeping
File size: 7,048 Bytes
7be3d75 e0eae58 7be3d75 1c3237a 7be3d75 c78417f 7be3d75 c78417f 7be3d75 c78417f 7be3d75 cda6fea 7be3d75 1c3237a 7be3d75 61b6f97 7be3d75 1c3237a 7be3d75 1c3237a 7be3d75 ab4c291 7be3d75 e0eae58 9ec0ecb 7be3d75 cda6fea 7be3d75 cda6fea 7be3d75 cda6fea 7be3d75 e0eae58 7be3d75 e0eae58 cda6fea | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | import gradio as gr
from huggingface_hub import HfApi, hf_hub_download, BucketFile, BucketFolder
import os
import re
import time
import shutil
import tempfile
RATE_LIMIT_PATTERN = re.compile(r"Retry after\s*(\d+)\s*seconds", re.IGNORECASE)
def _is_rate_limit_error(exc: Exception) -> bool:
message = str(exc).lower()
return "429" in message or "too many requests" in message or "rate limit" in message
def _get_retry_after(exc: Exception, default: int = 2) -> int:
match = RATE_LIMIT_PATTERN.search(str(exc))
if match:
return int(match.group(1))
return default
def _retry_api_call(fn, *args, retries: int = 3, **kwargs):
delay = 2
for attempt in range(1, retries + 1):
try:
return fn(*args, **kwargs)
except Exception as exc:
if not _is_rate_limit_error(exc) or attempt == retries:
raise
wait = _get_retry_after(exc, delay)
time.sleep(wait)
delay = min(delay * 2, 60)
def _format_bucket_uri(bucket_id: str) -> str:
bucket_id = bucket_id.strip()
if bucket_id.startswith("hf://buckets/"):
return bucket_id
if bucket_id.startswith("buckets/"):
return f"hf://{bucket_id}"
return f"hf://buckets/{bucket_id}"
def _stream_clone_repo(source_repo, target_repo, repo_type, api, hf_token):
file_paths = api.list_repo_files(
repo_id=source_repo,
repo_type=repo_type,
token=hf_token,
)
if not file_paths:
raise ValueError("source repo is empty or could not be listed")
with tempfile.TemporaryDirectory(prefix="hf_file_") as root_dir:
for file_path in file_paths:
if file_path.endswith("/"):
continue
try:
downloaded_path = hf_hub_download(
repo_id=source_repo,
filename=file_path,
repo_type=repo_type,
local_dir=root_dir,
local_dir_use_symlinks=False,
token=hf_token,
)
if not os.path.isfile(downloaded_path):
raise ValueError(f"Downloaded file not found: {downloaded_path}")
api.upload_file(
path_or_fileobj=downloaded_path,
path_in_repo=file_path,
repo_id=target_repo,
repo_type=repo_type,
commit_message=f"clone {file_path}",
token=hf_token,
)
finally:
if os.path.exists(downloaded_path):
os.remove(downloaded_path)
def _upload_local_source(source_path, target_repo, repo_type, api):
if not os.path.isdir(source_path):
raise ValueError("Local source path must be an existing directory.")
api.upload_large_folder(
repo_id=target_repo,
folder_path=source_path,
repo_type=repo_type,
num_workers=1,
print_report=False,
)
def _stream_clone_bucket(source_repo, target_repo, repo_type, api, hf_token):
bucket_uri = _format_bucket_uri(source_repo)
bucket_id = bucket_uri[len("hf://"):]
items = api.list_bucket_tree(bucket_id=bucket_id, recursive=True, token=hf_token)
with tempfile.TemporaryDirectory(prefix="hf_file_") as root_dir:
for item in items:
if isinstance(item, BucketFolder):
continue
if isinstance(item, BucketFile):
local_path = os.path.join(root_dir, item.path)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
try:
api.download_bucket_files(
bucket_id=bucket_id,
files=[(item.path, local_path)],
token=hf_token,
)
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=item.path,
repo_id=target_repo,
repo_type=repo_type,
commit_message=f"clone {item.path}",
token=hf_token,
)
finally:
if os.path.exists(local_path):
os.remove(local_path)
def stealth_clone_hf_repo(hf_token_ui, source_repo, source_type, target_repo, repo_type):
# Prioritize the token pasted by the user. Fallback to Space secrets if empty.
hf_token = hf_token_ui.strip() if hf_token_ui.strip() else os.environ.get("HF_TOKEN")
if not hf_token:
return "error: Please provide a valid Hugging Face Write Token."
api = HfApi(token=hf_token)
try:
_retry_api_call(
api.create_repo,
repo_id=target_repo,
repo_type=repo_type,
exist_ok=True,
)
if source_type == "bucket":
_stream_clone_bucket(source_repo, target_repo, repo_type, api, hf_token)
elif source_type == "local":
_upload_local_source(source_repo, target_repo, repo_type, api)
else:
_stream_clone_repo(source_repo, target_repo, repo_type, api, hf_token)
return f"success! cleanly cloned {source_repo} to {target_repo} with no tags."
except Exception as e:
return f"error: {type(e).__name__}: {str(e)}"
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("## 🥷 Hugging Face Stealth Cloner")
gr.Markdown(
"Clone repositories, datasets, or HF buckets cleanly **without** the 'duplicated from' tag showing up on the new repository.\n\n"
"**Note:** To use this tool, you must provide your own Hugging Face token with **Write** permissions so it can push files to your account."
)
with gr.Row():
hf_token_input = gr.Textbox(
label="Hugging Face Token (Write Access)",
type="password",
placeholder="hf_..."
)
with gr.Row():
source_input = gr.Textbox(
label="Source Repo, Bucket ID, or Local Path",
placeholder="e.g. source-user/source-model, username/my-bucket"
)
source_type_input = gr.Radio(
choices=["repo", "bucket", "local"],
value="repo",
label="Source Type"
)
with gr.Row():
target_input = gr.Textbox(
label="Target Repo ID",
placeholder="e.g. your-username/cloned-model"
)
repo_type_input = gr.Radio(
choices=["model", "dataset", "space"],
value="model",
label="Target Repository Type"
)
clone_btn = gr.Button("Stealth Clone Repo", variant="primary")
output = gr.Textbox(label="Status", lines=2)
clone_btn.click(
fn=stealth_clone_hf_repo,
inputs=[hf_token_input, source_input, source_type_input, target_input, repo_type_input],
outputs=output
)
if __name__ == "__main__":
demo.launch() |