File size: 8,972 Bytes
a5784e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
"""
Integration tests for lock behavior and concurrency control.

These tests verify that the lock hierarchy (processing_lock > model_switching_lock >
params_cache_lock) works correctly with REAL asyncio.Lock instances, catching
race conditions that mocked tests cannot detect.
"""

import asyncio

import pytest


@pytest.mark.integration
@pytest.mark.asyncio
async def test_processing_lock_prevents_concurrent_execution(real_server_state):
    """
    Verify that processing_lock prevents concurrent browser access.

    This test uses REAL asyncio.Lock to ensure that two "requests" cannot
    execute simultaneously, preventing race conditions in browser automation.
    """
    execution_log = []
    lock = real_server_state.processing_lock

    async def simulate_request_processing(request_id: str, delay: float):
        """Simulate processing a request with the processing lock."""
        execution_log.append(f"{request_id}_start")

        async with lock:
            execution_log.append(f"{request_id}_acquired_lock")
            # Simulate some async work (browser interaction)
            await asyncio.sleep(delay)
            execution_log.append(f"{request_id}_releasing_lock")

        execution_log.append(f"{request_id}_done")

    # Start two tasks concurrently
    task1 = asyncio.create_task(simulate_request_processing("req1", 0.1))
    task2 = asyncio.create_task(simulate_request_processing("req2", 0.05))

    await asyncio.gather(task1, task2)

    # Verify execution order - req1 should complete entirely before req2 starts
    # or vice versa (depends on which acquires lock first)
    req1_acquired_idx = execution_log.index("req1_acquired_lock")
    req1_releasing_idx = execution_log.index("req1_releasing_lock")
    req2_acquired_idx = execution_log.index("req2_acquired_lock")
    req2_releasing_idx = execution_log.index("req2_releasing_lock")

    # One request must fully complete lock section before other enters
    req1_finished_before_req2_started = req1_releasing_idx < req2_acquired_idx
    req2_finished_before_req1_started = req2_releasing_idx < req1_acquired_idx

    assert req1_finished_before_req2_started or req2_finished_before_req1_started, (
        f"Lock did not prevent concurrent execution. Log: {execution_log}"
    )


@pytest.mark.integration
@pytest.mark.asyncio
async def test_model_switching_lock_serializes_switches(real_server_state):
    """
    Verify that model_switching_lock prevents concurrent model changes.

    This test ensures that when multiple requests try to switch models,
    they are serialized properly and state remains consistent.
    """
    lock = real_server_state.model_switching_lock
    switch_order = []

    async def simulate_model_switch(request_id: str, target_model: str):
        """Simulate switching to a model."""
        async with lock:
            switch_order.append(f"{request_id}_start")
            # Simulate model switch operation
            await asyncio.sleep(0.05)
            real_server_state.current_ai_studio_model_id = target_model
            switch_order.append(f"{request_id}_done_{target_model}")

    # Start three concurrent model switch attempts
    tasks = [
        asyncio.create_task(simulate_model_switch("req1", "gemini-1.5-pro")),
        asyncio.create_task(simulate_model_switch("req2", "gemini-1.5-flash")),
        asyncio.create_task(simulate_model_switch("req3", "gemini-1.0-pro")),
    ]

    await asyncio.gather(*tasks)

    # Verify all switches completed
    assert len(switch_order) == 6  # 3 start + 3 done

    # Verify switches were serialized (each start followed by its done before next start)
    for i in range(0, len(switch_order), 2):
        start = switch_order[i]
        done = switch_order[i + 1]
        # Each start should be immediately followed by its done
        req_id = start.split("_")[0]
        assert done.startswith(f"{req_id}_done"), (
            f"Switches not serialized: {switch_order}"
        )

    # Verify final state is one of the target models
    assert real_server_state.current_ai_studio_model_id in [
        "gemini-1.5-pro",
        "gemini-1.5-flash",
        "gemini-1.0-pro",
    ]


@pytest.mark.integration
@pytest.mark.asyncio
async def test_lock_hierarchy_no_deadlock(real_server_state):
    """
    Verify that lock hierarchy (processing > model_switching > params_cache)
    prevents deadlocks when nested.

    This test ensures that locks can be safely nested in the correct order.
    """
    processing_lock = real_server_state.processing_lock
    model_lock = real_server_state.model_switching_lock
    params_lock = real_server_state.params_cache_lock

    execution_log = []

    async def nested_lock_operation(operation_id: str):
        """Simulate operation that needs multiple locks in hierarchy order."""
        async with processing_lock:
            execution_log.append(f"{operation_id}_processing_acquired")

            async with model_lock:
                execution_log.append(f"{operation_id}_model_acquired")

                async with params_lock:
                    execution_log.append(f"{operation_id}_params_acquired")
                    await asyncio.sleep(0.01)  # Simulate work
                    execution_log.append(f"{operation_id}_params_released")

                execution_log.append(f"{operation_id}_model_released")

            execution_log.append(f"{operation_id}_processing_released")

    # Run nested lock operations
    # If deadlock occurs, this will timeout (pytest-timeout will catch it)
    await nested_lock_operation("op1")
    await nested_lock_operation("op2")

    # Verify both operations completed successfully
    assert "op1_params_acquired" in execution_log
    assert "op2_params_acquired" in execution_log
    assert len(execution_log) == 12  # 6 events per operation


@pytest.mark.integration
@pytest.mark.asyncio
async def test_lock_release_on_exception(real_server_state):
    """
    Verify that locks are properly released even when exceptions occur.

    This is critical for preventing deadlocks in production.
    """
    lock = real_server_state.processing_lock
    execution_log = []

    async def failing_operation(op_id: str):
        """Operation that acquires lock then fails."""
        try:
            async with lock:
                execution_log.append(f"{op_id}_acquired")
                raise ValueError(f"Simulated error in {op_id}")
        except ValueError:
            execution_log.append(f"{op_id}_caught_exception")

    async def normal_operation(op_id: str):
        """Normal operation that acquires lock."""
        async with lock:
            execution_log.append(f"{op_id}_acquired")
            await asyncio.sleep(0.01)
            execution_log.append(f"{op_id}_done")

    # First operation fails but should release lock
    await failing_operation("fail_op")

    # Second operation should be able to acquire lock (not deadlocked)
    await normal_operation("success_op")

    # Verify both operations ran
    assert "fail_op_acquired" in execution_log
    assert "fail_op_caught_exception" in execution_log
    assert "success_op_acquired" in execution_log
    assert "success_op_done" in execution_log


@pytest.mark.integration
@pytest.mark.asyncio
async def test_concurrent_queue_and_lock_access(real_server_state):
    """
    Verify that queue and lock work correctly together under concurrent load.

    This simulates the real scenario where multiple requests are queued and
    processed sequentially due to processing_lock.
    """
    queue = real_server_state.request_queue
    lock = real_server_state.processing_lock
    processed_order = []

    async def producer(num_items: int):
        """Add items to queue."""
        for i in range(num_items):
            await queue.put({"id": i, "data": f"item_{i}"})
            await asyncio.sleep(0.01)  # Simulate arrival rate

    async def consumer(consumer_id: str):
        """Process items from queue with lock."""
        while True:
            try:
                item = await asyncio.wait_for(queue.get(), timeout=0.5)
            except asyncio.TimeoutError:
                break

            async with lock:
                # Simulate processing
                await asyncio.sleep(0.02)
                processed_order.append((consumer_id, item["id"]))
                queue.task_done()

    # Start producer and two consumers concurrently
    producer_task = asyncio.create_task(producer(5))
    consumer1_task = asyncio.create_task(consumer("c1"))
    consumer2_task = asyncio.create_task(consumer("c2"))

    await producer_task
    await asyncio.gather(consumer1_task, consumer2_task)

    # Verify all items were processed
    assert len(processed_order) == 5

    # Verify all items 0-4 were processed
    processed_ids = [item_id for _, item_id in processed_order]
    assert sorted(processed_ids) == [0, 1, 2, 3, 4]

    # Verify queue is empty
    assert queue.empty()