clone / app.py
yanqinghui's picture
Upload 4 files
dfa6c7d verified
import gradio as gr
from huggingface_hub import HfApi
import os
import git
import requests
import tempfile
import shutil
import json
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import base64
import sys
import io
from urllib.parse import unquote
import time
# --- 全局设置 ---
TEMP_DIR = tempfile.mkdtemp()
# --- 日志捕获工具 ---
class LogCapture:
def __init__(self): self.log_stream = io.StringIO()
def __enter__(self): self.old_stdout = sys.stdout; sys.stdout = self.log_stream; return self
def __exit__(self, exc_type, exc_val, exc_tb): sys.stdout = self.old_stdout
def get_value(self): return self.log_stream.getvalue()
# --- 后端逻辑函数 ---
def prepare_repo(hf_token, repo_id, repo_type, space_sdk):
if not hf_token: raise gr.Error("必须提供 Hugging Face Token!")
if not repo_id: raise gr.Error("必须提供目标 Hugging Face 仓库ID!")
api = HfApi(token=hf_token)
try:
sdk_to_use = space_sdk if repo_type == "space" else None
api.create_repo(repo_id=repo_id, repo_type=repo_type, space_sdk=sdk_to_use, exist_ok=True)
return api
except Exception as e: raise gr.Error(f"创建或访问仓库失败: {e}")
class CloneProgress(git.remote.RemoteProgress):
def __init__(self, progress_bar): super().__init__(); self.progress = progress_bar
def update(self, op_code, cur_count, max_count=None, message=''):
if max_count: self.progress(cur_count / max_count, desc=f"克隆中: {message.strip()} ({int(cur_count)}/{int(max_count)})")
def clone_github_repo(github_url, progress=gr.Progress()):
if not github_url: raise gr.Error("必须提供 GitHub 仓库链接!")
progress(0, desc="准备克隆...")
repo_name = github_url.split('/')[-1].replace('.git', '')
clone_path = os.path.join(TEMP_DIR, repo_name)
if os.path.exists(clone_path): shutil.rmtree(clone_path)
try: git.Repo.clone_from(github_url, clone_path, progress=CloneProgress(progress))
except Exception as e: raise gr.Error(f"克隆仓库失败: {e}")
progress(1, "克隆完成!")
items = ["上传整个仓库"] + os.listdir(clone_path)
return gr.update(choices=items, value="上传整个仓库", visible=True), gr.update(visible=True), f"✅ 成功克隆 '{repo_name}'。", clone_path
def update_folder_upload_options(selection, clone_path):
if selection and clone_path and selection != "上传整个仓库":
if os.path.isdir(os.path.join(clone_path, selection)): return gr.update(visible=True)
return gr.update(visible=False)
def upload_from_github(hf_token, repo_id, repo_type, space_sdk, path_in_repo, selection, folder_upload_mode, clone_path, progress=gr.Progress()):
if not clone_path: raise gr.Error("克隆路径状态丢失,请重新克隆仓库!")
progress(0, desc="准备上传...")
api = prepare_repo(hf_token, repo_id, repo_type, space_sdk)
source_path = os.path.join(clone_path, selection)
try:
if selection == "上传整个仓库":
progress(0.5, desc=f"正在上传整个仓库 (忽略 .git)...")
api.upload_folder(folder_path=clone_path, repo_id=repo_id, repo_type=repo_type, path_in_repo=path_in_repo, ignore_patterns=[".git", ".gitignore"])
message = "📂 成功上传了整个仓库"
elif os.path.isdir(source_path):
final_path_in_repo = path_in_repo if folder_upload_mode == "仅上传文件夹内容" else os.path.join(path_in_repo, selection)
progress(0.5, desc=f"正在上传文件夹 '{selection}'...")
api.upload_folder(folder_path=source_path, repo_id=repo_id, repo_type=repo_type, path_in_repo=final_path_in_repo)
message = f"📂 成功上传文件夹 '{selection}'"
else:
progress(0.5, desc=f"正在上传文件: {selection}")
api.upload_file(path_or_fileobj=source_path, repo_id=repo_id, repo_type=repo_type, path_in_repo=os.path.join(path_in_repo, selection))
message = f"📄 成功上传文件 '{selection}'"
progress(1, "上传完成!")
return f"✅ {message} 到 '{repo_id}'."
except Exception as e: raise gr.Error(f"上传失败: {e}")
def download_file_with_progress(url, download_dir, single_file_progress=None):
try:
with requests.get(url, stream=True, timeout=30) as r:
r.raise_for_status()
filename = unquote(url.split('/')[-1].split('?')[0] or "downloaded_file")
filepath = os.path.join(download_dir, filename)
total_size = int(r.headers.get('content-length', 0))
with open(filepath, 'wb') as f, tqdm(total=total_size, unit='B', unit_scale=True, unit_divisor=1024, desc=f"下载 {filename}", ascii=True, file=sys.stdout) as t:
chunk_size = 8192
for chunk in r.iter_content(chunk_size=chunk_size):
f.write(chunk)
t.update(len(chunk))
if single_file_progress and total_size > 0:
downloaded = t.n; single_file_progress(downloaded / total_size, desc=f"下载中: {filename} ({downloaded/1e6:.2f}/{total_size/1e6:.2f} MB)")
return filepath
except Exception as e: print(f"下载失败 {url}: {e}"); return e
def download_from_url(url_text, mode, thread_count, progress=gr.Progress()):
if not url_text: raise gr.Error("必须提供下载链接!")
urls = [url.strip() for url in url_text.split('\n') if url.strip()]; download_dir = os.path.join(TEMP_DIR, "downloads"); os.makedirs(download_dir, exist_ok=True); downloaded_files, errors = [], []; log_capture = LogCapture()
yield gr.update(), gr.update(visible=False), gr.update(), gr.update(value="", visible=True), gr.update()
with log_capture:
if mode == "单线程":
for i, url in enumerate(urls):
progress(i / len(urls), desc=f"开始下载第 {i+1}/{len(urls)} 个文件..."); result = download_file_with_progress(url, download_dir, progress); yield gr.update(), gr.update(), gr.update(), gr.update(value=log_capture.get_value()), gr.update()
if isinstance(result, str) and os.path.exists(result): downloaded_files.append(result)
else: errors.append(result)
else:
threads = []
with ThreadPoolExecutor(max_workers=thread_count) as executor:
for url in urls: threads.append(executor.submit(download_file_with_progress, url, download_dir))
while any(t.running() for t in threads):
completed_count = sum(1 for t in threads if t.done()); progress(completed_count / len(threads), desc=f"已完成 {completed_count}/{len(threads)} 个文件下载任务"); yield gr.update(), gr.update(), gr.update(), gr.update(value=log_capture.get_value()), gr.update(); time.sleep(0.5)
for thread in threads:
result = thread.result()
if isinstance(result, str) and os.path.exists(result): downloaded_files.append(result)
else: errors.append(result)
progress(1, "所有下载任务已完成!"); status_msg = f"✅ 成功下载 {len(downloaded_files)} 个文件。";
if errors: status_msg += f" ❌ 失败 {len(errors)} 个: {', '.join(map(str, errors[:2]))}...";
file_names = [os.path.basename(f) for f in downloaded_files]; final_log = log_capture.get_value()
yield gr.update(choices=file_names, value=file_names, visible=True), gr.update(visible=True), status_msg, gr.update(value=final_log), download_dir
def upload_from_url(hf_token, repo_id, repo_type, space_sdk, path_in_repo, selected_files, download_dir, progress=gr.Progress()):
if not download_dir: raise gr.Error("下载目录状态丢失,请重新下载文件!");
if not selected_files: raise gr.Error("请选择要上传的文件!");
api = prepare_repo(hf_token, repo_id, repo_type, space_sdk)
for i, filename in enumerate(selected_files): progress((i + 1) / len(selected_files), desc=f"正在上传 {filename}..."); api.upload_file(path_or_fileobj=os.path.join(download_dir, filename), path_in_repo=os.path.join(path_in_repo, filename), repo_id=repo_id, repo_type=repo_type)
return f"✅ 成功将 {len(selected_files)} 个文件上传到 '{repo_id}'."
def upload_from_local(hf_token, repo_id, repo_type, space_sdk, path_in_repo, local_files, progress=gr.Progress()):
if not local_files: raise gr.Error("请至少上传一个文件!");
api = prepare_repo(hf_token, repo_id, repo_type, space_sdk);
for i, file_obj in enumerate(local_files): progress((i + 1) / len(local_files), desc=f"正在上传 {os.path.basename(file_obj.name)}..."); api.upload_file(path_or_fileobj=file_obj.name, path_in_repo=os.path.join(path_in_repo, os.path.basename(file_obj.name)), repo_id=repo_id, repo_type=repo_type)
return f"✅ 成功上传 {len(local_files)} 个本地文件到 '{repo_id}'."
def generate_share_link(repo_id, repo_type, space_sdk, path_in_repo, github_url, url_text, download_mode, thread_count):
config = {"repo_id": repo_id, "repo_type": repo_type, "space_sdk": space_sdk, "path_in_repo": path_in_repo,"github_url": github_url, "url_text": url_text, "download_mode": download_mode, "thread_count": thread_count}
encoded = base64.urlsafe_b64encode(json.dumps(config).encode()).decode(); return f"将此编码粘贴到URL参数 `?config=` 后面: \n\n{encoded}"
def apply_config(config_data):
try:
if isinstance(config_data, str): config = json.loads(base64.urlsafe_b64decode(config_data).decode())
else:
with open(config_data.name, "r") as f: config = json.load(f)
except Exception as e: raise gr.Error(f"解析配置失败: {e}")
repo_type = config.get("repo_type", "space"); mode = config.get("download_mode", "多线程")
return gr.update(value=config.get("repo_id", "")),gr.update(value=repo_type),gr.update(value=config.get("space_sdk", "docker"), visible=(repo_type == "space")),gr.update(value=config.get("path_in_repo", "")),gr.update(value=config.get("github_url", "")),gr.update(value=config.get("url_text", "")),gr.update(value=mode),gr.update(value=config.get("thread_count", 8), visible=(mode == "多线程"))
with gr.Blocks(theme=gr.themes.Soft(), title="云端同步工具 Pro") as demo:
gr.Markdown("# ☁️ 云端同步工具 Pro (V11 最终定义修复版)")
global_progress = gr.Progress()
status_output = gr.Markdown()
with gr.Accordion("⚙️ 导入/导出配置", open=False):
with gr.Row(): gen_link_btn = gr.Button("生成分享配置")
share_link_output = gr.Textbox(label="分享配置编码", interactive=False, max_lines=3, visible=False)
with gr.Row():
config_link_input = gr.Textbox(label="粘贴配置编码", scale=3)
config_file_input = gr.UploadButton("或上传配置文件", file_types=[".json"])
apply_config_btn = gr.Button("应用配置", variant="primary")
gr.Markdown("---")
gr.Markdown("### **第一步:设置目标仓库**")
with gr.Row(): hf_token_input = gr.Textbox(label="Hugging Face Token", type="password", info="请提供有 'write' 写入权限的Token。")
with gr.Row():
hf_repo_input = gr.Textbox(label="目标 HF 仓库 ID", scale=2)
hf_repo_type_input = gr.Dropdown(label="仓库类型", choices=["space", "dataset", "model"], value="space")
hf_space_sdk_input = gr.Dropdown(label="Space SDK", choices=["docker", "gradio", "streamlit", "static"], value="docker", visible=True)
hf_path_in_repo_input = gr.Textbox(label="仓库内路径 (可选)")
def toggle_sdk_visibility(repo_type): return gr.update(visible=repo_type == "space")
hf_repo_type_input.change(fn=toggle_sdk_visibility, inputs=hf_repo_type_input, outputs=hf_space_sdk_input)
gr.Markdown("### **第二步:选择同步源并执行**")
with gr.Tabs():
with gr.TabItem("从 GitHub 同步"):
github_url_input = gr.Textbox(label="GitHub 仓库链接")
clone_btn = gr.Button("1. 克隆", variant="secondary")
upload_github_btn = gr.Button("2. 上传到 Hugging Face", variant="primary", visible=False)
github_selection_radio = gr.Radio(label="选择要上传的内容", visible=False)
github_folder_mode_radio = gr.Radio(["上传文件夹本身", "仅上传文件夹内容"], label="文件夹上传模式", value="上传文件夹本身", visible=False)
github_clone_path_state = gr.State()
with gr.TabItem("从 URL 链接下载"):
url_input = gr.TextArea(label="下载链接 (每行一个)")
with gr.Row():
url_download_mode = gr.Radio(["单线程", "多线程"], label="下载模式", value="多线程")
url_thread_count = gr.Number(label="线程数", value=8, minimum=1, maximum=32, step=1, visible=True)
def toggle_thread_count(mode): return gr.update(visible=mode == "多线程")
url_download_mode.change(fn=toggle_thread_count, inputs=url_download_mode, outputs=url_thread_count)
download_btn = gr.Button("1. 下载", variant="secondary")
upload_url_btn = gr.Button("2. 上传到 Hugging Face", variant="primary", visible=False)
url_selection_checkbox = gr.CheckboxGroup(label="选择要上传的文件", visible=False)
download_log_output = gr.Textbox(label="下载日志", lines=10, interactive=False, visible=False)
url_download_dir_state = gr.State()
with gr.TabItem("从本地上传"):
local_file_input = gr.File(label="选择本地文件", file_count="multiple")
upload_local_btn = gr.Button("上传到 Hugging Face", variant="primary")
# --- V11 核心修复: 将所有变量列表的定义移到最前面 ---
all_config_inputs = [hf_repo_input, hf_repo_type_input, hf_space_sdk_input, hf_path_in_repo_input, github_url_input, url_input, url_download_mode, url_thread_count]
upload_inputs = [hf_token_input, hf_repo_input, hf_repo_type_input, hf_space_sdk_input, hf_path_in_repo_input]
# --- 绑定UI和逻辑 ---
gen_link_btn.click(fn=generate_share_link, inputs=all_config_inputs, outputs=share_link_output).then(lambda: gr.update(visible=True), outputs=share_link_output)
def apply_wrapper(config_data, config_link): return apply_config(config_data if config_data else config_link)
apply_config_btn.click(fn=apply_wrapper, inputs=[config_file_input, config_link_input], outputs=all_config_inputs)
clone_btn.click(fn=clone_github_repo, inputs=[github_url_input], outputs=[github_selection_radio, upload_github_btn, status_output, github_clone_path_state], api_name="clone_github")
github_selection_radio.change(fn=update_folder_upload_options, inputs=[github_selection_radio, github_clone_path_state], outputs=github_folder_mode_radio)
upload_github_btn.click(fn=upload_from_github, inputs=upload_inputs + [github_selection_radio, github_folder_mode_radio, github_clone_path_state], outputs=[status_output], api_name="upload_github")
download_btn.click(fn=download_from_url, inputs=[url_input, url_download_mode, url_thread_count], outputs=[url_selection_checkbox, upload_url_btn, status_output, download_log_output, url_download_dir_state], api_name="download_url")
upload_url_btn.click(fn=upload_from_url, inputs=upload_inputs + [url_selection_checkbox, url_download_dir_state], outputs=[status_output], api_name="upload_url")
upload_local_btn.click(fn=upload_from_local, inputs=upload_inputs + [local_file_input], outputs=[status_output], api_name="upload_local")
def get_url_params(request: gr.Request):
if request and "config" in request.query_params: return apply_config(request.query_params["config"])
return (gr.update(),) * len(all_config_inputs)
demo.load(get_url_params, inputs=None, outputs=all_config_inputs)
demo.unload(lambda: shutil.rmtree(TEMP_DIR, ignore_errors=True))
if __name__ == "__main__":
demo.launch(debug=True)