TrafficDetector / backend /tests /test_worker.py
sebastianc1233's picture
test: refactor unit tests to support async services, S3 mocking and pytest fixtures
86c1ac6
raw
history blame contribute delete
706 Bytes
import uuid
from unittest.mock import patch
from backend.app.worker import celery_process_video
def test_celery_worker_bridge():
task_id_str = str(uuid.uuid4())
input_path = "/in.mp4"
output_path = "/out.mp4"
with (
patch("backend.app.worker.asyncio.run") as mock_run,
patch("backend.app.worker.process_video_workflow") as mock_workflow,
):
result = celery_process_video(task_id_str, input_path, output_path)
assert result == "OK"
mock_run.assert_called_once()
mock_workflow.assert_called_once()
args = mock_workflow.call_args
assert isinstance(args[0][0], uuid.UUID)
assert str(args[0][0]) == task_id_str