WitNote / tests /test_task_store_locking.py
harvesthealth's picture
Upload folder using huggingface_hub
f7044f4 verified
from __future__ import annotations
import multiprocessing as mp
import os
import time
from pathlib import Path
import pytest
from clawteam.team.models import TaskStatus
from clawteam.team.tasks import TaskStore
def _claim_task(
data_dir: str,
task_id: str,
agent_name: str,
save_delay: float,
result_queue,
) -> None:
os.environ["CLAWTEAM_DATA_DIR"] = data_dir
store = TaskStore("demo")
original_save = TaskStore._save_unlocked
def delayed_save(self, task):
if save_delay:
time.sleep(save_delay)
return original_save(self, task)
TaskStore._save_unlocked = delayed_save
try:
task = store.update(task_id, status=TaskStatus.in_progress, caller=agent_name)
result_queue.put((agent_name, "ok", task.locked_by if task else None))
except Exception as exc:
result_queue.put((agent_name, "err", type(exc).__name__))
finally:
TaskStore._save_unlocked = original_save
@pytest.mark.skipif("fork" not in mp.get_all_start_methods(), reason="requires fork start method")
def test_only_one_agent_can_claim_task_concurrently(monkeypatch, tmp_path: Path):
monkeypatch.setenv("CLAWTEAM_DATA_DIR", str(tmp_path))
store = TaskStore("demo")
task = store.create("demo task")
ctx = mp.get_context("fork")
result_queue = ctx.Queue()
proc_a = ctx.Process(
target=_claim_task,
args=(str(tmp_path), task.id, "agent-a", 0.3, result_queue),
)
proc_b = ctx.Process(
target=_claim_task,
args=(str(tmp_path), task.id, "agent-b", 0.0, result_queue),
)
proc_a.start()
time.sleep(0.05)
proc_b.start()
results = sorted(result_queue.get(timeout=10) for _ in range(2))
proc_a.join(timeout=10)
proc_b.join(timeout=10)
assert [result[1] for result in results].count("ok") == 1
assert [result[1] for result in results].count("err") == 1
assert any(result[2] == "TaskLockError" for result in results if result[1] == "err")
final_task = TaskStore("demo").get(task.id)
assert final_task is not None
assert final_task.status == TaskStatus.in_progress
assert final_task.locked_by in {"agent-a", "agent-b"}