goofish / src /scraper.py
host1syan's picture
Upload 212 files
5378afe verified
import asyncio
import json
import os
import random
from datetime import datetime
from typing import Optional
from urllib.parse import urlencode
from playwright.async_api import (
Response,
TimeoutError as PlaywrightTimeoutError,
async_playwright,
)
from src.ai_handler import (
download_all_images,
get_ai_analysis,
send_ntfy_notification,
cleanup_task_images,
)
from src.config import (
AI_DEBUG_MODE,
API_URL_PATTERN,
DETAIL_API_URL_PATTERN,
LOGIN_IS_EDGE,
RUN_HEADLESS,
RUNNING_IN_DOCKER,
STATE_FILE,
)
from src.parsers import (
_parse_search_results_json,
_parse_user_items_data,
calculate_reputation_from_ratings,
parse_ratings_data,
parse_user_head_data,
)
from src.utils import (
format_registration_days,
get_link_unique_key,
random_sleep,
safe_get,
save_to_jsonl,
log_time,
)
from src.rotation import RotationPool, load_state_files, parse_proxy_pool, RotationItem
class RiskControlError(Exception):
pass
def _as_bool(value, default: bool = False) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "y", "on"}
def _as_int(value, default: int) -> int:
if value is None:
return default
try:
return int(value)
except (TypeError, ValueError):
return default
def _get_rotation_settings(task_config: dict) -> dict:
account_cfg = task_config.get("account_rotation") or {}
proxy_cfg = task_config.get("proxy_rotation") or {}
account_enabled = _as_bool(account_cfg.get("enabled"), _as_bool(os.getenv("ACCOUNT_ROTATION_ENABLED"), False))
account_mode = (account_cfg.get("mode") or os.getenv("ACCOUNT_ROTATION_MODE", "per_task")).lower()
account_state_dir = account_cfg.get("state_dir") or os.getenv("ACCOUNT_STATE_DIR", "state")
account_retry_limit = _as_int(account_cfg.get("retry_limit"), _as_int(os.getenv("ACCOUNT_ROTATION_RETRY_LIMIT"), 2))
account_blacklist_ttl = _as_int(account_cfg.get("blacklist_ttl_sec"), _as_int(os.getenv("ACCOUNT_BLACKLIST_TTL"), 300))
proxy_enabled = _as_bool(proxy_cfg.get("enabled"), _as_bool(os.getenv("PROXY_ROTATION_ENABLED"), False))
proxy_mode = (proxy_cfg.get("mode") or os.getenv("PROXY_ROTATION_MODE", "per_task")).lower()
proxy_pool = proxy_cfg.get("proxy_pool") or os.getenv("PROXY_POOL", "")
proxy_retry_limit = _as_int(proxy_cfg.get("retry_limit"), _as_int(os.getenv("PROXY_ROTATION_RETRY_LIMIT"), 2))
proxy_blacklist_ttl = _as_int(proxy_cfg.get("blacklist_ttl_sec"), _as_int(os.getenv("PROXY_BLACKLIST_TTL"), 300))
return {
"account_enabled": account_enabled,
"account_mode": account_mode,
"account_state_dir": account_state_dir,
"account_retry_limit": max(1, account_retry_limit),
"account_blacklist_ttl": max(0, account_blacklist_ttl),
"proxy_enabled": proxy_enabled,
"proxy_mode": proxy_mode,
"proxy_pool": proxy_pool,
"proxy_retry_limit": max(1, proxy_retry_limit),
"proxy_blacklist_ttl": max(0, proxy_blacklist_ttl),
}
async def scrape_user_profile(context, user_id: str) -> dict:
"""
【新版】访问指定用户的个人主页,按顺序采集其摘要信息、完整的商品列表和完整的评价列表。
"""
print(f" -> 开始采集用户ID: {user_id} 的完整信息...")
profile_data = {}
page = await context.new_page()
# 为各项异步任务准备Future和数据容器
head_api_future = asyncio.get_event_loop().create_future()
all_items, all_ratings = [], []
stop_item_scrolling, stop_rating_scrolling = asyncio.Event(), asyncio.Event()
async def handle_response(response: Response):
# 捕获头部摘要API
if "mtop.idle.web.user.page.head" in response.url and not head_api_future.done():
try:
head_api_future.set_result(await response.json())
print(f" [API捕获] 用户头部信息... 成功")
except Exception as e:
if not head_api_future.done(): head_api_future.set_exception(e)
# 捕获商品列表API
elif "mtop.idle.web.xyh.item.list" in response.url:
try:
data = await response.json()
all_items.extend(data.get('data', {}).get('cardList', []))
print(f" [API捕获] 商品列表... 当前已捕获 {len(all_items)} 件")
if not data.get('data', {}).get('nextPage', True):
stop_item_scrolling.set()
except Exception as e:
stop_item_scrolling.set()
# 捕获评价列表API
elif "mtop.idle.web.trade.rate.list" in response.url:
try:
data = await response.json()
all_ratings.extend(data.get('data', {}).get('cardList', []))
print(f" [API捕获] 评价列表... 当前已捕获 {len(all_ratings)} 条")
if not data.get('data', {}).get('nextPage', True):
stop_rating_scrolling.set()
except Exception as e:
stop_rating_scrolling.set()
page.on("response", handle_response)
try:
# --- 任务1: 导航并采集头部信息 ---
await page.goto(f"https://www.goofish.com/personal?userId={user_id}", wait_until="domcontentloaded", timeout=20000)
head_data = await asyncio.wait_for(head_api_future, timeout=15)
profile_data = await parse_user_head_data(head_data)
# --- 任务2: 滚动加载所有商品 (默认页面) ---
print(" [采集阶段] 开始采集该用户的商品列表...")
await random_sleep(2, 4) # 等待第一页商品API完成
while not stop_item_scrolling.is_set():
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
try:
await asyncio.wait_for(stop_item_scrolling.wait(), timeout=8)
except asyncio.TimeoutError:
print(" [滚动超时] 商品列表可能已加载完毕。")
break
profile_data["卖家发布的商品列表"] = await _parse_user_items_data(all_items)
# --- 任务3: 点击并采集所有评价 ---
print(" [采集阶段] 开始采集该用户的评价列表...")
rating_tab_locator = page.locator("//div[text()='信用及评价']/ancestor::li")
if await rating_tab_locator.count() > 0:
await rating_tab_locator.click()
await random_sleep(3, 5) # 等待第一页评价API完成
while not stop_rating_scrolling.is_set():
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
try:
await asyncio.wait_for(stop_rating_scrolling.wait(), timeout=8)
except asyncio.TimeoutError:
print(" [滚动超时] 评价列表可能已加载完毕。")
break
profile_data['卖家收到的评价列表'] = await parse_ratings_data(all_ratings)
reputation_stats = await calculate_reputation_from_ratings(all_ratings)
profile_data.update(reputation_stats)
else:
print(" [警告] 未找到评价选项卡,跳过评价采集。")
except Exception as e:
print(f" [错误] 采集用户 {user_id} 信息时发生错误: {e}")
finally:
page.remove_listener("response", handle_response)
await page.close()
print(f" -> 用户 {user_id} 信息采集完成。")
return profile_data
async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
"""
【核心执行器】
根据单个任务配置,异步爬取闲鱼商品数据,并对每个新发现的商品进行实时的、独立的AI分析和通知。
"""
keyword = task_config['keyword']
max_pages = task_config.get('max_pages', 1)
personal_only = task_config.get('personal_only', False)
min_price = task_config.get('min_price')
max_price = task_config.get('max_price')
ai_prompt_text = task_config.get('ai_prompt_text', '')
free_shipping = task_config.get('free_shipping', False)
raw_new_publish = task_config.get('new_publish_option') or ''
new_publish_option = raw_new_publish.strip()
if new_publish_option == '__none__':
new_publish_option = ''
region_filter = (task_config.get('region') or '').strip()
processed_links = set()
output_filename = os.path.join("jsonl", f"{keyword.replace(' ', '_')}_full_data.jsonl")
if os.path.exists(output_filename):
print(f"LOG: 发现已存在文件 {output_filename},正在加载历史记录以去重...")
try:
with open(output_filename, 'r', encoding='utf-8') as f:
for line in f:
try:
record = json.loads(line)
link = record.get('商品信息', {}).get('商品链接', '')
if link:
processed_links.add(get_link_unique_key(link))
except json.JSONDecodeError:
print(f" [警告] 文件中有一行无法解析为JSON,已跳过。")
print(f"LOG: 加载完成,已记录 {len(processed_links)} 个已处理过的商品。")
except IOError as e:
print(f" [警告] 读取历史文件时发生错误: {e}")
else:
print(f"LOG: 输出文件 {output_filename} 不存在,将创建新文件。")
rotation_settings = _get_rotation_settings(task_config)
forced_account = task_config.get("account_state_file") or None
if isinstance(forced_account, str) and not forced_account.strip():
forced_account = None
if forced_account:
rotation_settings["account_enabled"] = False
account_items = load_state_files(rotation_settings["account_state_dir"])
if not forced_account and os.path.exists(STATE_FILE):
account_items = [STATE_FILE]
if not forced_account and not os.path.exists(STATE_FILE) and account_items:
rotation_settings["account_enabled"] = True
account_pool = RotationPool(account_items, rotation_settings["account_blacklist_ttl"], "account")
proxy_pool = RotationPool(parse_proxy_pool(rotation_settings["proxy_pool"]), rotation_settings["proxy_blacklist_ttl"], "proxy")
selected_account: Optional[RotationItem] = None
selected_proxy: Optional[RotationItem] = None
def _select_account(force_new: bool = False) -> Optional[RotationItem]:
nonlocal selected_account
if forced_account:
return RotationItem(value=forced_account)
if not rotation_settings["account_enabled"]:
if os.path.exists(STATE_FILE):
return RotationItem(value=STATE_FILE)
return None
if rotation_settings["account_mode"] == "per_task" and selected_account and not force_new:
return selected_account
picked = account_pool.pick_random()
return picked or selected_account
def _select_proxy(force_new: bool = False) -> Optional[RotationItem]:
nonlocal selected_proxy
if not rotation_settings["proxy_enabled"]:
return None
if rotation_settings["proxy_mode"] == "per_task" and selected_proxy and not force_new:
return selected_proxy
picked = proxy_pool.pick_random()
return picked or selected_proxy
async def _run_scrape_attempt(state_file: str, proxy_server: Optional[str]) -> int:
processed_item_count = 0
stop_scraping = False
if not os.path.exists(state_file):
raise FileNotFoundError(f"登录状态文件不存在: {state_file}")
async with async_playwright() as p:
# 反检测启动参数
launch_args = [
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-web-security',
'--disable-features=IsolateOrigins,site-per-process'
]
launch_kwargs = {"headless": RUN_HEADLESS, "args": launch_args}
if proxy_server:
launch_kwargs["proxy"] = {"server": proxy_server}
if LOGIN_IS_EDGE:
launch_kwargs["channel"] = "msedge"
else:
if not RUNNING_IN_DOCKER:
launch_kwargs["channel"] = "chrome"
browser = await p.chromium.launch(**launch_kwargs)
# 使用移动设备模拟(与真实Chrome移动模式一致)
# 基于HAR分析:真实浏览器使用Android移动设备模拟
context = await browser.new_context(
storage_state=state_file,
user_agent="Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Mobile Safari/537.36",
viewport={'width': 412, 'height': 915}, # Pixel 5尺寸
device_scale_factor=2.625,
is_mobile=True,
has_touch=True,
locale='zh-CN',
timezone_id='Asia/Shanghai',
permissions=['geolocation'],
geolocation={'longitude': 121.4737, 'latitude': 31.2304},
color_scheme='light'
)
# 增强反检测脚本(模拟真实移动设备)
await context.add_init_script("""
// 移除webdriver标识
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
// 模拟真实移动设备的navigator属性
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en-US', 'en']});
// 添加chrome对象
window.chrome = {runtime: {}, loadTimes: function() {}, csi: function() {}};
// 模拟触摸支持
Object.defineProperty(navigator, 'maxTouchPoints', {get: () => 5});
// 覆盖permissions查询(避免暴露自动化)
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({state: Notification.permission}) :
originalQuery(parameters)
);
""")
page = await context.new_page()
try:
# 步骤 0 - 模拟真实用户:先访问首页(重要的反检测措施)
log_time("步骤 0 - 模拟真实用户访问首页...")
await page.goto("https://www.goofish.com/", wait_until="domcontentloaded", timeout=30000)
log_time("[反爬] 在首页停留,模拟浏览...")
await random_sleep(1, 2)
# 模拟随机滚动(移动设备的触摸滚动)
await page.evaluate("window.scrollBy(0, Math.random() * 500 + 200)")
await random_sleep(1, 2)
log_time("步骤 1 - 导航到搜索结果页...")
# 使用 'q' 参数构建正确的搜索URL,并进行URL编码
params = {'q': keyword}
search_url = f"https://www.goofish.com/search?{urlencode(params)}"
log_time(f"目标URL: {search_url}")
# 使用 expect_response 在导航的同时捕获初始搜索的API数据
async with page.expect_response(lambda r: API_URL_PATTERN in r.url, timeout=30000) as response_info:
await page.goto(search_url, wait_until="domcontentloaded", timeout=60000)
initial_response = await response_info.value
# 等待页面加载出关键筛选元素,以确认已成功进入搜索结果页
await page.wait_for_selector('text=新发布', timeout=15000)
# 模拟真实用户行为:页面加载后的初始停留和浏览
log_time("[反爬] 模拟用户查看页面...")
await random_sleep(1, 3)
# --- 新增:检查是否存在验证弹窗 ---
baxia_dialog = page.locator("div.baxia-dialog-mask")
middleware_widget = page.locator("div.J_MIDDLEWARE_FRAME_WIDGET")
try:
# 等待弹窗在2秒内出现。如果出现,则执行块内代码。
await baxia_dialog.wait_for(state='visible', timeout=2000)
print("\n==================== CRITICAL BLOCK DETECTED ====================")
print("检测到闲鱼反爬虫验证弹窗 (baxia-dialog),无法继续操作。")
print("这通常是因为操作过于频繁或被识别为机器人。")
print("建议:")
print("1. 停止脚本一段时间再试。")
print("2. (推荐) 在 .env 文件中设置 RUN_HEADLESS=false,以非无头模式运行,这有助于绕过检测。")
print(f"任务 '{keyword}' 将在此处中止。")
print("===================================================================")
raise RiskControlError("baxia-dialog")
except PlaywrightTimeoutError:
# 2秒内弹窗未出现,这是正常情况,继续执行
pass
# 检查是否有J_MIDDLEWARE_FRAME_WIDGET覆盖层
try:
await middleware_widget.wait_for(state='visible', timeout=2000)
print("\n==================== CRITICAL BLOCK DETECTED ====================")
print("检测到闲鱼反爬虫验证弹窗 (J_MIDDLEWARE_FRAME_WIDGET),无法继续操作。")
print("这通常是因为操作过于频繁或被识别为机器人。")
print("建议:")
print("1. 停止脚本一段时间再试。")
print("2. (推荐) 更新登录状态文件,确保登录状态有效。")
print("3. 降低任务执行频率,避免被识别为机器人。")
print(f"任务 '{keyword}' 将在此处中止。")
print("===================================================================")
raise RiskControlError("J_MIDDLEWARE_FRAME_WIDGET")
except PlaywrightTimeoutError:
# 2秒内弹窗未出现,这是正常情况,继续执行
pass
# --- 结束新增 ---
try:
await page.click("div[class*='closeIconBg']", timeout=3000)
print("LOG: 已关闭广告弹窗。")
except PlaywrightTimeoutError:
print("LOG: 未检测到广告弹窗。")
final_response = None
log_time("步骤 2 - 应用筛选条件...")
if new_publish_option:
try:
await page.click('text=新发布')
await random_sleep(1, 2) # 原来是 (1.5, 2.5)
async with page.expect_response(lambda r: API_URL_PATTERN in r.url, timeout=20000) as response_info:
await page.click(f"text={new_publish_option}")
# --- 修改: 增加排序后的等待时间 ---
await random_sleep(2, 4) # 原来是 (3, 5)
final_response = await response_info.value
except PlaywrightTimeoutError:
log_time(f"新发布筛选 '{new_publish_option}' 请求超时,继续执行。")
except Exception as e:
print(f"LOG: 应用新发布筛选失败: {e}")
if personal_only:
async with page.expect_response(lambda r: API_URL_PATTERN in r.url, timeout=20000) as response_info:
await page.click('text=个人闲置')
# --- 修改: 将固定等待改为随机等待,并加长 ---
await random_sleep(2, 4) # 原来是 asyncio.sleep(5)
final_response = await response_info.value
if free_shipping:
try:
async with page.expect_response(lambda r: API_URL_PATTERN in r.url, timeout=20000) as response_info:
await page.click('text=包邮')
await random_sleep(2, 4)
final_response = await response_info.value
except PlaywrightTimeoutError:
log_time("包邮筛选请求超时,继续执行。")
except Exception as e:
print(f"LOG: 应用包邮筛选失败: {e}")
if region_filter:
try:
area_trigger = page.get_by_text("区域", exact=True)
if await area_trigger.count():
await area_trigger.first.click()
await random_sleep(1.5, 2)
popover_candidates = page.locator("div.ant-popover")
popover = popover_candidates.filter(has=page.locator(".areaWrap--FaZHsn8E, [class*='areaWrap']")).last
if not await popover.count():
popover = popover_candidates.filter(has=page.get_by_text("重新定位")).last
if not await popover.count():
popover = popover_candidates.filter(has=page.get_by_text("查看")).last
if not await popover.count():
print("LOG: 未找到区域弹窗,跳过区域筛选。")
raise PlaywrightTimeoutError("region-popover-not-found")
await popover.wait_for(state="visible", timeout=5000)
# 列表容器:第一层 children 即省/市/区三列,不再强依赖具体类名,提升鲁棒性
area_wrap = popover.locator(".areaWrap--FaZHsn8E, [class*='areaWrap']").first
await area_wrap.wait_for(state="visible", timeout=3000)
columns = area_wrap.locator(":scope > div")
col_prov = columns.nth(0)
col_city = columns.nth(1)
col_dist = columns.nth(2)
region_parts = [p.strip() for p in region_filter.split('/') if p.strip()]
async def _click_in_column(column_locator, text_value: str, desc: str) -> None:
option = column_locator.locator(".provItem--QAdOx8nD", has_text=text_value).first
if await option.count():
await option.click()
await random_sleep(1.5, 2)
try:
await option.wait_for(state="attached", timeout=1500)
await option.wait_for(state="visible", timeout=1500)
except PlaywrightTimeoutError:
pass
else:
print(f"LOG: 未找到{desc} '{text_value}',跳过。")
if len(region_parts) >= 1:
await _click_in_column(col_prov, region_parts[0], "省份")
await random_sleep(1, 2)
if len(region_parts) >= 2:
await _click_in_column(col_city, region_parts[1], "城市")
await random_sleep(1, 2)
if len(region_parts) >= 3:
await _click_in_column(col_dist, region_parts[2], "区/县")
await random_sleep(1, 2)
search_btn = popover.locator("div.searchBtn--Ic6RKcAb").first
if await search_btn.count():
try:
async with page.expect_response(lambda r: API_URL_PATTERN in r.url, timeout=20000) as response_info:
await search_btn.click()
await random_sleep(2, 3)
final_response = await response_info.value
except PlaywrightTimeoutError:
log_time("区域筛选提交超时,继续执行。")
else:
print("LOG: 未找到区域弹窗的“查看XX件宝贝”按钮,跳过提交。")
else:
print("LOG: 未找到区域筛选触发器。")
except PlaywrightTimeoutError:
log_time(f"区域筛选 '{region_filter}' 请求超时,继续执行。")
except Exception as e:
print(f"LOG: 应用区域筛选 '{region_filter}' 失败: {e}")
if min_price or max_price:
price_container = page.locator('div[class*="search-price-input-container"]').first
if await price_container.is_visible():
if min_price:
await price_container.get_by_placeholder("¥").first.fill(min_price)
# --- 修改: 将固定等待改为随机等待 ---
await random_sleep(1, 2.5) # 原来是 asyncio.sleep(5)
if max_price:
await price_container.get_by_placeholder("¥").nth(1).fill(max_price)
# --- 修改: 将固定等待改为随机等待 ---
await random_sleep(1, 2.5) # 原来是 asyncio.sleep(5)
async with page.expect_response(lambda r: API_URL_PATTERN in r.url, timeout=20000) as response_info:
await page.keyboard.press('Tab')
# --- 修改: 增加确认价格后的等待时间 ---
await random_sleep(2, 4) # 原来是 asyncio.sleep(5)
final_response = await response_info.value
else:
print("LOG: 警告 - 未找到价格输入容器。")
log_time("所有筛选已完成,开始处理商品列表...")
current_response = final_response if final_response and final_response.ok else initial_response
for page_num in range(1, max_pages + 1):
if stop_scraping:
break
log_time(f"开始处理第 {page_num}/{max_pages} 页 ...")
if page_num > 1:
# 查找未被禁用的“下一页”按钮。闲鱼通过添加 'disabled' 类名来禁用按钮,而不是使用 disabled 属性。
next_btn = page.locator("[class*='search-pagination-arrow-right']:not([class*='disabled'])")
if not await next_btn.count():
log_time("已到达最后一页,未找到可用的‘下一页’按钮,停止翻页。")
break
try:
async with page.expect_response(lambda r: API_URL_PATTERN in r.url, timeout=20000) as response_info:
await next_btn.click()
# --- 修改: 增加翻页后的等待时间 ---
await random_sleep(2, 5) # 原来是 (1.5, 3.5)
current_response = await response_info.value
except PlaywrightTimeoutError:
log_time(f"翻页到第 {page_num} 页超时,停止翻页。")
break
if not (current_response and current_response.ok):
log_time(f"第 {page_num} 页响应无效,跳过。")
continue
basic_items = await _parse_search_results_json(await current_response.json(), f"第 {page_num} 页")
if not basic_items:
break
total_items_on_page = len(basic_items)
for i, item_data in enumerate(basic_items, 1):
if debug_limit > 0 and processed_item_count >= debug_limit:
log_time(f"已达到调试上限 ({debug_limit}),停止获取新商品。")
stop_scraping = True
break
unique_key = get_link_unique_key(item_data["商品链接"])
if unique_key in processed_links:
log_time(f"[页内进度 {i}/{total_items_on_page}] 商品 '{item_data['商品标题'][:20]}...' 已存在,跳过。")
continue
log_time(f"[页内进度 {i}/{total_items_on_page}] 发现新商品,获取详情: {item_data['商品标题'][:30]}...")
# --- 修改: 访问详情页前的等待时间,模拟用户在列表页上看了一会儿 ---
await random_sleep(2, 4) # 原来是 (2, 4)
detail_page = await context.new_page()
try:
async with detail_page.expect_response(lambda r: DETAIL_API_URL_PATTERN in r.url, timeout=25000) as detail_info:
await detail_page.goto(item_data["商品链接"], wait_until="domcontentloaded", timeout=25000)
detail_response = await detail_info.value
if detail_response.ok:
detail_json = await detail_response.json()
ret_string = str(await safe_get(detail_json, 'ret', default=[]))
if "FAIL_SYS_USER_VALIDATE" in ret_string:
print("\n==================== CRITICAL BLOCK DETECTED ====================")
print("检测到闲鱼反爬虫验证 (FAIL_SYS_USER_VALIDATE),程序将终止。")
long_sleep_duration = random.randint(3, 60)
print(f"为避免账户风险,将执行一次长时间休眠 ({long_sleep_duration} 秒) 后再退出...")
await asyncio.sleep(long_sleep_duration)
print("长时间休眠结束,现在将安全退出。")
print("===================================================================")
raise RiskControlError("FAIL_SYS_USER_VALIDATE")
# 解析商品详情数据并更新 item_data
item_do = await safe_get(detail_json, 'data', 'itemDO', default={})
seller_do = await safe_get(detail_json, 'data', 'sellerDO', default={})
reg_days_raw = await safe_get(seller_do, 'userRegDay', default=0)
registration_duration_text = format_registration_days(reg_days_raw)
# --- START: 新增代码块 ---
# 1. 提取卖家的芝麻信用信息
zhima_credit_text = await safe_get(seller_do, 'zhimaLevelInfo', 'levelName')
# 2. 提取该商品的完整图片列表
image_infos = await safe_get(item_do, 'imageInfos', default=[])
if image_infos:
# 使用列表推导式获取所有有效的图片URL
all_image_urls = [img.get('url') for img in image_infos if img.get('url')]
if all_image_urls:
# 用新的字段存储图片列表,替换掉旧的单个链接
item_data['商品图片列表'] = all_image_urls
# (可选) 仍然保留主图链接,以防万一
item_data['商品主图链接'] = all_image_urls[0]
# --- END: 新增代码块 ---
item_data['“想要”人数'] = await safe_get(item_do, 'wantCnt', default=item_data.get('“想要”人数', 'NaN'))
item_data['浏览量'] = await safe_get(item_do, 'browseCnt', default='-')
# ...[此处可添加更多从详情页解析出的商品信息]...
# 调用核心函数采集卖家信息
user_profile_data = {}
user_id = await safe_get(seller_do, 'sellerId')
if user_id:
# 新的、高效的调用方式:
user_profile_data = await scrape_user_profile(context, str(user_id))
else:
print(" [警告] 未能从详情API中获取到卖家ID。")
user_profile_data['卖家芝麻信用'] = zhima_credit_text
user_profile_data['卖家注册时长'] = registration_duration_text
# 构建基础记录
final_record = {
"爬取时间": datetime.now().isoformat(),
"搜索关键字": keyword,
"任务名称": task_config.get('task_name', 'Untitled Task'),
"商品信息": item_data,
"卖家信息": user_profile_data
}
# --- START: Real-time AI Analysis & Notification ---
from src.config import SKIP_AI_ANALYSIS
# 检查是否跳过AI分析并直接发送通知
if SKIP_AI_ANALYSIS:
log_time("环境变量 SKIP_AI_ANALYSIS 已设置,跳过AI分析并直接发送通知...")
# 下载图片
image_urls = item_data.get('商品图片列表', [])
downloaded_image_paths = await download_all_images(item_data['商品ID'], image_urls, task_config.get('task_name', 'default'))
# 删除下载的图片文件,节省空间
for img_path in downloaded_image_paths:
try:
if os.path.exists(img_path):
os.remove(img_path)
print(f" [图片] 已删除临时图片文件: {img_path}")
except Exception as e:
print(f" [图片] 删除图片文件时出错: {e}")
# 直接发送通知,将所有商品标记为推荐
log_time("商品已跳过AI分析,准备发送通知...")
await send_ntfy_notification(item_data, "商品已跳过AI分析,直接通知")
else:
log_time(f"开始对商品 #{item_data['商品ID']} 进行实时AI分析...")
# 1. Download images
image_urls = item_data.get('商品图片列表', [])
downloaded_image_paths = await download_all_images(item_data['商品ID'], image_urls, task_config.get('task_name', 'default'))
# 2. Get AI analysis
ai_analysis_result = None
if ai_prompt_text:
try:
# 注意:这里我们将整个记录传给AI,让它拥有最全的上下文
ai_analysis_result = await get_ai_analysis(final_record, downloaded_image_paths, prompt_text=ai_prompt_text)
if ai_analysis_result:
final_record['ai_analysis'] = ai_analysis_result
log_time(f"AI分析完成。推荐状态: {ai_analysis_result.get('is_recommended')}")
else:
final_record['ai_analysis'] = {'error': 'AI analysis returned None after retries.'}
except Exception as e:
print(f" -> AI分析过程中发生严重错误: {e}")
final_record['ai_analysis'] = {'error': str(e)}
else:
print(" -> 任务未配置AI prompt,跳过分析。")
# 删除下载的图片文件,节省空间
for img_path in downloaded_image_paths:
try:
if os.path.exists(img_path):
os.remove(img_path)
print(f" [图片] 已删除临时图片文件: {img_path}")
except Exception as e:
print(f" [图片] 删除图片文件时出错: {e}")
# 3. Send notification if recommended
if ai_analysis_result and ai_analysis_result.get('is_recommended'):
log_time("商品被AI推荐,准备发送通知...")
await send_ntfy_notification(item_data, ai_analysis_result.get("reason", "无"))
# --- END: Real-time AI Analysis & Notification ---
# 4. 保存包含AI结果的完整记录
await save_to_jsonl(final_record, keyword)
processed_links.add(unique_key)
processed_item_count += 1
log_time(f"商品处理流程完毕。累计处理 {processed_item_count} 个新商品。")
# --- 修改: 增加单个商品处理后的主要延迟 ---
log_time("[反爬] 执行一次主要的随机延迟以模拟用户浏览间隔...")
await random_sleep(5, 10)
else:
print(f" 错误: 获取商品详情API响应失败,状态码: {detail_response.status}")
if AI_DEBUG_MODE:
print(f"--- [DETAIL DEBUG] FAILED RESPONSE from {item_data['商品链接']} ---")
try:
print(await detail_response.text())
except Exception as e:
print(f"无法读取响应内容: {e}")
print("----------------------------------------------------")
except PlaywrightTimeoutError:
print(f" 错误: 访问商品详情页或等待API响应超时。")
except Exception as e:
print(f" 错误: 处理商品详情时发生未知错误: {e}")
finally:
await detail_page.close()
# --- 修改: 增加关闭页面后的短暂整理时间 ---
await random_sleep(2, 4) # 原来是 (1, 2.5)
# --- 新增: 在处理完一页所有商品后,翻页前,增加一个更长的“休息”时间 ---
if not stop_scraping and page_num < max_pages:
print(f"--- 第 {page_num} 页处理完毕,准备翻页。执行一次页面间的长时休息... ---")
await random_sleep(10, 15)
except PlaywrightTimeoutError as e:
print(f"\n操作超时错误: 页面元素或网络响应未在规定时间内出现。\n{e}")
raise
except asyncio.CancelledError:
log_time("收到取消信号,正在终止当前爬虫任务...")
raise
except Exception as e:
if type(e).__name__ == "TargetClosedError":
log_time("浏览器已关闭,忽略后续异常(可能是任务被停止)。")
return processed_item_count
print(f"\n爬取过程中发生未知错误: {e}")
raise
finally:
log_time("任务执行完毕,浏览器将在5秒后自动关闭...")
await asyncio.sleep(5)
if debug_limit:
input("按回车键关闭浏览器...")
await browser.close()
return processed_item_count
processed_item_count = 0
attempt_limit = max(rotation_settings["account_retry_limit"], rotation_settings["proxy_retry_limit"], 1)
last_error = ""
for attempt in range(1, attempt_limit + 1):
if attempt == 1:
selected_account = _select_account()
selected_proxy = _select_proxy()
else:
if rotation_settings["account_enabled"] and rotation_settings["account_mode"] == "on_failure":
account_pool.mark_bad(selected_account, last_error)
selected_account = _select_account(force_new=True)
if rotation_settings["proxy_enabled"] and rotation_settings["proxy_mode"] == "on_failure":
proxy_pool.mark_bad(selected_proxy, last_error)
selected_proxy = _select_proxy(force_new=True)
if rotation_settings["account_enabled"] and not selected_account:
print("未找到可用的登录状态文件,无法继续执行任务。")
break
if not rotation_settings["account_enabled"] and not selected_account:
print("未找到可用的登录状态文件,无法继续执行任务。")
break
if rotation_settings["proxy_enabled"] and not selected_proxy:
print("未找到可用的代理地址,无法继续执行任务。")
break
state_path = selected_account.value if selected_account else STATE_FILE
proxy_server = selected_proxy.value if selected_proxy else None
if rotation_settings["account_enabled"]:
print(f"账号轮换:使用登录状态 {state_path}")
if rotation_settings["proxy_enabled"] and proxy_server:
print(f"IP 轮换:使用代理 {proxy_server}")
try:
processed_item_count += await _run_scrape_attempt(state_path, proxy_server)
break
except RiskControlError as e:
last_error = str(e)
print(f"检测到风控或验证触发: {e}")
if attempt < attempt_limit:
print("将尝试轮换账号/IP 后重试...")
except Exception as e:
last_error = f"{type(e).__name__}: {e}"
print(f"本次尝试失败: {last_error}")
if attempt < attempt_limit:
print("将尝试轮换账号/IP 后重试...")
# 清理任务图片目录
cleanup_task_images(task_config.get('task_name', 'default'))
return processed_item_count