import os import tempfile from flask import Flask, request, jsonify, send_file from huggingface_hub import ( batch_bucket_files, download_bucket_files, list_bucket_tree ) app = Flask(__name__) HF_TOKEN = os.environ.get("HF_TOKEN") if not HF_TOKEN: raise ValueError("HF_TOKEN environment variable not set. Please add it to Space Secrets.") BUCKET_ID = "nagose/filebed" HTML_CONTENT = """ HF Bucket 文件管理器 · 支持目录导航

📦 HF Bucket 文件管理器 (目录导航)

🧪 API 测试

📤 上传文件 (POST /upload) 支持目录

(留空则使用左侧文件)

📋 列出文件 (GET /list?dir=...)

📥 获取文件信息 (HEAD /file/<filename>)

🗑️ 删除文件 (DELETE /delete/<filename>)

点击按钮查看结果

📚 API 使用说明 (JavaScript 示例)

所有请求均需在 Hugging Face Space 内调用(同源),无需额外认证。

📤 上传文件

POST /upload

请求格式: multipart/form-data

参数:

  • file (必填) - 要上传的文件
  • dir (可选) - 目标目录路径,例如 "images/""logs/2026/"。留空则上传到根目录。

成功响应: {"success": true, "filename": "远程完整路径"}

失败响应: {"error": "错误信息"} (HTTP 4xx/5xx)

// 使用 FormData 上传
const fileInput = document.getElementById('fileInput'); // 文件选择 input
const file = fileInput.files[0];
const dir = "images/"; // 可选目录

const formData = new FormData();
formData.append('file', file);
formData.append('dir', dir);

fetch('/upload', {
    method: 'POST',
    body: formData
})
.then(res => res.json())
.then(data => {
    if (data.error) throw new Error(data.error);
    console.log('上传成功:', data.filename);
})
.catch(err => console.error('上传失败:', err.message));

📋 列出文件

GET /list?dir=...

查询参数:

  • dir (可选) - 目录路径,例如 "images/"。留空则列出根目录下的第一层内容。

成功响应: 字符串数组,每个元素是文件或目录的完整路径(目录路径以 / 结尾)。例如:["file.txt", "images/", "logs/"]

失败响应: {"error": "错误信息"} (HTTP 5xx)

// 列出根目录内容
fetch('/list')
    .then(res => res.json())
    .then(files => {
        files.forEach(path => {
            if (path.endsWith('/')) {
                console.log('目录:', path);
            } else {
                console.log('文件:', path);
            }
        });
    });

// 列出指定目录(如 images/)内容
const dir = "images/";
fetch(`/list?dir=${encodeURIComponent(dir)}`)
    .then(res => res.json())
    .then(files => console.log(files));

📥 下载文件

GET /file/<filename>

路径参数:

  • filename - 文件的完整路径(可包含目录),例如 "images/avatar.png"

成功响应: 文件内容(作为附件下载)。

失败响应: {"error": "错误信息"} (HTTP 4xx/5xx)

// 触发浏览器下载
const filename = "images/avatar.png";
window.location.href = `/file/${encodeURIComponent(filename)}`;

// 或者用 fetch 获取文件 Blob
fetch(`/file/${encodeURIComponent(filename)}`)
    .then(res => {
        if (!res.ok) throw new Error('文件不存在');
        return res.blob();
    })
    .then(blob => {
        const url = URL.createObjectURL(blob);
        const a = document.createElement('a');
        a.href = url;
        a.download = filename.split('/').pop(); // 提取文件名
        a.click();
    })
    .catch(err => console.error('下载失败:', err.message));

🔍 检查文件是否存在

HEAD /file/<filename>

路径参数: 同下载接口的 filename

成功响应: HTTP 200,无响应体。

失败响应: HTTP 404 并返回 JSON 错误信息。

const filename = "config.json";

fetch(`/file/${encodeURIComponent(filename)}`, { method: 'HEAD' })
    .then(res => {
        if (res.ok) {
            console.log('文件存在');
        } else {
            return res.json().then(err => { throw new Error(err.error); });
        }
    })
    .catch(err => console.error('检查失败:', err.message));

🗑️ 删除文件

DELETE /delete/<filename>

路径参数: 同下载接口的 filename

成功响应: {"success": true}

失败响应: {"error": "错误信息"} (HTTP 4xx/5xx)

const filename = "temp.log";

fetch(`/delete/${encodeURIComponent(filename)}`, { method: 'DELETE' })
    .then(res => res.json())
    .then(data => {
        if (data.success) {
            console.log('删除成功');
        } else {
            throw new Error(data.error);
        }
    })
    .catch(err => console.error('删除失败:', err.message));
""" @app.route('/') def index(): return HTML_CONTENT @app.route('/upload', methods=['POST']) def upload(): if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No selected file'}), 400 target_dir = request.form.get('dir', '').strip() filename = file.filename if target_dir: # 规范化目录路径:去除前导 '/',确保末尾有 '/' if target_dir.startswith('/'): target_dir = target_dir[1:] if not target_dir.endswith('/'): target_dir += '/' remote_path = target_dir + filename else: remote_path = filename with tempfile.NamedTemporaryFile(delete=False) as tmp: file.save(tmp.name) try: batch_bucket_files( bucket_id=BUCKET_ID, add=[(tmp.name, remote_path)], token=HF_TOKEN ) except Exception as e: return jsonify({'error': str(e)}), 500 finally: os.unlink(tmp.name) # 构建完整文件访问 URL base_url = "https://" + request.host + "/" # 例如 "https://chatyou-filebed.hf.space/" file_url = base_url + "file/" + remote_path return jsonify({'success': True, 'filename': file_url}) @app.route('/list') def list_files(): """ 列出 bucket 中指定目录下的第一层内容(文件和目录)。 若未提供 dir 参数或 dir 为空,则列出根目录下的第一层内容。 返回的列表元素为路径字符串,目录路径以 '/' 结尾,文件路径不以 '/' 结尾。 """ dir_param = request.args.get('dir', '').strip() try: # 规范化目录参数 if dir_param: if dir_param.startswith('/'): dir_param = dir_param[1:] if not dir_param.endswith('/'): dir_param += '/' # 当 dir_param 为空时,prefix='' 表示根目录 items = list_bucket_tree( bucket_id=BUCKET_ID, prefix=dir_param, recursive=False, # 不递归,只列出该目录下的直接内容 token=HF_TOKEN ) # 为目录添加末尾斜杠,文件保持原样 paths = [] for item in items: if item.type == 'directory': paths.append(item.path + '/') else: paths.append(item.path) return jsonify(paths) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/file/') def get_file(filename): try: with tempfile.TemporaryDirectory() as tmpdir: local_path = os.path.join(tmpdir, filename) download_bucket_files( bucket_id=BUCKET_ID, files=[(filename, local_path)], token=HF_TOKEN ) return send_file(local_path, as_attachment=True, download_name=filename) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/delete/', methods=['DELETE']) def delete_file(filename): try: batch_bucket_files( bucket_id=BUCKET_ID, delete=[filename], token=HF_TOKEN ) return jsonify({'success': True}) except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)