Spaces:
Paused
Paused
| """Tests for clawteam.team.tasks — TaskStore CRUD + dependency tracking.""" | |
| from unittest.mock import patch | |
| import pytest | |
| from clawteam.team.models import TaskItem, TaskPriority, TaskStatus | |
| from clawteam.team.tasks import TaskLockError, TaskStore | |
| def store(team_name): | |
| return TaskStore(team_name) | |
| class TestTaskCreate: | |
| def test_create_basic(self, store): | |
| t = store.create("Write tests", description="pytest suite") | |
| assert t.subject == "Write tests" | |
| assert t.description == "pytest suite" | |
| assert t.status == TaskStatus.pending | |
| def test_create_with_owner(self, store): | |
| t = store.create("Fix bug", owner="alice") | |
| assert t.owner == "alice" | |
| def test_create_with_priority(self, store): | |
| t = store.create("urgent item", priority=TaskPriority.urgent) | |
| assert t.priority == TaskPriority.urgent | |
| def test_create_with_blocked_by_sets_blocked_status(self, store): | |
| t1 = store.create("first task") | |
| t2 = store.create("second task", blocked_by=[t1.id]) | |
| assert t2.status == TaskStatus.blocked | |
| assert t1.id in t2.blocked_by | |
| def test_create_with_metadata(self, store): | |
| t = store.create("tagged task", metadata={"priority": "high"}) | |
| assert t.metadata["priority"] == "high" | |
| def test_create_persists_to_disk(self, store): | |
| t = store.create("persistent") | |
| loaded = store.get(t.id) | |
| assert loaded is not None | |
| assert loaded.subject == "persistent" | |
| class TestTaskGet: | |
| def test_get_existing(self, store): | |
| t = store.create("exists") | |
| got = store.get(t.id) | |
| assert got is not None | |
| assert got.id == t.id | |
| def test_get_nonexistent(self, store): | |
| assert store.get("does-not-exist") is None | |
| class TestTaskUpdate: | |
| def test_update_status(self, store): | |
| t = store.create("wip") | |
| # need to mock is_agent_alive for the lock logic | |
| with patch("clawteam.team.tasks.TaskStore._acquire_lock"): | |
| updated = store.update(t.id, status=TaskStatus.in_progress, caller="agent-1") | |
| assert updated.status == TaskStatus.in_progress | |
| def test_update_subject_and_description(self, store): | |
| t = store.create("old title") | |
| updated = store.update(t.id, subject="new title", description="details") | |
| assert updated.subject == "new title" | |
| assert updated.description == "details" | |
| def test_update_owner(self, store): | |
| t = store.create("task") | |
| updated = store.update(t.id, owner="bob") | |
| assert updated.owner == "bob" | |
| def test_update_priority(self, store): | |
| t = store.create("task") | |
| updated = store.update(t.id, priority=TaskPriority.high) | |
| assert updated.priority == TaskPriority.high | |
| def test_update_add_blocks(self, store): | |
| t1 = store.create("blocker") | |
| t2 = store.create("other") | |
| updated = store.update(t1.id, add_blocks=[t2.id]) | |
| assert t2.id in updated.blocks | |
| def test_update_add_blocked_by(self, store): | |
| t1 = store.create("dep") | |
| t2 = store.create("main") | |
| updated = store.update(t2.id, add_blocked_by=[t1.id]) | |
| assert t1.id in updated.blocked_by | |
| def test_update_metadata_merge(self, store): | |
| t = store.create("m", metadata={"a": 1}) | |
| updated = store.update(t.id, metadata={"b": 2}) | |
| assert updated.metadata == {"a": 1, "b": 2} | |
| def test_update_nonexistent_returns_none(self, store): | |
| assert store.update("nope", status=TaskStatus.completed) is None | |
| def test_complete_clears_lock(self, store): | |
| t = store.create("locked") | |
| with patch("clawteam.team.tasks.TaskStore._acquire_lock"): | |
| store.update(t.id, status=TaskStatus.in_progress, caller="agent-1") | |
| completed = store.update(t.id, status=TaskStatus.completed) | |
| assert completed.locked_by == "" | |
| assert completed.locked_at == "" | |
| def test_updated_at_changes(self, store): | |
| t = store.create("ts-check") | |
| original_ts = t.updated_at | |
| updated = store.update(t.id, subject="changed") | |
| assert updated.updated_at >= original_ts | |
| class TestTaskList: | |
| def test_list_all(self, store): | |
| store.create("a") | |
| store.create("b") | |
| store.create("c") | |
| tasks = store.list_tasks() | |
| assert len(tasks) == 3 | |
| def test_list_filter_by_status(self, store): | |
| store.create("pending-one") | |
| t2 = store.create("blocked-one", blocked_by=["fake-dep"]) | |
| tasks = store.list_tasks(status=TaskStatus.blocked) | |
| assert len(tasks) == 1 | |
| assert tasks[0].id == t2.id | |
| def test_list_filter_by_owner(self, store): | |
| store.create("alice-task", owner="alice") | |
| store.create("bob-task", owner="bob") | |
| tasks = store.list_tasks(owner="alice") | |
| assert len(tasks) == 1 | |
| assert tasks[0].owner == "alice" | |
| def test_list_empty(self, store): | |
| assert store.list_tasks() == [] | |
| def test_list_filter_by_priority(self, store): | |
| store.create("urgent-task", priority=TaskPriority.urgent) | |
| store.create("low-task", priority=TaskPriority.low) | |
| tasks = store.list_tasks(priority=TaskPriority.urgent) | |
| assert len(tasks) == 1 | |
| assert tasks[0].priority == TaskPriority.urgent | |
| def test_list_sort_by_priority(self, store): | |
| store.create("medium-task") | |
| store.create("low-task", priority=TaskPriority.low) | |
| store.create("urgent-task", priority=TaskPriority.urgent) | |
| store.create("high-task", priority=TaskPriority.high) | |
| tasks = store.list_tasks(sort_by_priority=True) | |
| assert [task.priority for task in tasks] == [ | |
| TaskPriority.urgent, | |
| TaskPriority.high, | |
| TaskPriority.medium, | |
| TaskPriority.low, | |
| ] | |
| class TestDependencyResolution: | |
| """When a task completes, its dependents should get unblocked.""" | |
| def test_completing_task_unblocks_dependent(self, store): | |
| t1 = store.create("prerequisite") | |
| t2 = store.create("depends on t1", blocked_by=[t1.id]) | |
| assert t2.status == TaskStatus.blocked | |
| store.update(t1.id, status=TaskStatus.completed) | |
| t2_after = store.get(t2.id) | |
| assert t2_after.status == TaskStatus.pending | |
| assert t1.id not in t2_after.blocked_by | |
| def test_partial_unblock_stays_blocked(self, store): | |
| """If a task depends on two things, completing one shouldn't unblock it.""" | |
| t1 = store.create("dep-1") | |
| t2 = store.create("dep-2") | |
| t3 = store.create("needs both", blocked_by=[t1.id, t2.id]) | |
| store.update(t1.id, status=TaskStatus.completed) | |
| t3_after = store.get(t3.id) | |
| assert t3_after.status == TaskStatus.blocked | |
| assert t2.id in t3_after.blocked_by | |
| def test_full_unblock_after_all_deps_complete(self, store): | |
| t1 = store.create("dep-1") | |
| t2 = store.create("dep-2") | |
| t3 = store.create("needs both", blocked_by=[t1.id, t2.id]) | |
| store.update(t1.id, status=TaskStatus.completed) | |
| store.update(t2.id, status=TaskStatus.completed) | |
| t3_after = store.get(t3.id) | |
| assert t3_after.status == TaskStatus.pending | |
| assert t3_after.blocked_by == [] | |
| class TestTaskLocking: | |
| def test_lock_acquired_on_in_progress(self, store): | |
| t = store.create("lockable") | |
| # mock is_agent_alive to return None (unknown) so lock logic proceeds | |
| with patch("clawteam.spawn.registry.is_agent_alive", return_value=None): | |
| updated = store.update(t.id, status=TaskStatus.in_progress, caller="agent-a") | |
| assert updated.locked_by == "agent-a" | |
| def test_same_agent_can_relock(self, store): | |
| t = store.create("lockable") | |
| with patch("clawteam.spawn.registry.is_agent_alive", return_value=None): | |
| store.update(t.id, status=TaskStatus.in_progress, caller="agent-a") | |
| # same agent again, no error | |
| updated = store.update(t.id, status=TaskStatus.in_progress, caller="agent-a") | |
| assert updated.locked_by == "agent-a" | |
| def test_different_agent_blocked_by_lock(self, store): | |
| t = store.create("contested") | |
| with patch("clawteam.spawn.registry.is_agent_alive", return_value=True): | |
| store.update(t.id, status=TaskStatus.in_progress, caller="agent-a") | |
| with pytest.raises(TaskLockError): | |
| store.update(t.id, status=TaskStatus.in_progress, caller="agent-b") | |
| def test_force_overrides_lock(self, store): | |
| t = store.create("force-me") | |
| with patch("clawteam.spawn.registry.is_agent_alive", return_value=True): | |
| store.update(t.id, status=TaskStatus.in_progress, caller="agent-a") | |
| updated = store.update( | |
| t.id, status=TaskStatus.in_progress, caller="agent-b", force=True | |
| ) | |
| assert updated.locked_by == "agent-b" | |
| def test_dead_agent_lock_is_released(self, store): | |
| t = store.create("stale-lock") | |
| with patch("clawteam.spawn.registry.is_agent_alive", return_value=None): | |
| store.update(t.id, status=TaskStatus.in_progress, caller="dead-agent") | |
| # now dead-agent is dead, another agent should be able to take over | |
| with patch("clawteam.spawn.registry.is_agent_alive", return_value=False): | |
| updated = store.update(t.id, status=TaskStatus.in_progress, caller="live-agent") | |
| assert updated.locked_by == "live-agent" | |
| class TestDurationTracking: | |
| """Tests for the started_at / duration tracking feature.""" | |
| def test_started_at_set_on_in_progress(self, store): | |
| t = store.create("timed task") | |
| assert t.started_at == "" | |
| with patch("clawteam.team.tasks.TaskStore._acquire_lock"): | |
| updated = store.update(t.id, status=TaskStatus.in_progress, caller="a") | |
| assert updated.started_at != "" | |
| def test_started_at_not_overwritten_on_second_in_progress(self, store): | |
| """If a task goes in_progress twice, keep the original start time.""" | |
| t = store.create("double start") | |
| with patch("clawteam.team.tasks.TaskStore._acquire_lock"): | |
| updated = store.update(t.id, status=TaskStatus.in_progress, caller="a") | |
| first_start = updated.started_at | |
| with patch("clawteam.team.tasks.TaskStore._acquire_lock"): | |
| updated2 = store.update(t.id, status=TaskStatus.in_progress, caller="a") | |
| assert updated2.started_at == first_start | |
| def test_duration_computed_on_completion(self, store): | |
| t = store.create("will complete") | |
| with patch("clawteam.team.tasks.TaskStore._acquire_lock"): | |
| store.update(t.id, status=TaskStatus.in_progress, caller="a") | |
| completed = store.update(t.id, status=TaskStatus.completed) | |
| assert "duration_seconds" in completed.metadata | |
| # duration should be non-negative (task just started moments ago) | |
| assert completed.metadata["duration_seconds"] >= 0 | |
| def test_no_duration_without_started_at(self, store): | |
| """Completing a task that was never in_progress shouldn't crash.""" | |
| t = store.create("skip to done") | |
| completed = store.update(t.id, status=TaskStatus.completed) | |
| assert "duration_seconds" not in completed.metadata | |
| def test_started_at_persists_through_serialization(self, store): | |
| t = store.create("persist check") | |
| with patch("clawteam.team.tasks.TaskStore._acquire_lock"): | |
| store.update(t.id, status=TaskStatus.in_progress, caller="a") | |
| reloaded = store.get(t.id) | |
| assert reloaded.started_at != "" | |
| def test_started_at_alias(self): | |
| """The field should serialize as 'startedAt' (camelCase).""" | |
| t = TaskItem(subject="alias test") | |
| dumped = t.model_dump(by_alias=True) | |
| assert "startedAt" in dumped | |
| class TestGetStats: | |
| def test_stats_empty_team(self, store): | |
| stats = store.get_stats() | |
| assert stats["total"] == 0 | |
| assert stats["completed"] == 0 | |
| assert stats["avg_duration_seconds"] == 0.0 | |
| def test_stats_counts(self, store): | |
| store.create("one") | |
| store.create("two") | |
| t3 = store.create("three") | |
| store.update(t3.id, status=TaskStatus.completed) | |
| stats = store.get_stats() | |
| assert stats["total"] == 3 | |
| assert stats["completed"] == 1 | |
| assert stats["pending"] == 2 | |
| def test_stats_with_timed_tasks(self, store): | |
| t = store.create("timed") | |
| with patch("clawteam.team.tasks.TaskStore._acquire_lock"): | |
| store.update(t.id, status=TaskStatus.in_progress, caller="a") | |
| store.update(t.id, status=TaskStatus.completed) | |
| stats = store.get_stats() | |
| assert stats["timed_completed"] == 1 | |
| assert stats["avg_duration_seconds"] >= 0 | |
| def test_stats_avg_excludes_untimed(self, store): | |
| """Tasks completed without going through in_progress shouldn't affect avg.""" | |
| # one task goes through the full flow | |
| t1 = store.create("full flow") | |
| with patch("clawteam.team.tasks.TaskStore._acquire_lock"): | |
| store.update(t1.id, status=TaskStatus.in_progress, caller="a") | |
| store.update(t1.id, status=TaskStatus.completed) | |
| # another task jumps straight to completed | |
| t2 = store.create("shortcut") | |
| store.update(t2.id, status=TaskStatus.completed) | |
| stats = store.get_stats() | |
| assert stats["completed"] == 2 | |
| assert stats["timed_completed"] == 1 | |