File size: 3,654 Bytes
3193174
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""Tests for src/utils/async_utils.py — run_sync, gather_with_concurrency, timeout_wrapper."""

import asyncio

import pytest

from utils.async_utils import gather_with_concurrency, run_sync, timeout_wrapper


class TestRunSync:
    def test_run_simple_coroutine(self):
        async def add(a, b):
            return a + b

        result = run_sync(add(2, 3))
        assert result == 5

    def test_run_awaitable_result(self):
        async def greeting():
            return "hello"

        assert run_sync(greeting()) == "hello"

    def test_raises_in_running_loop(self):
        async def check():
            # Inside a running loop, run_sync should raise
            with pytest.raises(RuntimeError, match="event loop"):
                run_sync(asyncio.sleep(0))

        asyncio.run(check())

    def test_run_sync_with_non_coroutine_awaitable(self):
        """Cover lines 19-22: run_sync with non-coroutine awaitable (has __await__ but not a coroutine)."""

        class EagerAwaitable:
            """Simple awaitable that immediately returns a value."""
            def __init__(self, value):
                self.value = value

            def __await__(self):
                yield from []  # immediate resolution, no suspension
                return self.value

        result = run_sync(EagerAwaitable(99))
        assert result == 99


class TestGatherWithConcurrency:
    async def test_basic_gather(self):
        async def double(x):
            return x * 2

        results = await gather_with_concurrency(3, double(1), double(2), double(3))
        assert sorted(results) == [2, 4, 6]

    async def test_concurrency_limit_one(self):
        """With limit=1, tasks run sequentially."""
        order = []

        async def task(n):
            order.append(f"start-{n}")
            await asyncio.sleep(0.01)
            order.append(f"end-{n}")
            return n

        results = await gather_with_concurrency(1, task(1), task(2), task(3))
        assert sorted(results) == [1, 2, 3]

    async def test_empty_coros(self):
        results = await gather_with_concurrency(5)
        assert results == []

    async def test_concurrency_n_larger_than_tasks(self):
        async def noop():
            return True

        results = await gather_with_concurrency(100, noop(), noop(), noop())
        assert results == [True, True, True]

    async def test_exception_propagates(self):
        async def faulty():
            raise ValueError("test error")

        with pytest.raises(ValueError, match="test error"):
            await gather_with_concurrency(2, faulty())


class TestTimeoutWrapper:
    async def test_succeeds_within_timeout(self):
        async def fast():
            await asyncio.sleep(0.01)
            return "ok"

        result = await timeout_wrapper(fast(), timeout_seconds=5.0)
        assert result == "ok"

    async def test_raises_timeout_error(self):
        async def slow():
            await asyncio.sleep(10.0)
            return "never"

        with pytest.raises(TimeoutError, match="timed out"):
            await timeout_wrapper(slow(), timeout_seconds=0.05, error_message="Operation timed out")

    async def test_custom_error_message(self):
        async def slow():
            await asyncio.sleep(10.0)

        with pytest.raises(TimeoutError, match="custom message"):
            await timeout_wrapper(slow(), timeout_seconds=0.05, error_message="custom message")

    async def test_returns_value_on_success(self):
        async def compute():
            return 42

        result = await timeout_wrapper(compute(), timeout_seconds=1.0)
        assert result == 42