Spaces:
Paused
Paused
| """ | |
| Integration tests for lock behavior and concurrency control. | |
| These tests verify that the lock hierarchy (processing_lock > model_switching_lock > | |
| params_cache_lock) works correctly with REAL asyncio.Lock instances, catching | |
| race conditions that mocked tests cannot detect. | |
| """ | |
| import asyncio | |
| import pytest | |
| async def test_processing_lock_prevents_concurrent_execution(real_server_state): | |
| """ | |
| Verify that processing_lock prevents concurrent browser access. | |
| This test uses REAL asyncio.Lock to ensure that two "requests" cannot | |
| execute simultaneously, preventing race conditions in browser automation. | |
| """ | |
| execution_log = [] | |
| lock = real_server_state.processing_lock | |
| async def simulate_request_processing(request_id: str, delay: float): | |
| """Simulate processing a request with the processing lock.""" | |
| execution_log.append(f"{request_id}_start") | |
| async with lock: | |
| execution_log.append(f"{request_id}_acquired_lock") | |
| # Simulate some async work (browser interaction) | |
| await asyncio.sleep(delay) | |
| execution_log.append(f"{request_id}_releasing_lock") | |
| execution_log.append(f"{request_id}_done") | |
| # Start two tasks concurrently | |
| task1 = asyncio.create_task(simulate_request_processing("req1", 0.1)) | |
| task2 = asyncio.create_task(simulate_request_processing("req2", 0.05)) | |
| await asyncio.gather(task1, task2) | |
| # Verify execution order - req1 should complete entirely before req2 starts | |
| # or vice versa (depends on which acquires lock first) | |
| req1_acquired_idx = execution_log.index("req1_acquired_lock") | |
| req1_releasing_idx = execution_log.index("req1_releasing_lock") | |
| req2_acquired_idx = execution_log.index("req2_acquired_lock") | |
| req2_releasing_idx = execution_log.index("req2_releasing_lock") | |
| # One request must fully complete lock section before other enters | |
| req1_finished_before_req2_started = req1_releasing_idx < req2_acquired_idx | |
| req2_finished_before_req1_started = req2_releasing_idx < req1_acquired_idx | |
| assert req1_finished_before_req2_started or req2_finished_before_req1_started, ( | |
| f"Lock did not prevent concurrent execution. Log: {execution_log}" | |
| ) | |
| async def test_model_switching_lock_serializes_switches(real_server_state): | |
| """ | |
| Verify that model_switching_lock prevents concurrent model changes. | |
| This test ensures that when multiple requests try to switch models, | |
| they are serialized properly and state remains consistent. | |
| """ | |
| lock = real_server_state.model_switching_lock | |
| switch_order = [] | |
| async def simulate_model_switch(request_id: str, target_model: str): | |
| """Simulate switching to a model.""" | |
| async with lock: | |
| switch_order.append(f"{request_id}_start") | |
| # Simulate model switch operation | |
| await asyncio.sleep(0.05) | |
| real_server_state.current_ai_studio_model_id = target_model | |
| switch_order.append(f"{request_id}_done_{target_model}") | |
| # Start three concurrent model switch attempts | |
| tasks = [ | |
| asyncio.create_task(simulate_model_switch("req1", "gemini-1.5-pro")), | |
| asyncio.create_task(simulate_model_switch("req2", "gemini-1.5-flash")), | |
| asyncio.create_task(simulate_model_switch("req3", "gemini-1.0-pro")), | |
| ] | |
| await asyncio.gather(*tasks) | |
| # Verify all switches completed | |
| assert len(switch_order) == 6 # 3 start + 3 done | |
| # Verify switches were serialized (each start followed by its done before next start) | |
| for i in range(0, len(switch_order), 2): | |
| start = switch_order[i] | |
| done = switch_order[i + 1] | |
| # Each start should be immediately followed by its done | |
| req_id = start.split("_")[0] | |
| assert done.startswith(f"{req_id}_done"), ( | |
| f"Switches not serialized: {switch_order}" | |
| ) | |
| # Verify final state is one of the target models | |
| assert real_server_state.current_ai_studio_model_id in [ | |
| "gemini-1.5-pro", | |
| "gemini-1.5-flash", | |
| "gemini-1.0-pro", | |
| ] | |
| async def test_lock_hierarchy_no_deadlock(real_server_state): | |
| """ | |
| Verify that lock hierarchy (processing > model_switching > params_cache) | |
| prevents deadlocks when nested. | |
| This test ensures that locks can be safely nested in the correct order. | |
| """ | |
| processing_lock = real_server_state.processing_lock | |
| model_lock = real_server_state.model_switching_lock | |
| params_lock = real_server_state.params_cache_lock | |
| execution_log = [] | |
| async def nested_lock_operation(operation_id: str): | |
| """Simulate operation that needs multiple locks in hierarchy order.""" | |
| async with processing_lock: | |
| execution_log.append(f"{operation_id}_processing_acquired") | |
| async with model_lock: | |
| execution_log.append(f"{operation_id}_model_acquired") | |
| async with params_lock: | |
| execution_log.append(f"{operation_id}_params_acquired") | |
| await asyncio.sleep(0.01) # Simulate work | |
| execution_log.append(f"{operation_id}_params_released") | |
| execution_log.append(f"{operation_id}_model_released") | |
| execution_log.append(f"{operation_id}_processing_released") | |
| # Run nested lock operations | |
| # If deadlock occurs, this will timeout (pytest-timeout will catch it) | |
| await nested_lock_operation("op1") | |
| await nested_lock_operation("op2") | |
| # Verify both operations completed successfully | |
| assert "op1_params_acquired" in execution_log | |
| assert "op2_params_acquired" in execution_log | |
| assert len(execution_log) == 12 # 6 events per operation | |
| async def test_lock_release_on_exception(real_server_state): | |
| """ | |
| Verify that locks are properly released even when exceptions occur. | |
| This is critical for preventing deadlocks in production. | |
| """ | |
| lock = real_server_state.processing_lock | |
| execution_log = [] | |
| async def failing_operation(op_id: str): | |
| """Operation that acquires lock then fails.""" | |
| try: | |
| async with lock: | |
| execution_log.append(f"{op_id}_acquired") | |
| raise ValueError(f"Simulated error in {op_id}") | |
| except ValueError: | |
| execution_log.append(f"{op_id}_caught_exception") | |
| async def normal_operation(op_id: str): | |
| """Normal operation that acquires lock.""" | |
| async with lock: | |
| execution_log.append(f"{op_id}_acquired") | |
| await asyncio.sleep(0.01) | |
| execution_log.append(f"{op_id}_done") | |
| # First operation fails but should release lock | |
| await failing_operation("fail_op") | |
| # Second operation should be able to acquire lock (not deadlocked) | |
| await normal_operation("success_op") | |
| # Verify both operations ran | |
| assert "fail_op_acquired" in execution_log | |
| assert "fail_op_caught_exception" in execution_log | |
| assert "success_op_acquired" in execution_log | |
| assert "success_op_done" in execution_log | |
| async def test_concurrent_queue_and_lock_access(real_server_state): | |
| """ | |
| Verify that queue and lock work correctly together under concurrent load. | |
| This simulates the real scenario where multiple requests are queued and | |
| processed sequentially due to processing_lock. | |
| """ | |
| queue = real_server_state.request_queue | |
| lock = real_server_state.processing_lock | |
| processed_order = [] | |
| async def producer(num_items: int): | |
| """Add items to queue.""" | |
| for i in range(num_items): | |
| await queue.put({"id": i, "data": f"item_{i}"}) | |
| await asyncio.sleep(0.01) # Simulate arrival rate | |
| async def consumer(consumer_id: str): | |
| """Process items from queue with lock.""" | |
| while True: | |
| try: | |
| item = await asyncio.wait_for(queue.get(), timeout=0.5) | |
| except asyncio.TimeoutError: | |
| break | |
| async with lock: | |
| # Simulate processing | |
| await asyncio.sleep(0.02) | |
| processed_order.append((consumer_id, item["id"])) | |
| queue.task_done() | |
| # Start producer and two consumers concurrently | |
| producer_task = asyncio.create_task(producer(5)) | |
| consumer1_task = asyncio.create_task(consumer("c1")) | |
| consumer2_task = asyncio.create_task(consumer("c2")) | |
| await producer_task | |
| await asyncio.gather(consumer1_task, consumer2_task) | |
| # Verify all items were processed | |
| assert len(processed_order) == 5 | |
| # Verify all items 0-4 were processed | |
| processed_ids = [item_id for _, item_id in processed_order] | |
| assert sorted(processed_ids) == [0, 1, 2, 3, 4] | |
| # Verify queue is empty | |
| assert queue.empty() | |