Spaces:
Running
Running
File size: 3,930 Bytes
5539271 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | """Tests for AnalysisService — callbacks, concurrency, and orchestration."""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from services.analysis_service import AnalysisService, _on_task_done
class TestOnTaskDone:
"""Bug #1: _on_task_done must call _mark_failed when the task raises."""
@pytest.mark.asyncio
async def test_exception_marks_job_failed(self):
"""When a background task raises, the job should be marked FAILED."""
job_id = "job-123"
# Create a task that raises
async def failing_task():
raise RuntimeError("boom")
task = asyncio.create_task(failing_task())
await asyncio.sleep(0) # let the task fail
with patch("services.analysis_service._mark_failed", new_callable=AsyncMock) as mock_mark:
_on_task_done(task, job_id=job_id)
# ensure_future schedules it; give the event loop a tick
await asyncio.sleep(0)
mock_mark.assert_called_once_with(job_id, "boom")
@pytest.mark.asyncio
async def test_cancelled_task_marks_job_failed(self):
"""When a background task is cancelled, the job should be marked FAILED."""
job_id = "job-456"
async def slow_task():
await asyncio.sleep(999)
import contextlib
task = asyncio.create_task(slow_task())
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
with patch("services.analysis_service._mark_failed", new_callable=AsyncMock) as mock_mark:
_on_task_done(task, job_id=job_id)
await asyncio.sleep(0)
mock_mark.assert_called_once_with(job_id, "Task was cancelled")
@pytest.mark.asyncio
async def test_successful_task_does_not_mark_failed(self):
"""When a background task succeeds, _mark_failed should not be called."""
job_id = "job-789"
async def ok_task():
return "done"
task = asyncio.create_task(ok_task())
await task
with patch("services.analysis_service._mark_failed", new_callable=AsyncMock) as mock_mark:
_on_task_done(task, job_id=job_id)
await asyncio.sleep(0)
mock_mark.assert_not_called()
class TestAnalysisServiceConcurrency:
"""Verify that the semaphore limits concurrent analysis jobs."""
def test_semaphore_initialized_with_max_concurrent(self):
converter = MagicMock()
service = AnalysisService(converter=converter, max_concurrent=5)
assert service._semaphore._value == 5
def test_default_max_concurrent(self):
converter = MagicMock()
service = AnalysisService(converter=converter)
assert service._semaphore._value == 3
@pytest.mark.asyncio
async def test_semaphore_limits_parallel_jobs(self):
"""Only max_concurrent jobs should run in parallel; others must wait."""
call_order: list[str] = []
blocker = asyncio.Event()
converter = MagicMock()
service = AnalysisService(converter=converter, max_concurrent=1)
async def fake_inner(self, *args, **kwargs):
call_order.append("start")
await blocker.wait()
call_order.append("end")
with patch.object(AnalysisService, "_run_analysis_inner", fake_inner):
t1 = asyncio.create_task(service._run_analysis("j1", "/f", "f.pdf"))
t2 = asyncio.create_task(service._run_analysis("j2", "/f", "f.pdf"))
await asyncio.sleep(0.05)
# With max_concurrent=1, only one task should have started
assert call_order.count("start") == 1
blocker.set()
await asyncio.gather(t1, t2)
# Both should have completed
assert call_order.count("start") == 2
assert call_order.count("end") == 2
|