import os import streamlit as st from huggingface_hub import HfApi import tempfile import shutil st.set_page_config(page_title="☁️ HF Cloud Drive", page_icon="☁️", layout="wide") REPO_ID = "e2dew32/cloud-drive" REPO_TYPE = "dataset" @st.cache_resource def get_api(): return HfApi() def format_size(size_bytes): if size_bytes is None: return "—" if size_bytes < 1024: return f"{size_bytes} B" elif size_bytes < 1024 * 1024: return f"{size_bytes / 1024:.1f} KB" elif size_bytes < 1024 * 1024 * 1024: return f"{size_bytes / 1024 / 1024:.1f} MB" else: return f"{size_bytes / 1024 / 1024 / 1024:.2f} GB" # ---- Sidebar ---- with st.sidebar: st.header("☁️ HF Cloud Drive") st.markdown(f"**仓库:** `{REPO_ID}`") st.markdown("---") if st.button("🔄 刷新", use_container_width=True): st.rerun() # ---- Main ---- st.title("☁️ HF Cloud Drive") st.caption(f"后端仓库: [{REPO_ID}](https://huggingface.co/datasets/{REPO_ID})") api = get_api() # ---- Upload ---- with st.expander("📤 上传文件", expanded=True): uploaded_files = st.file_uploader("拖拽或选择文件上传", accept_multiple_files=True, label_visibility="collapsed") if uploaded_files: if st.button("🚀 开始上传", type="primary", use_container_width=True): progress_bar = st.progress(0, text="上传中...") success_count = 0 for i, uploaded in enumerate(uploaded_files): with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded.name)[1]) as tmp: tmp.write(uploaded.getvalue()) tmp_path = tmp.name try: api.upload_file( path_or_fileobj=tmp_path, path_in_repo=uploaded.name, repo_id=REPO_ID, repo_type=REPO_TYPE, commit_message=f"Upload {uploaded.name} via Cloud Drive", ) success_count += 1 except Exception as e: st.error(f"❌ {uploaded.name}: {e}") finally: os.unlink(tmp_path) progress_bar.progress((i + 1) / len(uploaded_files), text=f"已上传 {i+1}/{len(uploaded_files)}") if success_count == len(uploaded_files): st.success(f"✅ 全部上传成功 ({success_count}/{len(uploaded_files)})") else: st.warning(f"⚠️ 部分上传成功 ({success_count}/{len(uploaded_files)})") st.rerun() # ---- File Browser ---- st.subheader("📁 文件列表") try: tree = list(api.list_repo_tree(repo_id=REPO_ID, repo_type=REPO_TYPE, recursive=True)) except Exception as e: st.error(f"无法获取文件列表: {e}") st.info("请确保 Hugging Face Token 已配置,且在 Space Settings 中设置了 HF_TOKEN secret。") tree = [] # Distinguish files vs directories: folders have tree_id, files have blob_id files = sorted([f for f in tree if hasattr(f, "blob_id")], key=lambda x: x.path) dirs = sorted([f for f in tree if hasattr(f, "tree_id")], key=lambda x: x.path) total_size = sum(getattr(f, "size", 0) or 0 for f in files) st.info(f"📊 {len(files)} 个文件 | {len(dirs)} 个目录 | 总大小: {format_size(total_size)}") if not files: st.info("仓库为空,上传一些文件开始使用吧!") else: # Directories for d in dirs: st.markdown(f"📁 **`{d.path}/`**") # Files for f in files: cols = st.columns([5, 1, 2]) fname = f.path.split("/")[-1] cols[0].markdown(f"📄 `{f.path}`") cols[1].text(format_size(getattr(f, "size", None))) # Download try: dl_path = api.hf_hub_download(repo_id=REPO_ID, filename=f.path, repo_type=REPO_TYPE) with open(dl_path, "rb") as fh: data = fh.read() cols[2].download_button( label="⬇️", data=data, file_name=fname, key=f"dl_{f.path}", ) except Exception: cols[2].button("⬇️", disabled=True, key=f"dl_{f.path}") # ---- Delete ---- st.markdown("---") st.subheader("🗑️ 删除文件") if files: to_delete = st.multiselect("选择要删除的文件", [f.path for f in files]) if to_delete and st.button("确认删除", type="primary"): for fp in to_delete: try: api.delete_file(path_in_repo=fp, repo_id=REPO_ID, repo_type=REPO_TYPE, commit_message=f"Delete {fp}") st.success(f"✅ 已删除: {fp}") except Exception as e: st.error(f"❌ 删除 {fp} 失败: {e}") st.rerun() else: st.info("没有文件可删除")