File size: 10,463 Bytes
d10e42a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import asyncio
import traceback
import logging
from utils.logger import setup_logger
from utils.config import get_config, get_userData, reload_config, reload_userData
from core.msg_builder import build_message
from core.browser import get_browser


complates = {}

logger = setup_logger(level=logging.DEBUG)


async def retry_operation(name, operation, retries=3, delay=2, *args, **kwargs):
    """

    通用的重试逻辑

    :param name: 操作名称(用于日志记录)

    :param operation: 要执行的异步操作

    :param retries: 最大重试次数

    :param delay: 每次重试之间的延迟(秒)

    :param args: 传递给操作的参数

    :param kwargs: 传递给操作的关键字参数

    """
    for attempt in range(retries):
        try:
            return await operation(*args, **kwargs)
        except Exception as e:
            if attempt < retries - 1:
                logger.warning(f"{name} 失败,正在重试第 {attempt + 1} 次,错误:{e}")
                await asyncio.sleep(delay)
            else:
                logger.error(f"{name} 失败,已达到最大重试次数,错误:{e}")
                raise


async def scroll_and_select_user(page, username, targets):
    """尝试滚动并查找用户名"""
    # 定义目标元素和滚动容器的选择器
    friends_tab_selector = 'xpath=//*[@id="sub-app"]/div/div/div[1]/div[2]'
    target_selector = 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]//div[contains(@class, "semi-list-item-body semi-list-item-body-flex-start")]'
    scrollable_friends_selector = 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]/div/div/div[3]/div/div/div/ul/div'
    
    # [修改] 更加精确的状态选择器
    no_more_selector = 'xpath=//div[contains(@class, "no-more-tip-ftdJnu")]'
    loading_selector = 'xpath=//div[contains(@class, "semi-spin")]'

    logger.debug(f"账号 {username} 开始查找目标好友列表")
    logger.debug(f"账号 {username} 目标好友列表: {targets}")

    logger.debug(f"账号 {username} 点击进入好友标签页")
    # 点击好友标签页
    await page.wait_for_selector(friends_tab_selector)
    await page.locator(friends_tab_selector).click()

    logger.debug(f"账号 {username} 进入好友列表页面")

    # 确保第一个好友元素加载完成
    first_friend_selector = 'xpath=//*[@id="sub-app"]/div/div/div[2]/div[2]/div/div/div[1]/div/div/div/ul/div/div/div[1]/li/div'
    await page.wait_for_selector(first_friend_selector)
    await page.locator(first_friend_selector).click()  # 点击第一个好友,确保列表激活

    logger.debug(f"账号 {username} 已激活好友列表,开始滚动查找目标好友")

    await asyncio.sleep(2)  # 等待好友列表加载

    found_usernames = set()
    # [修改] 复制一份目标列表用于追踪进度
    remaining_targets = set(targets)

    while True:
        # 查找所有目标元素
        target_elements = await page.locator(target_selector).all()

        for element in target_elements:
            try:
                # 查找子元素 span,模糊匹配 class
                span = element.locator(
                    """xpath=.//span[contains(@class, "item-header-name-")]"""
                )
                targetName = await span.inner_text()

                if targetName in found_usernames:
                    continue  # 已处理过,跳过
                found_usernames.add(targetName)

                logger.debug(f"账号 {username} 找到好友 {targetName}")
                # 检查是否是目标用户名
                if targetName in targets:
                    await element.click()
                    logger.info(
                        f"账号 {username} 选中目标好友 {targetName} 准备开始交互"
                    )
                    yield targetName
                    
                    # [修改] 标记已找到,如果全找到了直接退出
                    if targetName in remaining_targets:
                        remaining_targets.remove(targetName)
                    if len(remaining_targets) == 0:
                        logger.info(f"账号 {username} 所有目标好友均已找到,停止搜索")
                        return
                    break
            except Exception as e:
                traceback.print_exc()
        else:
            # [修改] 状态检测逻辑
            
            # 1. 检查是否到底(没有更多了)
            if await page.locator(no_more_selector).count() > 0:
                logger.info(f"账号 {username} 检测到'没有更多了'标志,已到达底部")
                if len(remaining_targets) > 0:
                    logger.warning(f"账号 {username} 搜索结束,仍有以下好友未找到: {remaining_targets}")
                break

            # 2. 检查是否正在加载
            if await page.locator(loading_selector).count() > 0:
                logger.debug(f"账号 {username} 列表正在加载中 (Loading)...")
                await asyncio.sleep(1.5) # 给加载留点时间
                # 不 break,继续去滚动以触发后续内容

            # 3. 滚动容器
            scrollable_element = await page.locator(
                scrollable_friends_selector
            ).element_handle()
            
            if scrollable_element:
                # [修改] 加大滚动幅度
                await page.evaluate(
                    "(element) => element.scrollTop += 800", scrollable_element
                )
                logger.debug(f"账号 {username} 滚动好友列表以加载更多好友")
                await asyncio.sleep(1.5)
            else:
                logger.error(f"账号 {username} 未找到滚动容器,退出")
                break


async def do_user_task(browser, username, cookies, targets, semaphore, config):
    async with semaphore:  # 使用信号量控制并发数量
        context = await browser.new_context()  # 每个任务使用独立的上下文
        context.set_default_navigation_timeout(120000)  # 设置导航超时时间为 90 秒
        context.set_default_timeout(120000)  # 设置所有操作的默认超时时间为 120 秒

        page = await context.new_page()
        # 打开抖音创作者中心
        await retry_operation(
            "打开抖音创作者中心",
            page.goto,
            retries=3,
            delay=5,
            url="https://creator.douyin.com/",
        )
        # 注入 Cookie
        await context.add_cookies(cookies)

        # 导航到消息页面
        await retry_operation(
            "导航到消息页面",
            page.goto,
            retries=3,
            delay=5,
            url="https://creator.douyin.com/creator-micro/data/following/chat",
        )

        logger.info(f"账号 {username} 开始发送消息")
        # 滚动并选择用户
        async for _target_name in scroll_and_select_user(page, username, targets):
            logger.info(f"账号 {username} 已选中好友 {username} 发送消息")
            # 等待 chat-input-dccKiL 元素加载完成
            chat_input_selector = "xpath=//div[contains(@class, 'chat-input-dccKiL')]"
            await page.wait_for_selector(chat_input_selector)
            chat_input = page.locator(chat_input_selector)

            # 在 chat-input-dccKiL 中输入内容
            message = build_message(config=config)
            for line in message.split("\n"):
                await chat_input.type(line)  # 输入每一行
                # 如果不是最后一行,模拟 Shift+Enter 插入换行
                if line != message.split("\n")[-1]:
                    await chat_input.press("Shift+Enter")  # 模拟 Shift+Enter 插入换行

            logger.debug(
                f"账号 {username} 准备发送消息给好友 {username}:\n\t{message}"
            )
            logger.info(f"账号 {username} 给好友 {username} 发送消息完成")
            # 模拟按下回车键发送消息
            await chat_input.press("Enter")
            await asyncio.sleep(2)  # 发送完等待一会儿

        await context.close()  # 任务完成后关闭上下文


async def runTasks(config=None, userData=None):
    active_config = config if config is not None else reload_config()
    active_user_data = userData if userData is not None else reload_userData()
    playwright, browser = await get_browser()
    try:
        # 检查是否启用多任务和任务数量
        # 创建信号量以限制并发任务数量
        logger.info("开始执行任务,当前配置如下:")
        multi_task = bool(active_config.get("multiTask", True))
        task_count = int(active_config.get("taskCount", 1) or 1)
        logger.info(f"多任务模式: {multi_task}, 任务数量: {task_count}")
        logger.info(f"消息模板: {active_config.get('messageTemplate', '')}")
        logger.info(f"一言类型: {active_config.get('hitokotoTypes', [])}")
        for user in active_user_data:
            logger.info(
                f"用户: {user.get('username', '未知用户')}, 目标好友: {user.get('targets', [])}"
            )
            
        semaphore = asyncio.Semaphore(task_count if multi_task else 1)

        tasks = []
        for user in active_user_data:
            cookies = user.get("cookies", [])
            targets = user.get("targets", [])
            unique_id = user.get("unique_id", "")
            if not cookies:
                logger.warning("用户 %s 缺少 cookies,已跳过。", user.get("username", "未知用户"))
                continue
            complates[unique_id] = []  # 初始化该用户的已完成列表
            username = user.get("username", "未知用户")
            # 创建任务
            tasks.append(
                do_user_task(browser, username, cookies, targets, semaphore, active_config)
            )

        # 并发执行任务
        if tasks:
            await asyncio.gather(*tasks)
        else:
            logger.warning("没有可执行的任务(用户数据为空或均缺少 cookies)。")
    finally:
        await playwright.stop()

        # 关闭浏览器实例
        await browser.close()