Spaces:
Paused
Paused
| from __future__ import annotations | |
| import threading | |
| import unittest | |
| from pathlib import Path | |
| from types import SimpleNamespace | |
| from unittest.mock import patch | |
| from selenium.common.exceptions import TimeoutException, WebDriverException | |
| from core.course_bot import CourseBot, RecoverableAutomationError | |
| from core.db import Database | |
| from tests.helpers import workspace_tempdir | |
| class FakeDriver: | |
| def __init__(self) -> None: | |
| self.quit_called = False | |
| def quit(self) -> None: | |
| self.quit_called = True | |
| class FakeActionDriver: | |
| def __init__(self, script_results: list[object] | None = None) -> None: | |
| self.script_results = list(script_results or []) | |
| self.script_calls: list[str] = [] | |
| self.stop_called = False | |
| def execute_script(self, script: str, *_args): | |
| self.script_calls.append(script) | |
| if script.strip() == "window.stop();": | |
| self.stop_called = True | |
| return None | |
| if self.script_results: | |
| result = self.script_results.pop(0) | |
| if isinstance(result, Exception): | |
| raise result | |
| return result | |
| return "scheduled-js-click" | |
| class FakeLoginOutcomeDriver: | |
| def __init__(self, url_steps: list[object]) -> None: | |
| self.url_steps = list(url_steps) | |
| def current_url(self) -> str: | |
| if self.url_steps: | |
| value = self.url_steps.pop(0) | |
| if isinstance(value, Exception): | |
| raise value | |
| return str(value) | |
| return "" | |
| def execute_script(self, _script: str): | |
| return "" | |
| class FakeButton: | |
| def __init__(self, *, click_exception: Exception | None = None) -> None: | |
| self.click_count = 0 | |
| self.click_exception = click_exception | |
| def click(self) -> None: | |
| self.click_count += 1 | |
| if self.click_exception is not None: | |
| raise self.click_exception | |
| class CourseBotTests(unittest.TestCase): | |
| def _make_store(self, temp_dir: Path) -> tuple[Database, dict]: | |
| store = Database(temp_dir / "test.db", default_parallel_limit=2) | |
| store.init_db() | |
| user_id = store.create_user("2023000000001", "encrypted", "Test User") | |
| store.add_course(user_id, "free", "1001001", "01") | |
| user = store.get_user(user_id) | |
| self.assertIsNotNone(user) | |
| return store, user | |
| def _make_config(self) -> SimpleNamespace: | |
| return SimpleNamespace( | |
| login_retry_limit=2, | |
| poll_interval_seconds=0, | |
| task_backoff_seconds=0, | |
| browser_page_timeout=1, | |
| selenium_error_limit=2, | |
| selenium_restart_limit=3, | |
| submit_captcha_retry_limit=3, | |
| chrome_binary="/usr/bin/chromium", | |
| chromedriver_path="/usr/bin/chromedriver", | |
| ) | |
| def _make_bot(self, store: Database, user: dict, config: SimpleNamespace, logs: list[tuple[str, str]]) -> CourseBot: | |
| with patch("core.course_bot.onnx_inference.CaptchaONNXInference") as solver_cls: | |
| solver_cls.return_value = SimpleNamespace(classification=lambda _image: "1234") | |
| return CourseBot( | |
| config=config, | |
| store=store, | |
| task_id=1, | |
| user=user, | |
| password="pw", | |
| logger=lambda level, message: logs.append((level, message)), | |
| ) | |
| def test_restart_browser_after_repeated_session_errors(self) -> None: | |
| with workspace_tempdir("course-bot-") as temp_dir: | |
| store, user = self._make_store(temp_dir) | |
| config = self._make_config() | |
| logs: list[tuple[str, str]] = [] | |
| drivers: list[FakeDriver] = [] | |
| def fake_configure_browser(**_kwargs): | |
| driver = FakeDriver() | |
| drivers.append(driver) | |
| return driver | |
| bot = self._make_bot(store, user, config, logs) | |
| stop_event = threading.Event() | |
| with patch("core.course_bot.webdriver_utils.configure_browser", side_effect=fake_configure_browser), patch.object( | |
| CourseBot, | |
| "_login", | |
| return_value=None, | |
| ), patch.object( | |
| CourseBot, | |
| "_goto_select_course", | |
| side_effect=RecoverableAutomationError("page unavailable"), | |
| ): | |
| result = bot.run(stop_event) | |
| self.assertEqual(result.status, "failed") | |
| self.assertIn("\u4efb\u52a1\u7ec8\u6b62", result.error) | |
| self.assertEqual(len(drivers), config.selenium_restart_limit) | |
| self.assertTrue(all(driver.quit_called for driver in drivers)) | |
| self.assertTrue(any("\u91cd\u5efa\u6d4f\u89c8\u5668" in message or "\u91cd\u5efa Selenium \u4f1a\u8bdd" in message for _level, message in logs)) | |
| def test_submit_captcha_flow_uses_ocr_branch(self) -> None: | |
| with workspace_tempdir("submit-captcha-") as temp_dir: | |
| store, user = self._make_store(temp_dir) | |
| config = self._make_config() | |
| logs: list[tuple[str, str]] = [] | |
| bot = self._make_bot(store, user, config, logs) | |
| fake_button = FakeButton() | |
| fake_driver = FakeActionDriver(["scheduled-js-click"]) | |
| fake_results = [{"result": True, "subject": "1001001_01", "detail": ""}] | |
| with patch.object(bot, "_find", return_value=fake_button), patch.object( | |
| bot, | |
| "_wait_for_submit_state", | |
| side_effect=["captcha", "result"], | |
| ), patch.object( | |
| bot, | |
| "_solve_visible_submit_captcha", | |
| return_value=True, | |
| ) as solve_mock, patch.object( | |
| bot, | |
| "_read_result_page", | |
| return_value=fake_results, | |
| ): | |
| results = bot._submit_with_optional_captcha(fake_driver, object(), "1001001_01") | |
| self.assertEqual(results, fake_results) | |
| self.assertEqual(fake_button.click_count, 0) | |
| solve_mock.assert_called_once() | |
| self.assertTrue(any("\u68c0\u6d4b\u5230\u9a8c\u8bc1\u7801" in message for _level, message in logs)) | |
| self.assertTrue(any("\u5df2\u89e6\u53d1\u63d0\u4ea4" in message for _level, message in logs)) | |
| def test_trigger_non_blocking_action_prefers_js_submit(self) -> None: | |
| with workspace_tempdir("non-blocking-js-") as temp_dir: | |
| store, user = self._make_store(temp_dir) | |
| config = self._make_config() | |
| logs: list[tuple[str, str]] = [] | |
| bot = self._make_bot(store, user, config, logs) | |
| driver = FakeActionDriver(["scheduled-requestSubmit"]) | |
| button = FakeButton() | |
| method = bot._trigger_non_blocking_action( | |
| driver, | |
| button, | |
| label="login-form", | |
| allow_form_submit=True, | |
| ) | |
| self.assertEqual(method, "scheduled-requestSubmit") | |
| self.assertEqual(button.click_count, 0) | |
| self.assertFalse(driver.stop_called) | |
| def test_trigger_non_blocking_action_recovers_native_timeout(self) -> None: | |
| with workspace_tempdir("non-blocking-timeout-") as temp_dir: | |
| store, user = self._make_store(temp_dir) | |
| config = self._make_config() | |
| logs: list[tuple[str, str]] = [] | |
| bot = self._make_bot(store, user, config, logs) | |
| driver = FakeActionDriver([WebDriverException("js failed")]) | |
| button = FakeButton(click_exception=TimeoutException("renderer timeout")) | |
| method = bot._trigger_non_blocking_action( | |
| driver, | |
| button, | |
| label="login-form", | |
| allow_form_submit=True, | |
| ) | |
| self.assertEqual(method, "native-click-timeout") | |
| self.assertEqual(button.click_count, 1) | |
| self.assertTrue(driver.stop_called) | |
| self.assertTrue(any("window.stop()" in message for _level, message in logs)) | |
| def test_wait_for_login_outcome_ignores_transient_driver_errors(self) -> None: | |
| with workspace_tempdir("login-outcome-") as temp_dir: | |
| store, user = self._make_store(temp_dir) | |
| config = self._make_config() | |
| logs: list[tuple[str, str]] = [] | |
| bot = self._make_bot(store, user, config, logs) | |
| driver = FakeLoginOutcomeDriver([ | |
| WebDriverException("navigation in progress"), | |
| "https://zhjw.scu.edu.cn/index", | |
| ]) | |
| with patch("core.course_bot.time.sleep", return_value=None): | |
| state, error_message = bot._wait_for_login_outcome(driver, timeout_seconds=1) | |
| self.assertEqual(state, "success") | |
| self.assertEqual(error_message, "") | |
| if __name__ == "__main__": | |
| unittest.main() | |