| import os |
| import tempfile |
| from flask import Flask, request, jsonify, send_file |
| from wsgidav.wsgidav_app import WsgiDAVApp |
| from hf_bucket_provider import HfBucketProvider |
|
|
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| if not HF_TOKEN: |
| raise ValueError("HF_TOKEN environment variable not set. Please add it to Space Secrets.") |
|
|
| BUCKET_ID = "nagose/filebed" |
|
|
| app = Flask(__name__) |
|
|
| |
| dav_config = { |
| "provider_mapping": { |
| "/": HfBucketProvider(BUCKET_ID, HF_TOKEN) |
| }, |
| "http_authenticator": { |
| "accept_basic": True, |
| "accept_digest": False, |
| "default_to_digest": False, |
| }, |
| "simple_dc": { |
| "user_mapping": {"*": True} |
| }, |
| "dir_browser": {"enable": False}, |
| "verbose": 1, |
| } |
|
|
| dav_app = WsgiDAVApp(dav_config) |
|
|
| @app.route('/webdav', defaults={'path': ''}, methods=['GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'PROPFIND', 'PROPPATCH', 'MKCOL', 'COPY', 'MOVE', 'LOCK', 'UNLOCK']) |
| @app.route('/webdav/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'PROPFIND', 'PROPPATCH', 'MKCOL', 'COPY', 'MOVE', 'LOCK', 'UNLOCK']) |
| def webdav_handler(path): |
| environ = request.environ |
| environ['PATH_INFO'] = '/' + path |
| environ['SCRIPT_NAME'] = '/webdav' |
| return dav_app(environ, lambda status, headers, exc_info=None: None) |
|
|
| |
| @app.route('/debug/webdav-root') |
| def debug_webdav_root(): |
| try: |
| provider = HfBucketProvider(BUCKET_ID, HF_TOKEN) |
| members = provider._list_directory('/') |
| return jsonify({'members': members}) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/debug/list') |
| def debug_list(): |
| from huggingface_hub import list_bucket_tree |
| try: |
| items = list_bucket_tree( |
| bucket_id=BUCKET_ID, |
| recursive=True, |
| token=HF_TOKEN |
| ) |
| items_list = list(items) |
| result = [] |
| for item in items_list: |
| entry = { |
| 'path': item.path, |
| 'type': item.type, |
| 'size': item.size, |
| } |
| if hasattr(item, 'last_modified') and item.last_modified: |
| entry['last_modified'] = item.last_modified.isoformat() |
| else: |
| entry['last_modified'] = None |
| result.append(entry) |
| return jsonify(result) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| |
| HTML_CONTENT = """<!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="UTF-8"> |
| <title>HF Bucket 管理器</title> |
| <style> |
| body { font-family: sans-serif; max-width: 800px; margin: 20px auto; } |
| .log { background: #1e293b; color: #b9e6f0; padding: 12px; border-radius: 8px; max-height: 300px; overflow: auto; } |
| .file-item { display: flex; justify-content: space-between; padding: 10px; background: #f8fafc; margin-bottom: 8px; } |
| button { padding: 8px 16px; background: #2563eb; color: white; border: none; border-radius: 4px; cursor: pointer; } |
| button.danger { background: #dc2626; } |
| input[type="file"] { margin: 10px 0; } |
| </style> |
| </head> |
| <body> |
| <h1>HF Bucket 管理器</h1> |
| <p>WebDAV 服务已挂载在 <code>/webdav</code> 路径,您可以使用任何 WebDAV 客户端连接。</p> |
| <p>例如:<code>https://你的-space地址/webdav</code></p> |
| <hr> |
| <h2>上传测试</h2> |
| <input type="file" id="fileInput"> |
| <button id="uploadBtn">上传</button> |
| <button onclick="debugWebDAVRoot()">调试 WebDAV 根目录</button> |
| <button onclick="debugList()">调试列表 (debug/list)</button> |
| <div class="log" id="log"></div> |
| <h2>文件列表</h2> |
| <!--<div id="fileList"></div> --> |
| <script> |
| const fileInput = document.getElementById('fileInput'); |
| const uploadBtn = document.getElementById('uploadBtn'); |
| const logDiv = document.getElementById('log'); |
| const fileListDiv = document.getElementById('fileList'); |
| |
| function addLog(msg, isErr = false) { |
| const p = document.createElement('p'); |
| p.textContent = `[${new Date().toLocaleTimeString()}] ${isErr ? '❌ ' : ''}${msg}`; |
| if (isErr) p.style.color = '#f87171'; |
| logDiv.appendChild(p); |
| logDiv.scrollTop = logDiv.scrollHeight; |
| } |
| |
| async function loadList() { |
| try { |
| fileListDiv.innerHTML = '<p>加载中...</p>'; |
| const res = await fetch('/list'); |
| const responseText = await res.text(); |
| if (!res.ok) { |
| throw new Error(`HTTP ${res.status}: ${responseText}`); |
| } |
| const files = JSON.parse(responseText); |
| if (files.length === 0) { |
| fileListDiv.innerHTML = '<p>暂无文件</p>'; |
| } else { |
| fileListDiv.innerHTML = files.map(f => ` |
| <div class="file-item"> |
| <a href="/file/${encodeURIComponent(f)}" target="_blank">${f}</a> |
| <button class="danger" onclick="deleteFile('${f.replace(/'/g, "\\\\'")}')">删除</button> |
| </div> |
| `).join(''); |
| } |
| addLog('列表刷新成功,共 ' + files.length + ' 个文件'); |
| } catch (err) { |
| fileListDiv.innerHTML = '<p>加载失败</p>'; |
| addLog('列表错误: ' + err.message, true); |
| } |
| } |
| |
| window.deleteFile = async function(filename) { |
| if (!confirm('确定删除?')) return; |
| const res = await fetch('/delete/' + encodeURIComponent(filename), { method: 'DELETE' }); |
| if (res.ok) loadList(); |
| }; |
| |
| uploadBtn.addEventListener('click', async () => { |
| const file = fileInput.files[0]; |
| if (!file) return alert('请选择文件'); |
| addLog('开始上传: ' + file.name); |
| uploadBtn.disabled = true; |
| const formData = new FormData(); |
| formData.append('file', file); |
| try { |
| const res = await fetch('/upload', { method: 'POST', body: formData }); |
| const data = await res.json(); |
| if (res.ok) { |
| addLog('✅ 上传成功: ' + data.filename); |
| fileInput.value = ''; |
| loadList(); |
| } else { |
| addLog('❌ ' + data.error, true); |
| } |
| } catch (err) { |
| addLog('❌ ' + err.message, true); |
| } finally { |
| uploadBtn.disabled = false; |
| } |
| }); |
| |
| async function debugWebDAVRoot() { |
| try { |
| addLog('正在请求 /debug/webdav-root ...'); |
| const res = await fetch('/debug/webdav-root'); |
| const data = await res.json(); |
| addLog('WebDAV 根目录成员: ' + JSON.stringify(data)); |
| } catch (err) { |
| addLog('调试请求失败: ' + err.message, true); |
| } |
| } |
| |
| async function debugList() { |
| try { |
| addLog('正在请求 /debug/list ...'); |
| const res = await fetch('/debug/list'); |
| const data = await res.json(); |
| addLog('调试列表响应: ' + JSON.stringify(data, null, 2)); |
| } catch (err) { |
| addLog('调试请求失败: ' + err.message, true); |
| } |
| } |
| |
| loadList(); |
| </script> |
| </body> |
| </html>""" |
|
|
| @app.route('/') |
| def index(): |
| return HTML_CONTENT |
|
|
| @app.route('/upload', methods=['POST']) |
| def upload(): |
| from huggingface_hub import batch_bucket_files |
| if 'file' not in request.files: |
| return jsonify({'error': 'No file part'}), 400 |
| file = request.files['file'] |
| if file.filename == '': |
| return jsonify({'error': 'No selected file'}), 400 |
| filename = file.filename |
| with tempfile.NamedTemporaryFile(delete=False) as tmp: |
| file.save(tmp.name) |
| try: |
| batch_bucket_files( |
| bucket_id=BUCKET_ID, |
| add=[(tmp.name, filename)], |
| token=HF_TOKEN |
| ) |
| print(f"[UPLOAD] Success: {filename}") |
| except Exception as e: |
| print(f"[UPLOAD] Error: {e}") |
| return jsonify({'error': str(e)}), 500 |
| finally: |
| os.unlink(tmp.name) |
| return jsonify({'success': True, 'filename': filename}) |
|
|
| @app.route('/list') |
| def list_files(): |
| from huggingface_hub import list_bucket_tree |
| try: |
| items = list_bucket_tree( |
| bucket_id=BUCKET_ID, |
| recursive=True, |
| token=HF_TOKEN |
| ) |
| items_list = list(items) |
| files = [item.path for item in items_list if item.type == 'file'] |
| return jsonify(files) |
| except Exception as e: |
| print(f"[LIST] Error: {e}") |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/file/<path:filename>') |
| def get_file(filename): |
| from huggingface_hub import download_bucket_files |
| try: |
| with tempfile.TemporaryDirectory() as tmpdir: |
| local_path = os.path.join(tmpdir, filename) |
| download_bucket_files( |
| bucket_id=BUCKET_ID, |
| files=[(filename, local_path)], |
| token=HF_TOKEN |
| ) |
| return send_file(local_path, as_attachment=True, download_name=filename) |
| except Exception as e: |
| print(f"[DOWNLOAD] Error: {e}") |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/delete/<path:filename>', methods=['DELETE']) |
| def delete_file(filename): |
| from huggingface_hub import batch_bucket_files |
| try: |
| batch_bucket_files( |
| bucket_id=BUCKET_ID, |
| delete=[filename], |
| token=HF_TOKEN |
| ) |
| print(f"[DELETE] Success: {filename}") |
| return jsonify({'success': True}) |
| except Exception as e: |
| print(f"[DELETE] Error: {e}") |
| return jsonify({'error': str(e)}), 500 |
|
|
| if __name__ == '__main__': |
| app.run(host='0.0.0.0', port=7860) |