dyspark / core /tasks.py
cacode's picture
Upload 33 files
d10e42a verified
import asyncio
import traceback
import logging
from utils.logger import setup_logger
from utils.config import get_config, get_userData, reload_config, reload_userData
from core.msg_builder import build_message
from core.browser import get_browser
complates = {}
logger = setup_logger(level=logging.DEBUG)
async def retry_operation(name, operation, retries=3, delay=2, *args, **kwargs):
"""
通用的重试逻辑
:param name: 操作名称(用于日志记录)
:param operation: 要执行的异步操作
:param retries: 最大重试次数
:param delay: 每次重试之间的延迟(秒)
:param args: 传递给操作的参数
:param kwargs: 传递给操作的关键字参数
"""
for attempt in range(retries):
try:
return await operation(*args, **kwargs)
except Exception as e:
if attempt < retries - 1:
logger.warning(f"{name} 失败,正在重试第 {attempt + 1} 次,错误:{e}")
await asyncio.sleep(delay)
else:
logger.error(f"{name} 失败,已达到最大重试次数,错误:{e}")
raise
async def scroll_and_select_user(page, username, targets):
"""尝试滚动并查找用户名"""
# 定义目标元素和滚动容器的选择器
friends_tab_selector = 'xpath=//*[@id="sub-app"]/div/div/div[1]/div[2]'
target_selector = 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]//div[contains(@class, "semi-list-item-body semi-list-item-body-flex-start")]'
scrollable_friends_selector = 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]/div/div/div[3]/div/div/div/ul/div'
# [修改] 更加精确的状态选择器
no_more_selector = 'xpath=//div[contains(@class, "no-more-tip-ftdJnu")]'
loading_selector = 'xpath=//div[contains(@class, "semi-spin")]'
logger.debug(f"账号 {username} 开始查找目标好友列表")
logger.debug(f"账号 {username} 目标好友列表: {targets}")
logger.debug(f"账号 {username} 点击进入好友标签页")
# 点击好友标签页
await page.wait_for_selector(friends_tab_selector)
await page.locator(friends_tab_selector).click()
logger.debug(f"账号 {username} 进入好友列表页面")
# 确保第一个好友元素加载完成
first_friend_selector = 'xpath=//*[@id="sub-app"]/div/div/div[2]/div[2]/div/div/div[1]/div/div/div/ul/div/div/div[1]/li/div'
await page.wait_for_selector(first_friend_selector)
await page.locator(first_friend_selector).click() # 点击第一个好友,确保列表激活
logger.debug(f"账号 {username} 已激活好友列表,开始滚动查找目标好友")
await asyncio.sleep(2) # 等待好友列表加载
found_usernames = set()
# [修改] 复制一份目标列表用于追踪进度
remaining_targets = set(targets)
while True:
# 查找所有目标元素
target_elements = await page.locator(target_selector).all()
for element in target_elements:
try:
# 查找子元素 span,模糊匹配 class
span = element.locator(
"""xpath=.//span[contains(@class, "item-header-name-")]"""
)
targetName = await span.inner_text()
if targetName in found_usernames:
continue # 已处理过,跳过
found_usernames.add(targetName)
logger.debug(f"账号 {username} 找到好友 {targetName}")
# 检查是否是目标用户名
if targetName in targets:
await element.click()
logger.info(
f"账号 {username} 选中目标好友 {targetName} 准备开始交互"
)
yield targetName
# [修改] 标记已找到,如果全找到了直接退出
if targetName in remaining_targets:
remaining_targets.remove(targetName)
if len(remaining_targets) == 0:
logger.info(f"账号 {username} 所有目标好友均已找到,停止搜索")
return
break
except Exception as e:
traceback.print_exc()
else:
# [修改] 状态检测逻辑
# 1. 检查是否到底(没有更多了)
if await page.locator(no_more_selector).count() > 0:
logger.info(f"账号 {username} 检测到'没有更多了'标志,已到达底部")
if len(remaining_targets) > 0:
logger.warning(f"账号 {username} 搜索结束,仍有以下好友未找到: {remaining_targets}")
break
# 2. 检查是否正在加载
if await page.locator(loading_selector).count() > 0:
logger.debug(f"账号 {username} 列表正在加载中 (Loading)...")
await asyncio.sleep(1.5) # 给加载留点时间
# 不 break,继续去滚动以触发后续内容
# 3. 滚动容器
scrollable_element = await page.locator(
scrollable_friends_selector
).element_handle()
if scrollable_element:
# [修改] 加大滚动幅度
await page.evaluate(
"(element) => element.scrollTop += 800", scrollable_element
)
logger.debug(f"账号 {username} 滚动好友列表以加载更多好友")
await asyncio.sleep(1.5)
else:
logger.error(f"账号 {username} 未找到滚动容器,退出")
break
async def do_user_task(browser, username, cookies, targets, semaphore, config):
async with semaphore: # 使用信号量控制并发数量
context = await browser.new_context() # 每个任务使用独立的上下文
context.set_default_navigation_timeout(120000) # 设置导航超时时间为 90 秒
context.set_default_timeout(120000) # 设置所有操作的默认超时时间为 120 秒
page = await context.new_page()
# 打开抖音创作者中心
await retry_operation(
"打开抖音创作者中心",
page.goto,
retries=3,
delay=5,
url="https://creator.douyin.com/",
)
# 注入 Cookie
await context.add_cookies(cookies)
# 导航到消息页面
await retry_operation(
"导航到消息页面",
page.goto,
retries=3,
delay=5,
url="https://creator.douyin.com/creator-micro/data/following/chat",
)
logger.info(f"账号 {username} 开始发送消息")
# 滚动并选择用户
async for _target_name in scroll_and_select_user(page, username, targets):
logger.info(f"账号 {username} 已选中好友 {username} 发送消息")
# 等待 chat-input-dccKiL 元素加载完成
chat_input_selector = "xpath=//div[contains(@class, 'chat-input-dccKiL')]"
await page.wait_for_selector(chat_input_selector)
chat_input = page.locator(chat_input_selector)
# 在 chat-input-dccKiL 中输入内容
message = build_message(config=config)
for line in message.split("\n"):
await chat_input.type(line) # 输入每一行
# 如果不是最后一行,模拟 Shift+Enter 插入换行
if line != message.split("\n")[-1]:
await chat_input.press("Shift+Enter") # 模拟 Shift+Enter 插入换行
logger.debug(
f"账号 {username} 准备发送消息给好友 {username}:\n\t{message}"
)
logger.info(f"账号 {username} 给好友 {username} 发送消息完成")
# 模拟按下回车键发送消息
await chat_input.press("Enter")
await asyncio.sleep(2) # 发送完等待一会儿
await context.close() # 任务完成后关闭上下文
async def runTasks(config=None, userData=None):
active_config = config if config is not None else reload_config()
active_user_data = userData if userData is not None else reload_userData()
playwright, browser = await get_browser()
try:
# 检查是否启用多任务和任务数量
# 创建信号量以限制并发任务数量
logger.info("开始执行任务,当前配置如下:")
multi_task = bool(active_config.get("multiTask", True))
task_count = int(active_config.get("taskCount", 1) or 1)
logger.info(f"多任务模式: {multi_task}, 任务数量: {task_count}")
logger.info(f"消息模板: {active_config.get('messageTemplate', '')}")
logger.info(f"一言类型: {active_config.get('hitokotoTypes', [])}")
for user in active_user_data:
logger.info(
f"用户: {user.get('username', '未知用户')}, 目标好友: {user.get('targets', [])}"
)
semaphore = asyncio.Semaphore(task_count if multi_task else 1)
tasks = []
for user in active_user_data:
cookies = user.get("cookies", [])
targets = user.get("targets", [])
unique_id = user.get("unique_id", "")
if not cookies:
logger.warning("用户 %s 缺少 cookies,已跳过。", user.get("username", "未知用户"))
continue
complates[unique_id] = [] # 初始化该用户的已完成列表
username = user.get("username", "未知用户")
# 创建任务
tasks.append(
do_user_task(browser, username, cookies, targets, semaphore, active_config)
)
# 并发执行任务
if tasks:
await asyncio.gather(*tasks)
else:
logger.warning("没有可执行的任务(用户数据为空或均缺少 cookies)。")
finally:
await playwright.stop()
# 关闭浏览器实例
await browser.close()