from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse import os, shutil, glob import tempfile import re import zipfile from main import Run, fetch_api_endpoints_from_server, get_book_info, get_headers import queue, threading from fastapi.staticfiles import StaticFiles # 全局下载队列和状态 TASK_QUEUE = queue.Queue() STATUS = {} FILE_PATHS = {} NAMES = {} TASK_DETAILS = {} # 存储任务详细信息,包括时间戳等 # 后台下载任务处理线程 def process_queue(): from datetime import datetime while True: try: book_id = TASK_QUEUE.get(timeout=1) # 添加超时避免无限等待 if book_id in STATUS and STATUS[book_id] != "queued": continue # 跳过已经处理过的任务 # 记录开始时间 start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if book_id in TASK_DETAILS: TASK_DETAILS[book_id]["start_time"] = start_time STATUS[book_id] = "in-progress" save_path = os.path.join(DOWNLOAD_ROOT, book_id) os.makedirs(save_path, exist_ok=True) try: fetch_api_endpoints_from_server() Run(book_id, save_path) name, _, _ = get_book_info(book_id, get_headers()) if not name: name = f"小说_{book_id}" filename = f"{name}.txt" path = os.path.join(save_path, filename) # 记录完成时间 complete_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if os.path.exists(path): FILE_PATHS[book_id] = f"/files/{book_id}/{filename}" NAMES[book_id] = name STATUS[book_id] = "done" if book_id in TASK_DETAILS: TASK_DETAILS[book_id]["complete_time"] = complete_time TASK_DETAILS[book_id]["name"] = name print(f"下载完成: {name}") else: STATUS[book_id] = "error" if book_id in TASK_DETAILS: TASK_DETAILS[book_id]["complete_time"] = complete_time print(f"下载失败: 文件不存在 - {book_id}") except Exception as e: STATUS[book_id] = "error" complete_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if book_id in TASK_DETAILS: TASK_DETAILS[book_id]["complete_time"] = complete_time print(f"下载异常: {book_id} - {str(e)}") except: # 队列为空时的超时,继续循环 continue app = FastAPI( title="番茄小说下载器API", description="通过 /download?book_id=xxx 获取小说文本文件", ) # 挂载本地下载目录并创建文件夹 (使用临时目录) DOWNLOAD_ROOT = os.path.join(tempfile.gettempdir(), "tomato_downloads") os.makedirs(DOWNLOAD_ROOT, exist_ok=True) app.mount("/files", StaticFiles(directory=DOWNLOAD_ROOT), name="files") # 启动后台下载处理线程 @app.on_event("startup") def startup_event(): threading.Thread(target=process_queue, daemon=True).start() @app.get("/", response_class=HTMLResponse) def root(): return """ 🍅 番茄小说下载器

番茄小说下载器

高效、便捷的小说下载工具

开始下载
0%
任务管理中心
暂无任务
""" @app.get("/enqueue") def enqueue(book_id: str): """添加到下载队列""" from datetime import datetime # 如果已有任务在队列或处理中 if STATUS.get(book_id) in ("queued", "in-progress"): return {"message": "已在队列", "status": STATUS[book_id]} # 获取书籍信息并设置名称 try: name, _, _ = get_book_info(book_id, get_headers()) if name: NAMES[book_id] = name except: NAMES[book_id] = f"小说_{book_id}" # 记录任务详细信息 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") TASK_DETAILS[book_id] = { "add_time": current_time, "start_time": None, "complete_time": None, "name": NAMES[book_id] } TASK_QUEUE.put(book_id) STATUS[book_id] = "queued" return {"message": "已添加到下载队列", "status": "queued"} @app.get("/status") def task_status(book_id: str): """查询下载任务状态""" status = STATUS.get(book_id) if not status: raise HTTPException(status_code=404, detail="任务不存在") data = {"book_id": book_id, "status": status} if status == "done": data["name"] = NAMES[book_id] data["url"] = FILE_PATHS[book_id] return data @app.get("/tasks") def tasks(): """获取所有任务""" task_list = [] for book_id, status in STATUS.items(): task_detail = TASK_DETAILS.get(book_id, {}) task = { "book_id": book_id, "name": NAMES.get(book_id, book_id), "status": status, "add_time": task_detail.get("add_time", "未知"), "start_time": task_detail.get("start_time"), "complete_time": task_detail.get("complete_time") } task_list.append(task) # 按添加时间排序,最新的在前面 task_list.sort(key=lambda x: x["add_time"], reverse=True) return task_list @app.get("/search") def search(name: str = ""): """按名称搜索已下载小说""" results = [] for bid, n in NAMES.items(): if name.lower() in n.lower(): results.append({"book_id": bid, "name": n, "url": FILE_PATHS.get(bid)}) return results @app.get("/download") def download(book_ids: str): # 解析多个小说ID,使用逗号、分号或空白分隔 ids = [bid.strip() for bid in re.split(r'[\\s,;]+', book_ids) if bid.strip()] if not ids: raise HTTPException(status_code=400, detail="请提供至少一个 book_id") # 在临时目录创建下载文件夹 save_path = tempfile.mkdtemp(prefix="tomato_") try: # 调用下载核心 - 支持批量ID for bid in ids: try: # 更新状态为下载中 STATUS[bid] = "in-progress" # 获取书籍信息 name, _, _ = get_book_info(bid, get_headers()) if not name: name = f"小说_{bid}" NAMES[bid] = name # 执行下载 fetch_api_endpoints_from_server() Run(bid, save_path) # 检查文件是否生成 expected_filename = f"{name}.txt" expected_path = os.path.join(save_path, expected_filename) if os.path.exists(expected_path): # 复制到共享目录 dest_dir = os.path.join(DOWNLOAD_ROOT, bid) os.makedirs(dest_dir, exist_ok=True) dst = os.path.join(dest_dir, expected_filename) shutil.copy(expected_path, dst) # 更新状态 FILE_PATHS[bid] = f"/files/{bid}/{expected_filename}" STATUS[bid] = "done" else: STATUS[bid] = "error" raise Exception(f"文件未生成: {expected_filename}") except Exception as e: STATUS[bid] = "error" print(f"下载 {bid} 失败: {str(e)}") raise HTTPException(status_code=500, detail=f"下载 {bid} 失败: {str(e)}") # 收集生成的 txt 文件 txt_files = glob.glob(os.path.join(save_path, "*.txt")) if not txt_files: raise HTTPException(status_code=404, detail="未生成txt文件,下载失败") # 如果只有一个文件,直接返回 if len(txt_files) == 1: file_path = txt_files[0] filename = os.path.basename(file_path) return FileResponse( path=file_path, filename=filename, media_type="text/plain; charset=utf-8", headers={"Content-Disposition": f"attachment; filename*=UTF-8''{filename}"} ) # 多文件时打包成 ZIP 并返回 zip_path = os.path.join(save_path, "novels.zip") with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for fpath in txt_files: zf.write(fpath, arcname=os.path.basename(fpath)) return FileResponse( path=zip_path, filename="novels.zip", media_type="application/zip", headers={"Content-Disposition": "attachment; filename=novels.zip"} ) except Exception as e: # 清理临时文件 try: shutil.rmtree(save_path) except: pass raise