| from __future__ import annotations |
|
|
| import threading |
| import unittest |
| from pathlib import Path |
| from types import SimpleNamespace |
| from unittest.mock import patch |
|
|
| from selenium.common.exceptions import TimeoutException, WebDriverException |
|
|
| from core.course_bot import CourseBot, RecoverableAutomationError |
| from core.db import Database |
| from tests.helpers import workspace_tempdir |
|
|
|
|
| class FakeDriver: |
| def __init__(self) -> None: |
| self.quit_called = False |
|
|
| def quit(self) -> None: |
| self.quit_called = True |
|
|
|
|
| class FakeActionDriver: |
| def __init__(self, script_results: list[object] | None = None) -> None: |
| self.script_results = list(script_results or []) |
| self.script_calls: list[str] = [] |
| self.stop_called = False |
|
|
| def execute_script(self, script: str, *_args): |
| self.script_calls.append(script) |
| if script.strip() == "window.stop();": |
| self.stop_called = True |
| return None |
| if self.script_results: |
| result = self.script_results.pop(0) |
| if isinstance(result, Exception): |
| raise result |
| return result |
| return "scheduled-js-click" |
|
|
|
|
| class FakeLoginOutcomeDriver: |
| def __init__(self, url_steps: list[object]) -> None: |
| self.url_steps = list(url_steps) |
|
|
| @property |
| def current_url(self) -> str: |
| if self.url_steps: |
| value = self.url_steps.pop(0) |
| if isinstance(value, Exception): |
| raise value |
| return str(value) |
| return "" |
|
|
| def execute_script(self, _script: str): |
| return "" |
|
|
|
|
| class FakeButton: |
| def __init__(self, *, click_exception: Exception | None = None) -> None: |
| self.click_count = 0 |
| self.click_exception = click_exception |
|
|
| def click(self) -> None: |
| self.click_count += 1 |
| if self.click_exception is not None: |
| raise self.click_exception |
|
|
|
|
| class CourseBotTests(unittest.TestCase): |
| def _make_store(self, temp_dir: Path) -> tuple[Database, dict]: |
| store = Database(temp_dir / "test.db", default_parallel_limit=2) |
| store.init_db() |
| user_id = store.create_user("2023000000001", "encrypted", "Test User") |
| store.add_course(user_id, "free", "1001001", "01") |
| user = store.get_user(user_id) |
| self.assertIsNotNone(user) |
| return store, user |
|
|
| def _make_config(self) -> SimpleNamespace: |
| return SimpleNamespace( |
| login_retry_limit=2, |
| poll_interval_seconds=0, |
| task_backoff_seconds=0, |
| browser_page_timeout=1, |
| selenium_error_limit=2, |
| selenium_restart_limit=3, |
| submit_captcha_retry_limit=3, |
| chrome_binary="/usr/bin/chromium", |
| chromedriver_path="/usr/bin/chromedriver", |
| ) |
|
|
| def _make_bot(self, store: Database, user: dict, config: SimpleNamespace, logs: list[tuple[str, str]]) -> CourseBot: |
| with patch("core.course_bot.onnx_inference.CaptchaONNXInference") as solver_cls: |
| solver_cls.return_value = SimpleNamespace(classification=lambda _image: "1234") |
| return CourseBot( |
| config=config, |
| store=store, |
| task_id=1, |
| user=user, |
| password="pw", |
| logger=lambda level, message: logs.append((level, message)), |
| ) |
|
|
| def test_restart_browser_after_repeated_session_errors(self) -> None: |
| with workspace_tempdir("course-bot-") as temp_dir: |
| store, user = self._make_store(temp_dir) |
| config = self._make_config() |
| logs: list[tuple[str, str]] = [] |
| drivers: list[FakeDriver] = [] |
|
|
| def fake_configure_browser(**_kwargs): |
| driver = FakeDriver() |
| drivers.append(driver) |
| return driver |
|
|
| bot = self._make_bot(store, user, config, logs) |
| stop_event = threading.Event() |
|
|
| with patch("core.course_bot.webdriver_utils.configure_browser", side_effect=fake_configure_browser), patch.object( |
| CourseBot, |
| "_login", |
| return_value=None, |
| ), patch.object( |
| CourseBot, |
| "_goto_select_course", |
| side_effect=RecoverableAutomationError("page unavailable"), |
| ): |
| result = bot.run(stop_event) |
|
|
| self.assertEqual(result.status, "failed") |
| self.assertIn("\u4efb\u52a1\u7ec8\u6b62", result.error) |
| self.assertEqual(len(drivers), config.selenium_restart_limit) |
| self.assertTrue(all(driver.quit_called for driver in drivers)) |
| self.assertTrue(any("\u91cd\u5efa\u6d4f\u89c8\u5668" in message or "\u91cd\u5efa Selenium \u4f1a\u8bdd" in message for _level, message in logs)) |
|
|
| def test_submit_captcha_flow_uses_ocr_branch(self) -> None: |
| with workspace_tempdir("submit-captcha-") as temp_dir: |
| store, user = self._make_store(temp_dir) |
| config = self._make_config() |
| logs: list[tuple[str, str]] = [] |
| bot = self._make_bot(store, user, config, logs) |
| fake_button = FakeButton() |
| fake_driver = FakeActionDriver(["scheduled-js-click"]) |
| fake_results = [{"result": True, "subject": "1001001_01", "detail": ""}] |
|
|
| with patch.object(bot, "_find", return_value=fake_button), patch.object( |
| bot, |
| "_wait_for_submit_state", |
| side_effect=["captcha", "result"], |
| ), patch.object( |
| bot, |
| "_solve_visible_submit_captcha", |
| return_value=True, |
| ) as solve_mock, patch.object( |
| bot, |
| "_read_result_page", |
| return_value=fake_results, |
| ): |
| results = bot._submit_with_optional_captcha(fake_driver, object(), "1001001_01") |
|
|
| self.assertEqual(results, fake_results) |
| self.assertEqual(fake_button.click_count, 0) |
| solve_mock.assert_called_once() |
| self.assertTrue(any("\u68c0\u6d4b\u5230\u9a8c\u8bc1\u7801" in message for _level, message in logs)) |
| self.assertTrue(any("\u5df2\u89e6\u53d1\u63d0\u4ea4" in message for _level, message in logs)) |
|
|
| def test_trigger_non_blocking_action_prefers_js_submit(self) -> None: |
| with workspace_tempdir("non-blocking-js-") as temp_dir: |
| store, user = self._make_store(temp_dir) |
| config = self._make_config() |
| logs: list[tuple[str, str]] = [] |
| bot = self._make_bot(store, user, config, logs) |
| driver = FakeActionDriver(["scheduled-requestSubmit"]) |
| button = FakeButton() |
|
|
| method = bot._trigger_non_blocking_action( |
| driver, |
| button, |
| label="login-form", |
| allow_form_submit=True, |
| ) |
|
|
| self.assertEqual(method, "scheduled-requestSubmit") |
| self.assertEqual(button.click_count, 0) |
| self.assertFalse(driver.stop_called) |
|
|
| def test_trigger_non_blocking_action_recovers_native_timeout(self) -> None: |
| with workspace_tempdir("non-blocking-timeout-") as temp_dir: |
| store, user = self._make_store(temp_dir) |
| config = self._make_config() |
| logs: list[tuple[str, str]] = [] |
| bot = self._make_bot(store, user, config, logs) |
| driver = FakeActionDriver([WebDriverException("js failed")]) |
| button = FakeButton(click_exception=TimeoutException("renderer timeout")) |
|
|
| method = bot._trigger_non_blocking_action( |
| driver, |
| button, |
| label="login-form", |
| allow_form_submit=True, |
| ) |
|
|
| self.assertEqual(method, "native-click-timeout") |
| self.assertEqual(button.click_count, 1) |
| self.assertTrue(driver.stop_called) |
| self.assertTrue(any("window.stop()" in message for _level, message in logs)) |
|
|
| def test_wait_for_login_outcome_ignores_transient_driver_errors(self) -> None: |
| with workspace_tempdir("login-outcome-") as temp_dir: |
| store, user = self._make_store(temp_dir) |
| config = self._make_config() |
| logs: list[tuple[str, str]] = [] |
| bot = self._make_bot(store, user, config, logs) |
| driver = FakeLoginOutcomeDriver([ |
| WebDriverException("navigation in progress"), |
| "https://zhjw.scu.edu.cn/index", |
| ]) |
|
|
| with patch("core.course_bot.time.sleep", return_value=None): |
| state, error_message = bot._wait_for_login_outcome(driver, timeout_seconds=1) |
|
|
| self.assertEqual(state, "success") |
| self.assertEqual(error_message, "") |
|
|
|
|
| if __name__ == "__main__": |
| unittest.main() |
|
|