| from __future__ import annotations
|
|
|
| import base64
|
| import json
|
| import re
|
| import time
|
| from dataclasses import dataclass
|
|
|
| from selenium.common.exceptions import NoSuchElementException, WebDriverException
|
| from selenium.webdriver.common.by import By
|
| from selenium.webdriver.support.wait import WebDriverWait
|
|
|
| import onnx_inference
|
| import webdriver_utils
|
| from core.config import CAPTCHA_MODEL_PATH, CHECK_RESULT_JS_PATH, CONFIG, SELECT_COURSE_JS_PATH
|
| from core.database import Database
|
| from core.security import decrypt_secret
|
|
|
|
|
| URL_LOGIN = 'http://id.scu.edu.cn/enduser/sp/sso/scdxplugin_jwt23?enterpriseId=scdx&target_url=index'
|
| URL_SELECT_COURSE = 'http://zhjw.scu.edu.cn/student/courseSelect/courseSelect/index'
|
| LOGIN_SUCCESS_PREFIXES = (
|
| 'http://zhjw.scu.edu.cn/index',
|
| 'http://zhjw.scu.edu.cn/',
|
| 'https://zhjw.scu.edu.cn/index',
|
| 'https://zhjw.scu.edu.cn/',
|
| )
|
|
|
|
|
| class TaskStoppedError(RuntimeError):
|
| """Raised when a running task is stopped by the user or the service."""
|
|
|
|
|
| @dataclass(frozen=True)
|
| class CourseTarget:
|
| course_id: str
|
| course_index: str
|
|
|
|
|
| class CourseTaskRunner:
|
| def __init__(self, database: Database, context):
|
| self.database = database
|
| self.context = context
|
| self.driver = None
|
| self.web_wait: WebDriverWait | None = None
|
| self.select_course_js = SELECT_COURSE_JS_PATH.read_text(encoding='utf-8')
|
| self.check_result_js = CHECK_RESULT_JS_PATH.read_text(encoding='utf-8')
|
| self.captcha_solver = onnx_inference.CaptchaONNXInference(model_path=str(CAPTCHA_MODEL_PATH))
|
|
|
| def run(self) -> None:
|
| task = self.database.get_task(self.context.task_id)
|
| if not task:
|
| raise RuntimeError('任务不存在,无法启动。')
|
| user = self.database.get_user_by_id(task['user_id'])
|
| if not user:
|
| raise RuntimeError('用户不存在,无法执行选课任务。')
|
|
|
| payload = json.loads(task['task_payload'])
|
| courses = [CourseTarget(**item) for item in payload.get('courses', [])]
|
| if not courses:
|
| self.database.set_task_status(self.context.task_id, 'failed', last_error='没有可执行的课程。')
|
| return
|
|
|
| plain_password = decrypt_secret(user['encrypted_password'])
|
| self.context.log('info', f'任务启动,目标课程 {len(courses)} 门。')
|
| self.context.log('info', '正在启动浏览器环境。')
|
| self.driver = webdriver_utils.configure_browser()
|
| self.web_wait = WebDriverWait(self.driver, CONFIG.page_ready_timeout, 0.4)
|
|
|
| try:
|
| self._login(student_id=user['student_id'], password=plain_password)
|
| self._catch_courses(courses)
|
| finally:
|
| if self.driver:
|
| try:
|
| self.driver.quit()
|
| except WebDriverException:
|
| pass
|
|
|
| def _login(self, *, student_id: str, password: str) -> None:
|
| assert self.driver and self.web_wait
|
| self.context.log('info', '正在登录川大统一认证。')
|
| captcha_failures = 0
|
| other_failures = 0
|
|
|
| while True:
|
| self._raise_if_stopped()
|
| self.driver.get(URL_LOGIN)
|
| webdriver_utils.wait_for_ready(self.web_wait)
|
| std_box, password_box, captcha_box, login_button = self._get_login_fields()
|
| std_box.clear()
|
| std_box.send_keys(student_id)
|
| password_box.clear()
|
| password_box.send_keys(password)
|
|
|
| captcha_raw, captcha_b64 = self._read_captcha_image()
|
| if captcha_failures >= CONFIG.captcha_auto_attempts:
|
| self.context.log('warning', '自动验证码识别已到阈值,等待人工输入。')
|
| captcha_text = self.context.request_captcha(captcha_b64)
|
| if not captcha_text:
|
| raise RuntimeError('验证码等待超时,登录已终止。')
|
| self.context.log('info', '收到人工验证码,继续登录。')
|
| else:
|
| captcha_text = self.captcha_solver.classification(captcha_raw)
|
| self.context.log('debug', f'验证码模型输出:{captcha_text}')
|
|
|
| captcha_box.clear()
|
| captcha_box.send_keys(captcha_text)
|
| login_button.click()
|
|
|
| if self._wait_for_login_success():
|
| self.context.log('info', '登录成功。')
|
| return
|
|
|
| error_text = self._read_login_error_text()
|
| if error_text:
|
| self.context.log('warning', f'登录返回:{error_text}')
|
| else:
|
| self.context.log('warning', '未捕获到明确登录错误,准备重试。')
|
|
|
| if '密码错误' in error_text or '用户名或密码错误' in error_text or '用户不存在' in error_text:
|
| raise RuntimeError('学号或密码错误,请先在后台或用户页更新账号信息。')
|
|
|
| if '验证码' in error_text:
|
| captcha_failures += 1
|
| else:
|
| other_failures += 1
|
|
|
| if other_failures >= 8:
|
| raise RuntimeError('登录多次失败,当前无法稳定进入教务系统。')
|
|
|
| time.sleep(1.0)
|
|
|
| def _get_login_fields(self):
|
| assert self.driver
|
| return (
|
| self.driver.find_element(
|
| By.XPATH,
|
| '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[1]/div/div/div[2]/div/input',
|
| ),
|
| self.driver.find_element(
|
| By.XPATH,
|
| '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[2]/div/div/div[2]/div/input',
|
| ),
|
| self.driver.find_element(
|
| By.XPATH,
|
| '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]/div/div/div/div/input',
|
| ),
|
| self.driver.find_element(
|
| By.XPATH,
|
| '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[4]/div/button',
|
| ),
|
| )
|
|
|
| def _read_captcha_image(self) -> tuple[bytes, str]:
|
| assert self.driver
|
| source = self.driver.find_element(
|
| By.XPATH,
|
| '//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]/div/div/img',
|
| ).get_attribute('src')
|
| if 'base64,' not in source:
|
| raise RuntimeError('未能读取验证码图片。')
|
| encoded = source.split('base64,', 1)[1]
|
| return base64.b64decode(encoded), encoded
|
|
|
| def _wait_for_login_success(self) -> bool:
|
| assert self.driver
|
| for _ in range(10):
|
| current_url = self.driver.current_url
|
| if any(current_url.startswith(prefix) for prefix in LOGIN_SUCCESS_PREFIXES):
|
| return True
|
| time.sleep(0.5)
|
| return False
|
|
|
| def _read_login_error_text(self) -> str:
|
| assert self.driver
|
| try:
|
| error_box = self.driver.find_element(By.XPATH, '/html/body/div[2]')
|
| return error_box.text.strip()
|
| except NoSuchElementException:
|
| return ''
|
|
|
| def _goto_select_course(self) -> None:
|
| assert self.driver and self.web_wait
|
| self.driver.get(URL_SELECT_COURSE)
|
| webdriver_utils.wait_for_ready(self.web_wait)
|
| body_text = self.driver.find_element(By.TAG_NAME, 'body').text
|
| if '非选课' in body_text:
|
| raise RuntimeError('当前不在选课时间,教务系统返回了非选课页面。')
|
|
|
| def _catch_courses(self, courses: list[CourseTarget]) -> None:
|
| remaining = {(course.course_id, course.course_index) for course in courses}
|
| round_index = 0
|
| while remaining:
|
| self._raise_if_stopped()
|
| round_index += 1
|
| self.context.log('info', f'开始第 {round_index} 轮检索,剩余 {len(remaining)} 门课程。')
|
| round_successes = 0
|
| for tab_name, tab_id in (('方案选课', 'faxk'), ('自由选课', 'zyxk')):
|
| if not remaining:
|
| break
|
| round_successes += self._process_tab(tab_name, tab_id, remaining)
|
|
|
| if not remaining:
|
| self.context.log('info', '所有目标课程已完成。')
|
| self.database.set_task_status(
|
| self.context.task_id,
|
| 'success',
|
| completed_count=len(courses),
|
| last_error='',
|
| )
|
| return
|
|
|
| if round_successes == 0:
|
| self.context.log('debug', f'本轮未命中课程,{CONFIG.task_poll_interval:.1f}s 后继续。')
|
| self._sleep_with_stop(CONFIG.task_poll_interval)
|
|
|
| def _process_tab(self, tab_name: str, tab_id: str, remaining: set[tuple[str, str]]) -> int:
|
| assert self.driver and self.web_wait
|
| self._goto_select_course()
|
| self.driver.find_element(By.XPATH, f'//*[@id="{tab_id}"]').click()
|
| webdriver_utils.wait_for_ready(self.web_wait)
|
|
|
| selected_targets: list[tuple[str, str]] = []
|
| for course_id, course_index in list(remaining):
|
| self._raise_if_stopped()
|
| if self._search_and_mark_current_tab(course_id, course_index):
|
| selected_targets.append((course_id, course_index))
|
| self.context.log('info', f'{tab_name} 命中课程 {course_id}_{course_index},已勾选。')
|
|
|
| if not selected_targets:
|
| return 0
|
|
|
| self.driver.find_element(By.XPATH, '//*[@id="submitButton"]').click()
|
| results = self._read_submit_results()
|
| successes = 0
|
| for result in results:
|
| course_key = self._extract_course_key(result['subject'])
|
| if result['result'] and course_key in remaining:
|
| remaining.remove(course_key)
|
| successes += 1
|
| completed = self._current_completed_count(remaining)
|
| self.database.update_task_progress(self.context.task_id, completed)
|
| self.context.log('info', f"选课成功:{result['subject']}")
|
| elif result['result']:
|
| self.context.log('info', f"选课成功,但未能精确匹配目标项:{result['subject']}")
|
| else:
|
| self.context.log('warning', f"选课失败:{result['subject']},原因:{result['detail']}")
|
| return successes
|
|
|
| def _search_and_mark_current_tab(self, course_id: str, course_index: str) -> bool:
|
| assert self.driver and self.web_wait
|
| self.driver.switch_to.frame('ifra')
|
| try:
|
| course_id_box = self.driver.find_element(By.XPATH, '//*[@id="kch"]')
|
| query_button = self.driver.find_element(By.XPATH, '//*[@id="queryButton"]')
|
| course_id_box.clear()
|
| course_id_box.send_keys(course_id)
|
| query_button.click()
|
| self.web_wait.until(
|
| lambda driver: driver.execute_script(
|
| "return document.getElementById('queryButton').innerText.indexOf('正在') === -1"
|
| )
|
| )
|
| finally:
|
| self.driver.switch_to.default_content()
|
|
|
| self.web_wait.until(lambda driver: driver.execute_script("return document.getElementById('ifra') != null"))
|
| time.sleep(0.15)
|
| return self.driver.execute_script(self.select_course_js, f'{course_id}_{course_index}') == 'yes'
|
|
|
| def _read_submit_results(self) -> list[dict]:
|
| assert self.driver and self.web_wait
|
| self.web_wait.until(
|
| lambda driver: driver.execute_script("return document.getElementById('xkresult') != null")
|
| )
|
| time.sleep(0.3)
|
| raw = self.driver.execute_script(self.check_result_js)
|
| return json.loads(raw)
|
|
|
| def _extract_course_key(self, subject: str) -> tuple[str, str]:
|
| match = re.search(r'(\d{5,})_(\d{2})', subject)
|
| if match:
|
| return (match.group(1), match.group(2))
|
| numbers = re.findall(r'\d+', subject)
|
| if len(numbers) >= 2:
|
| return (numbers[-2], numbers[-1].zfill(2))
|
| return ('', '')
|
|
|
| def _current_completed_count(self, remaining: set[tuple[str, str]]) -> int:
|
| task = self.database.get_task(self.context.task_id)
|
| total = int(task['total_count']) if task else 0
|
| return max(0, total - len(remaining))
|
|
|
| def _sleep_with_stop(self, seconds: float) -> None:
|
| deadline = time.monotonic() + seconds
|
| while time.monotonic() < deadline:
|
| self._raise_if_stopped()
|
| time.sleep(0.2)
|
|
|
| def _raise_if_stopped(self) -> None:
|
| if self.context.should_stop():
|
| raise TaskStoppedError()
|
|
|