Spaces:
Paused
Paused
| """ | |
| Tests for Ports Configuration API Router | |
| Covers: GET/POST /api/ports/config, GET /api/ports/status, POST /api/ports/kill | |
| """ | |
| import json | |
| from pathlib import Path | |
| from unittest.mock import MagicMock, patch | |
| import pytest | |
| from fastapi import FastAPI | |
| from fastapi.testclient import TestClient | |
| from api_utils.routers.ports import ( | |
| KillRequest, | |
| PortConfig, | |
| PortStatus, | |
| ProcessInfo, | |
| _find_processes_on_port, | |
| _get_process_name, | |
| router, | |
| ) | |
| def app() -> FastAPI: | |
| """Create test FastAPI app with ports router.""" | |
| app = FastAPI() | |
| app.include_router(router) | |
| return app | |
| def client(app: FastAPI) -> TestClient: | |
| """Create test client.""" | |
| return TestClient(app) | |
| class TestPortModels: | |
| """Tests for port models.""" | |
| def test_port_config_model(self) -> None: | |
| """Test PortConfig model with defaults.""" | |
| config = PortConfig() | |
| assert config.fastapi_port == 2048 | |
| assert config.camoufox_debug_port == 9222 | |
| assert config.stream_proxy_port == 3120 | |
| assert config.stream_proxy_enabled is True | |
| def test_port_config_validation(self) -> None: | |
| """Test PortConfig validates port ranges.""" | |
| with pytest.raises(ValueError): | |
| PortConfig(fastapi_port=80) # Below 1024 | |
| # Valid range | |
| config = PortConfig(fastapi_port=8080) | |
| assert config.fastapi_port == 8080 | |
| def test_process_info_model(self) -> None: | |
| """Test ProcessInfo model.""" | |
| proc = ProcessInfo(pid=1234, name="python") | |
| assert proc.pid == 1234 | |
| assert proc.name == "python" | |
| def test_port_status_model(self) -> None: | |
| """Test PortStatus model.""" | |
| status = PortStatus( | |
| port=8080, | |
| port_type="FastAPI", | |
| in_use=True, | |
| processes=[ProcessInfo(pid=1234, name="python")], | |
| ) | |
| assert status.in_use is True | |
| assert len(status.processes) == 1 | |
| def test_kill_request_model(self) -> None: | |
| """Test KillRequest model.""" | |
| req = KillRequest(pid=1234, confirm=True) | |
| assert req.pid == 1234 | |
| assert req.confirm is True | |
| class TestGetPortConfig: | |
| """Tests for GET /api/ports/config endpoint.""" | |
| def test_get_port_config_defaults(self, client: TestClient, tmp_path: Path) -> None: | |
| """Test getting config when no file exists.""" | |
| with patch( | |
| "api_utils.routers.ports._PORTS_CONFIG_FILE", tmp_path / "nonexistent.json" | |
| ): | |
| response = client.get("/api/ports/config") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "fastapi_port" in data | |
| assert "camoufox_debug_port" in data | |
| def test_get_port_config_from_file( | |
| self, client: TestClient, tmp_path: Path | |
| ) -> None: | |
| """Test getting config from saved file.""" | |
| config_file = tmp_path / "ports_config.json" | |
| config_file.write_text( | |
| json.dumps( | |
| { | |
| "fastapi_port": 9999, | |
| "camoufox_debug_port": 9223, | |
| "stream_proxy_port": 3121, | |
| "stream_proxy_enabled": False, | |
| } | |
| ) | |
| ) | |
| with patch("api_utils.routers.ports._PORTS_CONFIG_FILE", config_file): | |
| response = client.get("/api/ports/config") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["fastapi_port"] == 9999 | |
| class TestUpdatePortConfig: | |
| """Tests for POST /api/ports/config endpoint.""" | |
| def test_update_port_config(self, client: TestClient, tmp_path: Path) -> None: | |
| """Test updating port configuration.""" | |
| config_file = tmp_path / "ports_config.json" | |
| with patch("api_utils.routers.ports._PORTS_CONFIG_FILE", config_file): | |
| response = client.post( | |
| "/api/ports/config", | |
| json={ | |
| "fastapi_port": 8080, | |
| "camoufox_debug_port": 9000, | |
| "stream_proxy_port": 3200, | |
| "stream_proxy_enabled": True, | |
| }, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["success"] is True | |
| assert data["config"]["fastapi_port"] == 8080 | |
| assert "restart" in data["message"].lower() | |
| def test_update_port_config_invalid(self, client: TestClient) -> None: | |
| """Test updating with invalid port.""" | |
| response = client.post( | |
| "/api/ports/config", | |
| json={ | |
| "fastapi_port": 80, # Invalid | |
| "camoufox_debug_port": 9222, | |
| "stream_proxy_port": 3120, | |
| "stream_proxy_enabled": True, | |
| }, | |
| ) | |
| assert response.status_code == 422 | |
| class TestPortStatus: | |
| """Tests for GET /api/ports/status endpoint.""" | |
| def test_get_port_status(self, client: TestClient) -> None: | |
| """Test getting port status.""" | |
| with patch("api_utils.routers.ports._find_processes_on_port", return_value=[]): | |
| response = client.get("/api/ports/status") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "ports" in data | |
| assert len(data["ports"]) >= 2 # At least FastAPI and Camoufox | |
| def test_get_port_status_with_processes(self, client: TestClient) -> None: | |
| """Test port status shows active processes.""" | |
| mock_processes = [ProcessInfo(pid=1234, name="python")] | |
| with patch( | |
| "api_utils.routers.ports._find_processes_on_port", | |
| return_value=mock_processes, | |
| ): | |
| response = client.get("/api/ports/status") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| # At least one port should show in_use | |
| in_use_ports = [p for p in data["ports"] if p["in_use"]] | |
| assert len(in_use_ports) > 0 | |
| class TestKillProcess: | |
| """Tests for POST /api/ports/kill endpoint.""" | |
| def test_kill_process_requires_confirmation(self, client: TestClient) -> None: | |
| """Test kill endpoint requires confirmation.""" | |
| response = client.post( | |
| "/api/ports/kill", | |
| json={"pid": 1234, "confirm": False}, | |
| ) | |
| assert response.status_code == 400 | |
| def test_kill_process_with_confirmation(self, client: TestClient) -> None: | |
| """Test kill process with confirmation when PID is on a tracked port.""" | |
| # Must mock _find_processes_on_port to return a process with matching PID | |
| mock_processes = [ProcessInfo(pid=1234, name="python")] | |
| with ( | |
| patch( | |
| "api_utils.routers.ports._find_processes_on_port", | |
| return_value=mock_processes, | |
| ), | |
| patch( | |
| "api_utils.routers.ports._kill_process", | |
| return_value=(True, "Process terminated"), | |
| ), | |
| ): | |
| response = client.post( | |
| "/api/ports/kill", | |
| json={"pid": 1234, "confirm": True}, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["success"] is True | |
| def test_kill_process_unauthorized_pid(self, client: TestClient) -> None: | |
| """Test kill process fails for PID not on tracked ports (security check).""" | |
| # Mock _find_processes_on_port to return empty (PID not on any port) | |
| with patch( | |
| "api_utils.routers.ports._find_processes_on_port", | |
| return_value=[], | |
| ): | |
| response = client.post( | |
| "/api/ports/kill", | |
| json={"pid": 9999, "confirm": True}, | |
| ) | |
| assert response.status_code == 403 | |
| data = response.json() | |
| assert "9999" in data["detail"] | |
| def test_kill_process_failure(self, client: TestClient) -> None: | |
| """Test kill process failure when kill actually fails.""" | |
| # Mock: PID is on tracked port, but kill fails | |
| mock_processes = [ProcessInfo(pid=1234, name="python")] | |
| with ( | |
| patch( | |
| "api_utils.routers.ports._find_processes_on_port", | |
| return_value=mock_processes, | |
| ), | |
| patch( | |
| "api_utils.routers.ports._kill_process", | |
| return_value=(False, "Failed to terminate process"), | |
| ), | |
| ): | |
| response = client.post( | |
| "/api/ports/kill", | |
| json={"pid": 1234, "confirm": True}, | |
| ) | |
| assert response.status_code == 500 | |
| data = response.json() | |
| assert data["success"] is False | |
| class TestHelperFunctions: | |
| """Tests for helper functions.""" | |
| def test_find_processes_on_port_empty(self) -> None: | |
| """Test finding processes on unused port.""" | |
| with patch("subprocess.run") as mock_run: | |
| mock_run.return_value = MagicMock(returncode=1, stdout="") | |
| result = _find_processes_on_port(59999) # Unlikely to be in use | |
| assert isinstance(result, list) | |
| def test_get_process_name_unknown(self) -> None: | |
| """Test getting name for non-existent PID.""" | |
| with patch("subprocess.run") as mock_run: | |
| mock_run.return_value = MagicMock(returncode=1, stdout="") | |
| name = _get_process_name(999999) # Non-existent PID | |
| assert name == "Unknown" | |
| def test_find_processes_on_port_linux(self) -> None: | |
| """Test finding processes on Linux.""" | |
| with ( | |
| patch("platform.system", return_value="Linux"), | |
| patch("subprocess.run") as mock_run, | |
| ): | |
| mock_run.return_value = MagicMock(returncode=0, stdout="1234\n5678\n") | |
| with patch.object( | |
| __import__("api_utils.routers.ports", fromlist=["_get_process_name"]), | |
| "_get_process_name", | |
| return_value="python", | |
| ): | |
| result = _find_processes_on_port(8080) | |
| assert isinstance(result, list) | |
| def test_get_process_name_linux(self) -> None: | |
| """Test getting process name on Linux.""" | |
| with ( | |
| patch("platform.system", return_value="Linux"), | |
| patch("subprocess.run") as mock_run, | |
| ): | |
| mock_run.return_value = MagicMock(returncode=0, stdout="python3\n") | |
| name = _get_process_name(1234) | |
| assert name == "python3" | |
| def test_get_process_name_darwin(self) -> None: | |
| """Test getting process name on macOS.""" | |
| with ( | |
| patch("platform.system", return_value="Darwin"), | |
| patch("subprocess.run") as mock_run, | |
| ): | |
| mock_run.return_value = MagicMock(returncode=0, stdout="node\n") | |
| name = _get_process_name(1234) | |
| assert name == "node" | |
| def test_find_processes_on_port_exception(self) -> None: | |
| """Test finding processes handles exceptions.""" | |
| with ( | |
| patch("platform.system", return_value="Linux"), | |
| patch("subprocess.run", side_effect=Exception("fail")), | |
| ): | |
| result = _find_processes_on_port(8080) | |
| assert result == [] | |
| class TestKillProcessHelper: | |
| """Tests for _kill_process helper function.""" | |
| def test_kill_process_linux_success(self) -> None: | |
| """Test killing process on Linux with SIGTERM.""" | |
| from api_utils.routers.ports import _kill_process | |
| with ( | |
| patch("platform.system", return_value="Linux"), | |
| patch("subprocess.run") as mock_run, | |
| ): | |
| # First call is SIGTERM, second is check (process gone) | |
| mock_run.side_effect = [ | |
| MagicMock(returncode=0), # SIGTERM | |
| MagicMock(returncode=1), # Check shows process gone | |
| ] | |
| with patch("time.sleep"): | |
| success, msg = _kill_process(1234) | |
| assert success is True | |
| def test_kill_process_linux_force_kill(self) -> None: | |
| """Test killing process on Linux requires SIGKILL.""" | |
| from api_utils.routers.ports import _kill_process | |
| with ( | |
| patch("platform.system", return_value="Linux"), | |
| patch("subprocess.run") as mock_run, | |
| ): | |
| # SIGTERM doesn't work, SIGKILL does | |
| mock_run.side_effect = [ | |
| MagicMock(returncode=0), # SIGTERM | |
| MagicMock(returncode=0), # Check shows still alive | |
| MagicMock(returncode=0), # SIGKILL | |
| MagicMock(returncode=1), # Check shows process gone | |
| ] | |
| with patch("time.sleep"): | |
| success, msg = _kill_process(1234) | |
| assert success is True | |
| assert "SIGKILL" in msg or "force" in msg.lower() | |
| def test_kill_process_unsupported_os(self) -> None: | |
| """Test killing process on unsupported OS.""" | |
| from api_utils.routers.ports import _kill_process | |
| with patch("platform.system", return_value="FreeBSD"): | |
| success, msg = _kill_process(1234) | |
| assert success is False | |
| def test_kill_process_exception(self) -> None: | |
| """Test killing process handles exceptions.""" | |
| from api_utils.routers.ports import _kill_process | |
| with ( | |
| patch("platform.system", return_value="Linux"), | |
| patch("subprocess.run", side_effect=Exception("permission denied")), | |
| ): | |
| success, msg = _kill_process(1234) | |
| assert success is False | |
| assert "permission denied" in msg.lower() or "error" in msg.lower() | |
| class TestWindowsPlatform: | |
| """Tests for Windows-specific code paths.""" | |
| def test_find_processes_windows_success(self) -> None: | |
| """Test _find_processes_on_port on Windows with successful netstat parsing.""" | |
| with ( | |
| patch("api_utils.routers.ports.platform.system", return_value="Windows"), | |
| patch("api_utils.routers.ports.subprocess.run") as mock_run, | |
| patch( | |
| "api_utils.routers.ports.subprocess.CREATE_NO_WINDOW", | |
| 0x08000000, | |
| create=True, | |
| ), | |
| patch( | |
| "api_utils.routers.ports._get_process_name", return_value="python.exe" | |
| ), | |
| ): | |
| # Simulate netstat -ano output (actual netstat format without header) | |
| mock_run.return_value = MagicMock( | |
| returncode=0, | |
| stdout=( | |
| "TCP 0.0.0.0:8080 0.0.0.0:0 LISTENING 1234\n" | |
| "TCP 0.0.0.0:443 0.0.0.0:0 LISTENING 5678\n" | |
| ), | |
| ) | |
| result = _find_processes_on_port(8080) | |
| assert len(result) == 1 | |
| assert result[0].pid == 1234 | |
| assert result[0].name == "python.exe" | |
| def test_find_processes_windows_no_match(self) -> None: | |
| """Test _find_processes_on_port on Windows when port not found.""" | |
| with ( | |
| patch("api_utils.routers.ports.platform.system", return_value="Windows"), | |
| patch("api_utils.routers.ports.subprocess.run") as mock_run, | |
| patch( | |
| "api_utils.routers.ports.subprocess.CREATE_NO_WINDOW", | |
| 0x08000000, | |
| create=True, | |
| ), | |
| ): | |
| mock_run.return_value = MagicMock( | |
| returncode=0, | |
| stdout=( | |
| " Proto Local Address Foreign Address State PID\n" | |
| " TCP 0.0.0.0:443 0.0.0.0:0 LISTENING 5678\n" | |
| ), | |
| ) | |
| result = _find_processes_on_port(8080) | |
| assert len(result) == 0 | |
| def test_find_processes_windows_multiple(self) -> None: | |
| """Test _find_processes_on_port on Windows with same port multiple PIDs.""" | |
| with ( | |
| patch("api_utils.routers.ports.platform.system", return_value="Windows"), | |
| patch("api_utils.routers.ports.subprocess.run") as mock_run, | |
| patch( | |
| "api_utils.routers.ports.subprocess.CREATE_NO_WINDOW", | |
| 0x08000000, | |
| create=True, | |
| ), | |
| patch("api_utils.routers.ports._get_process_name", return_value="node.exe"), | |
| ): | |
| mock_run.return_value = MagicMock( | |
| returncode=0, | |
| stdout=( | |
| " Proto Local Address Foreign Address State PID\n" | |
| " TCP 0.0.0.0:8080 0.0.0.0:0 LISTENING 1234\n" | |
| " TCP 127.0.0.1:8080 0.0.0.0:0 LISTENING 1234\n" | |
| ), | |
| ) | |
| result = _find_processes_on_port(8080) | |
| # Should deduplicate by PID | |
| assert len(result) == 1 | |
| def test_get_process_name_windows_success(self) -> None: | |
| """Test _get_process_name on Windows with tasklist.""" | |
| with ( | |
| patch("api_utils.routers.ports.platform.system", return_value="Windows"), | |
| patch("api_utils.routers.ports.subprocess.run") as mock_run, | |
| patch( | |
| "api_utils.routers.ports.subprocess.CREATE_NO_WINDOW", | |
| 0x08000000, | |
| create=True, | |
| ), | |
| ): | |
| # tasklist /NH /FO CSV output format | |
| mock_run.return_value = MagicMock( | |
| returncode=0, stdout='"python.exe","1234","Console","1","10,000 K"\n' | |
| ) | |
| name = _get_process_name(1234) | |
| assert name == "python.exe" | |
| def test_get_process_name_windows_empty(self) -> None: | |
| """Test _get_process_name on Windows with empty result.""" | |
| with ( | |
| patch("api_utils.routers.ports.platform.system", return_value="Windows"), | |
| patch("api_utils.routers.ports.subprocess.run") as mock_run, | |
| patch( | |
| "api_utils.routers.ports.subprocess.CREATE_NO_WINDOW", | |
| 0x08000000, | |
| create=True, | |
| ), | |
| ): | |
| mock_run.return_value = MagicMock(returncode=0, stdout="") | |
| name = _get_process_name(1234) | |
| assert name == "Unknown" | |
| def test_get_process_name_windows_failure(self) -> None: | |
| """Test _get_process_name on Windows with non-zero return code.""" | |
| with ( | |
| patch("api_utils.routers.ports.platform.system", return_value="Windows"), | |
| patch("api_utils.routers.ports.subprocess.run") as mock_run, | |
| patch( | |
| "api_utils.routers.ports.subprocess.CREATE_NO_WINDOW", | |
| 0x08000000, | |
| create=True, | |
| ), | |
| ): | |
| mock_run.return_value = MagicMock(returncode=1, stdout="") | |
| name = _get_process_name(1234) | |
| assert name == "Unknown" | |
| def test_kill_process_windows_success(self) -> None: | |
| """Test _kill_process on Windows with taskkill success.""" | |
| from api_utils.routers.ports import _kill_process | |
| with ( | |
| patch("api_utils.routers.ports.platform.system", return_value="Windows"), | |
| patch("api_utils.routers.ports.subprocess.run") as mock_run, | |
| patch( | |
| "api_utils.routers.ports.subprocess.CREATE_NO_WINDOW", | |
| 0x08000000, | |
| create=True, | |
| ), | |
| ): | |
| mock_run.return_value = MagicMock(returncode=0, stderr="") | |
| success, msg = _kill_process(1234) | |
| assert success is True | |
| assert "1234" in msg | |
| def test_kill_process_windows_failure(self) -> None: | |
| """Test _kill_process on Windows with taskkill failure.""" | |
| from api_utils.routers.ports import _kill_process | |
| with ( | |
| patch("api_utils.routers.ports.platform.system", return_value="Windows"), | |
| patch("api_utils.routers.ports.subprocess.run") as mock_run, | |
| patch( | |
| "api_utils.routers.ports.subprocess.CREATE_NO_WINDOW", | |
| 0x08000000, | |
| create=True, | |
| ), | |
| ): | |
| mock_run.return_value = MagicMock( | |
| returncode=1, stderr="Access denied or process not found" | |
| ) | |
| success, msg = _kill_process(1234) | |
| assert success is False | |
| assert "1234" in msg or "Failed" in msg or "Unable" in msg | |
| def test_kill_process_linux_cannot_kill(self) -> None: | |
| """Test _kill_process on Linux when process survives SIGKILL.""" | |
| from api_utils.routers.ports import _kill_process | |
| with ( | |
| patch("api_utils.routers.ports.platform.system", return_value="Linux"), | |
| patch("api_utils.routers.ports.subprocess.run") as mock_run, | |
| ): | |
| # Process survives both SIGTERM and SIGKILL | |
| mock_run.side_effect = [ | |
| MagicMock(returncode=0), # SIGTERM | |
| MagicMock(returncode=0), # Check shows still alive | |
| MagicMock(returncode=0), # SIGKILL | |
| MagicMock(returncode=0), # Check shows still alive | |
| ] | |
| with patch("time.sleep"): | |
| success, msg = _kill_process(1234) | |
| assert success is False | |
| assert "Unable" in msg or "1234" in msg | |