File size: 3,930 Bytes
5539271
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""Tests for AnalysisService — callbacks, concurrency, and orchestration."""

from __future__ import annotations

import asyncio
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from services.analysis_service import AnalysisService, _on_task_done


class TestOnTaskDone:
    """Bug #1: _on_task_done must call _mark_failed when the task raises."""

    @pytest.mark.asyncio
    async def test_exception_marks_job_failed(self):
        """When a background task raises, the job should be marked FAILED."""
        job_id = "job-123"

        # Create a task that raises
        async def failing_task():
            raise RuntimeError("boom")

        task = asyncio.create_task(failing_task())
        await asyncio.sleep(0)  # let the task fail

        with patch("services.analysis_service._mark_failed", new_callable=AsyncMock) as mock_mark:
            _on_task_done(task, job_id=job_id)
            # ensure_future schedules it; give the event loop a tick
            await asyncio.sleep(0)

        mock_mark.assert_called_once_with(job_id, "boom")

    @pytest.mark.asyncio
    async def test_cancelled_task_marks_job_failed(self):
        """When a background task is cancelled, the job should be marked FAILED."""
        job_id = "job-456"

        async def slow_task():
            await asyncio.sleep(999)

        import contextlib

        task = asyncio.create_task(slow_task())
        task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await task

        with patch("services.analysis_service._mark_failed", new_callable=AsyncMock) as mock_mark:
            _on_task_done(task, job_id=job_id)
            await asyncio.sleep(0)

        mock_mark.assert_called_once_with(job_id, "Task was cancelled")

    @pytest.mark.asyncio
    async def test_successful_task_does_not_mark_failed(self):
        """When a background task succeeds, _mark_failed should not be called."""
        job_id = "job-789"

        async def ok_task():
            return "done"

        task = asyncio.create_task(ok_task())
        await task

        with patch("services.analysis_service._mark_failed", new_callable=AsyncMock) as mock_mark:
            _on_task_done(task, job_id=job_id)
            await asyncio.sleep(0)

        mock_mark.assert_not_called()


class TestAnalysisServiceConcurrency:
    """Verify that the semaphore limits concurrent analysis jobs."""

    def test_semaphore_initialized_with_max_concurrent(self):
        converter = MagicMock()
        service = AnalysisService(converter=converter, max_concurrent=5)
        assert service._semaphore._value == 5

    def test_default_max_concurrent(self):
        converter = MagicMock()
        service = AnalysisService(converter=converter)
        assert service._semaphore._value == 3

    @pytest.mark.asyncio
    async def test_semaphore_limits_parallel_jobs(self):
        """Only max_concurrent jobs should run in parallel; others must wait."""
        call_order: list[str] = []
        blocker = asyncio.Event()

        converter = MagicMock()
        service = AnalysisService(converter=converter, max_concurrent=1)

        async def fake_inner(self, *args, **kwargs):
            call_order.append("start")
            await blocker.wait()
            call_order.append("end")

        with patch.object(AnalysisService, "_run_analysis_inner", fake_inner):
            t1 = asyncio.create_task(service._run_analysis("j1", "/f", "f.pdf"))
            t2 = asyncio.create_task(service._run_analysis("j2", "/f", "f.pdf"))
            await asyncio.sleep(0.05)

            # With max_concurrent=1, only one task should have started
            assert call_order.count("start") == 1

            blocker.set()
            await asyncio.gather(t1, t2)

            # Both should have completed
            assert call_order.count("start") == 2
            assert call_order.count("end") == 2