peijun1's picture
Deploy AI Studio Proxy API to Hugging Face Spaces
a5784e9
Raw
History Blame Contribute Delete
21.5 kB
"""
Tests for Ports Configuration API Router
Covers: GET/POST /api/ports/config, GET /api/ports/status, POST /api/ports/kill
"""
import json
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from api_utils.routers.ports import (
KillRequest,
PortConfig,
PortStatus,
ProcessInfo,
_find_processes_on_port,
_get_process_name,
router,
)
@pytest.fixture
def app() -> FastAPI:
"""Create test FastAPI app with ports router."""
app = FastAPI()
app.include_router(router)
return app
@pytest.fixture
def client(app: FastAPI) -> TestClient:
"""Create test client."""
return TestClient(app)
class TestPortModels:
"""Tests for port models."""
def test_port_config_model(self) -> None:
"""Test PortConfig model with defaults."""
config = PortConfig()
assert config.fastapi_port == 2048
assert config.camoufox_debug_port == 9222
assert config.stream_proxy_port == 3120
assert config.stream_proxy_enabled is True
def test_port_config_validation(self) -> None:
"""Test PortConfig validates port ranges."""
with pytest.raises(ValueError):
PortConfig(fastapi_port=80) # Below 1024
# Valid range
config = PortConfig(fastapi_port=8080)
assert config.fastapi_port == 8080
def test_process_info_model(self) -> None:
"""Test ProcessInfo model."""
proc = ProcessInfo(pid=1234, name="python")
assert proc.pid == 1234
assert proc.name == "python"
def test_port_status_model(self) -> None:
"""Test PortStatus model."""
status = PortStatus(
port=8080,
port_type="FastAPI",
in_use=True,
processes=[ProcessInfo(pid=1234, name="python")],
)
assert status.in_use is True
assert len(status.processes) == 1
def test_kill_request_model(self) -> None:
"""Test KillRequest model."""
req = KillRequest(pid=1234, confirm=True)
assert req.pid == 1234
assert req.confirm is True
class TestGetPortConfig:
"""Tests for GET /api/ports/config endpoint."""
def test_get_port_config_defaults(self, client: TestClient, tmp_path: Path) -> None:
"""Test getting config when no file exists."""
with patch(
"api_utils.routers.ports._PORTS_CONFIG_FILE", tmp_path / "nonexistent.json"
):
response = client.get("/api/ports/config")
assert response.status_code == 200
data = response.json()
assert "fastapi_port" in data
assert "camoufox_debug_port" in data
def test_get_port_config_from_file(
self, client: TestClient, tmp_path: Path
) -> None:
"""Test getting config from saved file."""
config_file = tmp_path / "ports_config.json"
config_file.write_text(
json.dumps(
{
"fastapi_port": 9999,
"camoufox_debug_port": 9223,
"stream_proxy_port": 3121,
"stream_proxy_enabled": False,
}
)
)
with patch("api_utils.routers.ports._PORTS_CONFIG_FILE", config_file):
response = client.get("/api/ports/config")
assert response.status_code == 200
data = response.json()
assert data["fastapi_port"] == 9999
class TestUpdatePortConfig:
"""Tests for POST /api/ports/config endpoint."""
def test_update_port_config(self, client: TestClient, tmp_path: Path) -> None:
"""Test updating port configuration."""
config_file = tmp_path / "ports_config.json"
with patch("api_utils.routers.ports._PORTS_CONFIG_FILE", config_file):
response = client.post(
"/api/ports/config",
json={
"fastapi_port": 8080,
"camoufox_debug_port": 9000,
"stream_proxy_port": 3200,
"stream_proxy_enabled": True,
},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["config"]["fastapi_port"] == 8080
assert "restart" in data["message"].lower()
def test_update_port_config_invalid(self, client: TestClient) -> None:
"""Test updating with invalid port."""
response = client.post(
"/api/ports/config",
json={
"fastapi_port": 80, # Invalid
"camoufox_debug_port": 9222,
"stream_proxy_port": 3120,
"stream_proxy_enabled": True,
},
)
assert response.status_code == 422
class TestPortStatus:
"""Tests for GET /api/ports/status endpoint."""
def test_get_port_status(self, client: TestClient) -> None:
"""Test getting port status."""
with patch("api_utils.routers.ports._find_processes_on_port", return_value=[]):
response = client.get("/api/ports/status")
assert response.status_code == 200
data = response.json()
assert "ports" in data
assert len(data["ports"]) >= 2 # At least FastAPI and Camoufox
def test_get_port_status_with_processes(self, client: TestClient) -> None:
"""Test port status shows active processes."""
mock_processes = [ProcessInfo(pid=1234, name="python")]
with patch(
"api_utils.routers.ports._find_processes_on_port",
return_value=mock_processes,
):
response = client.get("/api/ports/status")
assert response.status_code == 200
data = response.json()
# At least one port should show in_use
in_use_ports = [p for p in data["ports"] if p["in_use"]]
assert len(in_use_ports) > 0
class TestKillProcess:
"""Tests for POST /api/ports/kill endpoint."""
def test_kill_process_requires_confirmation(self, client: TestClient) -> None:
"""Test kill endpoint requires confirmation."""
response = client.post(
"/api/ports/kill",
json={"pid": 1234, "confirm": False},
)
assert response.status_code == 400
def test_kill_process_with_confirmation(self, client: TestClient) -> None:
"""Test kill process with confirmation when PID is on a tracked port."""
# Must mock _find_processes_on_port to return a process with matching PID
mock_processes = [ProcessInfo(pid=1234, name="python")]
with (
patch(
"api_utils.routers.ports._find_processes_on_port",
return_value=mock_processes,
),
patch(
"api_utils.routers.ports._kill_process",
return_value=(True, "Process terminated"),
),
):
response = client.post(
"/api/ports/kill",
json={"pid": 1234, "confirm": True},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
def test_kill_process_unauthorized_pid(self, client: TestClient) -> None:
"""Test kill process fails for PID not on tracked ports (security check)."""
# Mock _find_processes_on_port to return empty (PID not on any port)
with patch(
"api_utils.routers.ports._find_processes_on_port",
return_value=[],
):
response = client.post(
"/api/ports/kill",
json={"pid": 9999, "confirm": True},
)
assert response.status_code == 403
data = response.json()
assert "9999" in data["detail"]
def test_kill_process_failure(self, client: TestClient) -> None:
"""Test kill process failure when kill actually fails."""
# Mock: PID is on tracked port, but kill fails
mock_processes = [ProcessInfo(pid=1234, name="python")]
with (
patch(
"api_utils.routers.ports._find_processes_on_port",
return_value=mock_processes,
),
patch(
"api_utils.routers.ports._kill_process",
return_value=(False, "Failed to terminate process"),
),
):
response = client.post(
"/api/ports/kill",
json={"pid": 1234, "confirm": True},
)
assert response.status_code == 500
data = response.json()
assert data["success"] is False
class TestHelperFunctions:
"""Tests for helper functions."""
def test_find_processes_on_port_empty(self) -> None:
"""Test finding processes on unused port."""
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=1, stdout="")
result = _find_processes_on_port(59999) # Unlikely to be in use
assert isinstance(result, list)
def test_get_process_name_unknown(self) -> None:
"""Test getting name for non-existent PID."""
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=1, stdout="")
name = _get_process_name(999999) # Non-existent PID
assert name == "Unknown"
def test_find_processes_on_port_linux(self) -> None:
"""Test finding processes on Linux."""
with (
patch("platform.system", return_value="Linux"),
patch("subprocess.run") as mock_run,
):
mock_run.return_value = MagicMock(returncode=0, stdout="1234\n5678\n")
with patch.object(
__import__("api_utils.routers.ports", fromlist=["_get_process_name"]),
"_get_process_name",
return_value="python",
):
result = _find_processes_on_port(8080)
assert isinstance(result, list)
def test_get_process_name_linux(self) -> None:
"""Test getting process name on Linux."""
with (
patch("platform.system", return_value="Linux"),
patch("subprocess.run") as mock_run,
):
mock_run.return_value = MagicMock(returncode=0, stdout="python3\n")
name = _get_process_name(1234)
assert name == "python3"
def test_get_process_name_darwin(self) -> None:
"""Test getting process name on macOS."""
with (
patch("platform.system", return_value="Darwin"),
patch("subprocess.run") as mock_run,
):
mock_run.return_value = MagicMock(returncode=0, stdout="node\n")
name = _get_process_name(1234)
assert name == "node"
def test_find_processes_on_port_exception(self) -> None:
"""Test finding processes handles exceptions."""
with (
patch("platform.system", return_value="Linux"),
patch("subprocess.run", side_effect=Exception("fail")),
):
result = _find_processes_on_port(8080)
assert result == []
class TestKillProcessHelper:
"""Tests for _kill_process helper function."""
def test_kill_process_linux_success(self) -> None:
"""Test killing process on Linux with SIGTERM."""
from api_utils.routers.ports import _kill_process
with (
patch("platform.system", return_value="Linux"),
patch("subprocess.run") as mock_run,
):
# First call is SIGTERM, second is check (process gone)
mock_run.side_effect = [
MagicMock(returncode=0), # SIGTERM
MagicMock(returncode=1), # Check shows process gone
]
with patch("time.sleep"):
success, msg = _kill_process(1234)
assert success is True
def test_kill_process_linux_force_kill(self) -> None:
"""Test killing process on Linux requires SIGKILL."""
from api_utils.routers.ports import _kill_process
with (
patch("platform.system", return_value="Linux"),
patch("subprocess.run") as mock_run,
):
# SIGTERM doesn't work, SIGKILL does
mock_run.side_effect = [
MagicMock(returncode=0), # SIGTERM
MagicMock(returncode=0), # Check shows still alive
MagicMock(returncode=0), # SIGKILL
MagicMock(returncode=1), # Check shows process gone
]
with patch("time.sleep"):
success, msg = _kill_process(1234)
assert success is True
assert "SIGKILL" in msg or "force" in msg.lower()
def test_kill_process_unsupported_os(self) -> None:
"""Test killing process on unsupported OS."""
from api_utils.routers.ports import _kill_process
with patch("platform.system", return_value="FreeBSD"):
success, msg = _kill_process(1234)
assert success is False
def test_kill_process_exception(self) -> None:
"""Test killing process handles exceptions."""
from api_utils.routers.ports import _kill_process
with (
patch("platform.system", return_value="Linux"),
patch("subprocess.run", side_effect=Exception("permission denied")),
):
success, msg = _kill_process(1234)
assert success is False
assert "permission denied" in msg.lower() or "error" in msg.lower()
class TestWindowsPlatform:
"""Tests for Windows-specific code paths."""
def test_find_processes_windows_success(self) -> None:
"""Test _find_processes_on_port on Windows with successful netstat parsing."""
with (
patch("api_utils.routers.ports.platform.system", return_value="Windows"),
patch("api_utils.routers.ports.subprocess.run") as mock_run,
patch(
"api_utils.routers.ports.subprocess.CREATE_NO_WINDOW",
0x08000000,
create=True,
),
patch(
"api_utils.routers.ports._get_process_name", return_value="python.exe"
),
):
# Simulate netstat -ano output (actual netstat format without header)
mock_run.return_value = MagicMock(
returncode=0,
stdout=(
"TCP 0.0.0.0:8080 0.0.0.0:0 LISTENING 1234\n"
"TCP 0.0.0.0:443 0.0.0.0:0 LISTENING 5678\n"
),
)
result = _find_processes_on_port(8080)
assert len(result) == 1
assert result[0].pid == 1234
assert result[0].name == "python.exe"
def test_find_processes_windows_no_match(self) -> None:
"""Test _find_processes_on_port on Windows when port not found."""
with (
patch("api_utils.routers.ports.platform.system", return_value="Windows"),
patch("api_utils.routers.ports.subprocess.run") as mock_run,
patch(
"api_utils.routers.ports.subprocess.CREATE_NO_WINDOW",
0x08000000,
create=True,
),
):
mock_run.return_value = MagicMock(
returncode=0,
stdout=(
" Proto Local Address Foreign Address State PID\n"
" TCP 0.0.0.0:443 0.0.0.0:0 LISTENING 5678\n"
),
)
result = _find_processes_on_port(8080)
assert len(result) == 0
def test_find_processes_windows_multiple(self) -> None:
"""Test _find_processes_on_port on Windows with same port multiple PIDs."""
with (
patch("api_utils.routers.ports.platform.system", return_value="Windows"),
patch("api_utils.routers.ports.subprocess.run") as mock_run,
patch(
"api_utils.routers.ports.subprocess.CREATE_NO_WINDOW",
0x08000000,
create=True,
),
patch("api_utils.routers.ports._get_process_name", return_value="node.exe"),
):
mock_run.return_value = MagicMock(
returncode=0,
stdout=(
" Proto Local Address Foreign Address State PID\n"
" TCP 0.0.0.0:8080 0.0.0.0:0 LISTENING 1234\n"
" TCP 127.0.0.1:8080 0.0.0.0:0 LISTENING 1234\n"
),
)
result = _find_processes_on_port(8080)
# Should deduplicate by PID
assert len(result) == 1
def test_get_process_name_windows_success(self) -> None:
"""Test _get_process_name on Windows with tasklist."""
with (
patch("api_utils.routers.ports.platform.system", return_value="Windows"),
patch("api_utils.routers.ports.subprocess.run") as mock_run,
patch(
"api_utils.routers.ports.subprocess.CREATE_NO_WINDOW",
0x08000000,
create=True,
),
):
# tasklist /NH /FO CSV output format
mock_run.return_value = MagicMock(
returncode=0, stdout='"python.exe","1234","Console","1","10,000 K"\n'
)
name = _get_process_name(1234)
assert name == "python.exe"
def test_get_process_name_windows_empty(self) -> None:
"""Test _get_process_name on Windows with empty result."""
with (
patch("api_utils.routers.ports.platform.system", return_value="Windows"),
patch("api_utils.routers.ports.subprocess.run") as mock_run,
patch(
"api_utils.routers.ports.subprocess.CREATE_NO_WINDOW",
0x08000000,
create=True,
),
):
mock_run.return_value = MagicMock(returncode=0, stdout="")
name = _get_process_name(1234)
assert name == "Unknown"
def test_get_process_name_windows_failure(self) -> None:
"""Test _get_process_name on Windows with non-zero return code."""
with (
patch("api_utils.routers.ports.platform.system", return_value="Windows"),
patch("api_utils.routers.ports.subprocess.run") as mock_run,
patch(
"api_utils.routers.ports.subprocess.CREATE_NO_WINDOW",
0x08000000,
create=True,
),
):
mock_run.return_value = MagicMock(returncode=1, stdout="")
name = _get_process_name(1234)
assert name == "Unknown"
def test_kill_process_windows_success(self) -> None:
"""Test _kill_process on Windows with taskkill success."""
from api_utils.routers.ports import _kill_process
with (
patch("api_utils.routers.ports.platform.system", return_value="Windows"),
patch("api_utils.routers.ports.subprocess.run") as mock_run,
patch(
"api_utils.routers.ports.subprocess.CREATE_NO_WINDOW",
0x08000000,
create=True,
),
):
mock_run.return_value = MagicMock(returncode=0, stderr="")
success, msg = _kill_process(1234)
assert success is True
assert "1234" in msg
def test_kill_process_windows_failure(self) -> None:
"""Test _kill_process on Windows with taskkill failure."""
from api_utils.routers.ports import _kill_process
with (
patch("api_utils.routers.ports.platform.system", return_value="Windows"),
patch("api_utils.routers.ports.subprocess.run") as mock_run,
patch(
"api_utils.routers.ports.subprocess.CREATE_NO_WINDOW",
0x08000000,
create=True,
),
):
mock_run.return_value = MagicMock(
returncode=1, stderr="Access denied or process not found"
)
success, msg = _kill_process(1234)
assert success is False
assert "1234" in msg or "Failed" in msg or "Unable" in msg
def test_kill_process_linux_cannot_kill(self) -> None:
"""Test _kill_process on Linux when process survives SIGKILL."""
from api_utils.routers.ports import _kill_process
with (
patch("api_utils.routers.ports.platform.system", return_value="Linux"),
patch("api_utils.routers.ports.subprocess.run") as mock_run,
):
# Process survives both SIGTERM and SIGKILL
mock_run.side_effect = [
MagicMock(returncode=0), # SIGTERM
MagicMock(returncode=0), # Check shows still alive
MagicMock(returncode=0), # SIGKILL
MagicMock(returncode=0), # Check shows still alive
]
with patch("time.sleep"):
success, msg = _kill_process(1234)
assert success is False
assert "Unable" in msg or "1234" in msg