Spaces:
Paused
Paused
| import logging | |
| import platform | |
| import select | |
| import socket | |
| import subprocess | |
| import sys | |
| import threading | |
| from typing import List, Optional | |
| logger = logging.getLogger("CamoufoxLauncher") | |
| def is_port_in_use(port: int, host: str = "0.0.0.0") -> bool: | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| try: | |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| s.bind((host, port)) | |
| return False | |
| except OSError: | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Unknown error checking port {port} (host {host}): {e}") | |
| return True | |
| def find_pids_on_port(port: int) -> List[int]: | |
| pids: List[int] = [] | |
| system_platform = platform.system() | |
| command = "" | |
| try: | |
| if system_platform == "Linux" or system_platform == "Darwin": | |
| command = f"lsof -ti :{port} -sTCP:LISTEN" | |
| process = subprocess.Popen( | |
| command, | |
| shell=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| close_fds=True, | |
| ) | |
| stdout, stderr = process.communicate(timeout=5) | |
| if process.returncode == 0 and stdout: | |
| pids = [int(pid) for pid in stdout.strip().split("\n") if pid.isdigit()] | |
| elif process.returncode != 0 and ( | |
| "command not found" in stderr.lower() or "未找到命令" in stderr | |
| ): | |
| logger.error("Command 'lsof' not found. Please ensure it is installed.") | |
| elif process.returncode not in [0, 1]: # lsof returns 1 when not found | |
| logger.warning( | |
| f"Failed to execute lsof command (return code {process.returncode}): {stderr.strip()}" | |
| ) | |
| elif system_platform == "Windows": | |
| command = f'netstat -ano -p TCP | findstr "LISTENING" | findstr ":{port} "' | |
| process = subprocess.Popen( | |
| command, | |
| shell=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| ) | |
| stdout, stderr = process.communicate(timeout=10) | |
| if process.returncode == 0 and stdout: | |
| for line in stdout.strip().split("\n"): | |
| parts = line.split() | |
| if ( | |
| len(parts) >= 4 | |
| and parts[0].upper() == "TCP" | |
| and f":{port}" in parts[1] | |
| ): | |
| if parts[-1].isdigit(): | |
| pids.append(int(parts[-1])) | |
| pids = list(set(pids)) # Deduplicate | |
| elif process.returncode not in [0, 1]: # findstr returns 1 when not found | |
| logger.warning( | |
| f"Failed to execute netstat/findstr command (return code {process.returncode}): {stderr.strip()}" | |
| ) | |
| else: | |
| logger.warning( | |
| f"Unsupported operating system '{system_platform}' for finding processes on port." | |
| ) | |
| except FileNotFoundError: | |
| cmd_name = command.split()[0] if command else "required tool" | |
| logger.error(f"Command '{cmd_name}' not found.") | |
| except subprocess.TimeoutExpired: | |
| logger.error(f"Command '{command}' timed out.") | |
| except Exception as e: | |
| logger.error(f"Error finding processes on port {port}: {e}", exc_info=True) | |
| return pids | |
| def kill_process_interactive(pid: int) -> bool: | |
| system_platform = platform.system() | |
| success = False | |
| logger.info(f"Attempting to terminate process PID: {pid}...") | |
| try: | |
| if system_platform == "Linux" or system_platform == "Darwin": | |
| result_term = subprocess.run( | |
| f"kill {pid}", | |
| shell=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=3, | |
| check=False, | |
| ) | |
| if result_term.returncode == 0: | |
| logger.info(f"SIGTERM signal sent to PID {pid}.") | |
| success = True | |
| else: | |
| logger.warning( | |
| f" PID {pid} SIGTERM failed: {result_term.stderr.strip() or result_term.stdout.strip()}. Trying SIGKILL..." | |
| ) | |
| result_kill = subprocess.run( | |
| f"kill -9 {pid}", | |
| shell=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=3, | |
| check=False, | |
| ) | |
| if result_kill.returncode == 0: | |
| logger.info(f"SIGKILL signal sent to PID {pid}.") | |
| success = True | |
| else: | |
| logger.error( | |
| f" ✗ PID {pid} SIGKILL failed: {result_kill.stderr.strip() or result_kill.stdout.strip()}." | |
| ) | |
| elif system_platform == "Windows": | |
| command_desc = f"taskkill /PID {pid} /T /F" | |
| result = subprocess.run( | |
| command_desc, | |
| shell=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=5, | |
| check=False, | |
| ) | |
| output = result.stdout.strip() | |
| error_output = result.stderr.strip() | |
| if result.returncode == 0 and ( | |
| "SUCCESS" in output.upper() or "成功" in output | |
| ): | |
| logger.info(f"PID {pid} terminated via taskkill /F.") | |
| success = True | |
| elif ( | |
| "could not find process" in error_output.lower() | |
| or "找不到" in error_output | |
| ): # Process may have already exited | |
| logger.info( | |
| f"PID {pid} not found when executing taskkill (may have exited)." | |
| ) | |
| success = True # Treat as success since goal is port availability | |
| else: | |
| # Count errors rather than outputting each one | |
| combined = (error_output + " " + output).strip() | |
| error_count = combined.count("ERROR:") | |
| if error_count > 0: | |
| logger.warning( | |
| f" PID {pid} taskkill /F: (suppressed {error_count} error messages)" | |
| ) | |
| else: | |
| logger.warning(f"PID {pid} taskkill /F returned non-zero status") | |
| else: | |
| logger.warning( | |
| f"Unsupported operating system '{system_platform}' for terminating processes." | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error terminating PID {pid}: {e}", exc_info=True) | |
| return success | |
| def input_with_timeout(prompt_message: str, timeout_seconds: int = 30) -> str: | |
| print(prompt_message, end="", flush=True) | |
| if sys.platform == "win32": | |
| user_input_container: List[Optional[str]] = [None] | |
| def get_input_in_thread(): | |
| try: | |
| user_input_container[0] = sys.stdin.readline().strip() | |
| except Exception: | |
| user_input_container[0] = "" # Return empty string on error | |
| input_thread = threading.Thread(target=get_input_in_thread, daemon=True) | |
| input_thread.start() | |
| input_thread.join(timeout=timeout_seconds) | |
| if input_thread.is_alive(): | |
| print("\nInput timed out. Using default value.", flush=True) | |
| return "" | |
| return user_input_container[0] if user_input_container[0] is not None else "" | |
| else: # Linux/macOS | |
| readable_fds, _, _ = select.select([sys.stdin], [], [], timeout_seconds) | |
| if readable_fds: | |
| return sys.stdin.readline().strip() | |
| else: | |
| print("\nInput timed out. Using default value.", flush=True) | |
| return "" | |
| def get_proxy_from_gsettings() -> Optional[str]: | |
| """ | |
| Retrieves the proxy settings from GSettings on Linux systems. | |
| Returns a proxy string like "http://host:port" or None. | |
| """ | |
| def _run_gsettings_command(command_parts: List[str]) -> Optional[str]: | |
| """Helper function to run gsettings command and return cleaned string output.""" | |
| try: | |
| process_result = subprocess.run( | |
| command_parts, | |
| capture_output=True, | |
| text=True, | |
| check=False, # Do not raise CalledProcessError for non-zero exit codes | |
| timeout=1, # Timeout for the subprocess call | |
| ) | |
| if process_result.returncode == 0: | |
| value = process_result.stdout.strip() | |
| if value.startswith("'") and value.endswith( | |
| "'" | |
| ): # Remove surrounding single quotes | |
| value = value[1:-1] | |
| # If after stripping quotes, value is empty, or it's a gsettings "empty" representation | |
| if not value or value == "''" or value == "@as []" or value == "[]": | |
| return None | |
| return value | |
| else: | |
| return None | |
| except subprocess.TimeoutExpired: | |
| return None | |
| except Exception: # Broad exception as per pseudocode | |
| return None | |
| proxy_mode = _run_gsettings_command( | |
| ["gsettings", "get", "org.gnome.system.proxy", "mode"] | |
| ) | |
| if proxy_mode == "manual": | |
| # Try HTTP proxy first | |
| http_host = _run_gsettings_command( | |
| ["gsettings", "get", "org.gnome.system.proxy.http", "host"] | |
| ) | |
| http_port_str = _run_gsettings_command( | |
| ["gsettings", "get", "org.gnome.system.proxy.http", "port"] | |
| ) | |
| if http_host and http_port_str: | |
| try: | |
| http_port = int(http_port_str) | |
| if http_port > 0: | |
| return f"http://{http_host}:{http_port}" | |
| except ValueError: | |
| pass # Continue to HTTPS | |
| # Try HTTPS proxy if HTTP not found or invalid | |
| https_host = _run_gsettings_command( | |
| ["gsettings", "get", "org.gnome.system.proxy.https", "host"] | |
| ) | |
| https_port_str = _run_gsettings_command( | |
| ["gsettings", "get", "org.gnome.system.proxy.https", "port"] | |
| ) | |
| if https_host and https_port_str: | |
| try: | |
| https_port = int(https_port_str) | |
| if https_port > 0: | |
| # Note: Even for HTTPS proxy settings, the scheme for Playwright/requests is usually http:// | |
| return f"http://{https_host}:{https_port}" | |
| except ValueError: | |
| pass | |
| return None | |