Spaces:
Paused
Paused
| from __future__ import annotations | |
| import base64 | |
| import json | |
| import re | |
| import time | |
| from dataclasses import dataclass | |
| from selenium.common.exceptions import NoSuchElementException, WebDriverException | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.wait import WebDriverWait | |
| import onnx_inference | |
| import webdriver_utils | |
| from core.config import CAPTCHA_MODEL_PATH, CHECK_RESULT_JS_PATH, CONFIG, SELECT_COURSE_JS_PATH | |
| from core.database import Database | |
| from core.security import decrypt_secret | |
| URL_LOGIN = 'http://id.scu.edu.cn/enduser/sp/sso/scdxplugin_jwt23?enterpriseId=scdx&target_url=index' | |
| URL_SELECT_COURSE = 'http://zhjw.scu.edu.cn/student/courseSelect/courseSelect/index' | |
| LOGIN_SUCCESS_PREFIXES = ( | |
| 'http://zhjw.scu.edu.cn/index', | |
| 'http://zhjw.scu.edu.cn/', | |
| 'https://zhjw.scu.edu.cn/index', | |
| 'https://zhjw.scu.edu.cn/', | |
| ) | |
| class TaskStoppedError(RuntimeError): | |
| """Raised when a running task is stopped by the user or the service.""" | |
| class CourseTarget: | |
| course_id: str | |
| course_index: str | |
| class CourseTaskRunner: | |
| def __init__(self, database: Database, context): | |
| self.database = database | |
| self.context = context | |
| self.driver = None | |
| self.web_wait: WebDriverWait | None = None | |
| self.select_course_js = SELECT_COURSE_JS_PATH.read_text(encoding='utf-8') | |
| self.check_result_js = CHECK_RESULT_JS_PATH.read_text(encoding='utf-8') | |
| self.captcha_solver = onnx_inference.CaptchaONNXInference(model_path=str(CAPTCHA_MODEL_PATH)) | |
| def run(self) -> None: | |
| task = self.database.get_task(self.context.task_id) | |
| if not task: | |
| raise RuntimeError('任务不存在,无法启动。') | |
| user = self.database.get_user_by_id(task['user_id']) | |
| if not user: | |
| raise RuntimeError('用户不存在,无法执行选课任务。') | |
| payload = json.loads(task['task_payload']) | |
| courses = [CourseTarget(**item) for item in payload.get('courses', [])] | |
| if not courses: | |
| self.database.set_task_status(self.context.task_id, 'failed', last_error='没有可执行的课程。') | |
| return | |
| plain_password = decrypt_secret(user['encrypted_password']) | |
| self.context.log('info', f'任务启动,目标课程 {len(courses)} 门。') | |
| self.context.log('info', '正在启动浏览器环境。') | |
| self.driver = webdriver_utils.configure_browser() | |
| self.web_wait = WebDriverWait(self.driver, CONFIG.page_ready_timeout, 0.4) | |
| try: | |
| self._login(student_id=user['student_id'], password=plain_password) | |
| self._catch_courses(courses) | |
| finally: | |
| if self.driver: | |
| try: | |
| self.driver.quit() | |
| except WebDriverException: | |
| pass | |
| def _login(self, *, student_id: str, password: str) -> None: | |
| assert self.driver and self.web_wait | |
| self.context.log('info', '正在登录川大统一认证。') | |
| captcha_failures = 0 | |
| other_failures = 0 | |
| while True: | |
| self._raise_if_stopped() | |
| self.driver.get(URL_LOGIN) | |
| webdriver_utils.wait_for_ready(self.web_wait) | |
| std_box, password_box, captcha_box, login_button = self._get_login_fields() | |
| std_box.clear() | |
| std_box.send_keys(student_id) | |
| password_box.clear() | |
| password_box.send_keys(password) | |
| captcha_raw, captcha_b64 = self._read_captcha_image() | |
| if captcha_failures >= CONFIG.captcha_auto_attempts: | |
| self.context.log('warning', '自动验证码识别已到阈值,等待人工输入。') | |
| captcha_text = self.context.request_captcha(captcha_b64) | |
| if not captcha_text: | |
| raise RuntimeError('验证码等待超时,登录已终止。') | |
| self.context.log('info', '收到人工验证码,继续登录。') | |
| else: | |
| captcha_text = self.captcha_solver.classification(captcha_raw) | |
| self.context.log('debug', f'验证码模型输出:{captcha_text}') | |
| captcha_box.clear() | |
| captcha_box.send_keys(captcha_text) | |
| login_button.click() | |
| if self._wait_for_login_success(): | |
| self.context.log('info', '登录成功。') | |
| return | |
| error_text = self._read_login_error_text() | |
| if error_text: | |
| self.context.log('warning', f'登录返回:{error_text}') | |
| else: | |
| self.context.log('warning', '未捕获到明确登录错误,准备重试。') | |
| if '密码错误' in error_text or '用户名或密码错误' in error_text or '用户不存在' in error_text: | |
| raise RuntimeError('学号或密码错误,请先在后台或用户页更新账号信息。') | |
| if '验证码' in error_text: | |
| captcha_failures += 1 | |
| else: | |
| other_failures += 1 | |
| if other_failures >= 8: | |
| raise RuntimeError('登录多次失败,当前无法稳定进入教务系统。') | |
| time.sleep(1.0) | |
| def _get_login_fields(self): | |
| assert self.driver | |
| return ( | |
| self.driver.find_element( | |
| By.XPATH, | |
| '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[1]/div/div/div[2]/div/input', | |
| ), | |
| self.driver.find_element( | |
| By.XPATH, | |
| '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[2]/div/div/div[2]/div/input', | |
| ), | |
| self.driver.find_element( | |
| By.XPATH, | |
| '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]/div/div/div/div/input', | |
| ), | |
| self.driver.find_element( | |
| By.XPATH, | |
| '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[4]/div/button', | |
| ), | |
| ) | |
| def _read_captcha_image(self) -> tuple[bytes, str]: | |
| assert self.driver | |
| source = self.driver.find_element( | |
| By.XPATH, | |
| '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]/div/div/img', | |
| ).get_attribute('src') | |
| if 'base64,' not in source: | |
| raise RuntimeError('未能读取验证码图片。') | |
| encoded = source.split('base64,', 1)[1] | |
| return base64.b64decode(encoded), encoded | |
| def _wait_for_login_success(self) -> bool: | |
| assert self.driver | |
| for _ in range(10): | |
| current_url = self.driver.current_url | |
| if any(current_url.startswith(prefix) for prefix in LOGIN_SUCCESS_PREFIXES): | |
| return True | |
| time.sleep(0.5) | |
| return False | |
| def _read_login_error_text(self) -> str: | |
| assert self.driver | |
| try: | |
| error_box = self.driver.find_element(By.XPATH, '/html/body/div[2]') | |
| return error_box.text.strip() | |
| except NoSuchElementException: | |
| return '' | |
| def _goto_select_course(self) -> None: | |
| assert self.driver and self.web_wait | |
| self.driver.get(URL_SELECT_COURSE) | |
| webdriver_utils.wait_for_ready(self.web_wait) | |
| body_text = self.driver.find_element(By.TAG_NAME, 'body').text | |
| if '非选课' in body_text: | |
| raise RuntimeError('当前不在选课时间,教务系统返回了非选课页面。') | |
| def _catch_courses(self, courses: list[CourseTarget]) -> None: | |
| remaining = {(course.course_id, course.course_index) for course in courses} | |
| round_index = 0 | |
| while remaining: | |
| self._raise_if_stopped() | |
| round_index += 1 | |
| self.context.log('info', f'开始第 {round_index} 轮检索,剩余 {len(remaining)} 门课程。') | |
| round_successes = 0 | |
| for tab_name, tab_id in (('方案选课', 'faxk'), ('自由选课', 'zyxk')): | |
| if not remaining: | |
| break | |
| round_successes += self._process_tab(tab_name, tab_id, remaining) | |
| if not remaining: | |
| self.context.log('info', '所有目标课程已完成。') | |
| self.database.set_task_status( | |
| self.context.task_id, | |
| 'success', | |
| completed_count=len(courses), | |
| last_error='', | |
| ) | |
| return | |
| if round_successes == 0: | |
| self.context.log('debug', f'本轮未命中课程,{CONFIG.task_poll_interval:.1f}s 后继续。') | |
| self._sleep_with_stop(CONFIG.task_poll_interval) | |
| def _process_tab(self, tab_name: str, tab_id: str, remaining: set[tuple[str, str]]) -> int: | |
| assert self.driver and self.web_wait | |
| self._goto_select_course() | |
| self.driver.find_element(By.XPATH, f'//*[@id="{tab_id}"]').click() | |
| webdriver_utils.wait_for_ready(self.web_wait) | |
| selected_targets: list[tuple[str, str]] = [] | |
| for course_id, course_index in list(remaining): | |
| self._raise_if_stopped() | |
| if self._search_and_mark_current_tab(course_id, course_index): | |
| selected_targets.append((course_id, course_index)) | |
| self.context.log('info', f'{tab_name} 命中课程 {course_id}_{course_index},已勾选。') | |
| if not selected_targets: | |
| return 0 | |
| self.driver.find_element(By.XPATH, '//*[@id="submitButton"]').click() | |
| results = self._read_submit_results() | |
| successes = 0 | |
| for result in results: | |
| course_key = self._extract_course_key(result['subject']) | |
| if result['result'] and course_key in remaining: | |
| remaining.remove(course_key) | |
| successes += 1 | |
| completed = self._current_completed_count(remaining) | |
| self.database.update_task_progress(self.context.task_id, completed) | |
| self.context.log('info', f"选课成功:{result['subject']}") | |
| elif result['result']: | |
| self.context.log('info', f"选课成功,但未能精确匹配目标项:{result['subject']}") | |
| else: | |
| self.context.log('warning', f"选课失败:{result['subject']},原因:{result['detail']}") | |
| return successes | |
| def _search_and_mark_current_tab(self, course_id: str, course_index: str) -> bool: | |
| assert self.driver and self.web_wait | |
| self.driver.switch_to.frame('ifra') | |
| try: | |
| course_id_box = self.driver.find_element(By.XPATH, '//*[@id="kch"]') | |
| query_button = self.driver.find_element(By.XPATH, '//*[@id="queryButton"]') | |
| course_id_box.clear() | |
| course_id_box.send_keys(course_id) | |
| query_button.click() | |
| self.web_wait.until( | |
| lambda driver: driver.execute_script( | |
| "return document.getElementById('queryButton').innerText.indexOf('正在') === -1" | |
| ) | |
| ) | |
| finally: | |
| self.driver.switch_to.default_content() | |
| self.web_wait.until(lambda driver: driver.execute_script("return document.getElementById('ifra') != null")) | |
| time.sleep(0.15) | |
| return self.driver.execute_script(self.select_course_js, f'{course_id}_{course_index}') == 'yes' | |
| def _read_submit_results(self) -> list[dict]: | |
| assert self.driver and self.web_wait | |
| self.web_wait.until( | |
| lambda driver: driver.execute_script("return document.getElementById('xkresult') != null") | |
| ) | |
| time.sleep(0.3) | |
| raw = self.driver.execute_script(self.check_result_js) | |
| return json.loads(raw) | |
| def _extract_course_key(self, subject: str) -> tuple[str, str]: | |
| match = re.search(r'(\d{5,})_(\d{2})', subject) | |
| if match: | |
| return (match.group(1), match.group(2)) | |
| numbers = re.findall(r'\d+', subject) | |
| if len(numbers) >= 2: | |
| return (numbers[-2], numbers[-1].zfill(2)) | |
| return ('', '') | |
| def _current_completed_count(self, remaining: set[tuple[str, str]]) -> int: | |
| task = self.database.get_task(self.context.task_id) | |
| total = int(task['total_count']) if task else 0 | |
| return max(0, total - len(remaining)) | |
| def _sleep_with_stop(self, seconds: float) -> None: | |
| deadline = time.monotonic() + seconds | |
| while time.monotonic() < deadline: | |
| self._raise_if_stopped() | |
| time.sleep(0.2) | |
| def _raise_if_stopped(self) -> None: | |
| if self.context.should_stop(): | |
| raise TaskStoppedError() | |