File size: 8,560 Bytes
2ceecec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb05669
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import time
import random
from playwright.sync_api import Page, FrameLocator


def get_preview_frame(page: Page, logger=None) -> FrameLocator:
    """
    获取预览iframe的FrameLocator。
    """
    try:
        # 查找title为"Preview"的iframe
        frame = page.frame_locator('iframe[title="Preview"]')
        return frame
    except Exception as e:
        if logger:
            logger.warning(f"获取Preview iframe失败: {e}")
        return None


def get_ws_status(page: Page, logger=None) -> str:
    """
    获取页面中WS连接状态(在iframe内部)。
    返回: CONNECTED, IDLE, CONNECTING 或 UNKNOWN
    """
    try:
        frame = get_preview_frame(page, logger)
        if not frame:
            return "UNKNOWN"
        
        # 在iframe内查找包含 "WS:" 的状态文本元素
        # 根据截图,状态显示为 "WS: CONNECTED" 等格式
        status_element = frame.locator('text=/WS:\\s*(CONNECTED|IDLE|CONNECTING)/i').first
        if status_element.is_visible(timeout=3000):
            text = status_element.text_content()
            if text:
                if "CONNECTED" in text.upper():
                    return "CONNECTED"
                elif "IDLE" in text.upper():
                    return "IDLE"
                elif "CONNECTING" in text.upper():
                    return "CONNECTING"
        return "UNKNOWN"
    except Exception as e:
        if logger:
            logger.warning(f"获取WS状态时出错: {e}")
        return "UNKNOWN"


def click_disconnect(page: Page, logger=None) -> bool:
    """
    点击Disconnect按钮断开WS连接(在iframe内部)。
    """
    try:
        frame = get_preview_frame(page, logger)
        if not frame:
            return False
        
        disconnect_btn = frame.locator('button:has-text("Disconnect")')
        if disconnect_btn.count() > 0 and disconnect_btn.first.is_visible(timeout=3000):
            disconnect_btn.first.click(timeout=5000)
            if logger:
                logger.info("已点击 Disconnect 按钮")
            time.sleep(1)
            return True
        if logger:
            logger.warning("未找到可见的 Disconnect 按钮")
        return False
    except Exception as e:
        if logger:
            logger.warning(f"点击 Disconnect 按钮失败: {e}")
        return False


def click_connect(page: Page, logger=None) -> bool:
    """
    点击Connect按钮建立WS连接(在iframe内部)。
    """
    try:
        frame = get_preview_frame(page, logger)
        if not frame:
            return False
        
        connect_btn = frame.locator('button:has-text("Connect")')
        if connect_btn.count() > 0 and connect_btn.first.is_visible(timeout=3000):
            connect_btn.first.click(timeout=5000)
            if logger:
                logger.info("已点击 Connect 按钮")
            time.sleep(1)
            return True
        if logger:
            logger.warning("未找到可见的 Connect 按钮")
        return False
    except Exception as e:
        if logger:
            logger.warning(f"点击 Connect 按钮失败: {e}")
        return False


def wait_for_ws_connected(page: Page, logger=None, timeout: int = 30) -> bool:
    """
    等待WS状态变为CONNECTED。
    """
    start_time = time.time()
    while time.time() - start_time < timeout:
        status = get_ws_status(page, logger)
        if status == "CONNECTED":
            return True
        time.sleep(1)
    return False


def reconnect_ws(page: Page, logger=None) -> str:
    """
    执行断开再连接的流程,并返回最终WS状态。
    流程:关闭遮罩 -> Disconnect -> 等待IDLE -> Connect -> 等待CONNECTED -> 获取状态
    """
    if logger:
        logger.info("开始执行WS重连流程: Disconnect -> Connect")
    
    # 先关闭 interaction-modal 遮罩层(如果存在)
    dismiss_interaction_modal(page, logger)
    
    # 先断开连接
    click_disconnect(page, logger)
    time.sleep(2)
    
    # 检查是否变为IDLE
    status = get_ws_status(page, logger)
    if logger:
        logger.info(f"断开后WS状态: {status}")
    
    # 再连接
    click_connect(page, logger)
    time.sleep(2)
    
    # 等待连接成功
    if wait_for_ws_connected(page, logger, timeout=15):
        status = get_ws_status(page, logger)
        if logger:
            logger.info(f"重连后WS状态: {status}")
        return status
    else:
        status = get_ws_status(page, logger)
        if logger:
            logger.warning(f"WS重连超时,当前状态: {status}")
        return status


def dismiss_interaction_modal(page: Page, logger=None) -> bool:
    """
    检测并关闭 interaction-modal 遮罩层。
    通过在 iframe 区域内模拟鼠标移动来触发遮罩层关闭。
    
    返回: True 如果成功关闭遮罩,False 如果未找到遮罩或关闭失败
    """
    try:
        modal = page.locator('div.interaction-modal')
        if modal.count() == 0 or not modal.first.is_visible(timeout=500):
            return False
        
        if logger:
            logger.info("检测到 interaction-modal 遮罩层,尝试关闭...")
        
        iframe = page.locator('iframe[title="Preview"]')
        if iframe.count() > 0:
            iframe_box = iframe.first.bounding_box()
            if iframe_box:
                # 随机起点
                curr_x = iframe_box['x'] + random.randint(50, int(iframe_box['width']) - 50)
                curr_y = iframe_box['y'] + random.randint(50, int(iframe_box['height']) - 50)
                
                # 持续连续移动直到遮罩关闭,最多尝试30次
                for i in range(30):
                    # 从当前位置随机移动一段距离
                    delta_x = random.randint(-30, 30)
                    delta_y = random.randint(-20, 20)
                    curr_x = max(iframe_box['x'] + 20, min(iframe_box['x'] + iframe_box['width'] - 20, curr_x + delta_x))
                    curr_y = max(iframe_box['y'] + 20, min(iframe_box['y'] + iframe_box['height'] - 20, curr_y + delta_y))
                    
                    page.mouse.move(curr_x, curr_y)
                    time.sleep(0.05)
                    
                    # 每次移动后检查遮罩是否关闭
                    if modal.count() == 0 or not modal.first.is_visible(timeout=100):
                        if logger:
                            logger.info("已成功关闭 interaction-modal 遮罩层")
                        return True
        
        return False
    except Exception as e:
        if logger:
            logger.debug(f"关闭 interaction-modal 时出错: {e}")
        return False


def click_in_iframe(page: Page, logger=None) -> bool:
    """
    在 iframe 内随机移动鼠标并点击一次,用于保活。
    避开顶部(状态栏和按钮区域)和右侧区域。
    
    返回: True 如果成功点击,False 如果失败
    """
    try:
        iframe = page.locator('iframe[title="Preview"]')
        if iframe.count() == 0:
            return False
        
        iframe_box = iframe.first.bounding_box()
        if not iframe_box:
            return False
        
        # 安全区域:避开顶部80像素(状态栏+按钮)和右侧200像素(按钮区域)
        safe_left = iframe_box['x'] + 50
        safe_right = iframe_box['x'] + iframe_box['width'] - 200
        safe_top = iframe_box['y'] + 80
        safe_bottom = iframe_box['y'] + iframe_box['height'] - 50
        
        # 确保安全区域有效
        if safe_right <= safe_left or safe_bottom <= safe_top:
            return False
        
        # 随机起点(在安全区域内)
        curr_x = random.randint(int(safe_left), int(safe_right))
        curr_y = random.randint(int(safe_top), int(safe_bottom))
        
        # 随机移动几步(保持在安全区域内)
        for _ in range(random.randint(3, 6)):
            delta_x = random.randint(-30, 30)
            delta_y = random.randint(-20, 20)
            curr_x = max(int(safe_left), min(int(safe_right), curr_x + delta_x))
            curr_y = max(int(safe_top), min(int(safe_bottom), curr_y + delta_y))
            page.mouse.move(curr_x, curr_y)
            time.sleep(0.05)
        
        # 点击当前位置
        page.mouse.click(curr_x, curr_y)
        return True
    except Exception as e:
        if logger:
            logger.debug(f"在 iframe 内点击失败: {e}")
        return False