Spaces:
Paused
Paused
| from __future__ import annotations | |
| import base64 | |
| import json | |
| import re | |
| import time | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Callable | |
| from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.remote.webdriver import WebDriver | |
| from selenium.webdriver.remote.webelement import WebElement | |
| from selenium.webdriver.support.wait import WebDriverWait | |
| import onnx_inference | |
| import webdriver_utils | |
| from core.db import Database | |
| URL_LOGIN = "http://id.scu.edu.cn/enduser/sp/sso/scdxplugin_jwt23?enterpriseId=scdx&target_url=index" | |
| URL_SELECT_COURSE = "http://zhjw.scu.edu.cn/student/courseSelect/courseSelect/index" | |
| LOGIN_SUCCESS_PREFIXES = ( | |
| "http://zhjw.scu.edu.cn/index", | |
| "http://zhjw.scu.edu.cn/", | |
| "https://zhjw.scu.edu.cn/index", | |
| "https://zhjw.scu.edu.cn/", | |
| ) | |
| CATEGORY_META = { | |
| "plan": {"label": "方案选课", "tab_id": "faxk"}, | |
| "free": {"label": "自由选课", "tab_id": "zyxk"}, | |
| } | |
| LOGIN_STUDENT_SELECTORS = [ | |
| (By.XPATH, "//*[@id='app']//form//input[@type='text']"), | |
| (By.XPATH, "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[1]/div/div/div[2]/div/input"), | |
| ] | |
| LOGIN_PASSWORD_SELECTORS = [ | |
| (By.XPATH, "//*[@id='app']//form//input[@type='password']"), | |
| (By.XPATH, "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[2]/div/div/div[2]/div/input"), | |
| ] | |
| LOGIN_CAPTCHA_INPUT_SELECTORS = [ | |
| (By.XPATH, "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]//input"), | |
| (By.XPATH, "//*[@id='app']//form//input[contains(@placeholder, '验证码')]"), | |
| ] | |
| LOGIN_CAPTCHA_IMAGE_SELECTORS = [ | |
| (By.XPATH, "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]//img"), | |
| (By.XPATH, "//*[@id='app']//form//img"), | |
| ] | |
| LOGIN_BUTTON_SELECTORS = [ | |
| (By.XPATH, "//*[@id='app']//form//button"), | |
| (By.XPATH, "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[4]/div/button"), | |
| ] | |
| SUBMIT_CAPTCHA_IMAGE_SELECTORS = [ | |
| (By.XPATH, "//div[contains(@class,'dialog') or contains(@class,'modal') or contains(@class,'popup')]//img[contains(@src,'base64') or contains(translate(@src,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'captcha')]"), | |
| (By.XPATH, "//img[contains(@src,'base64')]"), | |
| (By.XPATH, "//img[contains(translate(@alt,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'captcha') or contains(translate(@class,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'captcha')]"), | |
| ] | |
| SUBMIT_CAPTCHA_INPUT_SELECTORS = [ | |
| (By.XPATH, "//input[contains(@placeholder,'验证码')]"), | |
| (By.XPATH, "//input[contains(translate(@id,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'captcha')]"), | |
| (By.XPATH, "//input[contains(translate(@name,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'captcha')]"), | |
| ] | |
| SUBMIT_CAPTCHA_BUTTON_SELECTORS = [ | |
| (By.XPATH, "//button[contains(.,'确定') or contains(.,'提交') or contains(.,'确认') or contains(.,'验证')]"), | |
| (By.XPATH, "//input[(@type='button' or @type='submit') and (contains(@value,'确定') or contains(@value,'提交') or contains(@value,'确认') or contains(@value,'验证'))]"), | |
| ] | |
| class FatalCredentialsError(Exception): | |
| pass | |
| class RecoverableAutomationError(Exception): | |
| pass | |
| class TaskResult: | |
| status: str | |
| error: str = "" | |
| class CourseBot: | |
| def __init__( | |
| self, | |
| *, | |
| config, | |
| store: Database, | |
| task_id: int, | |
| user: dict, | |
| password: str, | |
| logger: Callable[[str, str], None], | |
| ) -> None: | |
| self.config = config | |
| self.store = store | |
| self.task_id = task_id | |
| self.user = user | |
| self.password = password | |
| self.logger = logger | |
| self.root_dir = Path(__file__).resolve().parent.parent | |
| self.select_course_js = (self.root_dir / "javascript" / "select_course.js").read_text(encoding="utf-8") | |
| self.check_result_js = (self.root_dir / "javascript" / "check_result.js").read_text(encoding="utf-8") | |
| self.captcha_solver = onnx_inference.CaptchaONNXInference( | |
| model_path=str(self.root_dir / "ocr_provider" / "captcha_model.onnx") | |
| ) | |
| def run(self, stop_event) -> TaskResult: | |
| exhausted_sessions = 0 | |
| while not stop_event.is_set(): | |
| courses = self.store.list_courses_for_user(self.user["id"]) | |
| if not courses: | |
| self.logger("INFO", "当前没有待抢课程,任务自动结束。") | |
| return TaskResult(status="completed") | |
| driver: WebDriver | None = None | |
| session_ready = False | |
| session_error_count = 0 | |
| try: | |
| self.logger( | |
| "INFO", | |
| f"启动新的 Selenium 会话。累计已重建 {exhausted_sessions}/{self.config.selenium_restart_limit} 次。", | |
| ) | |
| driver = webdriver_utils.configure_browser( | |
| chrome_binary=self.config.chrome_binary, | |
| chromedriver_path=self.config.chromedriver_path, | |
| page_timeout=self.config.browser_page_timeout, | |
| ) | |
| wait = WebDriverWait(driver, self.config.browser_page_timeout, 0.5) | |
| while not stop_event.is_set(): | |
| try: | |
| if not session_ready: | |
| self._login(driver, wait) | |
| session_ready = True | |
| current_courses = self.store.list_courses_for_user(self.user["id"]) | |
| if not current_courses: | |
| self.logger("INFO", "所有目标课程都已经从队列中移除,任务完成。") | |
| return TaskResult(status="completed") | |
| if not self._goto_select_course(driver, wait): | |
| session_error_count = 0 | |
| self._sleep_with_cancel(stop_event, self.config.poll_interval_seconds, "当前不是选课时段,等待下一轮检查。") | |
| continue | |
| attempts = 0 | |
| successes = 0 | |
| grouped_courses = {"plan": [], "free": []} | |
| for course in current_courses: | |
| grouped_courses[course["category"]].append(course) | |
| for category, items in grouped_courses.items(): | |
| if not items: | |
| continue | |
| for course in items: | |
| if stop_event.is_set(): | |
| break | |
| attempts += 1 | |
| if self._attempt_single_course(driver, wait, course): | |
| successes += 1 | |
| if attempts == 0: | |
| self.logger("INFO", "没有可执行的课程项目,下一轮将自动重试。") | |
| elif successes == 0: | |
| self.logger("INFO", "本轮没有抢到目标课程,准备稍后重试。") | |
| else: | |
| self.logger("INFO", f"本轮处理 {attempts} 门课程,成功更新 {successes} 门。") | |
| session_error_count = 0 | |
| if exhausted_sessions: | |
| self.logger("INFO", "当前 Selenium 会话已恢复稳定,浏览器重建错误计数已清零。") | |
| exhausted_sessions = 0 | |
| if not self.store.list_courses_for_user(self.user["id"]): | |
| self.logger("INFO", "全部课程均已处理完成。") | |
| return TaskResult(status="completed") | |
| self._sleep_with_cancel(stop_event, self.config.poll_interval_seconds, "等待下一轮刷新。") | |
| except FatalCredentialsError as exc: | |
| self.logger("ERROR", str(exc)) | |
| return TaskResult(status="failed", error=str(exc)) | |
| except RecoverableAutomationError as exc: | |
| session_ready = False | |
| session_error_count += 1 | |
| self.logger( | |
| "WARNING", | |
| f"当前 Selenium 会话第 {session_error_count}/{self.config.selenium_error_limit} 次可恢复错误: {exc}", | |
| ) | |
| if session_error_count < self.config.selenium_error_limit: | |
| self._sleep_with_cancel( | |
| stop_event, | |
| self.config.task_backoff_seconds, | |
| "将在当前 Selenium 会话内继续重试", | |
| ) | |
| continue | |
| exhausted_sessions += 1 | |
| if exhausted_sessions >= self.config.selenium_restart_limit: | |
| message = ( | |
| f"Selenium 会话连续达到错误上限并已重建 {exhausted_sessions} 次,任务终止。" | |
| f"最后错误: {exc}" | |
| ) | |
| self.logger("ERROR", message) | |
| return TaskResult(status="failed", error=message) | |
| self.logger( | |
| "WARNING", | |
| f"当前 Selenium 会话连续错误达到 {self.config.selenium_error_limit} 次,准备重建浏览器。" | |
| f"已耗尽 {exhausted_sessions}/{self.config.selenium_restart_limit} 个会话。", | |
| ) | |
| break | |
| except Exception as exc: # pragma: no cover - defensive fallback | |
| session_ready = False | |
| session_error_count += 1 | |
| self.logger( | |
| "WARNING", | |
| f"当前 Selenium 会话第 {session_error_count}/{self.config.selenium_error_limit} 次未知错误: {exc}", | |
| ) | |
| if session_error_count < self.config.selenium_error_limit: | |
| self._sleep_with_cancel( | |
| stop_event, | |
| self.config.task_backoff_seconds, | |
| "当前会话发生未知错误,稍后重试", | |
| ) | |
| continue | |
| exhausted_sessions += 1 | |
| if exhausted_sessions >= self.config.selenium_restart_limit: | |
| message = ( | |
| f"Selenium 会话连续达到错误上限并已重建 {exhausted_sessions} 次,任务终止。" | |
| f"最后错误: {exc}" | |
| ) | |
| self.logger("ERROR", message) | |
| return TaskResult(status="failed", error=message) | |
| self.logger( | |
| "WARNING", | |
| f"未知错误累计达到上限,准备重建 Selenium 会话。" | |
| f"已耗尽 {exhausted_sessions}/{self.config.selenium_restart_limit} 个会话。", | |
| ) | |
| break | |
| finally: | |
| if driver is not None: | |
| try: | |
| driver.quit() | |
| except Exception: | |
| pass | |
| self.logger("INFO", "收到停止信号,任务已结束。") | |
| return TaskResult(status="stopped") | |
| def _login(self, driver: WebDriver, wait: WebDriverWait) -> None: | |
| for attempt in range(1, self.config.login_retry_limit + 1): | |
| self._open_page(driver, wait, URL_LOGIN, f"登录页(第 {attempt} 次)", log_on_success=True) | |
| std_id_box = self._find_first_visible(driver, LOGIN_STUDENT_SELECTORS, "登录学号输入框", timeout=6) | |
| password_box = self._find_first_visible(driver, LOGIN_PASSWORD_SELECTORS, "登录密码输入框", timeout=6) | |
| captcha_box = self._find_first_visible(driver, LOGIN_CAPTCHA_INPUT_SELECTORS, "登录验证码输入框", timeout=6) | |
| login_button = self._find_first_visible(driver, LOGIN_BUTTON_SELECTORS, "登录按钮", timeout=6) | |
| captcha_image = self._find_first_visible(driver, LOGIN_CAPTCHA_IMAGE_SELECTORS, "登录验证码图片", timeout=6) | |
| captcha_text = self._solve_captcha_text(captcha_image, scene="登录") | |
| self.logger("INFO", f"登录尝试 {attempt}/{self.config.login_retry_limit},验证码 OCR 输出: {captcha_text}") | |
| std_id_box.clear() | |
| std_id_box.send_keys(self.user["student_id"]) | |
| password_box.clear() | |
| password_box.send_keys(self.password) | |
| captcha_box.clear() | |
| captcha_box.send_keys(captcha_text) | |
| self.logger("INFO", f"登录尝试 {attempt}/{self.config.login_retry_limit},准备提交登录表单。") | |
| submit_method = self._trigger_non_blocking_action( | |
| driver, | |
| login_button, | |
| label="登录表单", | |
| allow_form_submit=True, | |
| ) | |
| self.logger("INFO", f"登录表单已触发提交,方式={submit_method},开始等待登录结果。") | |
| state, error_message = self._wait_for_login_outcome(driver, timeout_seconds=10) | |
| if state == "success": | |
| self.logger("INFO", f"登录成功,耗费 {attempt} 次尝试。") | |
| return | |
| if any(token in error_message for token in ("用户名或密码错误", "密码错误", "账号或密码错误", "用户不存在")): | |
| raise FatalCredentialsError("学号或密码错误,任务已停止,请在面板中更新后重新启动。") | |
| if error_message: | |
| self.logger("WARNING", f"登录失败,第 {attempt} 次尝试: {error_message}") | |
| else: | |
| self.logger( | |
| "WARNING", | |
| f"登录失败,第 {attempt} 次尝试,未读取到明确错误提示。{self._page_snapshot(driver, include_body=True)}", | |
| ) | |
| time.sleep(0.6) | |
| raise RecoverableAutomationError("连续多次登录失败,可能是验证码识别失败、页面异常或系统暂时不可用。") | |
| def _goto_select_course(self, driver: WebDriver, wait: WebDriverWait) -> bool: | |
| self._open_page(driver, wait, URL_SELECT_COURSE, "选课页") | |
| if self._is_session_expired(driver): | |
| raise RecoverableAutomationError("检测到登录会话已失效,准备重新登录。") | |
| body_text = self._safe_body_text(driver) | |
| if "非选课" in body_text or "未到选课时间" in body_text: | |
| self.logger("INFO", body_text.strip() or "当前不是选课时段。") | |
| return False | |
| return True | |
| def _attempt_single_course(self, driver: WebDriver, wait: WebDriverWait, course: dict) -> bool: | |
| category_name = CATEGORY_META[course["category"]]["label"] | |
| course_key = f'{course["course_id"]}_{course["course_index"]}' | |
| self.logger("INFO", f"开始尝试 {category_name} {course_key}。") | |
| if not self._goto_select_course(driver, wait): | |
| self.logger("INFO", f"跳过 {course_key},当前系统暂时不在可选课页面。") | |
| return False | |
| self._open_category_tab(driver, wait, course["category"]) | |
| try: | |
| wait.until(lambda current_driver: current_driver.find_element(By.ID, "ifra")) | |
| driver.switch_to.frame("ifra") | |
| course_id_box = self._find(driver, By.ID, "kch") | |
| search_button = self._find(driver, By.ID, "queryButton") | |
| course_id_box.clear() | |
| course_id_box.send_keys(course["course_id"]) | |
| search_button.click() | |
| wait.until( | |
| lambda current_driver: current_driver.execute_script( | |
| "return document.getElementById('queryButton').innerText.indexOf('正在') === -1" | |
| ) | |
| ) | |
| except TimeoutException as exc: | |
| raise RecoverableAutomationError( | |
| f"课程查询超时,页面可能暂时无响应。{self._page_snapshot(driver, include_body=True)}" | |
| ) from exc | |
| finally: | |
| driver.switch_to.default_content() | |
| time.sleep(0.2) | |
| try: | |
| found_target = driver.execute_script(self.select_course_js, course_key) == "yes" | |
| except WebDriverException as exc: | |
| raise RecoverableAutomationError(f"执行课程勾选脚本失败。{self._page_snapshot(driver, include_body=True)}") from exc | |
| if not found_target: | |
| self.logger("INFO", f"本轮未找到目标课程 {course_key}。") | |
| return False | |
| self.logger("INFO", f"已勾选目标课程 {course_key},准备提交。") | |
| results = self._submit_with_optional_captcha(driver, wait, course_key) | |
| if not results: | |
| self.logger("WARNING", f"提交 {course_key} 后没有读取到结果列表。") | |
| return False | |
| satisfied = False | |
| for result in results: | |
| detail = (result.get("detail") or "").strip() | |
| subject = (result.get("subject") or course_key).strip() | |
| if result.get("result"): | |
| self.logger("SUCCESS", f"成功抢到课程: {subject}") | |
| satisfied = True | |
| elif any(token in detail for token in ("已选", "已选择", "已经选", "已在已选课程")): | |
| self.logger("INFO", f"课程已在系统中存在,自动从队列移除: {subject}") | |
| satisfied = True | |
| else: | |
| self.logger("WARNING", f"课程 {subject} 抢课失败: {detail or '未知原因'}") | |
| if satisfied: | |
| self.store.remove_course_by_identity( | |
| self.user["id"], | |
| course["category"], | |
| course["course_id"], | |
| course["course_index"], | |
| ) | |
| return True | |
| return False | |
| def _submit_with_optional_captcha(self, driver: WebDriver, wait: WebDriverWait, course_key: str) -> list[dict]: | |
| submit_button = self._find(driver, By.ID, "submitButton") | |
| submit_method = self._trigger_non_blocking_action( | |
| driver, | |
| submit_button, | |
| label=f"课程提交按钮 {course_key}", | |
| allow_form_submit=True, | |
| ) | |
| self.logger("INFO", f"课程 {course_key} 已触发提交,方式={submit_method}。") | |
| for attempt in range(1, self.config.submit_captcha_retry_limit + 1): | |
| state = self._wait_for_submit_state(driver, wait) | |
| if state == "result": | |
| return self._read_result_page(driver, wait) | |
| if state == "captcha": | |
| self.logger("INFO", f"提交 {course_key} 时检测到验证码,第 {attempt} 次自动识别。") | |
| if not self._solve_visible_submit_captcha(driver): | |
| raise RecoverableAutomationError("提交选课时检测到验证码,但未能自动完成识别与提交。") | |
| continue | |
| self.logger("WARNING", f"提交 {course_key} 后页面未返回结果,也未检测到验证码。") | |
| raise RecoverableAutomationError("提交选课后连续多次遇到验证码或未返回结果。") | |
| def _wait_for_submit_state(self, driver: WebDriver, wait: WebDriverWait) -> str: | |
| script = """ | |
| const visible = (el) => { | |
| if (!el) return false; | |
| const style = window.getComputedStyle(el); | |
| const rect = el.getBoundingClientRect(); | |
| return style.display !== 'none' && style.visibility !== 'hidden' && rect.width > 0 && rect.height > 0; | |
| }; | |
| if (document.querySelector('#xkresult tbody tr')) return 'result'; | |
| const input = Array.from(document.querySelectorAll('input')).find((el) => { | |
| const text = `${el.placeholder || ''} ${el.id || ''} ${el.name || ''} ${el.className || ''}`; | |
| return visible(el) && /验证码|captcha/i.test(text); | |
| }); | |
| const img = Array.from(document.querySelectorAll('img')).find((el) => { | |
| const text = `${el.src || ''} ${el.id || ''} ${el.alt || ''} ${el.className || ''}`; | |
| return visible(el) && (/captcha/i.test(text) || (el.src || '').includes('base64') || (el.naturalWidth >= 50 && el.naturalWidth <= 240 && el.naturalHeight >= 20 && el.naturalHeight <= 120)); | |
| }); | |
| const button = Array.from(document.querySelectorAll('button, input[type="button"], input[type="submit"]')).find((el) => { | |
| const text = `${el.innerText || ''} ${el.value || ''}`; | |
| return visible(el) && /确定|提交|确认|验证/.test(text); | |
| }); | |
| if (input && img && button) return 'captcha'; | |
| return 'pending'; | |
| """ | |
| try: | |
| wait.until(lambda current_driver: current_driver.execute_script(script) != "pending") | |
| except TimeoutException: | |
| return "pending" | |
| return str(driver.execute_script(script)) | |
| def _solve_visible_submit_captcha(self, driver: WebDriver) -> bool: | |
| image = self._find_first_visible_optional(driver, SUBMIT_CAPTCHA_IMAGE_SELECTORS, timeout=3) | |
| input_box = self._find_first_visible_optional(driver, SUBMIT_CAPTCHA_INPUT_SELECTORS, timeout=3) | |
| button = self._find_first_visible_optional(driver, SUBMIT_CAPTCHA_BUTTON_SELECTORS, timeout=3) | |
| if not image or not input_box or not button: | |
| return False | |
| captcha_text = self._solve_captcha_text(image, scene="提交") | |
| self.logger("INFO", f"提交验证码 OCR 输出: {captcha_text}") | |
| input_box.clear() | |
| input_box.send_keys(captcha_text) | |
| submit_method = self._trigger_non_blocking_action( | |
| driver, | |
| button, | |
| label="提交验证码确认按钮", | |
| allow_form_submit=True, | |
| ) | |
| self.logger("INFO", f"提交验证码确认已触发,方式={submit_method}。") | |
| time.sleep(1.0) | |
| return True | |
| def _open_category_tab(self, driver: WebDriver, wait: WebDriverWait, category: str) -> None: | |
| tab_id = CATEGORY_META[category]["tab_id"] | |
| tab = self._find(driver, By.ID, tab_id) | |
| tab.click() | |
| webdriver_utils.wait_for_ready(wait, allow_interactive=True) | |
| time.sleep(0.2) | |
| def _read_result_page(self, driver: WebDriver, wait: WebDriverWait) -> list[dict]: | |
| try: | |
| webdriver_utils.wait_for_ready(wait, allow_interactive=True) | |
| wait.until( | |
| lambda current_driver: current_driver.execute_script( | |
| """ | |
| const node = document.querySelector('#xkresult tbody tr'); | |
| return Boolean(node); | |
| """ | |
| ) | |
| ) | |
| return json.loads(driver.execute_script(self.check_result_js)) | |
| except Exception as exc: | |
| raise RecoverableAutomationError( | |
| f"读取选课结果失败,页面结构可能发生变化。{self._page_snapshot(driver, include_body=True)}" | |
| ) from exc | |
| def _open_page( | |
| self, | |
| driver: WebDriver, | |
| wait: WebDriverWait, | |
| url: str, | |
| label: str, | |
| *, | |
| allow_interactive: bool = True, | |
| log_on_success: bool = False, | |
| ) -> None: | |
| timed_out = webdriver_utils.open_with_recovery(driver, url) | |
| if timed_out: | |
| self.logger("WARNING", f"{label} 页面加载超时,尝试停止页面并继续执行。") | |
| try: | |
| ready_state = webdriver_utils.wait_for_ready(wait, allow_interactive=allow_interactive) | |
| except TimeoutException as exc: | |
| raise RecoverableAutomationError(f"{label} 页面加载失败。{self._page_snapshot(driver, include_body=True)}") from exc | |
| if log_on_success or timed_out: | |
| self.logger("INFO", f"{label} 页面已打开,readyState={ready_state}。{self._page_snapshot(driver, include_body=timed_out)}") | |
| def _wait_for_login_outcome(self, driver: WebDriver, timeout_seconds: int = 10) -> tuple[str, str]: | |
| deadline = time.monotonic() + max(1, timeout_seconds) | |
| last_error = "" | |
| while time.monotonic() < deadline: | |
| try: | |
| current_url = driver.current_url or "" | |
| except WebDriverException: | |
| time.sleep(0.3) | |
| continue | |
| if self._is_login_success_url(current_url): | |
| return "success", "" | |
| last_error = self._read_login_error(driver) | |
| if last_error: | |
| return "error", last_error | |
| time.sleep(0.4) | |
| return "unknown", self._read_login_error(driver) | |
| def _read_login_error(self, driver: WebDriver) -> str: | |
| script = """ | |
| const visible = (node) => { | |
| if (!node) return false; | |
| const style = window.getComputedStyle(node); | |
| const rect = node.getBoundingClientRect(); | |
| return style.display !== 'none' && style.visibility !== 'hidden' && rect.width > 0 && rect.height > 0; | |
| }; | |
| const selectors = [ | |
| '.el-message', | |
| '[role="alert"]', | |
| '.message', | |
| '.toast', | |
| '.el-form-item__error', | |
| '.error' | |
| ]; | |
| for (const selector of selectors) { | |
| for (const node of Array.from(document.querySelectorAll(selector))) { | |
| const text = (node.innerText || '').trim(); | |
| if (visible(node) && text) { | |
| return text; | |
| } | |
| } | |
| } | |
| const nodes = Array.from(document.querySelectorAll('body span, body div')); | |
| for (const node of nodes) { | |
| const text = (node.innerText || '').trim(); | |
| if (!text || !visible(node)) continue; | |
| if (/验证码|密码|用户|账号/.test(text)) return text; | |
| } | |
| return ''; | |
| """ | |
| try: | |
| raw_message = driver.execute_script(script) or "" | |
| except WebDriverException: | |
| return "" | |
| return re.sub(r"\s+", " ", str(raw_message)).strip() | |
| def _trigger_non_blocking_action( | |
| self, | |
| driver: WebDriver, | |
| element: WebElement, | |
| *, | |
| label: str, | |
| allow_form_submit: bool = False, | |
| ) -> str: | |
| script = """ | |
| const target = arguments[0]; | |
| const allowFormSubmit = Boolean(arguments[1]); | |
| const form = allowFormSubmit && target ? (target.form || target.closest('form')) : null; | |
| let method = 'unavailable'; | |
| if (form && typeof form.requestSubmit === 'function') { | |
| method = 'scheduled-requestSubmit'; | |
| } else if (target && typeof target.click === 'function') { | |
| method = 'scheduled-js-click'; | |
| } else if (target) { | |
| method = 'scheduled-dispatch-click'; | |
| } else if (form && typeof form.submit === 'function') { | |
| method = 'scheduled-form-submit'; | |
| } | |
| if (!target && !form) { | |
| return 'unavailable'; | |
| } | |
| window.setTimeout(() => { | |
| const dispatchFallback = (node) => { | |
| if (!node) return false; | |
| ['pointerdown', 'mousedown', 'mouseup', 'click'].forEach((type) => { | |
| node.dispatchEvent(new MouseEvent(type, { | |
| bubbles: true, | |
| cancelable: true, | |
| view: window, | |
| })); | |
| }); | |
| return true; | |
| }; | |
| try { | |
| if (target && typeof target.scrollIntoView === 'function') { | |
| target.scrollIntoView({block: 'center', inline: 'center'}); | |
| } | |
| } catch (error) {} | |
| try { | |
| if (form && typeof form.requestSubmit === 'function') { | |
| form.requestSubmit(target || undefined); | |
| return; | |
| } | |
| } catch (error) {} | |
| try { | |
| if (target && typeof target.click === 'function') { | |
| target.click(); | |
| return; | |
| } | |
| } catch (error) {} | |
| try { | |
| if (dispatchFallback(target)) { | |
| return; | |
| } | |
| } catch (error) {} | |
| try { | |
| if (form && typeof form.submit === 'function') { | |
| form.submit(); | |
| } | |
| } catch (error) {} | |
| }, 0); | |
| return method; | |
| """ | |
| try: | |
| method = str(driver.execute_script(script, element, allow_form_submit) or "").strip() | |
| if method and method != "unavailable": | |
| return method | |
| self.logger("WARNING", f"{label} 的 JS 非阻塞提交未找到可用方式,回退到原生点击。") | |
| except Exception as exc: | |
| self.logger("WARNING", f"{label} 的 JS 非阻塞提交失败,回退到原生点击。原因: {exc}") | |
| try: | |
| element.click() | |
| return "native-click" | |
| except TimeoutException: | |
| self.logger("WARNING", f"{label} 的原生点击触发后页面响应超时,已执行 window.stop() 并继续等待。") | |
| self._stop_loading(driver) | |
| return "native-click-timeout" | |
| except WebDriverException as exc: | |
| raise RecoverableAutomationError(f"{label} 触发失败: {exc}") from exc | |
| def _stop_loading(driver: WebDriver) -> None: | |
| try: | |
| driver.execute_script("window.stop();") | |
| except Exception: | |
| pass | |
| def _solve_captcha_text(self, image_element: WebElement, *, scene: str) -> str: | |
| last_candidate = "" | |
| for attempt in range(1, 3): | |
| raw_text = self.captcha_solver.classification(self._extract_image_bytes(image_element)) | |
| normalized = re.sub(r"[^0-9A-Za-z]", "", str(raw_text or "")).strip() | |
| if len(normalized) >= 4: | |
| return normalized[:4] | |
| last_candidate = normalized | |
| self.logger("WARNING", f"{scene}验证码 OCR 输出异常,第 {attempt} 次结果: {raw_text!r}") | |
| try: | |
| image_element.click() | |
| time.sleep(0.4) | |
| except Exception: | |
| pass | |
| if len(last_candidate) >= 3: | |
| return last_candidate[:4] | |
| raise RecoverableAutomationError(f"{scene}验证码 OCR 未能识别出有效内容。") | |
| def _is_login_success_url(self, url: str) -> bool: | |
| if not url: | |
| return False | |
| if any(url.startswith(prefix) for prefix in LOGIN_SUCCESS_PREFIXES): | |
| return True | |
| return "zhjw.scu.edu.cn" in url and "id.scu.edu.cn" not in url | |
| def _is_session_expired(self, driver: WebDriver) -> bool: | |
| current_url = driver.current_url or "" | |
| if "id.scu.edu.cn" in current_url: | |
| return True | |
| password_box = self._find_first_visible_optional(driver, LOGIN_PASSWORD_SELECTORS, timeout=1) | |
| return password_box is not None | |
| def _safe_body_text(self, driver: WebDriver) -> str: | |
| try: | |
| body = self._find(driver, By.TAG_NAME, "body") | |
| return body.text or "" | |
| except RecoverableAutomationError: | |
| return "" | |
| def _page_snapshot(self, driver: WebDriver, *, include_body: bool = False) -> str: | |
| script = """ | |
| const body = document.body ? (document.body.innerText || '') : ''; | |
| return JSON.stringify({ | |
| url: window.location.href || '', | |
| title: document.title || '', | |
| readyState: document.readyState || '', | |
| body: body.replace(/\\s+/g, ' ').trim().slice(0, 180) | |
| }); | |
| """ | |
| try: | |
| raw = driver.execute_script(script) | |
| data = json.loads(raw) if isinstance(raw, str) else raw | |
| except Exception: | |
| current_url = getattr(driver, "current_url", "") or "" | |
| return f" url={current_url or '-'}" | |
| parts = [ | |
| f"url={data.get('url') or '-'}", | |
| f"title={data.get('title') or '-'}", | |
| f"readyState={data.get('readyState') or '-'}", | |
| ] | |
| body_excerpt = (data.get("body") or "").strip() | |
| if include_body and body_excerpt: | |
| parts.append(f"body={body_excerpt}") | |
| return " " + " | ".join(parts) | |
| def _extract_image_bytes(self, image_element: WebElement) -> bytes: | |
| source = image_element.get_attribute("src") or "" | |
| if "base64," in source: | |
| return base64.b64decode(source.split("base64,", 1)[1]) | |
| return image_element.screenshot_as_png | |
| def _find_first_visible(self, driver: WebDriver, selectors: list[tuple[str, str]], label: str, timeout: int = 0): | |
| element = self._find_first_visible_optional(driver, selectors, timeout=timeout) | |
| if element is None: | |
| raise RecoverableAutomationError(f"页面元素未找到: {label}。{self._page_snapshot(driver, include_body=True)}") | |
| return element | |
| def _find_first_visible_optional(driver: WebDriver, selectors: list[tuple[str, str]], timeout: int = 0): | |
| deadline = time.monotonic() + max(0, timeout) | |
| while True: | |
| for by, value in selectors: | |
| try: | |
| elements = driver.find_elements(by, value) | |
| except WebDriverException: | |
| continue | |
| for element in elements: | |
| try: | |
| if element.is_displayed(): | |
| return element | |
| except WebDriverException: | |
| continue | |
| if timeout <= 0 or time.monotonic() >= deadline: | |
| return None | |
| time.sleep(0.2) | |
| def _find(driver: WebDriver, by: str, value: str): | |
| try: | |
| return driver.find_element(by, value) | |
| except NoSuchElementException as exc: | |
| raise RecoverableAutomationError(f"页面元素未找到: {value}") from exc | |
| except WebDriverException as exc: | |
| raise RecoverableAutomationError(f"浏览器操作失败: {value}") from exc | |
| def _sleep_with_cancel(self, stop_event, seconds: int, reason: str) -> None: | |
| if seconds <= 0: | |
| return | |
| self.logger("INFO", f"{reason} 大约 {seconds} 秒。") | |
| for _ in range(seconds): | |
| if stop_event.is_set(): | |
| return | |
| time.sleep(1) | |