|
|
import os |
|
|
import time |
|
|
import shutil |
|
|
import zipfile |
|
|
import gradio as gr |
|
|
from fastapi import FastAPI |
|
|
|
|
|
|
|
|
BASE_UPLOAD_DIR = "bins" |
|
|
EXPIRATION_HOURS = 24 |
|
|
|
|
|
if not os.path.exists(BASE_UPLOAD_DIR): |
|
|
os.makedirs(BASE_UPLOAD_DIR) |
|
|
|
|
|
def clean_old_bins(): |
|
|
now = time.time() |
|
|
cutoff = now - (EXPIRATION_HOURS * 3600) |
|
|
if os.path.exists(BASE_UPLOAD_DIR): |
|
|
for bin_id in os.listdir(BASE_UPLOAD_DIR): |
|
|
bin_path = os.path.join(BASE_UPLOAD_DIR, bin_id) |
|
|
if os.path.getmtime(bin_path) < cutoff: |
|
|
shutil.rmtree(bin_path) |
|
|
|
|
|
|
|
|
def upload_to_bin(bin_id, files, progress=gr.Progress()): |
|
|
clean_old_bins() |
|
|
if not bin_id or not files: return "⚠️ Thiếu mã Bin hoặc File!", None |
|
|
bin_id = bin_id.strip() |
|
|
bin_path = os.path.join(BASE_UPLOAD_DIR, bin_id) |
|
|
if not os.path.exists(bin_path): os.makedirs(bin_path) |
|
|
|
|
|
uploaded_list = [] |
|
|
for i, file in enumerate(files): |
|
|
progress((i + 1) / len(files), desc=f"Đang xử lý {i+1}/{len(files)}...") |
|
|
dest_path = os.path.join(bin_path, os.path.basename(file.name)) |
|
|
shutil.copy(file.name, dest_path) |
|
|
uploaded_list.append(dest_path) |
|
|
os.utime(bin_path, None) |
|
|
return f"✅ Đã tải lên Bin: {bin_id}", uploaded_list |
|
|
|
|
|
def get_bin_files(bin_id): |
|
|
clean_old_bins() |
|
|
if not bin_id: return "⚠️ Nhập mã Bin!", None |
|
|
bin_path = os.path.join(BASE_UPLOAD_DIR, bin_id.strip()) |
|
|
if not os.path.exists(bin_path): return "❌ Không tìm thấy Bin.", None |
|
|
files = [os.path.join(bin_path, f) for f in os.listdir(bin_path) if f != "all_files.zip"] |
|
|
return f"📂 Bin: {bin_id}", files |
|
|
|
|
|
def download_all_zip(bin_id): |
|
|
bin_path = os.path.join(BASE_UPLOAD_DIR, bin_id.strip()) |
|
|
zip_path = os.path.join(bin_path, "all_files.zip") |
|
|
files = [f for f in os.listdir(bin_path) if f != "all_files.zip"] |
|
|
with zipfile.ZipFile(zip_path, 'w') as zipf: |
|
|
for f in files: zipf.write(os.path.join(bin_path, f), f) |
|
|
return zip_path |
|
|
|
|
|
def delete_bin(bin_id): |
|
|
bin_path = os.path.join(BASE_UPLOAD_DIR, bin_id.strip()) |
|
|
if os.path.exists(bin_path): |
|
|
shutil.rmtree(bin_path) |
|
|
return "🗑️ Đã xóa Bin.", None |
|
|
return "❌ Không tìm thấy.", None |
|
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Default()) as demo: |
|
|
gr.Markdown("# 🚀 HF FileBin Pro") |
|
|
bin_id_input = gr.Textbox(label="Mã Bin (Private ID)") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
file_input = gr.File(label="Upload", file_count="multiple") |
|
|
btn_upload = gr.Button("📤 Tải lên", variant="primary") |
|
|
with gr.Column(): |
|
|
status_msg = gr.Textbox(label="Trạng thái") |
|
|
file_output = gr.File(label="Danh sách file") |
|
|
with gr.Row(): |
|
|
btn_access = gr.Button("🔍 Kiểm tra") |
|
|
btn_zip = gr.Button("📦 Tải ZIP") |
|
|
btn_delete = gr.Button("🗑️ Xóa", variant="stop") |
|
|
|
|
|
btn_upload.click(upload_to_bin, [bin_id_input, file_input], [status_msg, file_output]) |
|
|
btn_access.click(get_bin_files, [bin_id_input], [status_msg, file_output]) |
|
|
btn_zip.click(download_all_zip, [bin_id_input], [file_output]) |
|
|
btn_delete.click(delete_bin, [bin_id_input], [status_msg, file_output]) |
|
|
|
|
|
|
|
|
|
|
|
app = demo.app |
|
|
|
|
|
@app.get("/health") |
|
|
def health(): |
|
|
return {"status": "ok"} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|
|