Spaces:
Sleeping
Sleeping
| import os | |
| import streamlit as st | |
| from huggingface_hub import HfApi | |
| import tempfile | |
| import shutil | |
| st.set_page_config(page_title="☁️ HF Cloud Drive", page_icon="☁️", layout="wide") | |
| REPO_ID = "e2dew32/cloud-drive" | |
| REPO_TYPE = "dataset" | |
| def get_api(): | |
| return HfApi() | |
| def format_size(size_bytes): | |
| if size_bytes is None: | |
| return "—" | |
| if size_bytes < 1024: | |
| return f"{size_bytes} B" | |
| elif size_bytes < 1024 * 1024: | |
| return f"{size_bytes / 1024:.1f} KB" | |
| elif size_bytes < 1024 * 1024 * 1024: | |
| return f"{size_bytes / 1024 / 1024:.1f} MB" | |
| else: | |
| return f"{size_bytes / 1024 / 1024 / 1024:.2f} GB" | |
| # ---- Sidebar ---- | |
| with st.sidebar: | |
| st.header("☁️ HF Cloud Drive") | |
| st.markdown(f"**仓库:** `{REPO_ID}`") | |
| st.markdown("---") | |
| if st.button("🔄 刷新", use_container_width=True): | |
| st.rerun() | |
| # ---- Main ---- | |
| st.title("☁️ HF Cloud Drive") | |
| st.caption(f"后端仓库: [{REPO_ID}](https://huggingface.co/datasets/{REPO_ID})") | |
| api = get_api() | |
| # ---- Upload ---- | |
| with st.expander("📤 上传文件", expanded=True): | |
| uploaded_files = st.file_uploader("拖拽或选择文件上传", accept_multiple_files=True, label_visibility="collapsed") | |
| if uploaded_files: | |
| if st.button("🚀 开始上传", type="primary", use_container_width=True): | |
| progress_bar = st.progress(0, text="上传中...") | |
| success_count = 0 | |
| for i, uploaded in enumerate(uploaded_files): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded.name)[1]) as tmp: | |
| tmp.write(uploaded.getvalue()) | |
| tmp_path = tmp.name | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=tmp_path, | |
| path_in_repo=uploaded.name, | |
| repo_id=REPO_ID, | |
| repo_type=REPO_TYPE, | |
| commit_message=f"Upload {uploaded.name} via Cloud Drive", | |
| ) | |
| success_count += 1 | |
| except Exception as e: | |
| st.error(f"❌ {uploaded.name}: {e}") | |
| finally: | |
| os.unlink(tmp_path) | |
| progress_bar.progress((i + 1) / len(uploaded_files), text=f"已上传 {i+1}/{len(uploaded_files)}") | |
| if success_count == len(uploaded_files): | |
| st.success(f"✅ 全部上传成功 ({success_count}/{len(uploaded_files)})") | |
| else: | |
| st.warning(f"⚠️ 部分上传成功 ({success_count}/{len(uploaded_files)})") | |
| st.rerun() | |
| # ---- File Browser ---- | |
| st.subheader("📁 文件列表") | |
| try: | |
| tree = list(api.list_repo_tree(repo_id=REPO_ID, repo_type=REPO_TYPE, recursive=True)) | |
| except Exception as e: | |
| st.error(f"无法获取文件列表: {e}") | |
| st.info("请确保 Hugging Face Token 已配置,且在 Space Settings 中设置了 HF_TOKEN secret。") | |
| tree = [] | |
| # Distinguish files vs directories: folders have tree_id, files have blob_id | |
| files = sorted([f for f in tree if hasattr(f, "blob_id")], key=lambda x: x.path) | |
| dirs = sorted([f for f in tree if hasattr(f, "tree_id")], key=lambda x: x.path) | |
| total_size = sum(getattr(f, "size", 0) or 0 for f in files) | |
| st.info(f"📊 {len(files)} 个文件 | {len(dirs)} 个目录 | 总大小: {format_size(total_size)}") | |
| if not files: | |
| st.info("仓库为空,上传一些文件开始使用吧!") | |
| else: | |
| # Directories | |
| for d in dirs: | |
| st.markdown(f"📁 **`{d.path}/`**") | |
| # Files | |
| for f in files: | |
| cols = st.columns([5, 1, 2]) | |
| fname = f.path.split("/")[-1] | |
| cols[0].markdown(f"📄 `{f.path}`") | |
| cols[1].text(format_size(getattr(f, "size", None))) | |
| # Download | |
| try: | |
| dl_path = api.hf_hub_download(repo_id=REPO_ID, filename=f.path, repo_type=REPO_TYPE) | |
| with open(dl_path, "rb") as fh: | |
| data = fh.read() | |
| cols[2].download_button( | |
| label="⬇️", | |
| data=data, | |
| file_name=fname, | |
| key=f"dl_{f.path}", | |
| ) | |
| except Exception: | |
| cols[2].button("⬇️", disabled=True, key=f"dl_{f.path}") | |
| # ---- Delete ---- | |
| st.markdown("---") | |
| st.subheader("🗑️ 删除文件") | |
| if files: | |
| to_delete = st.multiselect("选择要删除的文件", [f.path for f in files]) | |
| if to_delete and st.button("确认删除", type="primary"): | |
| for fp in to_delete: | |
| try: | |
| api.delete_file(path_in_repo=fp, repo_id=REPO_ID, repo_type=REPO_TYPE, commit_message=f"Delete {fp}") | |
| st.success(f"✅ 已删除: {fp}") | |
| except Exception as e: | |
| st.error(f"❌ 删除 {fp} 失败: {e}") | |
| st.rerun() | |
| else: | |
| st.info("没有文件可删除") | |