SACC-release / core /course_runner.py
cacode's picture
Deploy updated SCU course catcher
e28c9e4 verified
from __future__ import annotations
import base64
import json
import re
import time
from dataclasses import dataclass
from selenium.common.exceptions import NoSuchElementException, WebDriverException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
import onnx_inference
import webdriver_utils
from core.config import CAPTCHA_MODEL_PATH, CHECK_RESULT_JS_PATH, CONFIG, SELECT_COURSE_JS_PATH
from core.database import Database
from core.security import decrypt_secret
URL_LOGIN = 'http://id.scu.edu.cn/enduser/sp/sso/scdxplugin_jwt23?enterpriseId=scdx&target_url=index'
URL_SELECT_COURSE = 'http://zhjw.scu.edu.cn/student/courseSelect/courseSelect/index'
LOGIN_SUCCESS_PREFIXES = (
'http://zhjw.scu.edu.cn/index',
'http://zhjw.scu.edu.cn/',
'https://zhjw.scu.edu.cn/index',
'https://zhjw.scu.edu.cn/',
)
class TaskStoppedError(RuntimeError):
"""Raised when a running task is stopped by the user or the service."""
@dataclass(frozen=True)
class CourseTarget:
course_id: str
course_index: str
class CourseTaskRunner:
def __init__(self, database: Database, context):
self.database = database
self.context = context
self.driver = None
self.web_wait: WebDriverWait | None = None
self.select_course_js = SELECT_COURSE_JS_PATH.read_text(encoding='utf-8')
self.check_result_js = CHECK_RESULT_JS_PATH.read_text(encoding='utf-8')
self.captcha_solver = onnx_inference.CaptchaONNXInference(model_path=str(CAPTCHA_MODEL_PATH))
def run(self) -> None:
task = self.database.get_task(self.context.task_id)
if not task:
raise RuntimeError('任务不存在,无法启动。')
user = self.database.get_user_by_id(task['user_id'])
if not user:
raise RuntimeError('用户不存在,无法执行选课任务。')
payload = json.loads(task['task_payload'])
courses = [CourseTarget(**item) for item in payload.get('courses', [])]
if not courses:
self.database.set_task_status(self.context.task_id, 'failed', last_error='没有可执行的课程。')
return
plain_password = decrypt_secret(user['encrypted_password'])
self.context.log('info', f'任务启动,目标课程 {len(courses)} 门。')
self.context.log('info', '正在启动浏览器环境。')
self.driver = webdriver_utils.configure_browser()
self.web_wait = WebDriverWait(self.driver, CONFIG.page_ready_timeout, 0.4)
try:
self._login(student_id=user['student_id'], password=plain_password)
self._catch_courses(courses)
finally:
if self.driver:
try:
self.driver.quit()
except WebDriverException:
pass
def _login(self, *, student_id: str, password: str) -> None:
assert self.driver and self.web_wait
self.context.log('info', '正在登录川大统一认证。')
captcha_failures = 0
other_failures = 0
while True:
self._raise_if_stopped()
self.driver.get(URL_LOGIN)
webdriver_utils.wait_for_ready(self.web_wait)
std_box, password_box, captcha_box, login_button = self._get_login_fields()
std_box.clear()
std_box.send_keys(student_id)
password_box.clear()
password_box.send_keys(password)
captcha_raw, captcha_b64 = self._read_captcha_image()
if captcha_failures >= CONFIG.captcha_auto_attempts:
self.context.log('warning', '自动验证码识别已到阈值,等待人工输入。')
captcha_text = self.context.request_captcha(captcha_b64)
if not captcha_text:
raise RuntimeError('验证码等待超时,登录已终止。')
self.context.log('info', '收到人工验证码,继续登录。')
else:
captcha_text = self.captcha_solver.classification(captcha_raw)
self.context.log('debug', f'验证码模型输出:{captcha_text}')
captcha_box.clear()
captcha_box.send_keys(captcha_text)
login_button.click()
if self._wait_for_login_success():
self.context.log('info', '登录成功。')
return
error_text = self._read_login_error_text()
if error_text:
self.context.log('warning', f'登录返回:{error_text}')
else:
self.context.log('warning', '未捕获到明确登录错误,准备重试。')
if '密码错误' in error_text or '用户名或密码错误' in error_text or '用户不存在' in error_text:
raise RuntimeError('学号或密码错误,请先在后台或用户页更新账号信息。')
if '验证码' in error_text:
captcha_failures += 1
else:
other_failures += 1
if other_failures >= 8:
raise RuntimeError('登录多次失败,当前无法稳定进入教务系统。')
time.sleep(1.0)
def _get_login_fields(self):
assert self.driver
return (
self.driver.find_element(
By.XPATH,
'//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[1]/div/div/div[2]/div/input',
),
self.driver.find_element(
By.XPATH,
'//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[2]/div/div/div[2]/div/input',
),
self.driver.find_element(
By.XPATH,
'//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]/div/div/div/div/input',
),
self.driver.find_element(
By.XPATH,
'//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[4]/div/button',
),
)
def _read_captcha_image(self) -> tuple[bytes, str]:
assert self.driver
source = self.driver.find_element(
By.XPATH,
'//*[@id="app"]/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]/div/div/img',
).get_attribute('src')
if 'base64,' not in source:
raise RuntimeError('未能读取验证码图片。')
encoded = source.split('base64,', 1)[1]
return base64.b64decode(encoded), encoded
def _wait_for_login_success(self) -> bool:
assert self.driver
for _ in range(10):
current_url = self.driver.current_url
if any(current_url.startswith(prefix) for prefix in LOGIN_SUCCESS_PREFIXES):
return True
time.sleep(0.5)
return False
def _read_login_error_text(self) -> str:
assert self.driver
try:
error_box = self.driver.find_element(By.XPATH, '/html/body/div[2]')
return error_box.text.strip()
except NoSuchElementException:
return ''
def _goto_select_course(self) -> None:
assert self.driver and self.web_wait
self.driver.get(URL_SELECT_COURSE)
webdriver_utils.wait_for_ready(self.web_wait)
body_text = self.driver.find_element(By.TAG_NAME, 'body').text
if '非选课' in body_text:
raise RuntimeError('当前不在选课时间,教务系统返回了非选课页面。')
def _catch_courses(self, courses: list[CourseTarget]) -> None:
remaining = {(course.course_id, course.course_index) for course in courses}
round_index = 0
while remaining:
self._raise_if_stopped()
round_index += 1
self.context.log('info', f'开始第 {round_index} 轮检索,剩余 {len(remaining)} 门课程。')
round_successes = 0
for tab_name, tab_id in (('方案选课', 'faxk'), ('自由选课', 'zyxk')):
if not remaining:
break
round_successes += self._process_tab(tab_name, tab_id, remaining)
if not remaining:
self.context.log('info', '所有目标课程已完成。')
self.database.set_task_status(
self.context.task_id,
'success',
completed_count=len(courses),
last_error='',
)
return
if round_successes == 0:
self.context.log('debug', f'本轮未命中课程,{CONFIG.task_poll_interval:.1f}s 后继续。')
self._sleep_with_stop(CONFIG.task_poll_interval)
def _process_tab(self, tab_name: str, tab_id: str, remaining: set[tuple[str, str]]) -> int:
assert self.driver and self.web_wait
self._goto_select_course()
self.driver.find_element(By.XPATH, f'//*[@id="{tab_id}"]').click()
webdriver_utils.wait_for_ready(self.web_wait)
selected_targets: list[tuple[str, str]] = []
for course_id, course_index in list(remaining):
self._raise_if_stopped()
if self._search_and_mark_current_tab(course_id, course_index):
selected_targets.append((course_id, course_index))
self.context.log('info', f'{tab_name} 命中课程 {course_id}_{course_index},已勾选。')
if not selected_targets:
return 0
self.driver.find_element(By.XPATH, '//*[@id="submitButton"]').click()
results = self._read_submit_results()
successes = 0
for result in results:
course_key = self._extract_course_key(result['subject'])
if result['result'] and course_key in remaining:
remaining.remove(course_key)
successes += 1
completed = self._current_completed_count(remaining)
self.database.update_task_progress(self.context.task_id, completed)
self.context.log('info', f"选课成功:{result['subject']}")
elif result['result']:
self.context.log('info', f"选课成功,但未能精确匹配目标项:{result['subject']}")
else:
self.context.log('warning', f"选课失败:{result['subject']},原因:{result['detail']}")
return successes
def _search_and_mark_current_tab(self, course_id: str, course_index: str) -> bool:
assert self.driver and self.web_wait
self.driver.switch_to.frame('ifra')
try:
course_id_box = self.driver.find_element(By.XPATH, '//*[@id="kch"]')
query_button = self.driver.find_element(By.XPATH, '//*[@id="queryButton"]')
course_id_box.clear()
course_id_box.send_keys(course_id)
query_button.click()
self.web_wait.until(
lambda driver: driver.execute_script(
"return document.getElementById('queryButton').innerText.indexOf('正在') === -1"
)
)
finally:
self.driver.switch_to.default_content()
self.web_wait.until(lambda driver: driver.execute_script("return document.getElementById('ifra') != null"))
time.sleep(0.15)
return self.driver.execute_script(self.select_course_js, f'{course_id}_{course_index}') == 'yes'
def _read_submit_results(self) -> list[dict]:
assert self.driver and self.web_wait
self.web_wait.until(
lambda driver: driver.execute_script("return document.getElementById('xkresult') != null")
)
time.sleep(0.3)
raw = self.driver.execute_script(self.check_result_js)
return json.loads(raw)
def _extract_course_key(self, subject: str) -> tuple[str, str]:
match = re.search(r'(\d{5,})_(\d{2})', subject)
if match:
return (match.group(1), match.group(2))
numbers = re.findall(r'\d+', subject)
if len(numbers) >= 2:
return (numbers[-2], numbers[-1].zfill(2))
return ('', '')
def _current_completed_count(self, remaining: set[tuple[str, str]]) -> int:
task = self.database.get_task(self.context.task_id)
total = int(task['total_count']) if task else 0
return max(0, total - len(remaining))
def _sleep_with_stop(self, seconds: float) -> None:
deadline = time.monotonic() + seconds
while time.monotonic() < deadline:
self._raise_if_stopped()
time.sleep(0.2)
def _raise_if_stopped(self) -> None:
if self.context.should_stop():
raise TaskStoppedError()