Nanny7's picture
已在
a4b072a
import time
import requests
import bs4
import re
import os
import random
import json
import urllib3
import threading
import signal
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from collections import OrderedDict
from fake_useragent import UserAgent
from typing import Optional, Dict
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Random import get_random_bytes
import base64
import gzip
from urllib.parse import urlencode
# 禁用SSL证书验证警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
requests.packages.urllib3.disable_warnings()
# 全局配置
CONFIG = {
"max_workers": 4,
"max_retries": 3,
"request_timeout": 15,
"status_file": "chapter.json",
"request_rate_limit": 0.4,
"auth_token": "wcnmd91jb",
"server_url": "https://dlbkltos.s7123.xyz:5080/api/sources",
"api_endpoints": [],
"batch_config": {
"name": "qyuing",
"base_url": None,
"batch_endpoint": None,
"token": None,
"max_batch_size": 290,
"timeout": 10,
"enabled": True
}
}
def make_request(url, headers=None, params=None, data=None, method='GET', verify=False, timeout=None):
"""通用的请求函数"""
if headers is None:
headers = get_headers()
try:
request_params = {
'headers': headers,
'params': params,
'verify': verify,
'timeout': timeout if timeout is not None else CONFIG["request_timeout"]
}
if data:
request_params['data'] = data
session = requests.Session()
if method.upper() == 'GET':
response = session.get(url, **request_params)
elif method.upper() == 'POST':
response = session.post(url, **request_params)
else:
raise ValueError(f"不支持的HTTP方法: {method}")
return response
except Exception as e:
print(f"请求失败: {str(e)}")
raise
def get_headers() -> Dict[str, str]:
"""生成随机请求头"""
browsers = ['chrome', 'edge']
browser = random.choice(browsers)
if browser == 'chrome':
user_agent = UserAgent().chrome
else:
user_agent = UserAgent().edge
return {
"User-Agent": user_agent,
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"Referer": "https://fanqienovel.com/",
"X-Requested-With": "XMLHttpRequest",
}
def fetch_api_endpoints_from_server():
"""从服务器获取API列表"""
try:
headers = get_headers()
headers["X-Auth-Token"] = CONFIG["auth_token"]
response = requests.get(
CONFIG["server_url"],
headers=headers,
timeout=10,
verify=False
)
if response.status_code == 200:
data = response.json()
sources = data.get("sources", [])
CONFIG["api_endpoints"] = []
for source in sources:
if source["enabled"]:
# 配置批量下载
if source["name"] == CONFIG["batch_config"]["name"]:
base_url = source["single_url"].split('?')[0]
batch_endpoint = base_url.split('/')[-1]
base_url = base_url.rsplit('/', 1)[0] if '/' in base_url else base_url
CONFIG["batch_config"]["base_url"] = base_url
CONFIG["batch_config"]["batch_endpoint"] = f"/{batch_endpoint}"
CONFIG["batch_config"]["token"] = source.get("token", "")
CONFIG["batch_config"]["enabled"] = True
else:
# 单章下载
CONFIG["api_endpoints"].append({
"url": source["single_url"],
"name": source["name"]
})
print("成功从服务器获取API列表!")
return True
else:
print(f"获取API列表失败,状态码: {response.status_code}")
except Exception as e:
print(f"获取API列表异常: {str(e)}")
def extract_chapters(soup):
"""解析章节列表"""
chapters = []
for idx, item in enumerate(soup.select('div.chapter-item')):
a_tag = item.find('a')
if not a_tag:
continue
raw_title = a_tag.get_text(strip=True)
# 特殊章节
if re.match(r'^(番外|特别篇|if线)\s*', raw_title):
final_title = raw_title
else:
clean_title = re.sub(
r'^第[一二三四五六七八九十百千\d]+章\s*',
'',
raw_title
).strip()
final_title = f"第{idx+1}{clean_title}"
chapters.append({
"id": a_tag['href'].split('/')[-1],
"title": final_title,
"url": f"https://fanqienovel.com{a_tag['href']}",
"index": idx
})
return chapters
def batch_download_chapters(item_ids, headers):
"""批量下载章节内容"""
if not CONFIG["batch_config"]["enabled"]:
print("批量下载功能未启用")
return None
batch_config = CONFIG["batch_config"]
url = f"{batch_config['base_url']}{batch_config['batch_endpoint']}"
try:
batch_headers = headers.copy()
if batch_config["token"]:
batch_headers["token"] = batch_config["token"]
batch_headers["Content-Type"] = "application/json"
payload = {"item_ids": item_ids}
response = make_request(
url,
headers=batch_headers,
method='POST',
data=json.dumps(payload),
timeout=batch_config["timeout"],
verify=False
)
if response.status_code == 200:
data = response.json()
if isinstance(data, dict) and "data" in data:
return data["data"]
return data
else:
print(f"批量下载失败,状态码: {response.status_code}")
return None
except Exception as e:
print(f"批量下载异常!")
return None
def process_chapter_content(content):
"""处理章节内容"""
if not content or not isinstance(content, str):
return ""
try:
# 移除HTML标签
content = re.sub(r'<header>.*?</header>', '', content, flags=re.DOTALL)
content = re.sub(r'<footer>.*?</footer>', '', content, flags=re.DOTALL)
content = re.sub(r'</?article>', '', content)
content = re.sub(r'<p[^>]*>', '\n ', content)
content = re.sub(r'</p>', '', content)
content = re.sub(r'<[^>]+>', '', content)
content = re.sub(r'\\u003c|\\u003e', '', content)
# 格式化段落
content = re.sub(r'\n{3,}', '\n\n', content).strip()
lines = [line.strip() for line in content.split('\n') if line.strip()]
return '\n'.join([' ' + line for line in lines])
except Exception as e:
print(f"内容处理错误: {str(e)}")
return str(content)
def down_text(chapter_id, headers, book_id=None):
"""下载章节内容"""
content = ""
chapter_title = ""
# 初始化API端点状态
if not hasattr(down_text, "api_status"):
down_text.api_status = {endpoint["url"]: {
"last_response_time": float('inf'),
"error_count": 0,
"last_try_time": 0
} for endpoint in CONFIG["api_endpoints"]}
# 顺序尝试API
for endpoint in CONFIG["api_endpoints"]:
current_endpoint = endpoint["url"]
api_name = endpoint["name"]
down_text.api_status[endpoint["url"]]["last_try_time"] = time.time()
try:
# 随机延迟
time.sleep(random.uniform(0.1, 0.5))
start_time = time.time()
# 处理不同的API请求方式
if api_name == "fanqie_sdk":
params = endpoint.get("params", {})
data = endpoint.get("data", {})
data["item_id"] = chapter_id
response = make_request(
current_endpoint,
headers=headers.copy(),
params=params,
method='POST',
data=json.dumps(data),
timeout=CONFIG["request_timeout"],
verify=False
)
else:
current_endpoint = endpoint["url"].format(chapter_id=chapter_id)
response = make_request(
current_endpoint,
headers=headers.copy(),
timeout=CONFIG["request_timeout"],
verify=False
)
response_time = time.time() - start_time
down_text.api_status[endpoint["url"]].update({
"last_response_time": response_time,
"error_count": max(0, down_text.api_status[endpoint["url"]]["error_count"] - 1)
})
data = response.json()
# 处理不同的API响应格式
if api_name == "fanqie_sdk":
content = data.get("data", {}).get("content", "")
chapter_title = data.get("data", {}).get("title", "")
if content:
processed_content = process_chapter_content(content)
return chapter_title, processed_content
elif api_name == "lsjk" and content:
paragraphs = re.findall(r'<p idx="\d+">(.*?)</p>', content)
cleaned_content = "\n".join(p.strip() for p in paragraphs if p.strip())
formatted_content = '\n'.join(' ' + line if line.strip() else line
for line in cleaned_content.split('\n'))
return chapter_title, formatted_content
elif api_name == "qyuing" and data.get("code") == 0 and content:
processed_content = process_chapter_content(content)
return chapter_title, processed_content
print(f"API返回空内容,继续尝试下一个API...")
down_text.api_status[endpoint["url"]]["error_count"] += 1
except Exception as e:
print(f"API请求失败!")
down_text.api_status[endpoint["url"]]["error_count"] += 1
time.sleep(3)
print(f"所有API尝试失败,无法下载章节 {chapter_id}")
return None, None
def get_chapters_from_api(book_id, headers):
"""从API获取章节列表"""
try:
# 获取章节列表
page_url = f'https://fanqienovel.com/page/{book_id}'
response = requests.get(page_url, headers=headers, timeout=CONFIG["request_timeout"])
soup = bs4.BeautifulSoup(response.text, 'html.parser')
chapters = extract_chapters(soup)
# 获取章节ID顺序
api_url = f"https://fanqienovel.com/api/reader/directory/detail?bookId={book_id}"
api_response = requests.get(api_url, headers=headers, timeout=CONFIG["request_timeout"])
api_data = api_response.json()
chapter_ids = api_data.get("data", {}).get("allItemIds", [])
# 合并数据
final_chapters = []
for idx, chapter_id in enumerate(chapter_ids):
# 查找网页解析的对应章节
web_chapter = next((ch for ch in chapters if ch["id"] == chapter_id), None)
if web_chapter:
final_chapters.append({
"id": chapter_id,
"title": web_chapter["title"],
"index": idx
})
else:
final_chapters.append({
"id": chapter_id,
"title": f"第{idx+1}章",
"index": idx
})
return final_chapters
except Exception as e:
print(f"获取章节列表失败: {str(e)}")
return None
def download_chapter(chapter, headers, save_path, book_name, downloaded, book_id):
"""下载单个章节"""
if chapter["id"] in downloaded:
return None
# 获取内容
_, content = down_text(chapter["id"], headers, book_id)
if content:
output_file_path = os.path.join(save_path, f"{book_name}.txt")
try:
with open(output_file_path, 'a', encoding='utf-8') as f:
f.write(f'{chapter["title"]}\n')
f.write(content + '\n\n')
downloaded.add(chapter["id"])
save_status(save_path, downloaded)
return chapter["index"], content
except Exception as e:
print(f"写入文件失败: {str(e)}")
return None
def get_book_info(book_id, headers):
"""获取书名、作者、简介"""
url = f'https://fanqienovel.com/page/{book_id}'
try:
response = requests.get(url, headers=headers, timeout=CONFIG["request_timeout"])
if response.status_code != 200:
print(f"网络请求失败,状态码: {response.status_code}")
return None, None, None
soup = bs4.BeautifulSoup(response.text, 'html.parser')
# 获取书名
name_element = soup.find('h1')
name = name_element.text if name_element else "未知书名"
# 获取作者
author_name = "未知作者"
author_name_element = soup.find('div', class_='author-name')
if author_name_element:
author_name_span = author_name_element.find('span', class_='author-name-text')
if author_name_span:
author_name = author_name_span.text
# 获取简介
description = "无简介"
description_element = soup.find('div', class_='page-abstract-content')
if description_element:
description_p = description_element.find('p')
if description_p:
description = description_p.text
return name, author_name, description
except Exception as e:
print(f"获取书籍信息失败: {str(e)}")
return None, None, None
def load_status(save_path):
"""加载下载状态"""
status_file = os.path.join(save_path, CONFIG["status_file"])
if os.path.exists(status_file):
try:
with open(status_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
return set(data)
return set()
except:
pass
return set()
def save_status(save_path, downloaded):
"""保存下载状态"""
status_file = os.path.join(save_path, CONFIG["status_file"])
with open(status_file, 'w', encoding='utf-8') as f:
json.dump(list(downloaded), f, ensure_ascii=False, indent=2)
def Run(book_id, save_path):
"""运行下载"""
def signal_handler(sig, frame):
print("\n检测到程序中断,正在保存已下载内容...")
write_downloaded_chapters_in_order()
save_status(save_path, downloaded)
print(f"已保存 {len(downloaded)} 个章节的进度")
sys.exit(0)
def write_downloaded_chapters_in_order():
"""按章节顺序写入"""
if not chapter_results:
return
with open(output_file_path, 'w', encoding='utf-8') as f:
f.write(f"小说名: {name}\n作者: {author_name}\n内容简介: {description}\n\n")
for idx in range(len(chapters)):
if idx in chapter_results:
result = chapter_results[idx]
title = f'{result["base_title"]} {result["api_title"]}' if result["api_title"] else result["base_title"]
f.write(f"{title}\n{result['content']}\n\n")
# 信号处理,仅在主线程中注册
if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGINT, signal_handler)
# 加载 API 端点,支持批量下载
fetch_api_endpoints_from_server()
try:
headers = get_headers()
chapters = get_chapters_from_api(book_id, headers)
if not chapters:
print("未找到任何章节,请检查小说ID是否正确。")
return
name, author_name, description = get_book_info(book_id, headers)
if not name:
name = f"未知小说_{book_id}"
author_name = "未知作者"
description = "无简介"
downloaded = load_status(save_path)
if downloaded:
print(f"检测到您曾经下载过小说《{name}》,继续下载缺失章节。")
todo_chapters = [ch for ch in chapters if ch["id"] not in downloaded]
if not todo_chapters:
print("所有章节已是最新,无需下载")
return
print(f"开始下载:《{name}》, 总章节数: {len(chapters)}, 待下载: {len(todo_chapters)}")
os.makedirs(save_path, exist_ok=True)
output_file_path = os.path.join(save_path, f"{name}.txt")
if not os.path.exists(output_file_path):
with open(output_file_path, 'w', encoding='utf-8') as f:
f.write(f"小说名: {name}\n作者: {author_name}\n内容简介: {description}\n\n")
success_count = 0
failed_chapters = []
chapter_results = {}
lock = threading.Lock()
# 批量下载
if CONFIG["batch_config"]["enabled"]:
print("启用批量下载模式...")
batch_size = CONFIG["batch_config"]["max_batch_size"]
with tqdm(total=len(todo_chapters), desc="批量下载进度") as pbar:
for i in range(0, len(todo_chapters), batch_size):
batch = todo_chapters[i:i + batch_size]
item_ids = [chap["id"] for chap in batch]
batch_results = batch_download_chapters(item_ids, headers)
if not batch_results:
print(f"第 {i//batch_size + 1} 批下载失败")
failed_chapters.extend(batch)
pbar.update(len(batch))
continue
for chap in batch:
content = batch_results.get(chap["id"], "")
if isinstance(content, dict):
content = content.get("content", "")
if content:
processed = process_chapter_content(content)
with lock:
chapter_results[chap["index"]] = {
"base_title": chap["title"],
"api_title": "",
"content": processed
}
downloaded.add(chap["id"])
success_count += 1
else:
with lock:
failed_chapters.append(chap)
pbar.update(1)
todo_chapters = failed_chapters.copy()
failed_chapters = []
write_downloaded_chapters_in_order()
save_status(save_path, downloaded)
# 如果err单章下载
if todo_chapters:
print(f"开始单章下载模式,剩余 {len(todo_chapters)} 个章节...")
def download_task(chapter):
nonlocal success_count
try:
title, content = down_text(chapter["id"], headers, book_id)
if content:
with lock:
chapter_results[chapter["index"]] = {
"base_title": chapter["title"],
"api_title": title,
"content": content
}
downloaded.add(chapter["id"])
success_count += 1
else:
with lock:
failed_chapters.append(chapter)
except Exception as e:
print(f"章节 {chapter['id']} 下载失败: {str(e)}")
with lock:
failed_chapters.append(chapter)
attempt = 1
while todo_chapters:
print(f"\n第 {attempt} 次尝试,剩余 {len(todo_chapters)} 个章节...")
attempt += 1
with ThreadPoolExecutor(max_workers=CONFIG["max_workers"]) as executor:
futures = [executor.submit(download_task, ch) for ch in todo_chapters]
with tqdm(total=len(todo_chapters), desc="单章下载进度") as pbar:
for _ in as_completed(futures):
pbar.update(1)
write_downloaded_chapters_in_order()
save_status(save_path, downloaded)
todo_chapters = failed_chapters.copy()
failed_chapters = []
if todo_chapters:
time.sleep(1)
print(f"下载完成!成功下载 {success_count} 个章节")
return output_file_path
except Exception as e:
print(f"运行错误: {str(e)}")
if 'downloaded' in locals():
write_downloaded_chapters_in_order()
save_status(save_path, downloaded)
def main():
print("""欢迎使用番茄小说下载器精简版!
开发者:Dlmily
当前版本:v1.7.3
Github:https://github.com/Dlmily/Tomato-Novel-Downloader-Lite
赞助/了解新产品:https://afdian.com/a/dlbaokanluntanos
*使用前须知*:
开始下载之后,您可能会过于着急而查看下载文件的位置,这是徒劳的,请耐心等待小说下载完成再查看!另外如果你要下载之前已经下载过的小说(在此之前已经删除了原txt文件),那么你有可能会遇到"所有章节已是最新,无需下载"的情况,这时就请删除掉chapter.json,然后再次运行程序。
另:如果有带番茄svip的cookie或api,按照您的意愿投到“Issues”页中。
------------------------------------------""")
print("正在从服务器获取API列表...")
fetch_api_endpoints_from_server()
while True:
book_id = input("请输入小说ID(输入q退出):").strip()
if book_id.lower() == 'q':
break
save_path = input("保存路径(留空为当前目录):").strip() or os.getcwd()
try:
Run(book_id, save_path)
except Exception as e:
print(f"运行错误: {str(e)}")
print("\n" + "="*50 + "\n")
if __name__ == "__main__":
main()