Spaces:
Paused
Paused
| from __future__ import annotations | |
| import multiprocessing as mp | |
| import os | |
| import time | |
| from pathlib import Path | |
| import pytest | |
| from clawteam.team.models import TaskStatus | |
| from clawteam.team.tasks import TaskStore | |
| def _claim_task( | |
| data_dir: str, | |
| task_id: str, | |
| agent_name: str, | |
| save_delay: float, | |
| result_queue, | |
| ) -> None: | |
| os.environ["CLAWTEAM_DATA_DIR"] = data_dir | |
| store = TaskStore("demo") | |
| original_save = TaskStore._save_unlocked | |
| def delayed_save(self, task): | |
| if save_delay: | |
| time.sleep(save_delay) | |
| return original_save(self, task) | |
| TaskStore._save_unlocked = delayed_save | |
| try: | |
| task = store.update(task_id, status=TaskStatus.in_progress, caller=agent_name) | |
| result_queue.put((agent_name, "ok", task.locked_by if task else None)) | |
| except Exception as exc: | |
| result_queue.put((agent_name, "err", type(exc).__name__)) | |
| finally: | |
| TaskStore._save_unlocked = original_save | |
| def test_only_one_agent_can_claim_task_concurrently(monkeypatch, tmp_path: Path): | |
| monkeypatch.setenv("CLAWTEAM_DATA_DIR", str(tmp_path)) | |
| store = TaskStore("demo") | |
| task = store.create("demo task") | |
| ctx = mp.get_context("fork") | |
| result_queue = ctx.Queue() | |
| proc_a = ctx.Process( | |
| target=_claim_task, | |
| args=(str(tmp_path), task.id, "agent-a", 0.3, result_queue), | |
| ) | |
| proc_b = ctx.Process( | |
| target=_claim_task, | |
| args=(str(tmp_path), task.id, "agent-b", 0.0, result_queue), | |
| ) | |
| proc_a.start() | |
| time.sleep(0.05) | |
| proc_b.start() | |
| results = sorted(result_queue.get(timeout=10) for _ in range(2)) | |
| proc_a.join(timeout=10) | |
| proc_b.join(timeout=10) | |
| assert [result[1] for result in results].count("ok") == 1 | |
| assert [result[1] for result in results].count("err") == 1 | |
| assert any(result[2] == "TaskLockError" for result in results if result[1] == "err") | |
| final_task = TaskStore("demo").get(task.id) | |
| assert final_task is not None | |
| assert final_task.status == TaskStatus.in_progress | |
| assert final_task.locked_by in {"agent-a", "agent-b"} | |