"""Tests for AnalysisService — callbacks, concurrency, and orchestration.""" from __future__ import annotations import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest from services.analysis_service import AnalysisService, _on_task_done class TestOnTaskDone: """Bug #1: _on_task_done must call _mark_failed when the task raises.""" @pytest.mark.asyncio async def test_exception_marks_job_failed(self): """When a background task raises, the job should be marked FAILED.""" job_id = "job-123" # Create a task that raises async def failing_task(): raise RuntimeError("boom") task = asyncio.create_task(failing_task()) await asyncio.sleep(0) # let the task fail with patch("services.analysis_service._mark_failed", new_callable=AsyncMock) as mock_mark: _on_task_done(task, job_id=job_id) # ensure_future schedules it; give the event loop a tick await asyncio.sleep(0) mock_mark.assert_called_once_with(job_id, "boom") @pytest.mark.asyncio async def test_cancelled_task_marks_job_failed(self): """When a background task is cancelled, the job should be marked FAILED.""" job_id = "job-456" async def slow_task(): await asyncio.sleep(999) import contextlib task = asyncio.create_task(slow_task()) task.cancel() with contextlib.suppress(asyncio.CancelledError): await task with patch("services.analysis_service._mark_failed", new_callable=AsyncMock) as mock_mark: _on_task_done(task, job_id=job_id) await asyncio.sleep(0) mock_mark.assert_called_once_with(job_id, "Task was cancelled") @pytest.mark.asyncio async def test_successful_task_does_not_mark_failed(self): """When a background task succeeds, _mark_failed should not be called.""" job_id = "job-789" async def ok_task(): return "done" task = asyncio.create_task(ok_task()) await task with patch("services.analysis_service._mark_failed", new_callable=AsyncMock) as mock_mark: _on_task_done(task, job_id=job_id) await asyncio.sleep(0) mock_mark.assert_not_called() class TestAnalysisServiceConcurrency: """Verify that the semaphore limits concurrent analysis jobs.""" def test_semaphore_initialized_with_max_concurrent(self): converter = MagicMock() service = AnalysisService(converter=converter, max_concurrent=5) assert service._semaphore._value == 5 def test_default_max_concurrent(self): converter = MagicMock() service = AnalysisService(converter=converter) assert service._semaphore._value == 3 @pytest.mark.asyncio async def test_semaphore_limits_parallel_jobs(self): """Only max_concurrent jobs should run in parallel; others must wait.""" call_order: list[str] = [] blocker = asyncio.Event() converter = MagicMock() service = AnalysisService(converter=converter, max_concurrent=1) async def fake_inner(self, *args, **kwargs): call_order.append("start") await blocker.wait() call_order.append("end") with patch.object(AnalysisService, "_run_analysis_inner", fake_inner): t1 = asyncio.create_task(service._run_analysis("j1", "/f", "f.pdf")) t2 = asyncio.create_task(service._run_analysis("j2", "/f", "f.pdf")) await asyncio.sleep(0.05) # With max_concurrent=1, only one task should have started assert call_order.count("start") == 1 blocker.set() await asyncio.gather(t1, t2) # Both should have completed assert call_order.count("start") == 2 assert call_order.count("end") == 2