Spaces:
Paused
Paused
File size: 8,972 Bytes
a5784e9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | """
Integration tests for lock behavior and concurrency control.
These tests verify that the lock hierarchy (processing_lock > model_switching_lock >
params_cache_lock) works correctly with REAL asyncio.Lock instances, catching
race conditions that mocked tests cannot detect.
"""
import asyncio
import pytest
@pytest.mark.integration
@pytest.mark.asyncio
async def test_processing_lock_prevents_concurrent_execution(real_server_state):
"""
Verify that processing_lock prevents concurrent browser access.
This test uses REAL asyncio.Lock to ensure that two "requests" cannot
execute simultaneously, preventing race conditions in browser automation.
"""
execution_log = []
lock = real_server_state.processing_lock
async def simulate_request_processing(request_id: str, delay: float):
"""Simulate processing a request with the processing lock."""
execution_log.append(f"{request_id}_start")
async with lock:
execution_log.append(f"{request_id}_acquired_lock")
# Simulate some async work (browser interaction)
await asyncio.sleep(delay)
execution_log.append(f"{request_id}_releasing_lock")
execution_log.append(f"{request_id}_done")
# Start two tasks concurrently
task1 = asyncio.create_task(simulate_request_processing("req1", 0.1))
task2 = asyncio.create_task(simulate_request_processing("req2", 0.05))
await asyncio.gather(task1, task2)
# Verify execution order - req1 should complete entirely before req2 starts
# or vice versa (depends on which acquires lock first)
req1_acquired_idx = execution_log.index("req1_acquired_lock")
req1_releasing_idx = execution_log.index("req1_releasing_lock")
req2_acquired_idx = execution_log.index("req2_acquired_lock")
req2_releasing_idx = execution_log.index("req2_releasing_lock")
# One request must fully complete lock section before other enters
req1_finished_before_req2_started = req1_releasing_idx < req2_acquired_idx
req2_finished_before_req1_started = req2_releasing_idx < req1_acquired_idx
assert req1_finished_before_req2_started or req2_finished_before_req1_started, (
f"Lock did not prevent concurrent execution. Log: {execution_log}"
)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_model_switching_lock_serializes_switches(real_server_state):
"""
Verify that model_switching_lock prevents concurrent model changes.
This test ensures that when multiple requests try to switch models,
they are serialized properly and state remains consistent.
"""
lock = real_server_state.model_switching_lock
switch_order = []
async def simulate_model_switch(request_id: str, target_model: str):
"""Simulate switching to a model."""
async with lock:
switch_order.append(f"{request_id}_start")
# Simulate model switch operation
await asyncio.sleep(0.05)
real_server_state.current_ai_studio_model_id = target_model
switch_order.append(f"{request_id}_done_{target_model}")
# Start three concurrent model switch attempts
tasks = [
asyncio.create_task(simulate_model_switch("req1", "gemini-1.5-pro")),
asyncio.create_task(simulate_model_switch("req2", "gemini-1.5-flash")),
asyncio.create_task(simulate_model_switch("req3", "gemini-1.0-pro")),
]
await asyncio.gather(*tasks)
# Verify all switches completed
assert len(switch_order) == 6 # 3 start + 3 done
# Verify switches were serialized (each start followed by its done before next start)
for i in range(0, len(switch_order), 2):
start = switch_order[i]
done = switch_order[i + 1]
# Each start should be immediately followed by its done
req_id = start.split("_")[0]
assert done.startswith(f"{req_id}_done"), (
f"Switches not serialized: {switch_order}"
)
# Verify final state is one of the target models
assert real_server_state.current_ai_studio_model_id in [
"gemini-1.5-pro",
"gemini-1.5-flash",
"gemini-1.0-pro",
]
@pytest.mark.integration
@pytest.mark.asyncio
async def test_lock_hierarchy_no_deadlock(real_server_state):
"""
Verify that lock hierarchy (processing > model_switching > params_cache)
prevents deadlocks when nested.
This test ensures that locks can be safely nested in the correct order.
"""
processing_lock = real_server_state.processing_lock
model_lock = real_server_state.model_switching_lock
params_lock = real_server_state.params_cache_lock
execution_log = []
async def nested_lock_operation(operation_id: str):
"""Simulate operation that needs multiple locks in hierarchy order."""
async with processing_lock:
execution_log.append(f"{operation_id}_processing_acquired")
async with model_lock:
execution_log.append(f"{operation_id}_model_acquired")
async with params_lock:
execution_log.append(f"{operation_id}_params_acquired")
await asyncio.sleep(0.01) # Simulate work
execution_log.append(f"{operation_id}_params_released")
execution_log.append(f"{operation_id}_model_released")
execution_log.append(f"{operation_id}_processing_released")
# Run nested lock operations
# If deadlock occurs, this will timeout (pytest-timeout will catch it)
await nested_lock_operation("op1")
await nested_lock_operation("op2")
# Verify both operations completed successfully
assert "op1_params_acquired" in execution_log
assert "op2_params_acquired" in execution_log
assert len(execution_log) == 12 # 6 events per operation
@pytest.mark.integration
@pytest.mark.asyncio
async def test_lock_release_on_exception(real_server_state):
"""
Verify that locks are properly released even when exceptions occur.
This is critical for preventing deadlocks in production.
"""
lock = real_server_state.processing_lock
execution_log = []
async def failing_operation(op_id: str):
"""Operation that acquires lock then fails."""
try:
async with lock:
execution_log.append(f"{op_id}_acquired")
raise ValueError(f"Simulated error in {op_id}")
except ValueError:
execution_log.append(f"{op_id}_caught_exception")
async def normal_operation(op_id: str):
"""Normal operation that acquires lock."""
async with lock:
execution_log.append(f"{op_id}_acquired")
await asyncio.sleep(0.01)
execution_log.append(f"{op_id}_done")
# First operation fails but should release lock
await failing_operation("fail_op")
# Second operation should be able to acquire lock (not deadlocked)
await normal_operation("success_op")
# Verify both operations ran
assert "fail_op_acquired" in execution_log
assert "fail_op_caught_exception" in execution_log
assert "success_op_acquired" in execution_log
assert "success_op_done" in execution_log
@pytest.mark.integration
@pytest.mark.asyncio
async def test_concurrent_queue_and_lock_access(real_server_state):
"""
Verify that queue and lock work correctly together under concurrent load.
This simulates the real scenario where multiple requests are queued and
processed sequentially due to processing_lock.
"""
queue = real_server_state.request_queue
lock = real_server_state.processing_lock
processed_order = []
async def producer(num_items: int):
"""Add items to queue."""
for i in range(num_items):
await queue.put({"id": i, "data": f"item_{i}"})
await asyncio.sleep(0.01) # Simulate arrival rate
async def consumer(consumer_id: str):
"""Process items from queue with lock."""
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=0.5)
except asyncio.TimeoutError:
break
async with lock:
# Simulate processing
await asyncio.sleep(0.02)
processed_order.append((consumer_id, item["id"]))
queue.task_done()
# Start producer and two consumers concurrently
producer_task = asyncio.create_task(producer(5))
consumer1_task = asyncio.create_task(consumer("c1"))
consumer2_task = asyncio.create_task(consumer("c2"))
await producer_task
await asyncio.gather(consumer1_task, consumer2_task)
# Verify all items were processed
assert len(processed_order) == 5
# Verify all items 0-4 were processed
processed_ids = [item_id for _, item_id in processed_order]
assert sorted(processed_ids) == [0, 1, 2, 3, 4]
# Verify queue is empty
assert queue.empty()
|