|
|
import time |
|
|
import random |
|
|
from playwright.sync_api import Page, FrameLocator |
|
|
|
|
|
|
|
|
def get_preview_frame(page: Page, logger=None) -> FrameLocator: |
|
|
""" |
|
|
获取预览iframe的FrameLocator。 |
|
|
""" |
|
|
try: |
|
|
|
|
|
frame = page.frame_locator('iframe[title="Preview"]') |
|
|
return frame |
|
|
except Exception as e: |
|
|
if logger: |
|
|
logger.warning(f"获取Preview iframe失败: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def get_ws_status(page: Page, logger=None) -> str: |
|
|
""" |
|
|
获取页面中WS连接状态(在iframe内部)。 |
|
|
返回: CONNECTED, IDLE, CONNECTING 或 UNKNOWN |
|
|
""" |
|
|
try: |
|
|
frame = get_preview_frame(page, logger) |
|
|
if not frame: |
|
|
return "UNKNOWN" |
|
|
|
|
|
|
|
|
|
|
|
status_element = frame.locator('text=/WS:\\s*(CONNECTED|IDLE|CONNECTING)/i').first |
|
|
if status_element.is_visible(timeout=3000): |
|
|
text = status_element.text_content() |
|
|
if text: |
|
|
if "CONNECTED" in text.upper(): |
|
|
return "CONNECTED" |
|
|
elif "IDLE" in text.upper(): |
|
|
return "IDLE" |
|
|
elif "CONNECTING" in text.upper(): |
|
|
return "CONNECTING" |
|
|
return "UNKNOWN" |
|
|
except Exception as e: |
|
|
if logger: |
|
|
logger.warning(f"获取WS状态时出错: {e}") |
|
|
return "UNKNOWN" |
|
|
|
|
|
|
|
|
def click_disconnect(page: Page, logger=None) -> bool: |
|
|
""" |
|
|
点击Disconnect按钮断开WS连接(在iframe内部)。 |
|
|
""" |
|
|
try: |
|
|
frame = get_preview_frame(page, logger) |
|
|
if not frame: |
|
|
return False |
|
|
|
|
|
disconnect_btn = frame.locator('button:has-text("Disconnect")') |
|
|
if disconnect_btn.count() > 0 and disconnect_btn.first.is_visible(timeout=3000): |
|
|
disconnect_btn.first.click(timeout=5000) |
|
|
if logger: |
|
|
logger.info("已点击 Disconnect 按钮") |
|
|
time.sleep(1) |
|
|
return True |
|
|
if logger: |
|
|
logger.warning("未找到可见的 Disconnect 按钮") |
|
|
return False |
|
|
except Exception as e: |
|
|
if logger: |
|
|
logger.warning(f"点击 Disconnect 按钮失败: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def click_connect(page: Page, logger=None) -> bool: |
|
|
""" |
|
|
点击Connect按钮建立WS连接(在iframe内部)。 |
|
|
""" |
|
|
try: |
|
|
frame = get_preview_frame(page, logger) |
|
|
if not frame: |
|
|
return False |
|
|
|
|
|
connect_btn = frame.locator('button:has-text("Connect")') |
|
|
if connect_btn.count() > 0 and connect_btn.first.is_visible(timeout=3000): |
|
|
connect_btn.first.click(timeout=5000) |
|
|
if logger: |
|
|
logger.info("已点击 Connect 按钮") |
|
|
time.sleep(1) |
|
|
return True |
|
|
if logger: |
|
|
logger.warning("未找到可见的 Connect 按钮") |
|
|
return False |
|
|
except Exception as e: |
|
|
if logger: |
|
|
logger.warning(f"点击 Connect 按钮失败: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def wait_for_ws_connected(page: Page, logger=None, timeout: int = 30) -> bool: |
|
|
""" |
|
|
等待WS状态变为CONNECTED。 |
|
|
""" |
|
|
start_time = time.time() |
|
|
while time.time() - start_time < timeout: |
|
|
status = get_ws_status(page, logger) |
|
|
if status == "CONNECTED": |
|
|
return True |
|
|
time.sleep(1) |
|
|
return False |
|
|
|
|
|
|
|
|
def reconnect_ws(page: Page, logger=None) -> str: |
|
|
""" |
|
|
执行断开再连接的流程,并返回最终WS状态。 |
|
|
流程:关闭遮罩 -> Disconnect -> 等待IDLE -> Connect -> 等待CONNECTED -> 获取状态 |
|
|
""" |
|
|
if logger: |
|
|
logger.info("开始执行WS重连流程: Disconnect -> Connect") |
|
|
|
|
|
|
|
|
dismiss_interaction_modal(page, logger) |
|
|
|
|
|
|
|
|
click_disconnect(page, logger) |
|
|
time.sleep(2) |
|
|
|
|
|
|
|
|
status = get_ws_status(page, logger) |
|
|
if logger: |
|
|
logger.info(f"断开后WS状态: {status}") |
|
|
|
|
|
|
|
|
click_connect(page, logger) |
|
|
time.sleep(2) |
|
|
|
|
|
|
|
|
if wait_for_ws_connected(page, logger, timeout=15): |
|
|
status = get_ws_status(page, logger) |
|
|
if logger: |
|
|
logger.info(f"重连后WS状态: {status}") |
|
|
return status |
|
|
else: |
|
|
status = get_ws_status(page, logger) |
|
|
if logger: |
|
|
logger.warning(f"WS重连超时,当前状态: {status}") |
|
|
return status |
|
|
|
|
|
|
|
|
def dismiss_interaction_modal(page: Page, logger=None) -> bool: |
|
|
""" |
|
|
检测并关闭 interaction-modal 遮罩层。 |
|
|
通过在 iframe 区域内模拟鼠标移动来触发遮罩层关闭。 |
|
|
|
|
|
返回: True 如果成功关闭遮罩,False 如果未找到遮罩或关闭失败 |
|
|
""" |
|
|
try: |
|
|
modal = page.locator('div.interaction-modal') |
|
|
if modal.count() == 0 or not modal.first.is_visible(timeout=500): |
|
|
return False |
|
|
|
|
|
if logger: |
|
|
logger.info("检测到 interaction-modal 遮罩层,尝试关闭...") |
|
|
|
|
|
iframe = page.locator('iframe[title="Preview"]') |
|
|
if iframe.count() > 0: |
|
|
iframe_box = iframe.first.bounding_box() |
|
|
if iframe_box: |
|
|
|
|
|
curr_x = iframe_box['x'] + random.randint(50, int(iframe_box['width']) - 50) |
|
|
curr_y = iframe_box['y'] + random.randint(50, int(iframe_box['height']) - 50) |
|
|
|
|
|
|
|
|
for i in range(30): |
|
|
|
|
|
delta_x = random.randint(-30, 30) |
|
|
delta_y = random.randint(-20, 20) |
|
|
curr_x = max(iframe_box['x'] + 20, min(iframe_box['x'] + iframe_box['width'] - 20, curr_x + delta_x)) |
|
|
curr_y = max(iframe_box['y'] + 20, min(iframe_box['y'] + iframe_box['height'] - 20, curr_y + delta_y)) |
|
|
|
|
|
page.mouse.move(curr_x, curr_y) |
|
|
time.sleep(0.05) |
|
|
|
|
|
|
|
|
if modal.count() == 0 or not modal.first.is_visible(timeout=100): |
|
|
if logger: |
|
|
logger.info("已成功关闭 interaction-modal 遮罩层") |
|
|
return True |
|
|
|
|
|
return False |
|
|
except Exception as e: |
|
|
if logger: |
|
|
logger.debug(f"关闭 interaction-modal 时出错: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def click_in_iframe(page: Page, logger=None) -> bool: |
|
|
""" |
|
|
在 iframe 内随机移动鼠标并点击一次,用于保活。 |
|
|
避开顶部(状态栏和按钮区域)和右侧区域。 |
|
|
|
|
|
返回: True 如果成功点击,False 如果失败 |
|
|
""" |
|
|
try: |
|
|
iframe = page.locator('iframe[title="Preview"]') |
|
|
if iframe.count() == 0: |
|
|
return False |
|
|
|
|
|
iframe_box = iframe.first.bounding_box() |
|
|
if not iframe_box: |
|
|
return False |
|
|
|
|
|
|
|
|
safe_left = iframe_box['x'] + 50 |
|
|
safe_right = iframe_box['x'] + iframe_box['width'] - 200 |
|
|
safe_top = iframe_box['y'] + 80 |
|
|
safe_bottom = iframe_box['y'] + iframe_box['height'] - 50 |
|
|
|
|
|
|
|
|
if safe_right <= safe_left or safe_bottom <= safe_top: |
|
|
return False |
|
|
|
|
|
|
|
|
curr_x = random.randint(int(safe_left), int(safe_right)) |
|
|
curr_y = random.randint(int(safe_top), int(safe_bottom)) |
|
|
|
|
|
|
|
|
for _ in range(random.randint(3, 6)): |
|
|
delta_x = random.randint(-30, 30) |
|
|
delta_y = random.randint(-20, 20) |
|
|
curr_x = max(int(safe_left), min(int(safe_right), curr_x + delta_x)) |
|
|
curr_y = max(int(safe_top), min(int(safe_bottom), curr_y + delta_y)) |
|
|
page.mouse.move(curr_x, curr_y) |
|
|
time.sleep(0.05) |
|
|
|
|
|
|
|
|
page.mouse.click(curr_x, curr_y) |
|
|
return True |
|
|
except Exception as e: |
|
|
if logger: |
|
|
logger.debug(f"在 iframe 内点击失败: {e}") |
|
|
return False |
|
|
|