File size: 7,048 Bytes
7be3d75
 
e0eae58
7be3d75
 
1c3237a
7be3d75
c78417f
7be3d75
c78417f
7be3d75
 
 
c78417f
7be3d75
 
 
 
 
cda6fea
7be3d75
 
 
1c3237a
7be3d75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61b6f97
7be3d75
 
1c3237a
7be3d75
 
 
 
 
 
1c3237a
7be3d75
 
 
 
 
 
 
 
 
 
 
 
 
 
ab4c291
7be3d75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e0eae58
9ec0ecb
7be3d75
cda6fea
7be3d75
 
cda6fea
7be3d75
cda6fea
7be3d75
 
 
 
 
e0eae58
7be3d75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e0eae58
 
cda6fea
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download, BucketFile, BucketFolder
import os
import re
import time
import shutil
import tempfile

RATE_LIMIT_PATTERN = re.compile(r"Retry after\s*(\d+)\s*seconds", re.IGNORECASE)

def _is_rate_limit_error(exc: Exception) -> bool:
    message = str(exc).lower()
    return "429" in message or "too many requests" in message or "rate limit" in message

def _get_retry_after(exc: Exception, default: int = 2) -> int:
    match = RATE_LIMIT_PATTERN.search(str(exc))
    if match:
        return int(match.group(1))
    return default

def _retry_api_call(fn, *args, retries: int = 3, **kwargs):
    delay = 2
    for attempt in range(1, retries + 1):
        try:
            return fn(*args, **kwargs)
        except Exception as exc:
            if not _is_rate_limit_error(exc) or attempt == retries:
                raise
            wait = _get_retry_after(exc, delay)
            time.sleep(wait)
            delay = min(delay * 2, 60)

def _format_bucket_uri(bucket_id: str) -> str:
    bucket_id = bucket_id.strip()
    if bucket_id.startswith("hf://buckets/"):
        return bucket_id
    if bucket_id.startswith("buckets/"):
        return f"hf://{bucket_id}"
    return f"hf://buckets/{bucket_id}"

def _stream_clone_repo(source_repo, target_repo, repo_type, api, hf_token):
    file_paths = api.list_repo_files(
        repo_id=source_repo,
        repo_type=repo_type,
        token=hf_token,
    )
    if not file_paths:
        raise ValueError("source repo is empty or could not be listed")

    with tempfile.TemporaryDirectory(prefix="hf_file_") as root_dir:
        for file_path in file_paths:
            if file_path.endswith("/"):
                continue
            try:
                downloaded_path = hf_hub_download(
                    repo_id=source_repo,
                    filename=file_path,
                    repo_type=repo_type,
                    local_dir=root_dir,
                    local_dir_use_symlinks=False,
                    token=hf_token,
                )
                if not os.path.isfile(downloaded_path):
                    raise ValueError(f"Downloaded file not found: {downloaded_path}")
                api.upload_file(
                    path_or_fileobj=downloaded_path,
                    path_in_repo=file_path,
                    repo_id=target_repo,
                    repo_type=repo_type,
                    commit_message=f"clone {file_path}",
                    token=hf_token,
                )
            finally:
                if os.path.exists(downloaded_path):
                    os.remove(downloaded_path)

def _upload_local_source(source_path, target_repo, repo_type, api):
    if not os.path.isdir(source_path):
        raise ValueError("Local source path must be an existing directory.")
    api.upload_large_folder(
        repo_id=target_repo,
        folder_path=source_path,
        repo_type=repo_type,
        num_workers=1,
        print_report=False,
    )

def _stream_clone_bucket(source_repo, target_repo, repo_type, api, hf_token):
    bucket_uri = _format_bucket_uri(source_repo)
    bucket_id = bucket_uri[len("hf://"):]
    items = api.list_bucket_tree(bucket_id=bucket_id, recursive=True, token=hf_token)
    with tempfile.TemporaryDirectory(prefix="hf_file_") as root_dir:
        for item in items:
            if isinstance(item, BucketFolder):
                continue
            if isinstance(item, BucketFile):
                local_path = os.path.join(root_dir, item.path)
                os.makedirs(os.path.dirname(local_path), exist_ok=True)
                try:
                    api.download_bucket_files(
                        bucket_id=bucket_id,
                        files=[(item.path, local_path)],
                        token=hf_token,
                    )
                    api.upload_file(
                        path_or_fileobj=local_path,
                        path_in_repo=item.path,
                        repo_id=target_repo,
                        repo_type=repo_type,
                        commit_message=f"clone {item.path}",
                        token=hf_token,
                    )
                finally:
                    if os.path.exists(local_path):
                        os.remove(local_path)

def stealth_clone_hf_repo(hf_token_ui, source_repo, source_type, target_repo, repo_type):
    # Prioritize the token pasted by the user. Fallback to Space secrets if empty.
    hf_token = hf_token_ui.strip() if hf_token_ui.strip() else os.environ.get("HF_TOKEN")
    
    if not hf_token:
        return "error: Please provide a valid Hugging Face Write Token."

    api = HfApi(token=hf_token)
    try:
        _retry_api_call(
            api.create_repo,
            repo_id=target_repo,
            repo_type=repo_type,
            exist_ok=True,
        )

        if source_type == "bucket":
            _stream_clone_bucket(source_repo, target_repo, repo_type, api, hf_token)
        elif source_type == "local":
            _upload_local_source(source_repo, target_repo, repo_type, api)
        else:
            _stream_clone_repo(source_repo, target_repo, repo_type, api, hf_token)

        return f"success! cleanly cloned {source_repo} to {target_repo} with no tags."
    except Exception as e:
        return f"error: {type(e).__name__}: {str(e)}"

with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown("## 🥷 Hugging Face Stealth Cloner")
    gr.Markdown(
        "Clone repositories, datasets, or HF buckets cleanly **without** the 'duplicated from' tag showing up on the new repository.\n\n"
        "**Note:** To use this tool, you must provide your own Hugging Face token with **Write** permissions so it can push files to your account."
    )
    
    with gr.Row():
        hf_token_input = gr.Textbox(
            label="Hugging Face Token (Write Access)", 
            type="password",
            placeholder="hf_..."
        )
        
    with gr.Row():
        source_input = gr.Textbox(
            label="Source Repo, Bucket ID, or Local Path",
            placeholder="e.g. source-user/source-model, username/my-bucket"
        )
        source_type_input = gr.Radio(
            choices=["repo", "bucket", "local"],
            value="repo",
            label="Source Type"
        )
        
    with gr.Row():
        target_input = gr.Textbox(
            label="Target Repo ID",
            placeholder="e.g. your-username/cloned-model"
        )
        repo_type_input = gr.Radio(
            choices=["model", "dataset", "space"], 
            value="model", 
            label="Target Repository Type"
        )
        
    clone_btn = gr.Button("Stealth Clone Repo", variant="primary")
    output = gr.Textbox(label="Status", lines=2)
    
    clone_btn.click(
        fn=stealth_clone_hf_repo, 
        inputs=[hf_token_input, source_input, source_type_input, target_input, repo_type_input], 
        outputs=output
    )

if __name__ == "__main__":
    demo.launch()