from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse
import os, shutil, glob
import tempfile
import re
import zipfile
from main import Run, fetch_api_endpoints_from_server, get_book_info, get_headers
import queue, threading
from fastapi.staticfiles import StaticFiles
# 全局下载队列和状态
TASK_QUEUE = queue.Queue()
STATUS = {}
FILE_PATHS = {}
NAMES = {}
TASK_DETAILS = {} # 存储任务详细信息,包括时间戳等
# 后台下载任务处理线程
def process_queue():
from datetime import datetime
while True:
try:
book_id = TASK_QUEUE.get(timeout=1) # 添加超时避免无限等待
if book_id in STATUS and STATUS[book_id] != "queued":
continue # 跳过已经处理过的任务
# 记录开始时间
start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if book_id in TASK_DETAILS:
TASK_DETAILS[book_id]["start_time"] = start_time
STATUS[book_id] = "in-progress"
save_path = os.path.join(DOWNLOAD_ROOT, book_id)
os.makedirs(save_path, exist_ok=True)
try:
fetch_api_endpoints_from_server()
Run(book_id, save_path)
name, _, _ = get_book_info(book_id, get_headers())
if not name:
name = f"小说_{book_id}"
filename = f"{name}.txt"
path = os.path.join(save_path, filename)
# 记录完成时间
complete_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if os.path.exists(path):
FILE_PATHS[book_id] = f"/files/{book_id}/{filename}"
NAMES[book_id] = name
STATUS[book_id] = "done"
if book_id in TASK_DETAILS:
TASK_DETAILS[book_id]["complete_time"] = complete_time
TASK_DETAILS[book_id]["name"] = name
print(f"下载完成: {name}")
else:
STATUS[book_id] = "error"
if book_id in TASK_DETAILS:
TASK_DETAILS[book_id]["complete_time"] = complete_time
print(f"下载失败: 文件不存在 - {book_id}")
except Exception as e:
STATUS[book_id] = "error"
complete_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if book_id in TASK_DETAILS:
TASK_DETAILS[book_id]["complete_time"] = complete_time
print(f"下载异常: {book_id} - {str(e)}")
except:
# 队列为空时的超时,继续循环
continue
app = FastAPI(
title="番茄小说下载器API",
description="通过 /download?book_id=xxx 获取小说文本文件",
)
# 挂载本地下载目录并创建文件夹 (使用临时目录)
DOWNLOAD_ROOT = os.path.join(tempfile.gettempdir(), "tomato_downloads")
os.makedirs(DOWNLOAD_ROOT, exist_ok=True)
app.mount("/files", StaticFiles(directory=DOWNLOAD_ROOT), name="files")
# 启动后台下载处理线程
@app.on_event("startup")
def startup_event():
threading.Thread(target=process_queue, daemon=True).start()
@app.get("/", response_class=HTMLResponse)
def root():
return """
🍅 番茄小说下载器
"""
@app.get("/enqueue")
def enqueue(book_id: str):
"""添加到下载队列"""
from datetime import datetime
# 如果已有任务在队列或处理中
if STATUS.get(book_id) in ("queued", "in-progress"):
return {"message": "已在队列", "status": STATUS[book_id]}
# 获取书籍信息并设置名称
try:
name, _, _ = get_book_info(book_id, get_headers())
if name:
NAMES[book_id] = name
except:
NAMES[book_id] = f"小说_{book_id}"
# 记录任务详细信息
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
TASK_DETAILS[book_id] = {
"add_time": current_time,
"start_time": None,
"complete_time": None,
"name": NAMES[book_id]
}
TASK_QUEUE.put(book_id)
STATUS[book_id] = "queued"
return {"message": "已添加到下载队列", "status": "queued"}
@app.get("/status")
def task_status(book_id: str):
"""查询下载任务状态"""
status = STATUS.get(book_id)
if not status:
raise HTTPException(status_code=404, detail="任务不存在")
data = {"book_id": book_id, "status": status}
if status == "done":
data["name"] = NAMES[book_id]
data["url"] = FILE_PATHS[book_id]
return data
@app.get("/tasks")
def tasks():
"""获取所有任务"""
task_list = []
for book_id, status in STATUS.items():
task_detail = TASK_DETAILS.get(book_id, {})
task = {
"book_id": book_id,
"name": NAMES.get(book_id, book_id),
"status": status,
"add_time": task_detail.get("add_time", "未知"),
"start_time": task_detail.get("start_time"),
"complete_time": task_detail.get("complete_time")
}
task_list.append(task)
# 按添加时间排序,最新的在前面
task_list.sort(key=lambda x: x["add_time"], reverse=True)
return task_list
@app.get("/search")
def search(name: str = ""):
"""按名称搜索已下载小说"""
results = []
for bid, n in NAMES.items():
if name.lower() in n.lower():
results.append({"book_id": bid, "name": n, "url": FILE_PATHS.get(bid)})
return results
@app.get("/download")
def download(book_ids: str):
# 解析多个小说ID,使用逗号、分号或空白分隔
ids = [bid.strip() for bid in re.split(r'[\\s,;]+', book_ids) if bid.strip()]
if not ids:
raise HTTPException(status_code=400, detail="请提供至少一个 book_id")
# 在临时目录创建下载文件夹
save_path = tempfile.mkdtemp(prefix="tomato_")
try:
# 调用下载核心 - 支持批量ID
for bid in ids:
try:
# 更新状态为下载中
STATUS[bid] = "in-progress"
# 获取书籍信息
name, _, _ = get_book_info(bid, get_headers())
if not name:
name = f"小说_{bid}"
NAMES[bid] = name
# 执行下载
fetch_api_endpoints_from_server()
Run(bid, save_path)
# 检查文件是否生成
expected_filename = f"{name}.txt"
expected_path = os.path.join(save_path, expected_filename)
if os.path.exists(expected_path):
# 复制到共享目录
dest_dir = os.path.join(DOWNLOAD_ROOT, bid)
os.makedirs(dest_dir, exist_ok=True)
dst = os.path.join(dest_dir, expected_filename)
shutil.copy(expected_path, dst)
# 更新状态
FILE_PATHS[bid] = f"/files/{bid}/{expected_filename}"
STATUS[bid] = "done"
else:
STATUS[bid] = "error"
raise Exception(f"文件未生成: {expected_filename}")
except Exception as e:
STATUS[bid] = "error"
print(f"下载 {bid} 失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"下载 {bid} 失败: {str(e)}")
# 收集生成的 txt 文件
txt_files = glob.glob(os.path.join(save_path, "*.txt"))
if not txt_files:
raise HTTPException(status_code=404, detail="未生成txt文件,下载失败")
# 如果只有一个文件,直接返回
if len(txt_files) == 1:
file_path = txt_files[0]
filename = os.path.basename(file_path)
return FileResponse(
path=file_path,
filename=filename,
media_type="text/plain; charset=utf-8",
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{filename}"}
)
# 多文件时打包成 ZIP 并返回
zip_path = os.path.join(save_path, "novels.zip")
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for fpath in txt_files:
zf.write(fpath, arcname=os.path.basename(fpath))
return FileResponse(
path=zip_path,
filename="novels.zip",
media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=novels.zip"}
)
except Exception as e:
# 清理临时文件
try:
shutil.rmtree(save_path)
except:
pass
raise