| from __future__ import annotations |
|
|
| import base64 |
| import json |
| import re |
| import time |
| import urllib.error |
| import urllib.request |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Callable |
|
|
| from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException |
| from selenium.webdriver.common.by import By |
| from selenium.webdriver.remote.webdriver import WebDriver |
| from selenium.webdriver.remote.webelement import WebElement |
| from selenium.webdriver.support.wait import WebDriverWait |
|
|
| import onnx_inference |
| import webdriver_utils |
| from core.db import Database |
|
|
|
|
| URL_LOGIN = "http://id.scu.edu.cn/enduser/sp/sso/scdxplugin_jwt23?enterpriseId=scdx&target_url=index" |
| URL_SELECT_COURSE = "http://zhjw.scu.edu.cn/student/courseSelect/courseSelect/index" |
| URL_CURRICULUM_CALLBACK = "http://zhjw.scu.edu.cn/student/courseSelect/thisSemesterCurriculum/callback" |
| LOGIN_SUCCESS_PREFIXES = ( |
| "http://zhjw.scu.edu.cn/index", |
| "http://zhjw.scu.edu.cn/", |
| "https://zhjw.scu.edu.cn/index", |
| "https://zhjw.scu.edu.cn/", |
| ) |
| CATEGORY_META = { |
| "plan": {"label": "方案选课", "tab_id": "faxk"}, |
| "free": {"label": "自由选课", "tab_id": "zyxk"}, |
| } |
|
|
| LOGIN_STUDENT_SELECTORS = [ |
| (By.XPATH, "//*[@id='app']//form//input[@type='text']"), |
| (By.XPATH, "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[1]/div/div/div[2]/div/input"), |
| ] |
| LOGIN_PASSWORD_SELECTORS = [ |
| (By.XPATH, "//*[@id='app']//form//input[@type='password']"), |
| (By.XPATH, "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[2]/div/div/div[2]/div/input"), |
| ] |
| LOGIN_CAPTCHA_INPUT_SELECTORS = [ |
| (By.XPATH, "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]//input"), |
| (By.XPATH, "//*[@id='app']//form//input[contains(@placeholder, '验证码')]"), |
| ] |
| LOGIN_CAPTCHA_IMAGE_SELECTORS = [ |
| (By.XPATH, "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]//img"), |
| (By.XPATH, "//*[@id='app']//form//img"), |
| ] |
| LOGIN_BUTTON_SELECTORS = [ |
| (By.XPATH, "//*[@id='app']//form//button"), |
| (By.XPATH, "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[4]/div/button"), |
| ] |
| SUBMIT_CAPTCHA_IMAGE_SELECTORS = [ |
| (By.XPATH, "//div[contains(@class,'dialog') or contains(@class,'modal') or contains(@class,'popup')]//img[contains(@src,'base64') or contains(translate(@src,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'captcha')]"), |
| (By.XPATH, "//img[contains(@src,'base64')]"), |
| (By.XPATH, "//img[contains(translate(@alt,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'captcha') or contains(translate(@class,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'captcha')]"), |
| ] |
| SUBMIT_CAPTCHA_INPUT_SELECTORS = [ |
| (By.XPATH, "//input[contains(@placeholder,'验证码')]"), |
| (By.XPATH, "//input[contains(translate(@id,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'captcha')]"), |
| (By.XPATH, "//input[contains(translate(@name,'ABCDEFGHIJKLMNOPQRSTUVWXYZ','abcdefghijklmnopqrstuvwxyz'),'captcha')]"), |
| ] |
| SUBMIT_CAPTCHA_BUTTON_SELECTORS = [ |
| (By.XPATH, "//button[contains(.,'确定') or contains(.,'提交') or contains(.,'确认') or contains(.,'验证')]"), |
| (By.XPATH, "//input[(@type='button' or @type='submit') and (contains(@value,'确定') or contains(@value,'提交') or contains(@value,'确认') or contains(@value,'验证'))]"), |
| ] |
|
|
|
|
| class FatalCredentialsError(Exception): |
| pass |
|
|
|
|
| class RecoverableAutomationError(Exception): |
| pass |
|
|
|
|
| @dataclass(slots=True) |
| class TaskResult: |
| status: str |
| error: str = "" |
|
|
|
|
| class CourseBot: |
| def __init__( |
| self, |
| *, |
| config, |
| store: Database, |
| task_id: int, |
| user: dict, |
| password: str, |
| logger: Callable[[str, str], None], |
| ) -> None: |
| self.config = config |
| self.store = store |
| self.task_id = task_id |
| self.user = user |
| self.password = password |
| self.logger = logger |
| self.root_dir = Path(__file__).resolve().parent.parent |
| self.select_course_js = (self.root_dir / "javascript" / "select_course.js").read_text(encoding="utf-8") |
| self.check_result_js = (self.root_dir / "javascript" / "check_result.js").read_text(encoding="utf-8") |
| self.captcha_solver = onnx_inference.CaptchaONNXInference( |
| model_path=str(self.root_dir / "ocr_provider" / "captcha_model.onnx") |
| ) |
| self.selection_attempts = 0 |
|
|
| def run(self, stop_event) -> TaskResult: |
| exhausted_sessions = 0 |
| while not stop_event.is_set(): |
| courses = self.store.list_courses_for_user(self.user["id"]) |
| if not courses: |
| self.logger("INFO", "当前没有待抢课程,任务自动结束。") |
| return TaskResult(status="completed") |
|
|
| driver: WebDriver | None = None |
| session_ready = False |
| session_error_count = 0 |
| try: |
| self.logger( |
| "INFO", |
| f"启动新的 Selenium 会话。累计已重建 {exhausted_sessions}/{self.config.selenium_restart_limit} 次。", |
| ) |
| driver = webdriver_utils.configure_browser( |
| chrome_binary=self.config.chrome_binary, |
| chromedriver_path=self.config.chromedriver_path, |
| page_timeout=self.config.browser_page_timeout, |
| ) |
| wait = WebDriverWait(driver, self.config.browser_page_timeout, 0.5) |
|
|
| while not stop_event.is_set(): |
| try: |
| if not session_ready: |
| self._login(driver, wait) |
| session_ready = True |
|
|
| current_courses = self.store.list_courses_for_user(self.user["id"]) |
| if not current_courses: |
| self.logger("INFO", "所有目标课程都已经从队列中移除,任务完成。") |
| return TaskResult(status="completed") |
|
|
| if not self._goto_select_course(driver, wait): |
| session_error_count = 0 |
| self._sleep_with_cancel( |
| stop_event, |
| self._current_poll_interval_seconds(), |
| "当前不是选课时段,等待下一轮检查。", |
| ) |
| continue |
|
|
| attempts = 0 |
| successes = 0 |
| grouped_courses = {"plan": [], "free": []} |
| for course in current_courses: |
| grouped_courses[course["category"]].append(course) |
|
|
| all_done = False |
| for category, items in grouped_courses.items(): |
| if not items: |
| continue |
| for course in items: |
| if stop_event.is_set(): |
| break |
| if not self._is_course_still_queued(course): |
| continue |
|
|
| attempts += 1 |
| attempt_number = self._register_course_attempt() |
| if self._attempt_single_course(driver, wait, course, attempt_number=attempt_number): |
| successes += 1 |
| self._maybe_probe_current_curriculum(driver, attempt_number=attempt_number) |
| if not self.store.list_courses_for_user(self.user["id"]): |
| all_done = True |
| break |
| if stop_event.is_set() or all_done: |
| break |
|
|
| if attempts == 0: |
| self.logger("INFO", "没有可执行的课程项目,下一轮将自动重试。") |
| elif successes == 0: |
| self.logger("INFO", "本轮没有抢到目标课程,准备稍后重试。") |
| else: |
| self.logger("INFO", f"本轮处理 {attempts} 门课程,成功更新 {successes} 门。") |
|
|
| session_error_count = 0 |
| if exhausted_sessions: |
| self.logger("INFO", "当前 Selenium 会话已恢复稳定,浏览器重建错误计数已清零。") |
| exhausted_sessions = 0 |
|
|
| if all_done or not self.store.list_courses_for_user(self.user["id"]): |
| self.logger("INFO", "全部课程均已处理完成。") |
| return TaskResult(status="completed") |
|
|
| self._sleep_with_cancel( |
| stop_event, |
| self._current_poll_interval_seconds(), |
| "等待下一轮刷新。", |
| ) |
| except FatalCredentialsError as exc: |
| self.logger("ERROR", str(exc)) |
| return TaskResult(status="failed", error=str(exc)) |
| except RecoverableAutomationError as exc: |
| session_ready = False |
| session_error_count += 1 |
| self.logger( |
| "WARNING", |
| f"当前 Selenium 会话第 {session_error_count}/{self.config.selenium_error_limit} 次可恢复错误: {exc}", |
| ) |
| if session_error_count < self.config.selenium_error_limit: |
| self._sleep_with_cancel( |
| stop_event, |
| self.config.task_backoff_seconds, |
| "将在当前 Selenium 会话内继续重试", |
| ) |
| continue |
|
|
| exhausted_sessions += 1 |
| if exhausted_sessions >= self.config.selenium_restart_limit: |
| message = ( |
| f"Selenium 会话连续达到错误上限并已重建 {exhausted_sessions} 次,任务终止。" |
| f"最后错误: {exc}" |
| ) |
| self.logger("ERROR", message) |
| return TaskResult(status="failed", error=message) |
|
|
| self.logger( |
| "WARNING", |
| f"当前 Selenium 会话连续错误达到 {self.config.selenium_error_limit} 次,准备重建浏览器。" |
| f"已耗尽 {exhausted_sessions}/{self.config.selenium_restart_limit} 个会话。", |
| ) |
| break |
| except Exception as exc: |
| session_ready = False |
| session_error_count += 1 |
| self.logger( |
| "WARNING", |
| f"当前 Selenium 会话第 {session_error_count}/{self.config.selenium_error_limit} 次未知错误: {exc}", |
| ) |
| if session_error_count < self.config.selenium_error_limit: |
| self._sleep_with_cancel( |
| stop_event, |
| self.config.task_backoff_seconds, |
| "当前会话发生未知错误,稍后重试", |
| ) |
| continue |
|
|
| exhausted_sessions += 1 |
| if exhausted_sessions >= self.config.selenium_restart_limit: |
| message = ( |
| f"Selenium 会话连续达到错误上限并已重建 {exhausted_sessions} 次,任务终止。" |
| f"最后错误: {exc}" |
| ) |
| self.logger("ERROR", message) |
| return TaskResult(status="failed", error=message) |
|
|
| self.logger( |
| "WARNING", |
| f"未知错误累计达到上限,准备重建 Selenium 会话。" |
| f"已耗尽 {exhausted_sessions}/{self.config.selenium_restart_limit} 个会话。", |
| ) |
| break |
| finally: |
| if driver is not None: |
| try: |
| driver.quit() |
| except Exception: |
| pass |
|
|
| self.logger("INFO", "收到停止信号,任务已结束。") |
| return TaskResult(status="stopped") |
|
|
| def _login(self, driver: WebDriver, wait: WebDriverWait) -> None: |
| for attempt in range(1, self.config.login_retry_limit + 1): |
| self._open_page(driver, wait, URL_LOGIN, f"登录页(第 {attempt} 次)", log_on_success=True) |
|
|
| std_id_box = self._find_first_visible(driver, LOGIN_STUDENT_SELECTORS, "登录学号输入框", timeout=6) |
| password_box = self._find_first_visible(driver, LOGIN_PASSWORD_SELECTORS, "登录密码输入框", timeout=6) |
| captcha_box = self._find_first_visible(driver, LOGIN_CAPTCHA_INPUT_SELECTORS, "登录验证码输入框", timeout=6) |
| login_button = self._find_first_visible(driver, LOGIN_BUTTON_SELECTORS, "登录按钮", timeout=6) |
| captcha_image = self._find_first_visible(driver, LOGIN_CAPTCHA_IMAGE_SELECTORS, "登录验证码图片", timeout=6) |
|
|
| captcha_text = self._solve_captcha_text(captcha_image, scene="登录") |
| self.logger("INFO", f"登录尝试 {attempt}/{self.config.login_retry_limit},验证码 OCR 输出: {captcha_text}") |
|
|
| std_id_box.clear() |
| std_id_box.send_keys(self.user["student_id"]) |
| password_box.clear() |
| password_box.send_keys(self.password) |
| captcha_box.clear() |
| captcha_box.send_keys(captcha_text) |
| self.logger("INFO", f"登录尝试 {attempt}/{self.config.login_retry_limit},准备提交登录表单。") |
| submit_method = self._trigger_non_blocking_action( |
| driver, |
| login_button, |
| label="登录表单", |
| allow_form_submit=True, |
| ) |
| self.logger("INFO", f"登录表单已触发提交,方式={submit_method},开始等待登录结果。") |
|
|
| state, error_message = self._wait_for_login_outcome(driver, timeout_seconds=10) |
| if state == "success": |
| self.logger("INFO", f"登录成功,耗费 {attempt} 次尝试。") |
| return |
|
|
| if any(token in error_message for token in ("用户名或密码错误", "密码错误", "账号或密码错误", "用户不存在")): |
| raise FatalCredentialsError("学号或密码错误,任务已停止,请在面板中更新后重新启动。") |
|
|
| if error_message: |
| self.logger("WARNING", f"登录失败,第 {attempt} 次尝试: {error_message}") |
| else: |
| self.logger( |
| "WARNING", |
| f"登录失败,第 {attempt} 次尝试,未读取到明确错误提示。{self._page_snapshot(driver, include_body=True)}", |
| ) |
|
|
| time.sleep(0.6) |
|
|
| raise RecoverableAutomationError("连续多次登录失败,可能是验证码识别失败、页面异常或系统暂时不可用。") |
|
|
| def _goto_select_course(self, driver: WebDriver, wait: WebDriverWait) -> bool: |
| self._open_page(driver, wait, URL_SELECT_COURSE, "选课页") |
| if self._is_session_expired(driver): |
| raise RecoverableAutomationError("检测到登录会话已失效,准备重新登录。") |
|
|
| body_text = self._safe_body_text(driver) |
| if "非选课" in body_text or "未到选课时间" in body_text: |
| self.logger("INFO", body_text.strip() or "当前不是选课时段。") |
| return False |
| return True |
|
|
| def _attempt_single_course(self, driver: WebDriver, wait: WebDriverWait, course: dict, *, attempt_number: int) -> bool: |
| category_name = CATEGORY_META[course["category"]]["label"] |
| course_key = f'{course["course_id"]}_{course["course_index"]}' |
| self.logger("INFO", f"开始尝试 {category_name} {course_key}。累计选课尝试 {attempt_number} 次。") |
|
|
| if not self._goto_select_course(driver, wait): |
| self.logger("INFO", f"跳过 {course_key},当前系统暂时不在可选课页面。") |
| return False |
| self._open_category_tab(driver, wait, course["category"]) |
|
|
| try: |
| wait.until(lambda current_driver: current_driver.find_element(By.ID, "ifra")) |
| driver.switch_to.frame("ifra") |
| course_id_box = self._find(driver, By.ID, "kch") |
| search_button = self._find(driver, By.ID, "queryButton") |
| course_id_box.clear() |
| course_id_box.send_keys(course["course_id"]) |
| search_button.click() |
| wait.until( |
| lambda current_driver: current_driver.execute_script( |
| "return document.getElementById('queryButton').innerText.indexOf('正在') === -1" |
| ) |
| ) |
| except TimeoutException as exc: |
| raise RecoverableAutomationError( |
| f"课程查询超时,页面可能暂时无响应。{self._page_snapshot(driver, include_body=True)}" |
| ) from exc |
| finally: |
| driver.switch_to.default_content() |
|
|
| time.sleep(0.2) |
| try: |
| found_target = driver.execute_script(self.select_course_js, course_key) == "yes" |
| except WebDriverException as exc: |
| raise RecoverableAutomationError(f"执行课程勾选脚本失败。{self._page_snapshot(driver, include_body=True)}") from exc |
| if not found_target: |
| self.logger("INFO", f"本轮未找到目标课程 {course_key}。") |
| return False |
|
|
| self.logger("INFO", f"已勾选目标课程 {course_key},准备提交。") |
| results = self._submit_with_optional_captcha(driver, wait, course_key) |
| if not results: |
| self.logger("WARNING", f"提交 {course_key} 后没有读取到结果列表。") |
| return False |
|
|
| satisfied = False |
| for result in results: |
| detail = (result.get("detail") or "").strip() |
| subject = (result.get("subject") or course_key).strip() |
| if result.get("result"): |
| self.logger("SUCCESS", f"成功抢到课程: {subject}") |
| satisfied = True |
| elif any(token in detail for token in ("已选", "已选择", "已经选", "已在已选课程")): |
| self.logger("INFO", f"课程已在系统中存在,自动从队列移除: {subject}") |
| satisfied = True |
| else: |
| self.logger("WARNING", f"课程 {subject} 抢课失败: {detail or '未知原因'}") |
|
|
| if satisfied: |
| self.store.remove_course_by_identity( |
| self.user["id"], |
| course["category"], |
| course["course_id"], |
| course["course_index"], |
| ) |
| return True |
| return False |
|
|
| def _register_course_attempt(self) -> int: |
| self.selection_attempts += 1 |
| self.store.increment_task_attempts(self.task_id) |
| return self.selection_attempts |
|
|
| def _maybe_probe_current_curriculum(self, driver: WebDriver, *, attempt_number: int) -> None: |
| if attempt_number <= 0 or attempt_number % 20 != 0: |
| return |
| self.logger("INFO", f"累计选课尝试 {attempt_number} 次,开始回查本学期课表。") |
| try: |
| matched_courses = self._reconcile_selected_courses_via_callback(driver) |
| except Exception as exc: |
| self.logger("WARNING", f"回查本学期课表失败: {exc}") |
| return |
|
|
| if matched_courses: |
| self.logger("INFO", f"回查完成,新增确认 {len(matched_courses)} 门已在课表中的课程。") |
| else: |
| self.logger("INFO", "回查完成,暂未发现新增已选课程。") |
|
|
| def _reconcile_selected_courses_via_callback(self, driver: WebDriver) -> list[dict]: |
| payload = self._fetch_current_curriculum_payload(driver) |
| return self._reconcile_selected_courses_from_payload(payload) |
|
|
| def _fetch_current_curriculum_payload(self, driver: WebDriver) -> dict: |
| try: |
| cookies = driver.get_cookies() |
| except WebDriverException as exc: |
| raise RecoverableAutomationError(f"读取浏览器会话 Cookie 失败: {exc}") from exc |
| if not cookies: |
| raise RecoverableAutomationError("当前浏览器会话没有可用 Cookie,无法回查本学期课表。") |
|
|
| cookie_header = "; ".join( |
| f"{cookie['name']}={cookie['value']}" |
| for cookie in cookies |
| if cookie.get("name") and cookie.get("value") |
| ) |
| if not cookie_header: |
| raise RecoverableAutomationError("当前浏览器会话没有可用 Cookie,无法回查本学期课表。") |
|
|
| try: |
| referer = driver.current_url or URL_SELECT_COURSE |
| except WebDriverException: |
| referer = URL_SELECT_COURSE |
|
|
| request = urllib.request.Request( |
| URL_CURRICULUM_CALLBACK, |
| headers={ |
| "Accept": "application/json, text/plain, */*", |
| "Cookie": cookie_header, |
| "Referer": referer, |
| "User-Agent": webdriver_utils.DEFAULT_USER_AGENT, |
| "X-Requested-With": "XMLHttpRequest", |
| }, |
| ) |
| try: |
| with urllib.request.urlopen(request, timeout=20) as response: |
| status = getattr(response, "status", 200) |
| body = response.read().decode("utf-8", errors="replace") |
| except urllib.error.URLError as exc: |
| raise RecoverableAutomationError(f"访问本学期课表回查接口失败: {exc}") from exc |
|
|
| if int(status) >= 400: |
| raise RecoverableAutomationError(f"本学期课表回查接口返回异常状态码: {status}") |
| try: |
| payload = json.loads(body) |
| except json.JSONDecodeError as exc: |
| snippet = body.strip().replace("\n", " ")[:180] |
| raise RecoverableAutomationError(f"本学期课表回查接口返回了非 JSON 内容: {snippet}") from exc |
| if not isinstance(payload, dict): |
| raise RecoverableAutomationError("本学期课表回查接口返回的数据结构不是对象。") |
| return payload |
|
|
| def _reconcile_selected_courses_from_payload(self, payload: dict) -> list[dict]: |
| selected_map = self._extract_curriculum_course_entries(payload) |
| if not selected_map: |
| return [] |
|
|
| matched_courses: list[dict] = [] |
| current_courses = self.store.list_courses_for_user(self.user["id"]) |
| for course in current_courses: |
| course_key = self._normalize_course_identity(course["course_id"], course["course_index"]) |
| detail = selected_map.get(course_key) |
| if not detail: |
| continue |
|
|
| self.store.remove_course_by_identity( |
| self.user["id"], |
| course["category"], |
| course["course_id"], |
| course["course_index"], |
| ) |
| course_name = str(detail.get("courseName") or course_key).strip() |
| status_name = str(detail.get("selectCourseStatusName") or detail.get("selectCourseStatusCode") or "已在课表中").strip() |
| self.logger( |
| "SUCCESS", |
| f"回查确认课程已在本学期课表中: {course_name} ({course_key}),状态={status_name},已自动从队列移除。", |
| ) |
| matched_courses.append({"course_key": course_key, "course_name": course_name, "status_name": status_name}) |
| return matched_courses |
|
|
| def _extract_curriculum_course_entries(self, payload: dict) -> dict[str, dict]: |
| selected_map: dict[str, dict] = {} |
| for entry in payload.get("xkxx") or []: |
| if not isinstance(entry, dict): |
| continue |
| for raw_key, detail in entry.items(): |
| if not isinstance(detail, dict): |
| continue |
| normalized_key = self._normalize_callback_course_key(raw_key, detail) |
| if normalized_key: |
| selected_map[normalized_key] = detail |
| return selected_map |
|
|
| @staticmethod |
| def _normalize_course_identity(course_id: str, course_index: str) -> str: |
| normalized_course_id = str(course_id or "").strip().upper() |
| normalized_course_index = str(course_index or "").strip().upper() |
| if not normalized_course_id or not normalized_course_index: |
| return "" |
| return f"{normalized_course_id}_{normalized_course_index}" |
|
|
| def _normalize_callback_course_key(self, raw_key: str, detail: dict | None = None) -> str: |
| raw_text = str(raw_key or "").strip() |
| if raw_text and "_" in raw_text: |
| course_id, course_index = raw_text.rsplit("_", 1) |
| normalized = self._normalize_course_identity(course_id, course_index) |
| if normalized: |
| return normalized |
|
|
| detail_id = detail.get("id") if isinstance(detail, dict) else None |
| if isinstance(detail_id, dict): |
| return self._normalize_course_identity( |
| str(detail_id.get("coureNumber") or ""), |
| str(detail_id.get("coureSequenceNumber") or ""), |
| ) |
| return "" |
|
|
| def _is_course_still_queued(self, course: dict) -> bool: |
| return any(item["id"] == course["id"] for item in self.store.list_courses_for_user(self.user["id"])) |
|
|
| def _current_poll_interval_seconds(self) -> int: |
| latest_user = self.store.get_user(self.user["id"]) |
| if latest_user is not None: |
| self.user = latest_user |
| try: |
| interval = int((latest_user or self.user).get("refresh_interval_seconds") or self.config.poll_interval_seconds) |
| except (TypeError, ValueError, AttributeError): |
| interval = int(self.config.poll_interval_seconds) |
| return max(1, min(120, interval)) |
|
|
| def _submit_with_optional_captcha(self, driver: WebDriver, wait: WebDriverWait, course_key: str) -> list[dict]: |
| submit_button = self._find(driver, By.ID, "submitButton") |
| submit_method = self._trigger_non_blocking_action( |
| driver, |
| submit_button, |
| label=f"课程提交按钮 {course_key}", |
| allow_form_submit=True, |
| ) |
| self.logger("INFO", f"课程 {course_key} 已触发提交,方式={submit_method}。") |
|
|
| for attempt in range(1, self.config.submit_captcha_retry_limit + 1): |
| state = self._wait_for_submit_state(driver, wait) |
| if state == "result": |
| return self._read_result_page(driver, wait) |
| if state == "captcha": |
| self.logger("INFO", f"提交 {course_key} 时检测到验证码,第 {attempt} 次自动识别。") |
| if not self._solve_visible_submit_captcha(driver): |
| raise RecoverableAutomationError("提交选课时检测到验证码,但未能自动完成识别与提交。") |
| continue |
| self.logger("WARNING", f"提交 {course_key} 后页面未返回结果,也未检测到验证码。") |
|
|
| raise RecoverableAutomationError("提交选课后连续多次遇到验证码或未返回结果。") |
|
|
| def _wait_for_submit_state(self, driver: WebDriver, wait: WebDriverWait) -> str: |
| script = """ |
| const visible = (el) => { |
| if (!el) return false; |
| const style = window.getComputedStyle(el); |
| const rect = el.getBoundingClientRect(); |
| return style.display !== 'none' && style.visibility !== 'hidden' && rect.width > 0 && rect.height > 0; |
| }; |
| if (document.querySelector('#xkresult tbody tr')) return 'result'; |
| const input = Array.from(document.querySelectorAll('input')).find((el) => { |
| const text = `${el.placeholder || ''} ${el.id || ''} ${el.name || ''} ${el.className || ''}`; |
| return visible(el) && /验证码|captcha/i.test(text); |
| }); |
| const img = Array.from(document.querySelectorAll('img')).find((el) => { |
| const text = `${el.src || ''} ${el.id || ''} ${el.alt || ''} ${el.className || ''}`; |
| return visible(el) && (/captcha/i.test(text) || (el.src || '').includes('base64') || (el.naturalWidth >= 50 && el.naturalWidth <= 240 && el.naturalHeight >= 20 && el.naturalHeight <= 120)); |
| }); |
| const button = Array.from(document.querySelectorAll('button, input[type="button"], input[type="submit"]')).find((el) => { |
| const text = `${el.innerText || ''} ${el.value || ''}`; |
| return visible(el) && /确定|提交|确认|验证/.test(text); |
| }); |
| if (input && img && button) return 'captcha'; |
| return 'pending'; |
| """ |
| try: |
| wait.until(lambda current_driver: current_driver.execute_script(script) != "pending") |
| except TimeoutException: |
| return "pending" |
| return str(driver.execute_script(script)) |
|
|
| def _solve_visible_submit_captcha(self, driver: WebDriver) -> bool: |
| image = self._find_first_visible_optional(driver, SUBMIT_CAPTCHA_IMAGE_SELECTORS, timeout=3) |
| input_box = self._find_first_visible_optional(driver, SUBMIT_CAPTCHA_INPUT_SELECTORS, timeout=3) |
| button = self._find_first_visible_optional(driver, SUBMIT_CAPTCHA_BUTTON_SELECTORS, timeout=3) |
| if not image or not input_box or not button: |
| return False |
|
|
| captcha_text = self._solve_captcha_text(image, scene="提交") |
| self.logger("INFO", f"提交验证码 OCR 输出: {captcha_text}") |
| input_box.clear() |
| input_box.send_keys(captcha_text) |
| submit_method = self._trigger_non_blocking_action( |
| driver, |
| button, |
| label="提交验证码确认按钮", |
| allow_form_submit=True, |
| ) |
| self.logger("INFO", f"提交验证码确认已触发,方式={submit_method}。") |
| time.sleep(1.0) |
| return True |
|
|
| def _open_category_tab(self, driver: WebDriver, wait: WebDriverWait, category: str) -> None: |
| tab_id = CATEGORY_META[category]["tab_id"] |
| tab = self._find(driver, By.ID, tab_id) |
| tab.click() |
| webdriver_utils.wait_for_ready(wait, allow_interactive=True) |
| time.sleep(0.2) |
|
|
| def _read_result_page(self, driver: WebDriver, wait: WebDriverWait) -> list[dict]: |
| try: |
| webdriver_utils.wait_for_ready(wait, allow_interactive=True) |
| wait.until( |
| lambda current_driver: current_driver.execute_script( |
| """ |
| const node = document.querySelector('#xkresult tbody tr'); |
| return Boolean(node); |
| """ |
| ) |
| ) |
| return json.loads(driver.execute_script(self.check_result_js)) |
| except Exception as exc: |
| raise RecoverableAutomationError( |
| f"读取选课结果失败,页面结构可能发生变化。{self._page_snapshot(driver, include_body=True)}" |
| ) from exc |
|
|
| def _open_page( |
| self, |
| driver: WebDriver, |
| wait: WebDriverWait, |
| url: str, |
| label: str, |
| *, |
| allow_interactive: bool = True, |
| log_on_success: bool = False, |
| ) -> None: |
| timed_out = webdriver_utils.open_with_recovery(driver, url) |
| if timed_out: |
| self.logger("WARNING", f"{label} 页面加载超时,尝试停止页面并继续执行。") |
| try: |
| ready_state = webdriver_utils.wait_for_ready(wait, allow_interactive=allow_interactive) |
| except TimeoutException as exc: |
| raise RecoverableAutomationError(f"{label} 页面加载失败。{self._page_snapshot(driver, include_body=True)}") from exc |
| if log_on_success or timed_out: |
| self.logger("INFO", f"{label} 页面已打开,readyState={ready_state}。{self._page_snapshot(driver, include_body=timed_out)}") |
|
|
| def _wait_for_login_outcome(self, driver: WebDriver, timeout_seconds: int = 10) -> tuple[str, str]: |
| deadline = time.monotonic() + max(1, timeout_seconds) |
| last_error = "" |
| while time.monotonic() < deadline: |
| try: |
| current_url = driver.current_url or "" |
| except WebDriverException: |
| time.sleep(0.3) |
| continue |
| if self._is_login_success_url(current_url): |
| return "success", "" |
| last_error = self._read_login_error(driver) |
| if last_error: |
| return "error", last_error |
| time.sleep(0.4) |
| return "unknown", self._read_login_error(driver) |
|
|
| def _read_login_error(self, driver: WebDriver) -> str: |
| script = """ |
| const visible = (node) => { |
| if (!node) return false; |
| const style = window.getComputedStyle(node); |
| const rect = node.getBoundingClientRect(); |
| return style.display !== 'none' && style.visibility !== 'hidden' && rect.width > 0 && rect.height > 0; |
| }; |
| const selectors = [ |
| '.el-message', |
| '[role="alert"]', |
| '.message', |
| '.toast', |
| '.el-form-item__error', |
| '.error' |
| ]; |
| for (const selector of selectors) { |
| for (const node of Array.from(document.querySelectorAll(selector))) { |
| const text = (node.innerText || '').trim(); |
| if (visible(node) && text) { |
| return text; |
| } |
| } |
| } |
| const nodes = Array.from(document.querySelectorAll('body span, body div')); |
| for (const node of nodes) { |
| const text = (node.innerText || '').trim(); |
| if (!text || !visible(node)) continue; |
| if (/验证码|密码|用户|账号/.test(text)) return text; |
| } |
| return ''; |
| """ |
| try: |
| raw_message = driver.execute_script(script) or "" |
| except WebDriverException: |
| return "" |
| return re.sub(r"\s+", " ", str(raw_message)).strip() |
|
|
| def _trigger_non_blocking_action( |
| self, |
| driver: WebDriver, |
| element: WebElement, |
| *, |
| label: str, |
| allow_form_submit: bool = False, |
| ) -> str: |
| script = """ |
| const target = arguments[0]; |
| const allowFormSubmit = Boolean(arguments[1]); |
| const form = allowFormSubmit && target ? (target.form || target.closest('form')) : null; |
| let method = 'unavailable'; |
| |
| if (form && typeof form.requestSubmit === 'function') { |
| method = 'scheduled-requestSubmit'; |
| } else if (target && typeof target.click === 'function') { |
| method = 'scheduled-js-click'; |
| } else if (target) { |
| method = 'scheduled-dispatch-click'; |
| } else if (form && typeof form.submit === 'function') { |
| method = 'scheduled-form-submit'; |
| } |
| |
| if (!target && !form) { |
| return 'unavailable'; |
| } |
| |
| window.setTimeout(() => { |
| const dispatchFallback = (node) => { |
| if (!node) return false; |
| ['pointerdown', 'mousedown', 'mouseup', 'click'].forEach((type) => { |
| node.dispatchEvent(new MouseEvent(type, { |
| bubbles: true, |
| cancelable: true, |
| view: window, |
| })); |
| }); |
| return true; |
| }; |
| |
| try { |
| if (target && typeof target.scrollIntoView === 'function') { |
| target.scrollIntoView({block: 'center', inline: 'center'}); |
| } |
| } catch (error) {} |
| |
| try { |
| if (form && typeof form.requestSubmit === 'function') { |
| form.requestSubmit(target || undefined); |
| return; |
| } |
| } catch (error) {} |
| |
| try { |
| if (target && typeof target.click === 'function') { |
| target.click(); |
| return; |
| } |
| } catch (error) {} |
| |
| try { |
| if (dispatchFallback(target)) { |
| return; |
| } |
| } catch (error) {} |
| |
| try { |
| if (form && typeof form.submit === 'function') { |
| form.submit(); |
| } |
| } catch (error) {} |
| }, 0); |
| |
| return method; |
| """ |
| try: |
| method = str(driver.execute_script(script, element, allow_form_submit) or "").strip() |
| if method and method != "unavailable": |
| return method |
| self.logger("WARNING", f"{label} 的 JS 非阻塞提交未找到可用方式,回退到原生点击。") |
| except Exception as exc: |
| self.logger("WARNING", f"{label} 的 JS 非阻塞提交失败,回退到原生点击。原因: {exc}") |
|
|
| try: |
| element.click() |
| return "native-click" |
| except TimeoutException: |
| self.logger("WARNING", f"{label} 的原生点击触发后页面响应超时,已执行 window.stop() 并继续等待。") |
| self._stop_loading(driver) |
| return "native-click-timeout" |
| except WebDriverException as exc: |
| raise RecoverableAutomationError(f"{label} 触发失败: {exc}") from exc |
|
|
| @staticmethod |
| def _stop_loading(driver: WebDriver) -> None: |
| try: |
| driver.execute_script("window.stop();") |
| except Exception: |
| pass |
|
|
| def _solve_captcha_text(self, image_element: WebElement, *, scene: str) -> str: |
| last_candidate = "" |
| for attempt in range(1, 3): |
| raw_text = self.captcha_solver.classification(self._extract_image_bytes(image_element)) |
| normalized = re.sub(r"[^0-9A-Za-z]", "", str(raw_text or "")).strip() |
| if len(normalized) >= 4: |
| return normalized[:4] |
| last_candidate = normalized |
| self.logger("WARNING", f"{scene}验证码 OCR 输出异常,第 {attempt} 次结果: {raw_text!r}") |
| try: |
| image_element.click() |
| time.sleep(0.4) |
| except Exception: |
| pass |
| if len(last_candidate) >= 3: |
| return last_candidate[:4] |
| raise RecoverableAutomationError(f"{scene}验证码 OCR 未能识别出有效内容。") |
|
|
| def _is_login_success_url(self, url: str) -> bool: |
| if not url: |
| return False |
| if any(url.startswith(prefix) for prefix in LOGIN_SUCCESS_PREFIXES): |
| return True |
| return "zhjw.scu.edu.cn" in url and "id.scu.edu.cn" not in url |
|
|
| def _is_session_expired(self, driver: WebDriver) -> bool: |
| current_url = driver.current_url or "" |
| if "id.scu.edu.cn" in current_url: |
| return True |
| password_box = self._find_first_visible_optional(driver, LOGIN_PASSWORD_SELECTORS, timeout=1) |
| return password_box is not None |
|
|
| def _safe_body_text(self, driver: WebDriver) -> str: |
| try: |
| body = self._find(driver, By.TAG_NAME, "body") |
| return body.text or "" |
| except RecoverableAutomationError: |
| return "" |
|
|
| def _page_snapshot(self, driver: WebDriver, *, include_body: bool = False) -> str: |
| script = """ |
| const body = document.body ? (document.body.innerText || '') : ''; |
| return JSON.stringify({ |
| url: window.location.href || '', |
| title: document.title || '', |
| readyState: document.readyState || '', |
| body: body.replace(/\\s+/g, ' ').trim().slice(0, 180) |
| }); |
| """ |
| try: |
| raw = driver.execute_script(script) |
| data = json.loads(raw) if isinstance(raw, str) else raw |
| except Exception: |
| current_url = getattr(driver, "current_url", "") or "" |
| return f" url={current_url or '-'}" |
|
|
| parts = [ |
| f"url={data.get('url') or '-'}", |
| f"title={data.get('title') or '-'}", |
| f"readyState={data.get('readyState') or '-'}", |
| ] |
| body_excerpt = (data.get("body") or "").strip() |
| if include_body and body_excerpt: |
| parts.append(f"body={body_excerpt}") |
| return " " + " | ".join(parts) |
|
|
| def _extract_image_bytes(self, image_element: WebElement) -> bytes: |
| source = image_element.get_attribute("src") or "" |
| if "base64," in source: |
| return base64.b64decode(source.split("base64,", 1)[1]) |
| return image_element.screenshot_as_png |
|
|
| def _find_first_visible(self, driver: WebDriver, selectors: list[tuple[str, str]], label: str, timeout: int = 0): |
| element = self._find_first_visible_optional(driver, selectors, timeout=timeout) |
| if element is None: |
| raise RecoverableAutomationError(f"页面元素未找到: {label}。{self._page_snapshot(driver, include_body=True)}") |
| return element |
|
|
| @staticmethod |
| def _find_first_visible_optional(driver: WebDriver, selectors: list[tuple[str, str]], timeout: int = 0): |
| deadline = time.monotonic() + max(0, timeout) |
| while True: |
| for by, value in selectors: |
| try: |
| elements = driver.find_elements(by, value) |
| except WebDriverException: |
| continue |
| for element in elements: |
| try: |
| if element.is_displayed(): |
| return element |
| except WebDriverException: |
| continue |
| if timeout <= 0 or time.monotonic() >= deadline: |
| return None |
| time.sleep(0.2) |
|
|
| @staticmethod |
| def _find(driver: WebDriver, by: str, value: str): |
| try: |
| return driver.find_element(by, value) |
| except NoSuchElementException as exc: |
| raise RecoverableAutomationError(f"页面元素未找到: {value}") from exc |
| except WebDriverException as exc: |
| raise RecoverableAutomationError(f"浏览器操作失败: {value}") from exc |
|
|
| def _sleep_with_cancel(self, stop_event, seconds: int, reason: str) -> None: |
| if seconds <= 0: |
| return |
| self.logger("INFO", f"{reason} 大约 {seconds} 秒。") |
| for _ in range(seconds): |
| if stop_event.is_set(): |
| return |
| time.sleep(1) |
|
|