File size: 10,463 Bytes
d10e42a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | import asyncio
import traceback
import logging
from utils.logger import setup_logger
from utils.config import get_config, get_userData, reload_config, reload_userData
from core.msg_builder import build_message
from core.browser import get_browser
complates = {}
logger = setup_logger(level=logging.DEBUG)
async def retry_operation(name, operation, retries=3, delay=2, *args, **kwargs):
"""
通用的重试逻辑
:param name: 操作名称(用于日志记录)
:param operation: 要执行的异步操作
:param retries: 最大重试次数
:param delay: 每次重试之间的延迟(秒)
:param args: 传递给操作的参数
:param kwargs: 传递给操作的关键字参数
"""
for attempt in range(retries):
try:
return await operation(*args, **kwargs)
except Exception as e:
if attempt < retries - 1:
logger.warning(f"{name} 失败,正在重试第 {attempt + 1} 次,错误:{e}")
await asyncio.sleep(delay)
else:
logger.error(f"{name} 失败,已达到最大重试次数,错误:{e}")
raise
async def scroll_and_select_user(page, username, targets):
"""尝试滚动并查找用户名"""
# 定义目标元素和滚动容器的选择器
friends_tab_selector = 'xpath=//*[@id="sub-app"]/div/div/div[1]/div[2]'
target_selector = 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]//div[contains(@class, "semi-list-item-body semi-list-item-body-flex-start")]'
scrollable_friends_selector = 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]/div/div/div[3]/div/div/div/ul/div'
# [修改] 更加精确的状态选择器
no_more_selector = 'xpath=//div[contains(@class, "no-more-tip-ftdJnu")]'
loading_selector = 'xpath=//div[contains(@class, "semi-spin")]'
logger.debug(f"账号 {username} 开始查找目标好友列表")
logger.debug(f"账号 {username} 目标好友列表: {targets}")
logger.debug(f"账号 {username} 点击进入好友标签页")
# 点击好友标签页
await page.wait_for_selector(friends_tab_selector)
await page.locator(friends_tab_selector).click()
logger.debug(f"账号 {username} 进入好友列表页面")
# 确保第一个好友元素加载完成
first_friend_selector = 'xpath=//*[@id="sub-app"]/div/div/div[2]/div[2]/div/div/div[1]/div/div/div/ul/div/div/div[1]/li/div'
await page.wait_for_selector(first_friend_selector)
await page.locator(first_friend_selector).click() # 点击第一个好友,确保列表激活
logger.debug(f"账号 {username} 已激活好友列表,开始滚动查找目标好友")
await asyncio.sleep(2) # 等待好友列表加载
found_usernames = set()
# [修改] 复制一份目标列表用于追踪进度
remaining_targets = set(targets)
while True:
# 查找所有目标元素
target_elements = await page.locator(target_selector).all()
for element in target_elements:
try:
# 查找子元素 span,模糊匹配 class
span = element.locator(
"""xpath=.//span[contains(@class, "item-header-name-")]"""
)
targetName = await span.inner_text()
if targetName in found_usernames:
continue # 已处理过,跳过
found_usernames.add(targetName)
logger.debug(f"账号 {username} 找到好友 {targetName}")
# 检查是否是目标用户名
if targetName in targets:
await element.click()
logger.info(
f"账号 {username} 选中目标好友 {targetName} 准备开始交互"
)
yield targetName
# [修改] 标记已找到,如果全找到了直接退出
if targetName in remaining_targets:
remaining_targets.remove(targetName)
if len(remaining_targets) == 0:
logger.info(f"账号 {username} 所有目标好友均已找到,停止搜索")
return
break
except Exception as e:
traceback.print_exc()
else:
# [修改] 状态检测逻辑
# 1. 检查是否到底(没有更多了)
if await page.locator(no_more_selector).count() > 0:
logger.info(f"账号 {username} 检测到'没有更多了'标志,已到达底部")
if len(remaining_targets) > 0:
logger.warning(f"账号 {username} 搜索结束,仍有以下好友未找到: {remaining_targets}")
break
# 2. 检查是否正在加载
if await page.locator(loading_selector).count() > 0:
logger.debug(f"账号 {username} 列表正在加载中 (Loading)...")
await asyncio.sleep(1.5) # 给加载留点时间
# 不 break,继续去滚动以触发后续内容
# 3. 滚动容器
scrollable_element = await page.locator(
scrollable_friends_selector
).element_handle()
if scrollable_element:
# [修改] 加大滚动幅度
await page.evaluate(
"(element) => element.scrollTop += 800", scrollable_element
)
logger.debug(f"账号 {username} 滚动好友列表以加载更多好友")
await asyncio.sleep(1.5)
else:
logger.error(f"账号 {username} 未找到滚动容器,退出")
break
async def do_user_task(browser, username, cookies, targets, semaphore, config):
async with semaphore: # 使用信号量控制并发数量
context = await browser.new_context() # 每个任务使用独立的上下文
context.set_default_navigation_timeout(120000) # 设置导航超时时间为 90 秒
context.set_default_timeout(120000) # 设置所有操作的默认超时时间为 120 秒
page = await context.new_page()
# 打开抖音创作者中心
await retry_operation(
"打开抖音创作者中心",
page.goto,
retries=3,
delay=5,
url="https://creator.douyin.com/",
)
# 注入 Cookie
await context.add_cookies(cookies)
# 导航到消息页面
await retry_operation(
"导航到消息页面",
page.goto,
retries=3,
delay=5,
url="https://creator.douyin.com/creator-micro/data/following/chat",
)
logger.info(f"账号 {username} 开始发送消息")
# 滚动并选择用户
async for _target_name in scroll_and_select_user(page, username, targets):
logger.info(f"账号 {username} 已选中好友 {username} 发送消息")
# 等待 chat-input-dccKiL 元素加载完成
chat_input_selector = "xpath=//div[contains(@class, 'chat-input-dccKiL')]"
await page.wait_for_selector(chat_input_selector)
chat_input = page.locator(chat_input_selector)
# 在 chat-input-dccKiL 中输入内容
message = build_message(config=config)
for line in message.split("\n"):
await chat_input.type(line) # 输入每一行
# 如果不是最后一行,模拟 Shift+Enter 插入换行
if line != message.split("\n")[-1]:
await chat_input.press("Shift+Enter") # 模拟 Shift+Enter 插入换行
logger.debug(
f"账号 {username} 准备发送消息给好友 {username}:\n\t{message}"
)
logger.info(f"账号 {username} 给好友 {username} 发送消息完成")
# 模拟按下回车键发送消息
await chat_input.press("Enter")
await asyncio.sleep(2) # 发送完等待一会儿
await context.close() # 任务完成后关闭上下文
async def runTasks(config=None, userData=None):
active_config = config if config is not None else reload_config()
active_user_data = userData if userData is not None else reload_userData()
playwright, browser = await get_browser()
try:
# 检查是否启用多任务和任务数量
# 创建信号量以限制并发任务数量
logger.info("开始执行任务,当前配置如下:")
multi_task = bool(active_config.get("multiTask", True))
task_count = int(active_config.get("taskCount", 1) or 1)
logger.info(f"多任务模式: {multi_task}, 任务数量: {task_count}")
logger.info(f"消息模板: {active_config.get('messageTemplate', '')}")
logger.info(f"一言类型: {active_config.get('hitokotoTypes', [])}")
for user in active_user_data:
logger.info(
f"用户: {user.get('username', '未知用户')}, 目标好友: {user.get('targets', [])}"
)
semaphore = asyncio.Semaphore(task_count if multi_task else 1)
tasks = []
for user in active_user_data:
cookies = user.get("cookies", [])
targets = user.get("targets", [])
unique_id = user.get("unique_id", "")
if not cookies:
logger.warning("用户 %s 缺少 cookies,已跳过。", user.get("username", "未知用户"))
continue
complates[unique_id] = [] # 初始化该用户的已完成列表
username = user.get("username", "未知用户")
# 创建任务
tasks.append(
do_user_task(browser, username, cookies, targets, semaphore, active_config)
)
# 并发执行任务
if tasks:
await asyncio.gather(*tasks)
else:
logger.warning("没有可执行的任务(用户数据为空或均缺少 cookies)。")
finally:
await playwright.stop()
# 关闭浏览器实例
await browser.close()
|