from __future__ import annotations import base64 import json import re import time from dataclasses import dataclass from selenium.common.exceptions import NoSuchElementException, WebDriverException from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait import onnx_inference import webdriver_utils from core.config import CAPTCHA_MODEL_PATH, CHECK_RESULT_JS_PATH, CONFIG, SELECT_COURSE_JS_PATH from core.database import Database from core.security import decrypt_secret URL_LOGIN = 'http://id.scu.edu.cn/enduser/sp/sso/scdxplugin_jwt23?enterpriseId=scdx&target_url=index' URL_SELECT_COURSE = 'http://zhjw.scu.edu.cn/student/courseSelect/courseSelect/index' LOGIN_SUCCESS_PREFIXES = ( 'http://zhjw.scu.edu.cn/index', 'http://zhjw.scu.edu.cn/', 'https://zhjw.scu.edu.cn/index', 'https://zhjw.scu.edu.cn/', ) class TaskStoppedError(RuntimeError): """Raised when a running task is stopped by the user or the service.""" @dataclass(frozen=True) class CourseTarget: course_id: str course_index: str class CourseTaskRunner: def __init__(self, database: Database, context): self.database = database self.context = context self.driver = None self.web_wait: WebDriverWait | None = None self.select_course_js = SELECT_COURSE_JS_PATH.read_text(encoding='utf-8') self.check_result_js = CHECK_RESULT_JS_PATH.read_text(encoding='utf-8') self.captcha_solver = onnx_inference.CaptchaONNXInference(model_path=str(CAPTCHA_MODEL_PATH)) def run(self) -> None: task = self.database.get_task(self.context.task_id) if not task: raise RuntimeError('任务不存在,无法启动。') user = self.database.get_user_by_id(task['user_id']) if not user: raise RuntimeError('用户不存在,无法执行选课任务。') payload = json.loads(task['task_payload']) courses = [CourseTarget(**item) for item in payload.get('courses', [])] if not courses: self.database.set_task_status(self.context.task_id, 'failed', last_error='没有可执行的课程。') return plain_password = decrypt_secret(user['encrypted_password']) self.context.log('info', f'任务启动,目标课程 {len(courses)} 门。') self.context.log('info', '正在启动浏览器环境。') self.driver = webdriver_utils.configure_browser() self.web_wait = WebDriverWait(self.driver, CONFIG.page_ready_timeout, 0.4) try: self._login(student_id=user['student_id'], password=plain_password) self._catch_courses(courses) finally: if self.driver: try: self.driver.quit() except WebDriverException: pass def _login(self, *, student_id: str, password: str) -> None: assert self.driver and self.web_wait self.context.log('info', '正在登录川大统一认证。') captcha_failures = 0 other_failures = 0 while True: self._raise_if_stopped() self.driver.get(URL_LOGIN) webdriver_utils.wait_for_ready(self.web_wait) std_box, password_box, captcha_box, login_button = self._get_login_fields() std_box.clear() std_box.send_keys(student_id) password_box.clear() password_box.send_keys(password) captcha_raw, captcha_b64 = self._read_captcha_image() if captcha_failures >= CONFIG.captcha_auto_attempts: self.context.log('warning', '自动验证码识别已到阈值,等待人工输入。') captcha_text = self.context.request_captcha(captcha_b64) if not captcha_text: raise RuntimeError('验证码等待超时,登录已终止。') self.context.log('info', '收到人工验证码,继续登录。') else: captcha_text = self.captcha_solver.classification(captcha_raw) self.context.log('debug', f'验证码模型输出:{captcha_text}') captcha_box.clear() captcha_box.send_keys(captcha_text) login_button.click() if self._wait_for_login_success(): self.context.log('info', '登录成功。') return error_text = self._read_login_error_text() if error_text: self.context.log('warning', f'登录返回:{error_text}') else: self.context.log('warning', '未捕获到明确登录错误,准备重试。') if '密码错误' in error_text or '用户名或密码错误' in error_text or '用户不存在' in error_text: raise RuntimeError('学号或密码错误,请先在后台或用户页更新账号信息。') if '验证码' in error_text: captcha_failures += 1 else: other_failures += 1 if other_failures >= 8: raise RuntimeError('登录多次失败,当前无法稳定进入教务系统。') time.sleep(1.0) def _get_login_fields(self): assert self.driver return ( self.driver.find_element( By.XPATH, '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[1]/div/div/div[2]/div/input', ), self.driver.find_element( By.XPATH, '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[2]/div/div/div[2]/div/input', ), self.driver.find_element( By.XPATH, '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]/div/div/div/div/input', ), self.driver.find_element( By.XPATH, '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[4]/div/button', ), ) def _read_captcha_image(self) -> tuple[bytes, str]: assert self.driver source = self.driver.find_element( By.XPATH, '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]/div/div/img', ).get_attribute('src') if 'base64,' not in source: raise RuntimeError('未能读取验证码图片。') encoded = source.split('base64,', 1)[1] return base64.b64decode(encoded), encoded def _wait_for_login_success(self) -> bool: assert self.driver for _ in range(10): current_url = self.driver.current_url if any(current_url.startswith(prefix) for prefix in LOGIN_SUCCESS_PREFIXES): return True time.sleep(0.5) return False def _read_login_error_text(self) -> str: assert self.driver try: error_box = self.driver.find_element(By.XPATH, '/html/body/div[2]') return error_box.text.strip() except NoSuchElementException: return '' def _goto_select_course(self) -> None: assert self.driver and self.web_wait self.driver.get(URL_SELECT_COURSE) webdriver_utils.wait_for_ready(self.web_wait) body_text = self.driver.find_element(By.TAG_NAME, 'body').text if '非选课' in body_text: raise RuntimeError('当前不在选课时间,教务系统返回了非选课页面。') def _catch_courses(self, courses: list[CourseTarget]) -> None: remaining = {(course.course_id, course.course_index) for course in courses} round_index = 0 while remaining: self._raise_if_stopped() round_index += 1 self.context.log('info', f'开始第 {round_index} 轮检索,剩余 {len(remaining)} 门课程。') round_successes = 0 for tab_name, tab_id in (('方案选课', 'faxk'), ('自由选课', 'zyxk')): if not remaining: break round_successes += self._process_tab(tab_name, tab_id, remaining) if not remaining: self.context.log('info', '所有目标课程已完成。') self.database.set_task_status( self.context.task_id, 'success', completed_count=len(courses), last_error='', ) return if round_successes == 0: self.context.log('debug', f'本轮未命中课程,{CONFIG.task_poll_interval:.1f}s 后继续。') self._sleep_with_stop(CONFIG.task_poll_interval) def _process_tab(self, tab_name: str, tab_id: str, remaining: set[tuple[str, str]]) -> int: assert self.driver and self.web_wait self._goto_select_course() self.driver.find_element(By.XPATH, f'//*[@id="{tab_id}"]').click() webdriver_utils.wait_for_ready(self.web_wait) selected_targets: list[tuple[str, str]] = [] for course_id, course_index in list(remaining): self._raise_if_stopped() if self._search_and_mark_current_tab(course_id, course_index): selected_targets.append((course_id, course_index)) self.context.log('info', f'{tab_name} 命中课程 {course_id}_{course_index},已勾选。') if not selected_targets: return 0 self.driver.find_element(By.XPATH, '//*[@id="submitButton"]').click() results = self._read_submit_results() successes = 0 for result in results: course_key = self._extract_course_key(result['subject']) if result['result'] and course_key in remaining: remaining.remove(course_key) successes += 1 completed = self._current_completed_count(remaining) self.database.update_task_progress(self.context.task_id, completed) self.context.log('info', f"选课成功:{result['subject']}") elif result['result']: self.context.log('info', f"选课成功,但未能精确匹配目标项:{result['subject']}") else: self.context.log('warning', f"选课失败:{result['subject']},原因:{result['detail']}") return successes def _search_and_mark_current_tab(self, course_id: str, course_index: str) -> bool: assert self.driver and self.web_wait self.driver.switch_to.frame('ifra') try: course_id_box = self.driver.find_element(By.XPATH, '//*[@id="kch"]') query_button = self.driver.find_element(By.XPATH, '//*[@id="queryButton"]') course_id_box.clear() course_id_box.send_keys(course_id) query_button.click() self.web_wait.until( lambda driver: driver.execute_script( "return document.getElementById('queryButton').innerText.indexOf('正在') === -1" ) ) finally: self.driver.switch_to.default_content() self.web_wait.until(lambda driver: driver.execute_script("return document.getElementById('ifra') != null")) time.sleep(0.15) return self.driver.execute_script(self.select_course_js, f'{course_id}_{course_index}') == 'yes' def _read_submit_results(self) -> list[dict]: assert self.driver and self.web_wait self.web_wait.until( lambda driver: driver.execute_script("return document.getElementById('xkresult') != null") ) time.sleep(0.3) raw = self.driver.execute_script(self.check_result_js) return json.loads(raw) def _extract_course_key(self, subject: str) -> tuple[str, str]: match = re.search(r'(\d{5,})_(\d{2})', subject) if match: return (match.group(1), match.group(2)) numbers = re.findall(r'\d+', subject) if len(numbers) >= 2: return (numbers[-2], numbers[-1].zfill(2)) return ('', '') def _current_completed_count(self, remaining: set[tuple[str, str]]) -> int: task = self.database.get_task(self.context.task_id) total = int(task['total_count']) if task else 0 return max(0, total - len(remaining)) def _sleep_with_stop(self, seconds: float) -> None: deadline = time.monotonic() + seconds while time.monotonic() < deadline: self._raise_if_stopped() time.sleep(0.2) def _raise_if_stopped(self) -> None: if self.context.should_stop(): raise TaskStoppedError()