Spaces:
Sleeping
Sleeping
File size: 8,560 Bytes
2ceecec cb05669 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
import time
import random
from playwright.sync_api import Page, FrameLocator
def get_preview_frame(page: Page, logger=None) -> FrameLocator:
"""
获取预览iframe的FrameLocator。
"""
try:
# 查找title为"Preview"的iframe
frame = page.frame_locator('iframe[title="Preview"]')
return frame
except Exception as e:
if logger:
logger.warning(f"获取Preview iframe失败: {e}")
return None
def get_ws_status(page: Page, logger=None) -> str:
"""
获取页面中WS连接状态(在iframe内部)。
返回: CONNECTED, IDLE, CONNECTING 或 UNKNOWN
"""
try:
frame = get_preview_frame(page, logger)
if not frame:
return "UNKNOWN"
# 在iframe内查找包含 "WS:" 的状态文本元素
# 根据截图,状态显示为 "WS: CONNECTED" 等格式
status_element = frame.locator('text=/WS:\\s*(CONNECTED|IDLE|CONNECTING)/i').first
if status_element.is_visible(timeout=3000):
text = status_element.text_content()
if text:
if "CONNECTED" in text.upper():
return "CONNECTED"
elif "IDLE" in text.upper():
return "IDLE"
elif "CONNECTING" in text.upper():
return "CONNECTING"
return "UNKNOWN"
except Exception as e:
if logger:
logger.warning(f"获取WS状态时出错: {e}")
return "UNKNOWN"
def click_disconnect(page: Page, logger=None) -> bool:
"""
点击Disconnect按钮断开WS连接(在iframe内部)。
"""
try:
frame = get_preview_frame(page, logger)
if not frame:
return False
disconnect_btn = frame.locator('button:has-text("Disconnect")')
if disconnect_btn.count() > 0 and disconnect_btn.first.is_visible(timeout=3000):
disconnect_btn.first.click(timeout=5000)
if logger:
logger.info("已点击 Disconnect 按钮")
time.sleep(1)
return True
if logger:
logger.warning("未找到可见的 Disconnect 按钮")
return False
except Exception as e:
if logger:
logger.warning(f"点击 Disconnect 按钮失败: {e}")
return False
def click_connect(page: Page, logger=None) -> bool:
"""
点击Connect按钮建立WS连接(在iframe内部)。
"""
try:
frame = get_preview_frame(page, logger)
if not frame:
return False
connect_btn = frame.locator('button:has-text("Connect")')
if connect_btn.count() > 0 and connect_btn.first.is_visible(timeout=3000):
connect_btn.first.click(timeout=5000)
if logger:
logger.info("已点击 Connect 按钮")
time.sleep(1)
return True
if logger:
logger.warning("未找到可见的 Connect 按钮")
return False
except Exception as e:
if logger:
logger.warning(f"点击 Connect 按钮失败: {e}")
return False
def wait_for_ws_connected(page: Page, logger=None, timeout: int = 30) -> bool:
"""
等待WS状态变为CONNECTED。
"""
start_time = time.time()
while time.time() - start_time < timeout:
status = get_ws_status(page, logger)
if status == "CONNECTED":
return True
time.sleep(1)
return False
def reconnect_ws(page: Page, logger=None) -> str:
"""
执行断开再连接的流程,并返回最终WS状态。
流程:关闭遮罩 -> Disconnect -> 等待IDLE -> Connect -> 等待CONNECTED -> 获取状态
"""
if logger:
logger.info("开始执行WS重连流程: Disconnect -> Connect")
# 先关闭 interaction-modal 遮罩层(如果存在)
dismiss_interaction_modal(page, logger)
# 先断开连接
click_disconnect(page, logger)
time.sleep(2)
# 检查是否变为IDLE
status = get_ws_status(page, logger)
if logger:
logger.info(f"断开后WS状态: {status}")
# 再连接
click_connect(page, logger)
time.sleep(2)
# 等待连接成功
if wait_for_ws_connected(page, logger, timeout=15):
status = get_ws_status(page, logger)
if logger:
logger.info(f"重连后WS状态: {status}")
return status
else:
status = get_ws_status(page, logger)
if logger:
logger.warning(f"WS重连超时,当前状态: {status}")
return status
def dismiss_interaction_modal(page: Page, logger=None) -> bool:
"""
检测并关闭 interaction-modal 遮罩层。
通过在 iframe 区域内模拟鼠标移动来触发遮罩层关闭。
返回: True 如果成功关闭遮罩,False 如果未找到遮罩或关闭失败
"""
try:
modal = page.locator('div.interaction-modal')
if modal.count() == 0 or not modal.first.is_visible(timeout=500):
return False
if logger:
logger.info("检测到 interaction-modal 遮罩层,尝试关闭...")
iframe = page.locator('iframe[title="Preview"]')
if iframe.count() > 0:
iframe_box = iframe.first.bounding_box()
if iframe_box:
# 随机起点
curr_x = iframe_box['x'] + random.randint(50, int(iframe_box['width']) - 50)
curr_y = iframe_box['y'] + random.randint(50, int(iframe_box['height']) - 50)
# 持续连续移动直到遮罩关闭,最多尝试30次
for i in range(30):
# 从当前位置随机移动一段距离
delta_x = random.randint(-30, 30)
delta_y = random.randint(-20, 20)
curr_x = max(iframe_box['x'] + 20, min(iframe_box['x'] + iframe_box['width'] - 20, curr_x + delta_x))
curr_y = max(iframe_box['y'] + 20, min(iframe_box['y'] + iframe_box['height'] - 20, curr_y + delta_y))
page.mouse.move(curr_x, curr_y)
time.sleep(0.05)
# 每次移动后检查遮罩是否关闭
if modal.count() == 0 or not modal.first.is_visible(timeout=100):
if logger:
logger.info("已成功关闭 interaction-modal 遮罩层")
return True
return False
except Exception as e:
if logger:
logger.debug(f"关闭 interaction-modal 时出错: {e}")
return False
def click_in_iframe(page: Page, logger=None) -> bool:
"""
在 iframe 内随机移动鼠标并点击一次,用于保活。
避开顶部(状态栏和按钮区域)和右侧区域。
返回: True 如果成功点击,False 如果失败
"""
try:
iframe = page.locator('iframe[title="Preview"]')
if iframe.count() == 0:
return False
iframe_box = iframe.first.bounding_box()
if not iframe_box:
return False
# 安全区域:避开顶部80像素(状态栏+按钮)和右侧200像素(按钮区域)
safe_left = iframe_box['x'] + 50
safe_right = iframe_box['x'] + iframe_box['width'] - 200
safe_top = iframe_box['y'] + 80
safe_bottom = iframe_box['y'] + iframe_box['height'] - 50
# 确保安全区域有效
if safe_right <= safe_left or safe_bottom <= safe_top:
return False
# 随机起点(在安全区域内)
curr_x = random.randint(int(safe_left), int(safe_right))
curr_y = random.randint(int(safe_top), int(safe_bottom))
# 随机移动几步(保持在安全区域内)
for _ in range(random.randint(3, 6)):
delta_x = random.randint(-30, 30)
delta_y = random.randint(-20, 20)
curr_x = max(int(safe_left), min(int(safe_right), curr_x + delta_x))
curr_y = max(int(safe_top), min(int(safe_bottom), curr_y + delta_y))
page.mouse.move(curr_x, curr_y)
time.sleep(0.05)
# 点击当前位置
page.mouse.click(curr_x, curr_y)
return True
except Exception as e:
if logger:
logger.debug(f"在 iframe 内点击失败: {e}")
return False
|