AIstudioProxyAPI / tests /conftest.py
peijun1's picture
Deploy AI Studio Proxy API to Hugging Face Spaces
a5784e9
Raw
History Blame Contribute Delete
14.1 kB
import asyncio
import sys
import types
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# Note: Session-level cleanup hooks removed as they cause recursion issues on Windows
# Rely on pytest-asyncio's natural cleanup and explicit fixture teardown instead
@pytest.fixture(autouse=True)
def global_mock_error_snapshots():
"""
Global autouse fixture to prevent tests from creating real error snapshots.
This prevents the tests from flooding the errors_py directory with
test-generated error snapshots. The mock is applied at the lowest level
(debug_utils.save_error_snapshot_enhanced) to catch all calls.
"""
with patch(
"browser_utils.debug_utils.save_error_snapshot_enhanced",
new_callable=AsyncMock,
):
yield
@pytest.fixture(autouse=True)
def reset_global_state():
"""Reset GlobalState and ServerState before each test to ensure isolation."""
from api_utils.server_state import state
from config.global_state import GlobalState
GlobalState.reset_quota_status()
GlobalState.IS_RECOVERING = False
GlobalState.DEPLOYMENT_EMERGENCY_MODE = False
GlobalState.CURRENT_STREAM_REQ_ID = None
GlobalState.queued_request_count = 0
GlobalState.AUTH_ROTATION_LOCK.set() # Allow requests by default
GlobalState.QUOTA_EXCEEDED_EVENT.clear()
GlobalState.rotation_complete_event.clear()
GlobalState.RECOVERY_EVENT.set()
GlobalState.IS_SHUTTING_DOWN.clear()
state.reset()
yield
@pytest.fixture(autouse=True)
def mock_server_module():
"""Mock the server module to prevent import errors and provide global state."""
module_name = "server"
# Create a mock module
mock_module = types.ModuleType(module_name)
# Set up required attributes (using setattr for dynamic module attributes)
setattr(mock_module, "logger", MagicMock())
setattr(mock_module, "request_queue", asyncio.Queue())
setattr(mock_module, "processing_lock", asyncio.Lock())
setattr(mock_module, "model_switching_lock", asyncio.Lock())
setattr(mock_module, "params_cache_lock", asyncio.Lock())
# page_instance needs is_closed() as sync method
page_mock = AsyncMock()
page_mock.is_closed = MagicMock(return_value=False)
setattr(mock_module, "page_instance", page_mock)
# browser_instance needs is_connected() as sync method
browser_mock = AsyncMock()
browser_mock.is_connected = MagicMock(return_value=True)
setattr(mock_module, "browser_instance", browser_mock)
setattr(mock_module, "parsed_model_list", [])
setattr(mock_module, "log_ws_manager", MagicMock())
setattr(mock_module, "STREAM_QUEUE", MagicMock())
setattr(mock_module, "STREAM_PROCESS", AsyncMock())
setattr(mock_module, "PLAYWRIGHT_PROXY_SETTINGS", {})
setattr(mock_module, "is_initializing", False)
setattr(mock_module, "is_page_ready", True)
setattr(mock_module, "is_browser_connected", True)
setattr(mock_module, "model_list_fetch_event", asyncio.Event())
setattr(mock_module, "worker_task", MagicMock())
setattr(mock_module, "queue_worker", AsyncMock())
setattr(mock_module, "playwright_manager", AsyncMock())
setattr(mock_module, "global_model_list_raw_json", None)
setattr(mock_module, "current_ai_studio_model_id", None)
setattr(mock_module, "is_playwright_ready", True)
setattr(mock_module, "excluded_model_ids", [])
setattr(mock_module, "console_logs", [])
setattr(mock_module, "network_log", {"requests": [], "responses": []})
# Save original if it exists
original_module = sys.modules.get(module_name)
# Inject mock
sys.modules[module_name] = mock_module
yield mock_module
# Restore original or clean up
if original_module:
sys.modules[module_name] = original_module
else:
sys.modules.pop(module_name, None)
@pytest.fixture
def mock_env(monkeypatch):
"""Mock environment variables."""
monkeypatch.setenv("LAUNCH_MODE", "test")
monkeypatch.setenv("STREAM_PORT", "0")
monkeypatch.setenv("PORT", "2048")
@pytest.fixture
def mock_page():
"""Mock Playwright Page object with proper locator setup.
NOTE: page.locator() is SYNC and returns a Locator object.
Locator methods like .click(), .hover(), .wait_for() are ASYNC.
Locator chaining methods like .get_by_label(), .get_by_role() are SYNC.
"""
page = AsyncMock()
page.goto = AsyncMock()
page.wait_for_selector = AsyncMock()
page.click = AsyncMock()
page.fill = AsyncMock()
page.evaluate = AsyncMock()
# Create a default locator with proper async/sync method setup
default_locator = MagicMock()
# Async action methods
default_locator.hover = AsyncMock()
default_locator.click = AsyncMock()
default_locator.fill = AsyncMock()
default_locator.wait_for = AsyncMock()
default_locator.inner_text = AsyncMock()
default_locator.text_content = AsyncMock()
default_locator.get_attribute = AsyncMock()
default_locator.input_value = AsyncMock()
default_locator.is_visible = AsyncMock()
default_locator.is_disabled = AsyncMock()
default_locator.count = AsyncMock(return_value=1) # Default to element exists
# Sync chaining methods (these should return new locators but we'll keep it simple)
default_locator.get_by_label = MagicMock(return_value=default_locator)
default_locator.get_by_role = MagicMock(return_value=default_locator)
default_locator.locator = MagicMock(return_value=default_locator)
# .first and .last properties return the same locator
default_locator.first = default_locator
default_locator.last = default_locator
# page.locator() is SYNC (returns Locator immediately) - use return_value so tests can override
page.locator = MagicMock(return_value=default_locator)
# page.get_by_role() is also SYNC
page.get_by_role = MagicMock(return_value=default_locator)
# page.is_closed() is SYNC in Playwright
page.is_closed = MagicMock(return_value=False)
# page.on() and page.remove_listener() are SYNC
page.on = MagicMock()
page.remove_listener = MagicMock()
return page
@pytest.fixture
def mock_browser_context(mock_page):
"""Mock Playwright BrowserContext."""
context = AsyncMock()
context.new_page.return_value = mock_page
return context
@pytest.fixture
def mock_browser(mock_browser_context):
"""Mock Playwright Browser."""
browser = AsyncMock()
browser.new_context.return_value = mock_browser_context
return browser
@pytest.fixture
def mock_playwright(mock_browser):
"""Mock Playwright object."""
playwright = AsyncMock()
playwright.chromium.launch.return_value = mock_browser
playwright.firefox.launch.return_value = mock_browser
return playwright
# ==================== New Fixtures for Improved Testing ====================
@pytest.fixture
def mock_playwright_stack():
"""
Factory fixture providing consistent Playwright mock stack.
Returns a tuple of (playwright, browser, context, page) with all common
methods pre-configured. This replaces the need to use multiple separate
fixtures (mock_playwright, mock_browser, mock_browser_context, mock_page).
Example:
def test_something(mock_playwright_stack):
playwright, browser, context, page = mock_playwright_stack
# Use page.goto, page.click, etc. - all pre-configured
"""
from playwright.async_api import Page as AsyncPage
# Create mock page with all common methods
page = AsyncMock(spec=AsyncPage)
page.goto = AsyncMock()
page.wait_for_selector = AsyncMock()
page.wait_for_load_state = AsyncMock()
page.click = AsyncMock()
page.fill = AsyncMock()
page.evaluate = AsyncMock(return_value="[]")
page.locator = MagicMock(return_value=MagicMock())
page.query_selector = AsyncMock(return_value=MagicMock())
page.query_selector_all = AsyncMock(return_value=[])
page.content = AsyncMock(return_value="<html></html>")
page.url = "https://aistudio.google.com/app/prompts/new_chat"
# Create mock context
context = AsyncMock()
context.new_page = AsyncMock(return_value=page)
context.close = AsyncMock()
# Create mock browser
browser = AsyncMock()
browser.new_context = AsyncMock(return_value=context)
browser.close = AsyncMock()
# Create mock playwright
playwright = AsyncMock()
playwright.firefox.launch = AsyncMock(return_value=browser)
playwright.chromium.launch = AsyncMock(return_value=browser)
return playwright, browser, context, page
@pytest.fixture
def make_chat_request():
"""
Factory fixture for creating ChatCompletionRequest instances with custom parameters.
This allows tests to create multiple request objects with different configurations
without repeating boilerplate code.
Args:
model: Model ID (default: "gemini-1.5-pro")
stream: Whether to stream response (default: False)
**kwargs: Additional fields to override (temperature, max_tokens, etc.)
Returns:
Function that creates ChatCompletionRequest instances
Example:
def test_streaming(make_chat_request):
request1 = make_chat_request(stream=True)
request2 = make_chat_request(model="gemini-1.5-flash", temperature=0.5)
"""
from models import ChatCompletionRequest, Message
def _make(model: str = "gemini-1.5-pro", stream: bool = False, **kwargs):
default_request = {
"model": model,
"messages": [Message(role="user", content="Test message")],
"stream": stream,
"temperature": 1.0,
"max_tokens": 8192,
}
return ChatCompletionRequest(**{**default_request, **kwargs})
return _make
@pytest.fixture
def make_request_context(mock_playwright_stack):
"""
Factory fixture for creating request context dictionaries with customizable fields.
This provides a realistic RequestContext with sane defaults that can be overridden
for specific test scenarios. Uses mock browser from mock_playwright_stack.
Args:
**overrides: Fields to override in the context
Returns:
Function that creates RequestContext dicts
Example:
def test_context(make_request_context):
ctx = make_request_context(req_id="custom-id", is_page_ready=False)
assert ctx["req_id"] == "custom-id"
"""
import logging
_, _, _, page = mock_playwright_stack
def _make(**overrides):
# Import here to avoid circular imports
from api_utils.server_state import state
default_context = {
"req_id": "test-req",
"page": page,
"logger": logging.getLogger("test"),
"is_page_ready": True,
"parsed_model_list": [],
"current_ai_studio_model_id": "gemini-1.5-pro",
"model_switching_lock": state.model_switching_lock,
"params_cache_lock": state.params_cache_lock,
"page_params_cache": {},
# Additional RequestContext fields (from context_types.py)
"is_streaming": False,
"model_actually_switched": False,
"requested_model": "gemini-1.5-pro",
"model_id_to_use": None,
"needs_model_switching": False,
}
return {**default_context, **overrides}
return _make
@pytest.fixture
def real_locks_mock_browser():
"""
Hybrid fixture providing real asyncio primitives + mock browser boundaries.
Use this for tests that need real lock/queue behavior but don't need a real browser.
This is useful for testing concurrency without the overhead of integration tests.
Provides:
- REAL asyncio.Lock instances (processing_lock, model_switching_lock, params_cache_lock)
- REAL asyncio.Queue (request_queue)
- MOCK browser/page (external I/O boundaries)
Use when:
- Testing lock contention and mutual exclusion
- Testing queue processing without full integration
- Need real async behavior but not real browser
Don't use when:
- Testing pure logic (use regular fixtures)
- Testing full request flow (use real_server_state from integration conftest)
Example:
async def test_lock_behavior(real_locks_mock_browser):
async with real_locks_mock_browser.processing_lock:
# This actually blocks other tasks
await some_operation()
"""
from api_utils.server_state import state
# Reset state to clean slate
state.reset()
# Create REAL asyncio primitives
state.processing_lock = asyncio.Lock()
state.model_switching_lock = asyncio.Lock()
state.params_cache_lock = asyncio.Lock()
state.request_queue = asyncio.Queue()
# Mock only external boundaries (browser/page - these are I/O)
mock_page = AsyncMock()
mock_page.goto = AsyncMock()
mock_page.wait_for_selector = AsyncMock()
mock_page.click = AsyncMock()
mock_page.fill = AsyncMock()
mock_page.evaluate = AsyncMock()
mock_page.locator = MagicMock(return_value=MagicMock())
mock_page.is_closed = MagicMock(return_value=False) # Page is open
mock_browser = AsyncMock()
mock_browser.close = AsyncMock()
state.page_instance = mock_page
state.browser_instance = mock_browser
state.is_page_ready = True
state.is_browser_connected = True
yield state
# Cleanup: Clear queue and reset state
while not state.request_queue.empty():
try:
state.request_queue.get_nowait()
state.request_queue.task_done()
except asyncio.QueueEmpty:
break
state.reset()
def pytest_sessionfinish(session, exitstatus):
"""
Clean up any remaining multiprocessing children to prevent hangs.
This is especially important in CI environments.
"""
import multiprocessing
for child in multiprocessing.active_children():
child.terminate()
child.join(timeout=1.0)