| from __future__ import annotations |
|
|
| import select |
| import shutil |
| import socket |
| import threading |
| from dataclasses import dataclass |
| from pathlib import Path |
| from urllib.parse import unquote, urlsplit |
|
|
| from selenium import webdriver |
| from selenium.common.exceptions import TimeoutException, WebDriverException |
| from selenium.webdriver.chrome.service import Service as ChromeService |
| from selenium.webdriver.remote.webdriver import WebDriver |
| from selenium.webdriver.support.wait import WebDriverWait |
|
|
|
|
| DEFAULT_USER_AGENT = ( |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " |
| "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" |
| ) |
|
|
|
|
| def _get_login_2fa_bypass_script() -> str: |
| return """(() => { |
| const RULES = [ |
| { |
| url: "/api/bff/v1.2/2factor/select", |
| modify(json) { |
| if (json?.data?.formDto?.userTwoFactory !== true) return false; |
| json.data.formDto.userTwoFactory = false; |
| return true; |
| } |
| }, |
| { |
| url: "/api/bff/v1.2/commons/user_setting_info", |
| modify(json) { |
| if (json?.data?.user2factor !== true) return false; |
| json.data.user2factor = false; |
| return true; |
| } |
| } |
| ]; |
| |
| function findRule(url) { |
| return RULES.find((rule) => String(url || '').includes(rule.url)); |
| } |
| |
| function applyModify(json, url) { |
| const rule = findRule(url); |
| return rule ? rule.modify(json) : false; |
| } |
| |
| const origFetch = window.fetch; |
| window.fetch = function(input, init) { |
| const url = typeof input === 'string' |
| ? input |
| : (input instanceof URL ? input.href : (input && input.url ? input.url : '')); |
| return origFetch.call(window, input, init).then(function(response) { |
| if (!findRule(url)) return response; |
| return response.clone().json().then(function(json) { |
| if (!applyModify(json, url)) return response; |
| return new Response(JSON.stringify(json), { |
| status: response.status, |
| statusText: response.statusText, |
| headers: response.headers |
| }); |
| }).catch(function() { |
| return response; |
| }); |
| }); |
| }; |
| |
| const origOpen = XMLHttpRequest.prototype.open; |
| XMLHttpRequest.prototype.open = function(method, url, async, username, password) { |
| this.__saccLogin2faUrl = typeof url === 'string' ? url : (url && url.href ? url.href : ''); |
| return origOpen.call(this, method, url, async !== false, username, password); |
| }; |
| |
| const textDescriptor = Object.getOwnPropertyDescriptor(XMLHttpRequest.prototype, 'responseText'); |
| if (textDescriptor?.get) { |
| const origTextGetter = textDescriptor.get; |
| Object.defineProperty(XMLHttpRequest.prototype, 'responseText', { |
| get: function() { |
| const value = origTextGetter.call(this); |
| if (this.readyState !== 4 || this.status !== 200) return value; |
| const url = this.__saccLogin2faUrl || ''; |
| if (!url || !findRule(url)) return value; |
| try { |
| const json = JSON.parse(value); |
| if (applyModify(json, url)) return JSON.stringify(json); |
| } catch (error) {} |
| return value; |
| }, |
| configurable: true, |
| enumerable: true |
| }); |
| } |
| |
| const respDescriptor = Object.getOwnPropertyDescriptor(XMLHttpRequest.prototype, 'response'); |
| if (respDescriptor?.get) { |
| const origRespGetter = respDescriptor.get; |
| Object.defineProperty(XMLHttpRequest.prototype, 'response', { |
| get: function() { |
| const value = origRespGetter.call(this); |
| if (this.readyState !== 4 || this.status !== 200) return value; |
| const url = this.__saccLogin2faUrl || ''; |
| if (!url || !findRule(url)) return value; |
| try { |
| const obj = typeof value === 'string' ? JSON.parse(value) : value; |
| if (applyModify(obj, url)) { |
| return this.responseType === 'json' ? obj : JSON.stringify(obj); |
| } |
| } catch (error) {} |
| return value; |
| }, |
| configurable: true, |
| enumerable: true |
| }); |
| } |
| })();""" |
|
|
|
|
| def _get_password_popup_blocker_script() -> str: |
| return """(() => { |
| const REDIRECT_URL = 'https://id.scu.edu.cn/enduser/sp/sso/scdxplugin_jwt23?enterpriseId=scdx&target_url=index'; |
| let stopped = false; |
| let observer = null; |
| let origPushState = null; |
| let origReplaceState = null; |
| |
| function removeAllListeners() { |
| stopped = true; |
| try { if (origPushState) history.pushState = origPushState; } catch (error) {} |
| try { if (origReplaceState) history.replaceState = origReplaceState; } catch (error) {} |
| try { window.removeEventListener('popstate', checkAndRedirect); } catch (error) {} |
| try { window.removeEventListener('hashchange', checkAndRedirect); } catch (error) {} |
| try { if (observer) observer.disconnect(); } catch (error) {} |
| } |
| |
| function redirectToSso() { |
| removeAllListeners(); |
| try { window.location.replace(REDIRECT_URL); } catch (error) { window.location.href = REDIRECT_URL; } |
| } |
| |
| function checkAndRedirect() { |
| if (stopped) return; |
| try { |
| const href = location.href || ''; |
| if (!href.includes('id.scu.edu.cn')) return; |
| if (href.includes('zhjw.scu.edu.cn')) { |
| removeAllListeners(); |
| return; |
| } |
| if (/modifyPassword/.test(href) && /needModifyPasswordOfPwdExpire/.test(href)) { |
| redirectToSso(); |
| } |
| } catch (error) {} |
| } |
| |
| if ((location.href || '').includes('id.scu.edu.cn')) { |
| checkAndRedirect(); |
| origPushState = history.pushState; |
| origReplaceState = history.replaceState; |
| history.pushState = function() { |
| const result = origPushState.apply(this, arguments); |
| try { setTimeout(checkAndRedirect, 50); } catch (error) {} |
| return result; |
| }; |
| history.replaceState = function() { |
| const result = origReplaceState.apply(this, arguments); |
| try { setTimeout(checkAndRedirect, 50); } catch (error) {} |
| return result; |
| }; |
| window.addEventListener('popstate', checkAndRedirect); |
| window.addEventListener('hashchange', checkAndRedirect); |
| try { |
| observer = new MutationObserver(checkAndRedirect); |
| observer.observe(document, { subtree: true, childList: true }); |
| } catch (error) {} |
| } |
| |
| function removeIfPasswordModal(modalDialog) { |
| try { |
| const text = modalDialog.textContent || ''; |
| const hasKeywords = /修改密码|更改密码/.test(text); |
| const hasPasswordFields = Boolean( |
| modalDialog.querySelector && |
| (modalDialog.querySelector('#oldPass') || |
| modalDialog.querySelector('#newPass1') || |
| modalDialog.querySelector('#newPass2')) |
| ); |
| if (!hasKeywords && !hasPasswordFields) return; |
| const container = (modalDialog.closest && modalDialog.closest('.modal')) || modalDialog.parentElement || modalDialog; |
| if (container && typeof container.remove === 'function') { |
| container.remove(); |
| } |
| } catch (error) {} |
| } |
| |
| function removePasswordArtifacts() { |
| try { |
| document.querySelectorAll('.modal-dialog').forEach(removeIfPasswordModal); |
| document.querySelectorAll('.modal-backdrop.fade.in, .modal-backdrop').forEach((node) => { |
| try { node.remove(); } catch (error) {} |
| }); |
| } catch (error) {} |
| } |
| |
| removePasswordArtifacts(); |
| try { |
| const modalObserver = new MutationObserver(removePasswordArtifacts); |
| modalObserver.observe(document, { subtree: true, childList: true }); |
| } catch (error) {} |
| })();""" |
|
|
|
|
| @dataclass(slots=True) |
| class BrowserProxyConfig: |
| scheme: str |
| host: str |
| port: int |
| username: str = "" |
| password: str = "" |
|
|
| @property |
| def requires_auth_relay(self) -> bool: |
| return self.scheme in {"socks5", "socks5h"} and bool(self.username or self.password) |
|
|
| @property |
| def display_label(self) -> str: |
| return f"{self.scheme}://{self.host}:{self.port}" |
|
|
|
|
| class Socks5AuthRelay: |
| """Local SOCKS5 relay that authenticates to an upstream SOCKS5 proxy.""" |
|
|
| def __init__(self, proxy: BrowserProxyConfig) -> None: |
| self.proxy = proxy |
| self.port = 0 |
| self._server: socket.socket | None = None |
| self._stop_event = threading.Event() |
| self._thread: threading.Thread | None = None |
|
|
| def start(self) -> None: |
| server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| server.bind(("127.0.0.1", 0)) |
| server.listen(64) |
| server.settimeout(0.5) |
| self._server = server |
| self.port = int(server.getsockname()[1]) |
| self._thread = threading.Thread( |
| target=self._accept_loop, |
| name=f"sacc-socks5-relay-{self.port}", |
| daemon=True, |
| ) |
| self._thread.start() |
|
|
| def close(self) -> None: |
| self._stop_event.set() |
| if self._server is not None: |
| try: |
| self._server.close() |
| except OSError: |
| pass |
| if self._thread is not None and self._thread.is_alive(): |
| self._thread.join(timeout=1) |
|
|
| def _accept_loop(self) -> None: |
| while not self._stop_event.is_set(): |
| try: |
| client, _addr = self._server.accept() if self._server is not None else (None, None) |
| except (OSError, TimeoutError): |
| continue |
| if client is None: |
| continue |
| threading.Thread(target=self._handle_client, args=(client,), daemon=True).start() |
|
|
| def _handle_client(self, client: socket.socket) -> None: |
| upstream: socket.socket | None = None |
| try: |
| client.settimeout(20) |
| request = self._read_client_connect_request(client) |
| if request is None: |
| return |
| upstream = socket.create_connection((self.proxy.host, self.proxy.port), timeout=20) |
| upstream.settimeout(20) |
| if not self._authenticate_upstream(upstream): |
| self._send_client_failure(client) |
| return |
| upstream.sendall(request) |
| upstream_response = self._read_socks5_response(upstream) |
| if not upstream_response or len(upstream_response) < 2 or upstream_response[1] != 0: |
| self._send_client_failure(client) |
| return |
| client.sendall(b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00") |
| self._relay(client, upstream) |
| except OSError: |
| return |
| finally: |
| for sock in (client, upstream): |
| if sock is not None: |
| try: |
| sock.close() |
| except OSError: |
| pass |
|
|
| def _read_client_connect_request(self, client: socket.socket) -> bytes | None: |
| header = self._recv_exact(client, 2) |
| if not header or header[0] != 5: |
| return None |
| methods = self._recv_exact(client, header[1]) |
| if methods is None: |
| return None |
| client.sendall(b"\x05\x00") |
| request_header = self._recv_exact(client, 4) |
| if not request_header or request_header[0] != 5 or request_header[1] != 1: |
| self._send_client_failure(client) |
| return None |
| address = self._read_socks5_address(client, request_header[3]) |
| port = self._recv_exact(client, 2) |
| if address is None or port is None: |
| self._send_client_failure(client) |
| return None |
| return request_header + address + port |
|
|
| def _authenticate_upstream(self, upstream: socket.socket) -> bool: |
| username = self.proxy.username.encode("utf-8") |
| password = self.proxy.password.encode("utf-8") |
| if len(username) > 255 or len(password) > 255: |
| return False |
| if username or password: |
| upstream.sendall(b"\x05\x01\x02") |
| if self._recv_exact(upstream, 2) != b"\x05\x02": |
| return False |
| upstream.sendall(b"\x01" + bytes([len(username)]) + username + bytes([len(password)]) + password) |
| return self._recv_exact(upstream, 2) == b"\x01\x00" |
| upstream.sendall(b"\x05\x01\x00") |
| return self._recv_exact(upstream, 2) == b"\x05\x00" |
|
|
| def _read_socks5_response(self, sock: socket.socket) -> bytes | None: |
| header = self._recv_exact(sock, 4) |
| if not header: |
| return None |
| address = self._read_socks5_address(sock, header[3]) |
| port = self._recv_exact(sock, 2) |
| if address is None or port is None: |
| return None |
| return header + address + port |
|
|
| def _read_socks5_address(self, sock: socket.socket, address_type: int) -> bytes | None: |
| if address_type == 1: |
| return self._recv_exact(sock, 4) |
| if address_type == 3: |
| length = self._recv_exact(sock, 1) |
| if not length: |
| return None |
| domain = self._recv_exact(sock, length[0]) |
| return None if domain is None else length + domain |
| if address_type == 4: |
| return self._recv_exact(sock, 16) |
| return None |
|
|
| @staticmethod |
| def _recv_exact(sock: socket.socket, size: int) -> bytes | None: |
| data = bytearray() |
| while len(data) < size: |
| chunk = sock.recv(size - len(data)) |
| if not chunk: |
| return None |
| data.extend(chunk) |
| return bytes(data) |
|
|
| @staticmethod |
| def _send_client_failure(client: socket.socket) -> None: |
| try: |
| client.sendall(b"\x05\x01\x00\x01\x00\x00\x00\x00\x00\x00") |
| except OSError: |
| pass |
|
|
| def _relay(self, client: socket.socket, upstream: socket.socket) -> None: |
| client.settimeout(None) |
| upstream.settimeout(None) |
| sockets = [client, upstream] |
| while not self._stop_event.is_set(): |
| readable, _writable, errored = select.select(sockets, [], sockets, 30) |
| if errored: |
| return |
| if not readable: |
| continue |
| for source in readable: |
| target = upstream if source is client else client |
| data = source.recv(65536) |
| if not data: |
| return |
| target.sendall(data) |
|
|
|
|
| def parse_browser_proxy(proxy_url: str) -> BrowserProxyConfig | None: |
| raw_url = str(proxy_url or "").strip() |
| if not raw_url: |
| return None |
| parts = urlsplit(raw_url) |
| scheme = (parts.scheme or "socks5").lower() |
| if scheme not in {"socks5", "socks5h", "http", "https"}: |
| raise ValueError(f"Unsupported browser proxy scheme: {scheme}") |
| if not parts.hostname or not parts.port: |
| raise ValueError("Browser proxy URL must include host and port.") |
| return BrowserProxyConfig( |
| scheme=scheme, |
| host=parts.hostname, |
| port=int(parts.port), |
| username=unquote(parts.username or ""), |
| password=unquote(parts.password or ""), |
| ) |
|
|
|
|
| def probe_browser_proxy( |
| proxy_url: str, |
| *, |
| target_host: str = "id.scu.edu.cn", |
| target_port: int = 80, |
| timeout_seconds: float = 8.0, |
| ) -> tuple[bool, str]: |
| proxy = parse_browser_proxy(proxy_url) |
| if proxy is None: |
| return True, "proxy disabled" |
| if proxy.scheme not in {"socks5", "socks5h"}: |
| return _probe_tcp_proxy(proxy, timeout_seconds=timeout_seconds) |
|
|
| try: |
| with socket.create_connection((proxy.host, proxy.port), timeout=timeout_seconds) as sock: |
| sock.settimeout(timeout_seconds) |
| if not _authenticate_socks5(sock, proxy): |
| return False, f"{proxy.display_label} authentication failed" |
| request = _build_socks5_connect_request(proxy, target_host, int(target_port)) |
| sock.sendall(request) |
| response = _read_socks5_response(sock) |
| if not response or len(response) < 2: |
| return False, f"{proxy.display_label} did not return a SOCKS5 response" |
| if response[1] == 0: |
| return True, f"{proxy.display_label} can reach {target_host}:{target_port}" |
| return False, f"{proxy.display_label} returned SOCKS5 status {response[1]} for {target_host}:{target_port}" |
| except OSError as exc: |
| return False, f"{proxy.display_label} probe failed: {exc}" |
|
|
|
|
| def _probe_tcp_proxy(proxy: BrowserProxyConfig, *, timeout_seconds: float) -> tuple[bool, str]: |
| try: |
| with socket.create_connection((proxy.host, proxy.port), timeout=timeout_seconds): |
| return True, f"{proxy.display_label} TCP endpoint is reachable" |
| except OSError as exc: |
| return False, f"{proxy.display_label} TCP probe failed: {exc}" |
|
|
|
|
| def _authenticate_socks5(sock: socket.socket, proxy: BrowserProxyConfig) -> bool: |
| username = proxy.username.encode("utf-8") |
| password = proxy.password.encode("utf-8") |
| if len(username) > 255 or len(password) > 255: |
| return False |
| if username or password: |
| sock.sendall(b"\x05\x01\x02") |
| if Socks5AuthRelay._recv_exact(sock, 2) != b"\x05\x02": |
| return False |
| sock.sendall(b"\x01" + bytes([len(username)]) + username + bytes([len(password)]) + password) |
| return Socks5AuthRelay._recv_exact(sock, 2) == b"\x01\x00" |
|
|
| sock.sendall(b"\x05\x01\x00") |
| return Socks5AuthRelay._recv_exact(sock, 2) == b"\x05\x00" |
|
|
|
|
| def _build_socks5_connect_request(proxy: BrowserProxyConfig, target_host: str, target_port: int) -> bytes: |
| if proxy.scheme == "socks5": |
| try: |
| address = b"\x01" + socket.inet_aton(socket.gethostbyname(target_host)) |
| except OSError: |
| encoded_host = target_host.encode("idna") |
| address = b"\x03" + bytes([len(encoded_host)]) + encoded_host |
| else: |
| encoded_host = target_host.encode("idna") |
| address = b"\x03" + bytes([len(encoded_host)]) + encoded_host |
| return b"\x05\x01\x00" + address + int(target_port).to_bytes(2, "big") |
|
|
|
|
| def _read_socks5_response(sock: socket.socket) -> bytes | None: |
| header = Socks5AuthRelay._recv_exact(sock, 4) |
| if not header: |
| return None |
| address_type = header[3] |
| if address_type == 1: |
| address = Socks5AuthRelay._recv_exact(sock, 4) |
| elif address_type == 3: |
| length = Socks5AuthRelay._recv_exact(sock, 1) |
| if not length: |
| address = None |
| else: |
| domain = Socks5AuthRelay._recv_exact(sock, length[0]) |
| address = None if domain is None else length + domain |
| elif address_type == 4: |
| address = Socks5AuthRelay._recv_exact(sock, 16) |
| else: |
| address = None |
| port = Socks5AuthRelay._recv_exact(sock, 2) |
| if address is None or port is None: |
| return None |
| return header + address + port |
|
|
|
|
| def configure_browser( |
| *, |
| chrome_binary: str, |
| chromedriver_path: str, |
| page_timeout: int = 40, |
| proxy_url: str = "", |
| enable_login_2fa_bypass: bool = False, |
| ) -> WebDriver: |
| options = webdriver.ChromeOptions() |
| options.binary_location = chrome_binary |
| options.page_load_strategy = "eager" |
| options.add_argument("--headless=new") |
| options.add_argument("--disable-gpu") |
| options.add_argument("--no-sandbox") |
| options.add_argument("--disable-dev-shm-usage") |
| options.add_argument("--disable-blink-features=AutomationControlled") |
| options.add_argument("--disable-background-networking") |
| options.add_argument("--disable-background-timer-throttling") |
| options.add_argument("--disable-backgrounding-occluded-windows") |
| options.add_argument("--disable-renderer-backgrounding") |
| options.add_argument("--disable-extensions") |
| options.add_argument("--disable-default-apps") |
| options.add_argument("--no-first-run") |
| options.add_argument("--no-default-browser-check") |
| options.add_argument("--mute-audio") |
| options.add_argument("--window-size=1440,1280") |
| options.add_argument("--lang=zh-CN") |
| options.add_argument(f"--user-agent={DEFAULT_USER_AGENT}") |
| options.add_argument("--remote-debugging-pipe") |
| options.add_experimental_option("excludeSwitches", ["enable-automation", "enable-logging"]) |
| options.add_experimental_option("useAutomationExtension", False) |
| options.set_capability("goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"}) |
|
|
| proxy_relay: Socks5AuthRelay | None = None |
| proxy = parse_browser_proxy(proxy_url) |
| if proxy is not None: |
| if proxy.requires_auth_relay: |
| proxy_relay = Socks5AuthRelay(proxy) |
| proxy_relay.start() |
| options.add_argument(f"--proxy-server=socks5://127.0.0.1:{proxy_relay.port}") |
| else: |
| options.add_argument(f"--proxy-server={proxy.scheme}://{proxy.host}:{proxy.port}") |
|
|
| resolved_driver_path = str(chromedriver_path or "").strip() |
| if resolved_driver_path and not Path(resolved_driver_path).exists() and shutil.which(resolved_driver_path) is None: |
| resolved_driver_path = "" |
| service = ChromeService(executable_path=resolved_driver_path) if resolved_driver_path else ChromeService() |
| try: |
| driver = webdriver.Chrome(service=service, options=options) |
| except Exception: |
| if proxy_relay is not None: |
| proxy_relay.close() |
| raise |
| driver._sacc_proxy_relay = proxy_relay |
| driver.set_page_load_timeout(page_timeout) |
| driver.set_script_timeout(min(page_timeout, 20)) |
| driver.implicitly_wait(6) |
| try: |
| driver.execute_cdp_cmd("Network.enable", {}) |
| except WebDriverException: |
| pass |
| driver.execute_cdp_cmd( |
| "Page.addScriptToEvaluateOnNewDocument", |
| { |
| "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});" |
| }, |
| ) |
| driver.execute_cdp_cmd( |
| "Page.addScriptToEvaluateOnNewDocument", |
| {"source": _get_password_popup_blocker_script()}, |
| ) |
| if enable_login_2fa_bypass: |
| driver.execute_cdp_cmd( |
| "Page.addScriptToEvaluateOnNewDocument", |
| {"source": _get_login_2fa_bypass_script()}, |
| ) |
| return driver |
|
|
|
|
| def quit_browser(driver: WebDriver) -> None: |
| proxy_relay = getattr(driver, "_sacc_proxy_relay", None) |
| try: |
| driver.quit() |
| finally: |
| if proxy_relay is not None: |
| proxy_relay.close() |
|
|
|
|
| def open_with_recovery(driver: WebDriver, url: str) -> bool: |
| try: |
| driver.get(url) |
| return False |
| except TimeoutException: |
| try: |
| driver.execute_script("window.stop();") |
| except WebDriverException: |
| pass |
| return True |
|
|
|
|
| def wait_for_ready(driver_wait: WebDriverWait, *, allow_interactive: bool = True) -> str: |
| acceptable_states = {"complete", "interactive"} if allow_interactive else {"complete"} |
| driver_wait.until( |
| lambda web_driver: web_driver.execute_script("return document.readyState") in acceptable_states, |
| "The target page did not finish loading in time.", |
| ) |
| return str(driver_wait._driver.execute_script("return document.readyState"))
|
|
|