WitNote / tests /test_tasks.py
harvesthealth's picture
Upload folder using huggingface_hub
f7044f4 verified
"""Tests for clawteam.team.tasks — TaskStore CRUD + dependency tracking."""
from unittest.mock import patch
import pytest
from clawteam.team.models import TaskItem, TaskPriority, TaskStatus
from clawteam.team.tasks import TaskLockError, TaskStore
@pytest.fixture
def store(team_name):
return TaskStore(team_name)
class TestTaskCreate:
def test_create_basic(self, store):
t = store.create("Write tests", description="pytest suite")
assert t.subject == "Write tests"
assert t.description == "pytest suite"
assert t.status == TaskStatus.pending
def test_create_with_owner(self, store):
t = store.create("Fix bug", owner="alice")
assert t.owner == "alice"
def test_create_with_priority(self, store):
t = store.create("urgent item", priority=TaskPriority.urgent)
assert t.priority == TaskPriority.urgent
def test_create_with_blocked_by_sets_blocked_status(self, store):
t1 = store.create("first task")
t2 = store.create("second task", blocked_by=[t1.id])
assert t2.status == TaskStatus.blocked
assert t1.id in t2.blocked_by
def test_create_with_metadata(self, store):
t = store.create("tagged task", metadata={"priority": "high"})
assert t.metadata["priority"] == "high"
def test_create_persists_to_disk(self, store):
t = store.create("persistent")
loaded = store.get(t.id)
assert loaded is not None
assert loaded.subject == "persistent"
class TestTaskGet:
def test_get_existing(self, store):
t = store.create("exists")
got = store.get(t.id)
assert got is not None
assert got.id == t.id
def test_get_nonexistent(self, store):
assert store.get("does-not-exist") is None
class TestTaskUpdate:
def test_update_status(self, store):
t = store.create("wip")
# need to mock is_agent_alive for the lock logic
with patch("clawteam.team.tasks.TaskStore._acquire_lock"):
updated = store.update(t.id, status=TaskStatus.in_progress, caller="agent-1")
assert updated.status == TaskStatus.in_progress
def test_update_subject_and_description(self, store):
t = store.create("old title")
updated = store.update(t.id, subject="new title", description="details")
assert updated.subject == "new title"
assert updated.description == "details"
def test_update_owner(self, store):
t = store.create("task")
updated = store.update(t.id, owner="bob")
assert updated.owner == "bob"
def test_update_priority(self, store):
t = store.create("task")
updated = store.update(t.id, priority=TaskPriority.high)
assert updated.priority == TaskPriority.high
def test_update_add_blocks(self, store):
t1 = store.create("blocker")
t2 = store.create("other")
updated = store.update(t1.id, add_blocks=[t2.id])
assert t2.id in updated.blocks
def test_update_add_blocked_by(self, store):
t1 = store.create("dep")
t2 = store.create("main")
updated = store.update(t2.id, add_blocked_by=[t1.id])
assert t1.id in updated.blocked_by
def test_update_metadata_merge(self, store):
t = store.create("m", metadata={"a": 1})
updated = store.update(t.id, metadata={"b": 2})
assert updated.metadata == {"a": 1, "b": 2}
def test_update_nonexistent_returns_none(self, store):
assert store.update("nope", status=TaskStatus.completed) is None
def test_complete_clears_lock(self, store):
t = store.create("locked")
with patch("clawteam.team.tasks.TaskStore._acquire_lock"):
store.update(t.id, status=TaskStatus.in_progress, caller="agent-1")
completed = store.update(t.id, status=TaskStatus.completed)
assert completed.locked_by == ""
assert completed.locked_at == ""
def test_updated_at_changes(self, store):
t = store.create("ts-check")
original_ts = t.updated_at
updated = store.update(t.id, subject="changed")
assert updated.updated_at >= original_ts
class TestTaskList:
def test_list_all(self, store):
store.create("a")
store.create("b")
store.create("c")
tasks = store.list_tasks()
assert len(tasks) == 3
def test_list_filter_by_status(self, store):
store.create("pending-one")
t2 = store.create("blocked-one", blocked_by=["fake-dep"])
tasks = store.list_tasks(status=TaskStatus.blocked)
assert len(tasks) == 1
assert tasks[0].id == t2.id
def test_list_filter_by_owner(self, store):
store.create("alice-task", owner="alice")
store.create("bob-task", owner="bob")
tasks = store.list_tasks(owner="alice")
assert len(tasks) == 1
assert tasks[0].owner == "alice"
def test_list_empty(self, store):
assert store.list_tasks() == []
def test_list_filter_by_priority(self, store):
store.create("urgent-task", priority=TaskPriority.urgent)
store.create("low-task", priority=TaskPriority.low)
tasks = store.list_tasks(priority=TaskPriority.urgent)
assert len(tasks) == 1
assert tasks[0].priority == TaskPriority.urgent
def test_list_sort_by_priority(self, store):
store.create("medium-task")
store.create("low-task", priority=TaskPriority.low)
store.create("urgent-task", priority=TaskPriority.urgent)
store.create("high-task", priority=TaskPriority.high)
tasks = store.list_tasks(sort_by_priority=True)
assert [task.priority for task in tasks] == [
TaskPriority.urgent,
TaskPriority.high,
TaskPriority.medium,
TaskPriority.low,
]
class TestDependencyResolution:
"""When a task completes, its dependents should get unblocked."""
def test_completing_task_unblocks_dependent(self, store):
t1 = store.create("prerequisite")
t2 = store.create("depends on t1", blocked_by=[t1.id])
assert t2.status == TaskStatus.blocked
store.update(t1.id, status=TaskStatus.completed)
t2_after = store.get(t2.id)
assert t2_after.status == TaskStatus.pending
assert t1.id not in t2_after.blocked_by
def test_partial_unblock_stays_blocked(self, store):
"""If a task depends on two things, completing one shouldn't unblock it."""
t1 = store.create("dep-1")
t2 = store.create("dep-2")
t3 = store.create("needs both", blocked_by=[t1.id, t2.id])
store.update(t1.id, status=TaskStatus.completed)
t3_after = store.get(t3.id)
assert t3_after.status == TaskStatus.blocked
assert t2.id in t3_after.blocked_by
def test_full_unblock_after_all_deps_complete(self, store):
t1 = store.create("dep-1")
t2 = store.create("dep-2")
t3 = store.create("needs both", blocked_by=[t1.id, t2.id])
store.update(t1.id, status=TaskStatus.completed)
store.update(t2.id, status=TaskStatus.completed)
t3_after = store.get(t3.id)
assert t3_after.status == TaskStatus.pending
assert t3_after.blocked_by == []
class TestTaskLocking:
def test_lock_acquired_on_in_progress(self, store):
t = store.create("lockable")
# mock is_agent_alive to return None (unknown) so lock logic proceeds
with patch("clawteam.spawn.registry.is_agent_alive", return_value=None):
updated = store.update(t.id, status=TaskStatus.in_progress, caller="agent-a")
assert updated.locked_by == "agent-a"
def test_same_agent_can_relock(self, store):
t = store.create("lockable")
with patch("clawteam.spawn.registry.is_agent_alive", return_value=None):
store.update(t.id, status=TaskStatus.in_progress, caller="agent-a")
# same agent again, no error
updated = store.update(t.id, status=TaskStatus.in_progress, caller="agent-a")
assert updated.locked_by == "agent-a"
def test_different_agent_blocked_by_lock(self, store):
t = store.create("contested")
with patch("clawteam.spawn.registry.is_agent_alive", return_value=True):
store.update(t.id, status=TaskStatus.in_progress, caller="agent-a")
with pytest.raises(TaskLockError):
store.update(t.id, status=TaskStatus.in_progress, caller="agent-b")
def test_force_overrides_lock(self, store):
t = store.create("force-me")
with patch("clawteam.spawn.registry.is_agent_alive", return_value=True):
store.update(t.id, status=TaskStatus.in_progress, caller="agent-a")
updated = store.update(
t.id, status=TaskStatus.in_progress, caller="agent-b", force=True
)
assert updated.locked_by == "agent-b"
def test_dead_agent_lock_is_released(self, store):
t = store.create("stale-lock")
with patch("clawteam.spawn.registry.is_agent_alive", return_value=None):
store.update(t.id, status=TaskStatus.in_progress, caller="dead-agent")
# now dead-agent is dead, another agent should be able to take over
with patch("clawteam.spawn.registry.is_agent_alive", return_value=False):
updated = store.update(t.id, status=TaskStatus.in_progress, caller="live-agent")
assert updated.locked_by == "live-agent"
class TestDurationTracking:
"""Tests for the started_at / duration tracking feature."""
def test_started_at_set_on_in_progress(self, store):
t = store.create("timed task")
assert t.started_at == ""
with patch("clawteam.team.tasks.TaskStore._acquire_lock"):
updated = store.update(t.id, status=TaskStatus.in_progress, caller="a")
assert updated.started_at != ""
def test_started_at_not_overwritten_on_second_in_progress(self, store):
"""If a task goes in_progress twice, keep the original start time."""
t = store.create("double start")
with patch("clawteam.team.tasks.TaskStore._acquire_lock"):
updated = store.update(t.id, status=TaskStatus.in_progress, caller="a")
first_start = updated.started_at
with patch("clawteam.team.tasks.TaskStore._acquire_lock"):
updated2 = store.update(t.id, status=TaskStatus.in_progress, caller="a")
assert updated2.started_at == first_start
def test_duration_computed_on_completion(self, store):
t = store.create("will complete")
with patch("clawteam.team.tasks.TaskStore._acquire_lock"):
store.update(t.id, status=TaskStatus.in_progress, caller="a")
completed = store.update(t.id, status=TaskStatus.completed)
assert "duration_seconds" in completed.metadata
# duration should be non-negative (task just started moments ago)
assert completed.metadata["duration_seconds"] >= 0
def test_no_duration_without_started_at(self, store):
"""Completing a task that was never in_progress shouldn't crash."""
t = store.create("skip to done")
completed = store.update(t.id, status=TaskStatus.completed)
assert "duration_seconds" not in completed.metadata
def test_started_at_persists_through_serialization(self, store):
t = store.create("persist check")
with patch("clawteam.team.tasks.TaskStore._acquire_lock"):
store.update(t.id, status=TaskStatus.in_progress, caller="a")
reloaded = store.get(t.id)
assert reloaded.started_at != ""
def test_started_at_alias(self):
"""The field should serialize as 'startedAt' (camelCase)."""
t = TaskItem(subject="alias test")
dumped = t.model_dump(by_alias=True)
assert "startedAt" in dumped
class TestGetStats:
def test_stats_empty_team(self, store):
stats = store.get_stats()
assert stats["total"] == 0
assert stats["completed"] == 0
assert stats["avg_duration_seconds"] == 0.0
def test_stats_counts(self, store):
store.create("one")
store.create("two")
t3 = store.create("three")
store.update(t3.id, status=TaskStatus.completed)
stats = store.get_stats()
assert stats["total"] == 3
assert stats["completed"] == 1
assert stats["pending"] == 2
def test_stats_with_timed_tasks(self, store):
t = store.create("timed")
with patch("clawteam.team.tasks.TaskStore._acquire_lock"):
store.update(t.id, status=TaskStatus.in_progress, caller="a")
store.update(t.id, status=TaskStatus.completed)
stats = store.get_stats()
assert stats["timed_completed"] == 1
assert stats["avg_duration_seconds"] >= 0
def test_stats_avg_excludes_untimed(self, store):
"""Tasks completed without going through in_progress shouldn't affect avg."""
# one task goes through the full flow
t1 = store.create("full flow")
with patch("clawteam.team.tasks.TaskStore._acquire_lock"):
store.update(t1.id, status=TaskStatus.in_progress, caller="a")
store.update(t1.id, status=TaskStatus.completed)
# another task jumps straight to completed
t2 = store.create("shortcut")
store.update(t2.id, status=TaskStatus.completed)
stats = store.get_stats()
assert stats["completed"] == 2
assert stats["timed_completed"] == 1