Spaces:
Paused
Paused
| from __future__ import annotations | |
| import base64 | |
| import json | |
| import re | |
| import time | |
| from pathlib import Path | |
| from typing import Callable | |
| import onnx_inference | |
| from selenium import webdriver | |
| from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException | |
| from selenium.webdriver.chrome.service import Service | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.remote.webdriver import WebDriver | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from course_catcher.config import AppConfig | |
| SCU_LOGIN_URL = ( | |
| "http://id.scu.edu.cn/enduser/sp/sso/scdxplugin_jwt23?enterpriseId=scdx&target_url=index" | |
| ) | |
| SCU_SELECT_URL = "http://zhjw.scu.edu.cn/student/courseSelect/courseSelect/index" | |
| TAB_IDS = {"plan": "faxk", "free": "zyxk"} | |
| ALREADY_SELECTED_KEYWORDS = ("已选", "已选择", "已修读") | |
| class AutomationError(Exception): | |
| pass | |
| class CredentialsError(AutomationError): | |
| pass | |
| class SessionExpiredError(AutomationError): | |
| pass | |
| class TemporaryAutomationError(AutomationError): | |
| pass | |
| class CourseAutomation: | |
| def __init__(self, config: AppConfig) -> None: | |
| self.config = config | |
| self.select_course_js = Path("javascript/select_course.js").read_text(encoding="utf-8") | |
| self.check_result_js = Path("javascript/check_result.js").read_text(encoding="utf-8") | |
| self.captcha_solver = onnx_inference.CaptchaONNXInference( | |
| model_path=str(Path("ocr_provider") / "captcha_model.onnx") | |
| ) | |
| def run_until_stopped( | |
| self, | |
| task_id: int, | |
| user_credentials: dict, | |
| db, | |
| should_stop: Callable[[], bool], | |
| log: Callable[[str, str], None], | |
| ) -> tuple[str, str]: | |
| driver: WebDriver | None = None | |
| last_error = "" | |
| try: | |
| while True: | |
| if should_stop(): | |
| log("INFO", "收到停止请求,准备安全退出任务。") | |
| return "stopped", "" | |
| pending_courses = db.list_pending_courses(user_credentials["id"]) | |
| if not pending_courses: | |
| log("SUCCESS", "所有待抢课程都已完成,本轮任务结束。") | |
| return "completed", "" | |
| db.increment_task_attempt(task_id) | |
| if driver is None: | |
| log("INFO", "正在启动浏览器会话并登录教务系统。") | |
| driver = self._build_driver() | |
| self._login(driver, user_credentials["student_id"], user_credentials["password"], log) | |
| self._goto_select_course(driver, log) | |
| try: | |
| self._run_single_cycle(driver, user_credentials["id"], pending_courses, db, log) | |
| last_error = "" | |
| except SessionExpiredError as exc: | |
| last_error = str(exc) | |
| log("WARNING", f"{exc},将重新建立会话。") | |
| self._safe_quit(driver) | |
| driver = None | |
| time.sleep(2) | |
| continue | |
| except TemporaryAutomationError as exc: | |
| last_error = str(exc) | |
| log("WARNING", f"{exc},稍后重试。") | |
| self._safe_quit(driver) | |
| driver = None | |
| except CredentialsError: | |
| raise | |
| except Exception as exc: # pragma: no cover - defensive path | |
| last_error = str(exc) | |
| log("ERROR", f"本轮执行发生异常:{exc}") | |
| self._safe_quit(driver) | |
| driver = None | |
| if should_stop(): | |
| log("INFO", "收到停止请求,准备安全退出任务。") | |
| return "stopped", "" | |
| time.sleep(db.get_setting_float("loop_interval_seconds", self.config.loop_interval_seconds)) | |
| except CredentialsError as exc: | |
| return "failed", str(exc) | |
| finally: | |
| self._safe_quit(driver) | |
| def _build_driver(self) -> WebDriver: | |
| options = webdriver.ChromeOptions() | |
| options.add_argument("--headless=new") | |
| options.add_argument("--no-sandbox") | |
| options.add_argument("--disable-dev-shm-usage") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument("--disable-blink-features=AutomationControlled") | |
| options.add_argument("--window-size=1440,1600") | |
| options.add_argument("--lang=zh-CN") | |
| options.add_argument( | |
| "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" | |
| ) | |
| chrome_binary = Path(self.config.chrome_binary) | |
| if chrome_binary.exists(): | |
| options.binary_location = str(chrome_binary) | |
| service = None | |
| chromedriver_binary = Path(self.config.chromedriver_binary) | |
| if chromedriver_binary.exists(): | |
| service = Service(executable_path=str(chromedriver_binary)) | |
| driver = webdriver.Chrome(service=service, options=options) if service else webdriver.Chrome(options=options) | |
| driver.set_page_load_timeout(self.config.request_timeout_seconds) | |
| driver.implicitly_wait(5) | |
| return driver | |
| def _login(self, driver: WebDriver, student_id: str, password: str, log: Callable[[str, str], None]) -> None: | |
| for attempt in range(1, self.config.login_retry_limit + 1): | |
| driver.get(SCU_LOGIN_URL) | |
| self._wait_ready(driver) | |
| student_box = self._find_first( | |
| driver, | |
| [ | |
| (By.XPATH, "//*[@id='app']//form//input[@type='text']"), | |
| ( | |
| By.XPATH, | |
| "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[1]/div/div/div[2]/div/input", | |
| ), | |
| ], | |
| ) | |
| password_box = self._find_first( | |
| driver, | |
| [ | |
| (By.XPATH, "//*[@id='app']//form//input[@type='password']"), | |
| ( | |
| By.XPATH, | |
| "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[2]/div/div/div[2]/div/input", | |
| ), | |
| ], | |
| ) | |
| captcha_box = self._find_first( | |
| driver, | |
| [ | |
| ( | |
| By.XPATH, | |
| "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]//input", | |
| ), | |
| (By.XPATH, "//*[@id='app']//form//input[contains(@placeholder, '验证码')]"), | |
| ], | |
| ) | |
| login_button = self._find_first( | |
| driver, | |
| [ | |
| (By.XPATH, "//*[@id='app']//form//button"), | |
| ( | |
| By.XPATH, | |
| "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[4]/div/button", | |
| ), | |
| ], | |
| ) | |
| student_box.clear() | |
| student_box.send_keys(student_id) | |
| password_box.clear() | |
| password_box.send_keys(password) | |
| captcha_box.clear() | |
| captcha_box.send_keys(self._solve_captcha(driver)) | |
| login_button.click() | |
| if self._wait_for_login_success(driver): | |
| log("SUCCESS", "教务系统登录成功。") | |
| return | |
| error_message = self._read_login_error(driver) | |
| if error_message: | |
| log("WARNING", f"第 {attempt} 次登录失败:{error_message}") | |
| if "用户名或密码错误" in error_message or "密码错误" in error_message: | |
| raise CredentialsError("学生账号或密码错误,请在用户面板更新后重新启动任务。") | |
| if "验证码" not in error_message and attempt >= 2: | |
| raise TemporaryAutomationError(error_message) | |
| else: | |
| log("WARNING", f"第 {attempt} 次登录失败,未捕获到明确错误提示。") | |
| raise TemporaryAutomationError("连续多次登录失败,可能是验证码识别失败或当前系统不可用。") | |
| def _wait_for_login_success(self, driver: WebDriver) -> bool: | |
| try: | |
| WebDriverWait(driver, 8, 0.5).until( | |
| lambda current: current.current_url in {"http://zhjw.scu.edu.cn/index", "http://zhjw.scu.edu.cn/"} | |
| or current.current_url.startswith("http://zhjw.scu.edu.cn/index") | |
| ) | |
| return True | |
| except TimeoutException: | |
| return False | |
| def _read_login_error(self, driver: WebDriver) -> str: | |
| candidates = [ | |
| "/html/body/div[2]", | |
| "//div[contains(@class, 'el-message') or contains(@class, 'message')]", | |
| ] | |
| for xpath in candidates: | |
| try: | |
| element = driver.find_element(By.XPATH, xpath) | |
| text = element.text.strip() | |
| if text: | |
| return text | |
| except NoSuchElementException: | |
| continue | |
| return "" | |
| def _solve_captcha(self, driver: WebDriver) -> str: | |
| captcha_img = self._find_first( | |
| driver, | |
| [ | |
| ( | |
| By.XPATH, | |
| "//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]/div/div/img", | |
| ), | |
| (By.XPATH, "//*[@id='app']//form//img"), | |
| ], | |
| ) | |
| src = captcha_img.get_attribute("src") or "" | |
| if "base64," in src: | |
| image_bytes = base64.b64decode(src.split("base64,", 1)[1]) | |
| else: | |
| image_bytes = captcha_img.screenshot_as_png | |
| return self.captcha_solver.classification(image_bytes) | |
| def _goto_select_course(self, driver: WebDriver, log: Callable[[str, str], None]) -> None: | |
| driver.get(SCU_SELECT_URL) | |
| self._wait_ready(driver) | |
| body_text = driver.find_element(By.TAG_NAME, "body").text | |
| if "非选课" in body_text: | |
| raise TemporaryAutomationError("当前不是可选课时段,教务系统已返回非选课提示") | |
| if "登录" in body_text and "学号" in body_text: | |
| raise SessionExpiredError("当前会话已失效,需要重新登录") | |
| log("INFO", "已进入选课页面。") | |
| def _run_single_cycle( | |
| self, | |
| driver: WebDriver, | |
| user_id: int, | |
| pending_courses: list[dict], | |
| db, | |
| log: Callable[[str, str], None], | |
| ) -> None: | |
| self._goto_select_course(driver, log) | |
| for tab_name in ("plan", "free"): | |
| current_pending = db.list_pending_courses(user_id) | |
| if not current_pending: | |
| return | |
| results = self._attempt_tab(driver, tab_name, current_pending, log) | |
| for result in results: | |
| parsed = self._extract_course_pair(result["subject"]) | |
| if not parsed: | |
| log("WARNING", f"无法从结果文本中解析课程号:{result['subject']}") | |
| continue | |
| course_id, course_index = parsed | |
| status = "selected" if self._is_selected_result(result) else "pending" | |
| db.mark_course_result(user_id, course_id, course_index, status, result["detail"]) | |
| level = "SUCCESS" if status == "selected" else "INFO" | |
| log(level, f"{course_id}_{course_index}: {result['detail']}") | |
| if results: | |
| self._goto_select_course(driver, log) | |
| def _attempt_tab( | |
| self, | |
| driver: WebDriver, | |
| tab_name: str, | |
| courses: list[dict], | |
| log: Callable[[str, str], None], | |
| ) -> list[dict]: | |
| tab_id = TAB_IDS[tab_name] | |
| self._find_first(driver, [(By.ID, tab_id)]).click() | |
| self._wait_ready(driver) | |
| selected_any = False | |
| for course in courses: | |
| self._wait_for_iframe(driver) | |
| driver.switch_to.frame("ifra") | |
| try: | |
| course_input = self._find_first(driver, [(By.ID, "kch")], timeout=10) | |
| query_button = self._find_first(driver, [(By.ID, "queryButton")], timeout=10) | |
| course_input.clear() | |
| course_input.send_keys(course["course_id"]) | |
| query_button.click() | |
| WebDriverWait(driver, 20, 0.3).until( | |
| lambda current: "正在" not in current.find_element(By.ID, "queryButton").text | |
| ) | |
| finally: | |
| driver.switch_to.default_content() | |
| self._wait_for_iframe(driver) | |
| found = ( | |
| driver.execute_script( | |
| self.select_course_js, | |
| f"{course['course_id']}_{course['course_index']}", | |
| ) | |
| == "yes" | |
| ) | |
| if found: | |
| selected_any = True | |
| log("INFO", f"[{tab_name}] 已勾选课程 {course['course_id']}_{course['course_index']}。") | |
| else: | |
| log("INFO", f"[{tab_name}] 未找到课程 {course['course_id']}_{course['course_index']}。") | |
| if not selected_any: | |
| return [] | |
| submit_button = self._find_first(driver, [(By.ID, "submitButton")], timeout=10) | |
| submit_button.click() | |
| return self._collect_results(driver) | |
| def _collect_results(self, driver: WebDriver) -> list[dict]: | |
| WebDriverWait(driver, 20, 0.5).until( | |
| lambda current: current.execute_script("return document.getElementById('xkresult') !== null") | |
| ) | |
| raw_result = driver.execute_script(self.check_result_js) | |
| if isinstance(raw_result, str): | |
| return json.loads(raw_result) | |
| return raw_result | |
| def _is_selected_result(self, result: dict) -> bool: | |
| detail = result.get("detail", "") | |
| return bool(result.get("result")) or any(keyword in detail for keyword in ALREADY_SELECTED_KEYWORDS) | |
| def _extract_course_pair(self, subject: str) -> tuple[str, str] | None: | |
| strict_match = re.findall(r"(\d{5,})_(\d{2})", subject) | |
| if strict_match: | |
| return strict_match[-1] | |
| loose_match = re.findall(r"(\d+)", subject) | |
| if len(loose_match) >= 2: | |
| return loose_match[-2], loose_match[-1].zfill(2)[-2:] | |
| return None | |
| def _wait_for_iframe(self, driver: WebDriver) -> None: | |
| try: | |
| WebDriverWait(driver, 15, 0.5).until(EC.frame_to_be_available_and_switch_to_it((By.ID, "ifra"))) | |
| except TimeoutException as exc: | |
| raise SessionExpiredError("选课 iframe 未能正常加载") from exc | |
| finally: | |
| driver.switch_to.default_content() | |
| def _wait_ready(self, driver: WebDriver) -> None: | |
| WebDriverWait(driver, self.config.request_timeout_seconds, 0.5).until( | |
| lambda current: current.execute_script("return document.readyState") == "complete" | |
| ) | |
| def _find_first(self, driver: WebDriver, selectors: list[tuple[str, str]], timeout: int | None = None): | |
| timeout = timeout or self.config.request_timeout_seconds | |
| def locate(current: WebDriver): | |
| for by, value in selectors: | |
| try: | |
| return current.find_element(by, value) | |
| except NoSuchElementException: | |
| continue | |
| return False | |
| try: | |
| return WebDriverWait(driver, timeout, 0.5).until(locate) | |
| except TimeoutException as exc: | |
| selectors_text = ", ".join(f"{by}:{value}" for by, value in selectors) | |
| raise TemporaryAutomationError(f"未找到页面元素:{selectors_text}") from exc | |
| def _safe_quit(self, driver: WebDriver | None) -> None: | |
| if not driver: | |
| return | |
| try: | |
| driver.quit() | |
| except WebDriverException: | |
| pass | |