Spaces:
Sleeping
Sleeping
File size: 4,892 Bytes
d475d3b 030ee81 d475d3b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | import os
import streamlit as st
from huggingface_hub import HfApi
import tempfile
import shutil
st.set_page_config(page_title="☁️ HF Cloud Drive", page_icon="☁️", layout="wide")
REPO_ID = "e2dew32/cloud-drive"
REPO_TYPE = "dataset"
@st.cache_resource
def get_api():
return HfApi()
def format_size(size_bytes):
if size_bytes is None:
return "—"
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / 1024 / 1024:.1f} MB"
else:
return f"{size_bytes / 1024 / 1024 / 1024:.2f} GB"
# ---- Sidebar ----
with st.sidebar:
st.header("☁️ HF Cloud Drive")
st.markdown(f"**仓库:** `{REPO_ID}`")
st.markdown("---")
if st.button("🔄 刷新", use_container_width=True):
st.rerun()
# ---- Main ----
st.title("☁️ HF Cloud Drive")
st.caption(f"后端仓库: [{REPO_ID}](https://huggingface.co/datasets/{REPO_ID})")
api = get_api()
# ---- Upload ----
with st.expander("📤 上传文件", expanded=True):
uploaded_files = st.file_uploader("拖拽或选择文件上传", accept_multiple_files=True, label_visibility="collapsed")
if uploaded_files:
if st.button("🚀 开始上传", type="primary", use_container_width=True):
progress_bar = st.progress(0, text="上传中...")
success_count = 0
for i, uploaded in enumerate(uploaded_files):
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded.name)[1]) as tmp:
tmp.write(uploaded.getvalue())
tmp_path = tmp.name
try:
api.upload_file(
path_or_fileobj=tmp_path,
path_in_repo=uploaded.name,
repo_id=REPO_ID,
repo_type=REPO_TYPE,
commit_message=f"Upload {uploaded.name} via Cloud Drive",
)
success_count += 1
except Exception as e:
st.error(f"❌ {uploaded.name}: {e}")
finally:
os.unlink(tmp_path)
progress_bar.progress((i + 1) / len(uploaded_files), text=f"已上传 {i+1}/{len(uploaded_files)}")
if success_count == len(uploaded_files):
st.success(f"✅ 全部上传成功 ({success_count}/{len(uploaded_files)})")
else:
st.warning(f"⚠️ 部分上传成功 ({success_count}/{len(uploaded_files)})")
st.rerun()
# ---- File Browser ----
st.subheader("📁 文件列表")
try:
tree = list(api.list_repo_tree(repo_id=REPO_ID, repo_type=REPO_TYPE, recursive=True))
except Exception as e:
st.error(f"无法获取文件列表: {e}")
st.info("请确保 Hugging Face Token 已配置,且在 Space Settings 中设置了 HF_TOKEN secret。")
tree = []
# Distinguish files vs directories: folders have tree_id, files have blob_id
files = sorted([f for f in tree if hasattr(f, "blob_id")], key=lambda x: x.path)
dirs = sorted([f for f in tree if hasattr(f, "tree_id")], key=lambda x: x.path)
total_size = sum(getattr(f, "size", 0) or 0 for f in files)
st.info(f"📊 {len(files)} 个文件 | {len(dirs)} 个目录 | 总大小: {format_size(total_size)}")
if not files:
st.info("仓库为空,上传一些文件开始使用吧!")
else:
# Directories
for d in dirs:
st.markdown(f"📁 **`{d.path}/`**")
# Files
for f in files:
cols = st.columns([5, 1, 2])
fname = f.path.split("/")[-1]
cols[0].markdown(f"📄 `{f.path}`")
cols[1].text(format_size(getattr(f, "size", None)))
# Download
try:
dl_path = api.hf_hub_download(repo_id=REPO_ID, filename=f.path, repo_type=REPO_TYPE)
with open(dl_path, "rb") as fh:
data = fh.read()
cols[2].download_button(
label="⬇️",
data=data,
file_name=fname,
key=f"dl_{f.path}",
)
except Exception:
cols[2].button("⬇️", disabled=True, key=f"dl_{f.path}")
# ---- Delete ----
st.markdown("---")
st.subheader("🗑️ 删除文件")
if files:
to_delete = st.multiselect("选择要删除的文件", [f.path for f in files])
if to_delete and st.button("确认删除", type="primary"):
for fp in to_delete:
try:
api.delete_file(path_in_repo=fp, repo_id=REPO_ID, repo_type=REPO_TYPE, commit_message=f"Delete {fp}")
st.success(f"✅ 已删除: {fp}")
except Exception as e:
st.error(f"❌ 删除 {fp} 失败: {e}")
st.rerun()
else:
st.info("没有文件可删除")
|