SACC / course_catcher /automation.py
cacode's picture
Deploy updated SCU course catcher
e28c9e4 verified
from __future__ import annotations
import base64
import json
import re
import time
from pathlib import Path
from typing import Callable
import onnx_inference
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from course_catcher.config import AppConfig
SCU_LOGIN_URL = (
"http://id.scu.edu.cn/enduser/sp/sso/scdxplugin_jwt23?enterpriseId=scdx&target_url=index"
)
SCU_SELECT_URL = "http://zhjw.scu.edu.cn/student/courseSelect/courseSelect/index"
TAB_IDS = {"plan": "faxk", "free": "zyxk"}
ALREADY_SELECTED_KEYWORDS = ("已选", "已选择", "已修读")
class AutomationError(Exception):
pass
class CredentialsError(AutomationError):
pass
class SessionExpiredError(AutomationError):
pass
class TemporaryAutomationError(AutomationError):
pass
class CourseAutomation:
def __init__(self, config: AppConfig) -> None:
self.config = config
self.select_course_js = Path("javascript/select_course.js").read_text(encoding="utf-8")
self.check_result_js = Path("javascript/check_result.js").read_text(encoding="utf-8")
self.captcha_solver = onnx_inference.CaptchaONNXInference(
model_path=str(Path("ocr_provider") / "captcha_model.onnx")
)
def run_until_stopped(
self,
task_id: int,
user_credentials: dict,
db,
should_stop: Callable[[], bool],
log: Callable[[str, str], None],
) -> tuple[str, str]:
driver: WebDriver | None = None
last_error = ""
try:
while True:
if should_stop():
log("INFO", "收到停止请求,准备安全退出任务。")
return "stopped", ""
pending_courses = db.list_pending_courses(user_credentials["id"])
if not pending_courses:
log("SUCCESS", "所有待抢课程都已完成,本轮任务结束。")
return "completed", ""
db.increment_task_attempt(task_id)
if driver is None:
log("INFO", "正在启动浏览器会话并登录教务系统。")
driver = self._build_driver()
self._login(driver, user_credentials["student_id"], user_credentials["password"], log)
self._goto_select_course(driver, log)
try:
self._run_single_cycle(driver, user_credentials["id"], pending_courses, db, log)
last_error = ""
except SessionExpiredError as exc:
last_error = str(exc)
log("WARNING", f"{exc},将重新建立会话。")
self._safe_quit(driver)
driver = None
time.sleep(2)
continue
except TemporaryAutomationError as exc:
last_error = str(exc)
log("WARNING", f"{exc},稍后重试。")
self._safe_quit(driver)
driver = None
except CredentialsError:
raise
except Exception as exc: # pragma: no cover - defensive path
last_error = str(exc)
log("ERROR", f"本轮执行发生异常:{exc}")
self._safe_quit(driver)
driver = None
if should_stop():
log("INFO", "收到停止请求,准备安全退出任务。")
return "stopped", ""
time.sleep(db.get_setting_float("loop_interval_seconds", self.config.loop_interval_seconds))
except CredentialsError as exc:
return "failed", str(exc)
finally:
self._safe_quit(driver)
def _build_driver(self) -> WebDriver:
options = webdriver.ChromeOptions()
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument("--window-size=1440,1600")
options.add_argument("--lang=zh-CN")
options.add_argument(
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
)
chrome_binary = Path(self.config.chrome_binary)
if chrome_binary.exists():
options.binary_location = str(chrome_binary)
service = None
chromedriver_binary = Path(self.config.chromedriver_binary)
if chromedriver_binary.exists():
service = Service(executable_path=str(chromedriver_binary))
driver = webdriver.Chrome(service=service, options=options) if service else webdriver.Chrome(options=options)
driver.set_page_load_timeout(self.config.request_timeout_seconds)
driver.implicitly_wait(5)
return driver
def _login(self, driver: WebDriver, student_id: str, password: str, log: Callable[[str, str], None]) -> None:
for attempt in range(1, self.config.login_retry_limit + 1):
driver.get(SCU_LOGIN_URL)
self._wait_ready(driver)
student_box = self._find_first(
driver,
[
(By.XPATH, "//*[@id='app']//form//input[@type='text']"),
(
By.XPATH,
"//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[1]/div/div/div[2]/div/input",
),
],
)
password_box = self._find_first(
driver,
[
(By.XPATH, "//*[@id='app']//form//input[@type='password']"),
(
By.XPATH,
"//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[2]/div/div/div[2]/div/input",
),
],
)
captcha_box = self._find_first(
driver,
[
(
By.XPATH,
"//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]//input",
),
(By.XPATH, "//*[@id='app']//form//input[contains(@placeholder, '验证码')]"),
],
)
login_button = self._find_first(
driver,
[
(By.XPATH, "//*[@id='app']//form//button"),
(
By.XPATH,
"//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[4]/div/button",
),
],
)
student_box.clear()
student_box.send_keys(student_id)
password_box.clear()
password_box.send_keys(password)
captcha_box.clear()
captcha_box.send_keys(self._solve_captcha(driver))
login_button.click()
if self._wait_for_login_success(driver):
log("SUCCESS", "教务系统登录成功。")
return
error_message = self._read_login_error(driver)
if error_message:
log("WARNING", f"第 {attempt} 次登录失败:{error_message}")
if "用户名或密码错误" in error_message or "密码错误" in error_message:
raise CredentialsError("学生账号或密码错误,请在用户面板更新后重新启动任务。")
if "验证码" not in error_message and attempt >= 2:
raise TemporaryAutomationError(error_message)
else:
log("WARNING", f"第 {attempt} 次登录失败,未捕获到明确错误提示。")
raise TemporaryAutomationError("连续多次登录失败,可能是验证码识别失败或当前系统不可用。")
def _wait_for_login_success(self, driver: WebDriver) -> bool:
try:
WebDriverWait(driver, 8, 0.5).until(
lambda current: current.current_url in {"http://zhjw.scu.edu.cn/index", "http://zhjw.scu.edu.cn/"}
or current.current_url.startswith("http://zhjw.scu.edu.cn/index")
)
return True
except TimeoutException:
return False
def _read_login_error(self, driver: WebDriver) -> str:
candidates = [
"/html/body/div[2]",
"//div[contains(@class, 'el-message') or contains(@class, 'message')]",
]
for xpath in candidates:
try:
element = driver.find_element(By.XPATH, xpath)
text = element.text.strip()
if text:
return text
except NoSuchElementException:
continue
return ""
def _solve_captcha(self, driver: WebDriver) -> str:
captcha_img = self._find_first(
driver,
[
(
By.XPATH,
"//*[@id='app']/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/form/div[3]/div/div/img",
),
(By.XPATH, "//*[@id='app']//form//img"),
],
)
src = captcha_img.get_attribute("src") or ""
if "base64," in src:
image_bytes = base64.b64decode(src.split("base64,", 1)[1])
else:
image_bytes = captcha_img.screenshot_as_png
return self.captcha_solver.classification(image_bytes)
def _goto_select_course(self, driver: WebDriver, log: Callable[[str, str], None]) -> None:
driver.get(SCU_SELECT_URL)
self._wait_ready(driver)
body_text = driver.find_element(By.TAG_NAME, "body").text
if "非选课" in body_text:
raise TemporaryAutomationError("当前不是可选课时段,教务系统已返回非选课提示")
if "登录" in body_text and "学号" in body_text:
raise SessionExpiredError("当前会话已失效,需要重新登录")
log("INFO", "已进入选课页面。")
def _run_single_cycle(
self,
driver: WebDriver,
user_id: int,
pending_courses: list[dict],
db,
log: Callable[[str, str], None],
) -> None:
self._goto_select_course(driver, log)
for tab_name in ("plan", "free"):
current_pending = db.list_pending_courses(user_id)
if not current_pending:
return
results = self._attempt_tab(driver, tab_name, current_pending, log)
for result in results:
parsed = self._extract_course_pair(result["subject"])
if not parsed:
log("WARNING", f"无法从结果文本中解析课程号:{result['subject']}")
continue
course_id, course_index = parsed
status = "selected" if self._is_selected_result(result) else "pending"
db.mark_course_result(user_id, course_id, course_index, status, result["detail"])
level = "SUCCESS" if status == "selected" else "INFO"
log(level, f"{course_id}_{course_index}: {result['detail']}")
if results:
self._goto_select_course(driver, log)
def _attempt_tab(
self,
driver: WebDriver,
tab_name: str,
courses: list[dict],
log: Callable[[str, str], None],
) -> list[dict]:
tab_id = TAB_IDS[tab_name]
self._find_first(driver, [(By.ID, tab_id)]).click()
self._wait_ready(driver)
selected_any = False
for course in courses:
self._wait_for_iframe(driver)
driver.switch_to.frame("ifra")
try:
course_input = self._find_first(driver, [(By.ID, "kch")], timeout=10)
query_button = self._find_first(driver, [(By.ID, "queryButton")], timeout=10)
course_input.clear()
course_input.send_keys(course["course_id"])
query_button.click()
WebDriverWait(driver, 20, 0.3).until(
lambda current: "正在" not in current.find_element(By.ID, "queryButton").text
)
finally:
driver.switch_to.default_content()
self._wait_for_iframe(driver)
found = (
driver.execute_script(
self.select_course_js,
f"{course['course_id']}_{course['course_index']}",
)
== "yes"
)
if found:
selected_any = True
log("INFO", f"[{tab_name}] 已勾选课程 {course['course_id']}_{course['course_index']}。")
else:
log("INFO", f"[{tab_name}] 未找到课程 {course['course_id']}_{course['course_index']}。")
if not selected_any:
return []
submit_button = self._find_first(driver, [(By.ID, "submitButton")], timeout=10)
submit_button.click()
return self._collect_results(driver)
def _collect_results(self, driver: WebDriver) -> list[dict]:
WebDriverWait(driver, 20, 0.5).until(
lambda current: current.execute_script("return document.getElementById('xkresult') !== null")
)
raw_result = driver.execute_script(self.check_result_js)
if isinstance(raw_result, str):
return json.loads(raw_result)
return raw_result
def _is_selected_result(self, result: dict) -> bool:
detail = result.get("detail", "")
return bool(result.get("result")) or any(keyword in detail for keyword in ALREADY_SELECTED_KEYWORDS)
def _extract_course_pair(self, subject: str) -> tuple[str, str] | None:
strict_match = re.findall(r"(\d{5,})_(\d{2})", subject)
if strict_match:
return strict_match[-1]
loose_match = re.findall(r"(\d+)", subject)
if len(loose_match) >= 2:
return loose_match[-2], loose_match[-1].zfill(2)[-2:]
return None
def _wait_for_iframe(self, driver: WebDriver) -> None:
try:
WebDriverWait(driver, 15, 0.5).until(EC.frame_to_be_available_and_switch_to_it((By.ID, "ifra")))
except TimeoutException as exc:
raise SessionExpiredError("选课 iframe 未能正常加载") from exc
finally:
driver.switch_to.default_content()
def _wait_ready(self, driver: WebDriver) -> None:
WebDriverWait(driver, self.config.request_timeout_seconds, 0.5).until(
lambda current: current.execute_script("return document.readyState") == "complete"
)
def _find_first(self, driver: WebDriver, selectors: list[tuple[str, str]], timeout: int | None = None):
timeout = timeout or self.config.request_timeout_seconds
def locate(current: WebDriver):
for by, value in selectors:
try:
return current.find_element(by, value)
except NoSuchElementException:
continue
return False
try:
return WebDriverWait(driver, timeout, 0.5).until(locate)
except TimeoutException as exc:
selectors_text = ", ".join(f"{by}:{value}" for by, value in selectors)
raise TemporaryAutomationError(f"未找到页面元素:{selectors_text}") from exc
def _safe_quit(self, driver: WebDriver | None) -> None:
if not driver:
return
try:
driver.quit()
except WebDriverException:
pass