WEBDAV / app.py
chatyou's picture
Update app.py
39b21b6 verified
import os
import tempfile
from flask import Flask, request, jsonify, send_file
from wsgidav.wsgidav_app import WsgiDAVApp
from hf_bucket_provider import HfBucketProvider
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
raise ValueError("HF_TOKEN environment variable not set. Please add it to Space Secrets.")
BUCKET_ID = "nagose/filebed"
app = Flask(__name__)
# WebDAV 配置:禁用目录浏览器,确保响应为标准的 WebDAV XML 格式
dav_config = {
"provider_mapping": {
"/": HfBucketProvider(BUCKET_ID, HF_TOKEN)
},
"http_authenticator": {
"accept_basic": True,
"accept_digest": False,
"default_to_digest": False,
},
"simple_dc": {
"user_mapping": {"*": True}
},
"dir_browser": {"enable": False}, # 禁用内置的 HTML 目录列表,避免干扰 WebDAV 响应
"verbose": 1,
}
dav_app = WsgiDAVApp(dav_config)
@app.route('/webdav', defaults={'path': ''}, methods=['GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'PROPFIND', 'PROPPATCH', 'MKCOL', 'COPY', 'MOVE', 'LOCK', 'UNLOCK'])
@app.route('/webdav/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'PROPFIND', 'PROPPATCH', 'MKCOL', 'COPY', 'MOVE', 'LOCK', 'UNLOCK'])
def webdav_handler(path):
environ = request.environ
environ['PATH_INFO'] = '/' + path
environ['SCRIPT_NAME'] = '/webdav'
return dav_app(environ, lambda status, headers, exc_info=None: None)
# 调试路由:测试 WebDAV 提供者的根目录成员
@app.route('/debug/webdav-root')
def debug_webdav_root():
try:
provider = HfBucketProvider(BUCKET_ID, HF_TOKEN)
members = provider._list_directory('/')
return jsonify({'members': members})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/debug/list')
def debug_list():
from huggingface_hub import list_bucket_tree
try:
items = list_bucket_tree(
bucket_id=BUCKET_ID,
recursive=True,
token=HF_TOKEN
)
items_list = list(items)
result = []
for item in items_list:
entry = {
'path': item.path,
'type': item.type,
'size': item.size,
}
if hasattr(item, 'last_modified') and item.last_modified:
entry['last_modified'] = item.last_modified.isoformat()
else:
entry['last_modified'] = None
result.append(entry)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
# 美化后的前端界面(包含调试按钮)
HTML_CONTENT = """<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>HF Bucket 管理器</title>
<style>
body { font-family: sans-serif; max-width: 800px; margin: 20px auto; }
.log { background: #1e293b; color: #b9e6f0; padding: 12px; border-radius: 8px; max-height: 300px; overflow: auto; }
.file-item { display: flex; justify-content: space-between; padding: 10px; background: #f8fafc; margin-bottom: 8px; }
button { padding: 8px 16px; background: #2563eb; color: white; border: none; border-radius: 4px; cursor: pointer; }
button.danger { background: #dc2626; }
input[type="file"] { margin: 10px 0; }
</style>
</head>
<body>
<h1>HF Bucket 管理器</h1>
<p>WebDAV 服务已挂载在 <code>/webdav</code> 路径,您可以使用任何 WebDAV 客户端连接。</p>
<p>例如:<code>https://你的-space地址/webdav</code></p>
<hr>
<h2>上传测试</h2>
<input type="file" id="fileInput">
<button id="uploadBtn">上传</button>
<button onclick="debugWebDAVRoot()">调试 WebDAV 根目录</button>
<button onclick="debugList()">调试列表 (debug/list)</button>
<div class="log" id="log"></div>
<h2>文件列表</h2>
<!--<div id="fileList"></div> -->
<script>
const fileInput = document.getElementById('fileInput');
const uploadBtn = document.getElementById('uploadBtn');
const logDiv = document.getElementById('log');
const fileListDiv = document.getElementById('fileList');
function addLog(msg, isErr = false) {
const p = document.createElement('p');
p.textContent = `[${new Date().toLocaleTimeString()}] ${isErr ? '❌ ' : ''}${msg}`;
if (isErr) p.style.color = '#f87171';
logDiv.appendChild(p);
logDiv.scrollTop = logDiv.scrollHeight;
}
async function loadList() {
try {
fileListDiv.innerHTML = '<p>加载中...</p>';
const res = await fetch('/list');
const responseText = await res.text();
if (!res.ok) {
throw new Error(`HTTP ${res.status}: ${responseText}`);
}
const files = JSON.parse(responseText);
if (files.length === 0) {
fileListDiv.innerHTML = '<p>暂无文件</p>';
} else {
fileListDiv.innerHTML = files.map(f => `
<div class="file-item">
<a href="/file/${encodeURIComponent(f)}" target="_blank">${f}</a>
<button class="danger" onclick="deleteFile('${f.replace(/'/g, "\\\\'")}')">删除</button>
</div>
`).join('');
}
addLog('列表刷新成功,共 ' + files.length + ' 个文件');
} catch (err) {
fileListDiv.innerHTML = '<p>加载失败</p>';
addLog('列表错误: ' + err.message, true);
}
}
window.deleteFile = async function(filename) {
if (!confirm('确定删除?')) return;
const res = await fetch('/delete/' + encodeURIComponent(filename), { method: 'DELETE' });
if (res.ok) loadList();
};
uploadBtn.addEventListener('click', async () => {
const file = fileInput.files[0];
if (!file) return alert('请选择文件');
addLog('开始上传: ' + file.name);
uploadBtn.disabled = true;
const formData = new FormData();
formData.append('file', file);
try {
const res = await fetch('/upload', { method: 'POST', body: formData });
const data = await res.json();
if (res.ok) {
addLog('✅ 上传成功: ' + data.filename);
fileInput.value = '';
loadList();
} else {
addLog('❌ ' + data.error, true);
}
} catch (err) {
addLog('❌ ' + err.message, true);
} finally {
uploadBtn.disabled = false;
}
});
async function debugWebDAVRoot() {
try {
addLog('正在请求 /debug/webdav-root ...');
const res = await fetch('/debug/webdav-root');
const data = await res.json();
addLog('WebDAV 根目录成员: ' + JSON.stringify(data));
} catch (err) {
addLog('调试请求失败: ' + err.message, true);
}
}
async function debugList() {
try {
addLog('正在请求 /debug/list ...');
const res = await fetch('/debug/list');
const data = await res.json();
addLog('调试列表响应: ' + JSON.stringify(data, null, 2));
} catch (err) {
addLog('调试请求失败: ' + err.message, true);
}
}
loadList();
</script>
</body>
</html>"""
@app.route('/')
def index():
return HTML_CONTENT
@app.route('/upload', methods=['POST'])
def upload():
from huggingface_hub import batch_bucket_files
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
filename = file.filename
with tempfile.NamedTemporaryFile(delete=False) as tmp:
file.save(tmp.name)
try:
batch_bucket_files(
bucket_id=BUCKET_ID,
add=[(tmp.name, filename)],
token=HF_TOKEN
)
print(f"[UPLOAD] Success: {filename}")
except Exception as e:
print(f"[UPLOAD] Error: {e}")
return jsonify({'error': str(e)}), 500
finally:
os.unlink(tmp.name)
return jsonify({'success': True, 'filename': filename})
@app.route('/list')
def list_files():
from huggingface_hub import list_bucket_tree
try:
items = list_bucket_tree(
bucket_id=BUCKET_ID,
recursive=True,
token=HF_TOKEN
)
items_list = list(items)
files = [item.path for item in items_list if item.type == 'file']
return jsonify(files)
except Exception as e:
print(f"[LIST] Error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/file/<path:filename>')
def get_file(filename):
from huggingface_hub import download_bucket_files
try:
with tempfile.TemporaryDirectory() as tmpdir:
local_path = os.path.join(tmpdir, filename)
download_bucket_files(
bucket_id=BUCKET_ID,
files=[(filename, local_path)],
token=HF_TOKEN
)
return send_file(local_path, as_attachment=True, download_name=filename)
except Exception as e:
print(f"[DOWNLOAD] Error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/delete/<path:filename>', methods=['DELETE'])
def delete_file(filename):
from huggingface_hub import batch_bucket_files
try:
batch_bucket_files(
bucket_id=BUCKET_ID,
delete=[filename],
token=HF_TOKEN
)
print(f"[DELETE] Success: {filename}")
return jsonify({'success': True})
except Exception as e:
print(f"[DELETE] Error: {e}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)