| from __future__ import annotations |
|
|
| import base64 |
| import json |
| import re |
| import time |
| from pathlib import Path |
| from typing import Callable |
|
|
| import onnx_inference |
| from selenium import webdriver |
| from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException |
| from selenium.webdriver.chrome.service import Service |
| from selenium.webdriver.common.by import By |
| from selenium.webdriver.remote.webdriver import WebDriver |
| from selenium.webdriver.support import expected_conditions as EC |
| from selenium.webdriver.support.ui import WebDriverWait |
|
|
| from course_catcher.config import AppConfig |
|
|
| SCU_LOGIN_URL = ( |
| "http://id.scu.edu.cn/enduser/sp/sso/scdxplugin_jwt23?enterpriseId=scdx&target_url=index" |
| ) |
| SCU_SELECT_URL = "http://zhjw.scu.edu.cn/student/courseSelect/courseSelect/index" |
| TAB_IDS = {"plan": "faxk", "free": "zyxk"} |
| ALREADY_SELECTED_KEYWORDS = ("已选", "已选择", "已修读") |
|
|
|
|
| class AutomationError(Exception): |
| pass |
|
|
|
|
| class CredentialsError(AutomationError): |
| pass |
|
|
|
|
| class SessionExpiredError(AutomationError): |
| pass |
|
|
|
|
| class TemporaryAutomationError(AutomationError): |
| pass |
|
|
|
|
| class CourseAutomation: |
| def __init__(self, config: AppConfig) -> None: |
| self.config = config |
| self.select_course_js = Path("javascript/select_course.js").read_text(encoding="utf-8") |
| self.check_result_js = Path("javascript/check_result.js").read_text(encoding="utf-8") |
| self.captcha_solver = onnx_inference.CaptchaONNXInference( |
| model_path=str(Path("ocr_provider") / "captcha_model.onnx") |
| ) |
|
|
| def run_until_stopped( |
| self, |
| task_id: int, |
| user_credentials: dict, |
| db, |
| should_stop: Callable[[], bool], |
| log: Callable[[str, str], None], |
| ) -> tuple[str, str]: |
| driver: WebDriver | None = None |
| last_error = "" |
| try: |
| while True: |
| if should_stop(): |
| log("INFO", "收到停止请求,准备安全退出任务。") |
| return "stopped", "" |
|
|
| pending_courses = db.list_pending_courses(user_credentials["id"]) |
| if not pending_courses: |
| log("SUCCESS", "所有待抢课程都已完成,本轮任务结束。") |
| return "completed", "" |
|
|
| db.increment_task_attempt(task_id) |
| if driver is None: |
| log("INFO", "正在启动浏览器会话并登录教务系统。") |
| driver = self._build_driver() |
| self._login(driver, user_credentials["student_id"], user_credentials["password"], log) |
| self._goto_select_course(driver, log) |
|
|
| try: |
| self._run_single_cycle(driver, user_credentials["id"], pending_courses, db, log) |
| last_error = "" |
| except SessionExpiredError as exc: |
| last_error = str(exc) |
| log("WARNING", f"{exc},将重新建立会话。") |
| self._safe_quit(driver) |
| driver = None |
| time.sleep(2) |
| continue |
| except TemporaryAutomationError as exc: |
| last_error = str(exc) |
| log("WARNING", f"{exc},稍后重试。") |
| self._safe_quit(driver) |
| driver = None |
| except CredentialsError: |
| raise |
| except Exception as exc: |
| last_error = str(exc) |
| log("ERROR", f"本轮执行发生异常:{exc}") |
| self._safe_quit(driver) |
| driver = None |
|
|
| if should_stop(): |
| log("INFO", "收到停止请求,准备安全退出任务。") |
| return "stopped", "" |
|
|
| time.sleep(db.get_setting_float("loop_interval_seconds", self.config.loop_interval_seconds)) |
| except CredentialsError as exc: |
| return "failed", str(exc) |
| finally: |
| self._safe_quit(driver) |
|
|
| def _build_driver(self) -> WebDriver: |
| options = webdriver.ChromeOptions() |
| options.add_argument("--headless=new") |
| options.add_argument("--no-sandbox") |
| options.add_argument("--disable-dev-shm-usage") |
| options.add_argument("--disable-gpu") |
| options.add_argument("--disable-blink-features=AutomationControlled") |
| options.add_argument("--window-size=1440,1600") |
| options.add_argument("--lang=zh-CN") |
| options.add_argument( |
| "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" |
| ) |
|
|
| chrome_binary = Path(self.config.chrome_binary) |
| if chrome_binary.exists(): |
| options.binary_location = str(chrome_binary) |
|
|
| service = None |
| chromedriver_binary = Path(self.config.chromedriver_binary) |
| if chromedriver_binary.exists(): |
| service = Service(executable_path=str(chromedriver_binary)) |
|
|
| driver = webdriver.Chrome(service=service, options=options) if service else webdriver.Chrome(options=options) |
| driver.set_page_load_timeout(self.config.request_timeout_seconds) |
| driver.implicitly_wait(5) |
| return driver |
|
|
| def _login(self, driver: WebDriver, student_id: str, password: str, log: Callable[[str, str], None]) -> None: |
| for attempt in range(1, self.config.login_retry_limit + 1): |
| driver.get(SCU_LOGIN_URL) |
| self._wait_ready(driver) |
|
|
| student_box = self._find_first( |
| driver, |
| [ |
| (By.XPATH, "//*[@id='app']//form//input[@type='text']"), |
| ( |
| By.XPATH, |
| "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[1]/div/div/div[2]/div/input", |
| ), |
| ], |
| ) |
| password_box = self._find_first( |
| driver, |
| [ |
| (By.XPATH, "//*[@id='app']//form//input[@type='password']"), |
| ( |
| By.XPATH, |
| "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[2]/div/div/div[2]/div/input", |
| ), |
| ], |
| ) |
| captcha_box = self._find_first( |
| driver, |
| [ |
| ( |
| By.XPATH, |
| "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]//input", |
| ), |
| (By.XPATH, "//*[@id='app']//form//input[contains(@placeholder, '验证码')]"), |
| ], |
| ) |
| login_button = self._find_first( |
| driver, |
| [ |
| (By.XPATH, "//*[@id='app']//form//button"), |
| ( |
| By.XPATH, |
| "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[4]/div/button", |
| ), |
| ], |
| ) |
|
|
| student_box.clear() |
| student_box.send_keys(student_id) |
| password_box.clear() |
| password_box.send_keys(password) |
| captcha_box.clear() |
| captcha_box.send_keys(self._solve_captcha(driver)) |
| login_button.click() |
|
|
| if self._wait_for_login_success(driver): |
| log("SUCCESS", "教务系统登录成功。") |
| return |
|
|
| error_message = self._read_login_error(driver) |
| if error_message: |
| log("WARNING", f"第 {attempt} 次登录失败:{error_message}") |
| if "用户名或密码错误" in error_message or "密码错误" in error_message: |
| raise CredentialsError("学生账号或密码错误,请在用户面板更新后重新启动任务。") |
| if "验证码" not in error_message and attempt >= 2: |
| raise TemporaryAutomationError(error_message) |
| else: |
| log("WARNING", f"第 {attempt} 次登录失败,未捕获到明确错误提示。") |
|
|
| raise TemporaryAutomationError("连续多次登录失败,可能是验证码识别失败或当前系统不可用。") |
|
|
| def _wait_for_login_success(self, driver: WebDriver) -> bool: |
| try: |
| WebDriverWait(driver, 8, 0.5).until( |
| lambda current: current.current_url in {"http://zhjw.scu.edu.cn/index", "http://zhjw.scu.edu.cn/"} |
| or current.current_url.startswith("http://zhjw.scu.edu.cn/index") |
| ) |
| return True |
| except TimeoutException: |
| return False |
|
|
| def _read_login_error(self, driver: WebDriver) -> str: |
| candidates = [ |
| "/html/body/div[2]", |
| "//div[contains(@class, 'el-message') or contains(@class, 'message')]", |
| ] |
| for xpath in candidates: |
| try: |
| element = driver.find_element(By.XPATH, xpath) |
| text = element.text.strip() |
| if text: |
| return text |
| except NoSuchElementException: |
| continue |
| return "" |
|
|
| def _solve_captcha(self, driver: WebDriver) -> str: |
| captcha_img = self._find_first( |
| driver, |
| [ |
| ( |
| By.XPATH, |
| "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]/div/div/img", |
| ), |
| (By.XPATH, "//*[@id='app']//form//img"), |
| ], |
| ) |
| src = captcha_img.get_attribute("src") or "" |
| if "base64," in src: |
| image_bytes = base64.b64decode(src.split("base64,", 1)[1]) |
| else: |
| image_bytes = captcha_img.screenshot_as_png |
| return self.captcha_solver.classification(image_bytes) |
|
|
| def _goto_select_course(self, driver: WebDriver, log: Callable[[str, str], None]) -> None: |
| driver.get(SCU_SELECT_URL) |
| self._wait_ready(driver) |
| body_text = driver.find_element(By.TAG_NAME, "body").text |
| if "非选课" in body_text: |
| raise TemporaryAutomationError("当前不是可选课时段,教务系统已返回非选课提示") |
| if "登录" in body_text and "学号" in body_text: |
| raise SessionExpiredError("当前会话已失效,需要重新登录") |
| log("INFO", "已进入选课页面。") |
|
|
| def _run_single_cycle( |
| self, |
| driver: WebDriver, |
| user_id: int, |
| pending_courses: list[dict], |
| db, |
| log: Callable[[str, str], None], |
| ) -> None: |
| self._goto_select_course(driver, log) |
| for tab_name in ("plan", "free"): |
| current_pending = db.list_pending_courses(user_id) |
| if not current_pending: |
| return |
| results = self._attempt_tab(driver, tab_name, current_pending, log) |
| for result in results: |
| parsed = self._extract_course_pair(result["subject"]) |
| if not parsed: |
| log("WARNING", f"无法从结果文本中解析课程号:{result['subject']}") |
| continue |
| course_id, course_index = parsed |
| status = "selected" if self._is_selected_result(result) else "pending" |
| db.mark_course_result(user_id, course_id, course_index, status, result["detail"]) |
| level = "SUCCESS" if status == "selected" else "INFO" |
| log(level, f"{course_id}_{course_index}: {result['detail']}") |
| if results: |
| self._goto_select_course(driver, log) |
|
|
| def _attempt_tab( |
| self, |
| driver: WebDriver, |
| tab_name: str, |
| courses: list[dict], |
| log: Callable[[str, str], None], |
| ) -> list[dict]: |
| tab_id = TAB_IDS[tab_name] |
| self._find_first(driver, [(By.ID, tab_id)]).click() |
| self._wait_ready(driver) |
| selected_any = False |
|
|
| for course in courses: |
| self._wait_for_iframe(driver) |
| driver.switch_to.frame("ifra") |
| try: |
| course_input = self._find_first(driver, [(By.ID, "kch")], timeout=10) |
| query_button = self._find_first(driver, [(By.ID, "queryButton")], timeout=10) |
| course_input.clear() |
| course_input.send_keys(course["course_id"]) |
| query_button.click() |
| WebDriverWait(driver, 20, 0.3).until( |
| lambda current: "正在" not in current.find_element(By.ID, "queryButton").text |
| ) |
| finally: |
| driver.switch_to.default_content() |
|
|
| self._wait_for_iframe(driver) |
| found = ( |
| driver.execute_script( |
| self.select_course_js, |
| f"{course['course_id']}_{course['course_index']}", |
| ) |
| == "yes" |
| ) |
| if found: |
| selected_any = True |
| log("INFO", f"[{tab_name}] 已勾选课程 {course['course_id']}_{course['course_index']}。") |
| else: |
| log("INFO", f"[{tab_name}] 未找到课程 {course['course_id']}_{course['course_index']}。") |
|
|
| if not selected_any: |
| return [] |
|
|
| submit_button = self._find_first(driver, [(By.ID, "submitButton")], timeout=10) |
| submit_button.click() |
| return self._collect_results(driver) |
|
|
| def _collect_results(self, driver: WebDriver) -> list[dict]: |
| WebDriverWait(driver, 20, 0.5).until( |
| lambda current: current.execute_script("return document.getElementById('xkresult') !== null") |
| ) |
| raw_result = driver.execute_script(self.check_result_js) |
| if isinstance(raw_result, str): |
| return json.loads(raw_result) |
| return raw_result |
|
|
| def _is_selected_result(self, result: dict) -> bool: |
| detail = result.get("detail", "") |
| return bool(result.get("result")) or any(keyword in detail for keyword in ALREADY_SELECTED_KEYWORDS) |
|
|
| def _extract_course_pair(self, subject: str) -> tuple[str, str] | None: |
| strict_match = re.findall(r"(\d{5,})_(\d{2})", subject) |
| if strict_match: |
| return strict_match[-1] |
| loose_match = re.findall(r"(\d+)", subject) |
| if len(loose_match) >= 2: |
| return loose_match[-2], loose_match[-1].zfill(2)[-2:] |
| return None |
|
|
| def _wait_for_iframe(self, driver: WebDriver) -> None: |
| try: |
| WebDriverWait(driver, 15, 0.5).until(EC.frame_to_be_available_and_switch_to_it((By.ID, "ifra"))) |
| except TimeoutException as exc: |
| raise SessionExpiredError("选课 iframe 未能正常加载") from exc |
| finally: |
| driver.switch_to.default_content() |
|
|
| def _wait_ready(self, driver: WebDriver) -> None: |
| WebDriverWait(driver, self.config.request_timeout_seconds, 0.5).until( |
| lambda current: current.execute_script("return document.readyState") == "complete" |
| ) |
|
|
| def _find_first(self, driver: WebDriver, selectors: list[tuple[str, str]], timeout: int | None = None): |
| timeout = timeout or self.config.request_timeout_seconds |
|
|
| def locate(current: WebDriver): |
| for by, value in selectors: |
| try: |
| return current.find_element(by, value) |
| except NoSuchElementException: |
| continue |
| return False |
|
|
| try: |
| return WebDriverWait(driver, timeout, 0.5).until(locate) |
| except TimeoutException as exc: |
| selectors_text = ", ".join(f"{by}:{value}" for by, value in selectors) |
| raise TemporaryAutomationError(f"未找到页面元素:{selectors_text}") from exc |
|
|
| def _safe_quit(self, driver: WebDriver | None) -> None: |
| if not driver: |
| return |
| try: |
| driver.quit() |
| except WebDriverException: |
| pass |
|
|