workflow-orchestrator / tests /test_dag_executor.py
kartikmandar's picture
fix: close Phase 0/1 gaps β€” conftest.py, critical path computation
e5619df
"""Tests for DAG executor β€” dependency resolution, status transitions, validation."""
import pytest
from server.dag_executor import DAGExecutor
from server.task_registry import get_task
class TestDAGExecutorBasic:
def test_initial_ready_easy(self) -> None:
"""Easy task: only technical_design has no dependencies, so it's ready."""
config = get_task("easy")
dag = DAGExecutor(config.subtask_definitions)
ready = dag.get_ready_subtasks()
assert ready == ["technical_design"]
def test_initial_ready_medium(self) -> None:
"""Medium task: only checkout_code has no dependencies."""
config = get_task("medium")
dag = DAGExecutor(config.subtask_definitions)
assert dag.get_ready_subtasks() == ["checkout_code"]
def test_initial_ready_hard(self) -> None:
"""Hard task: only alert_triage has no dependencies."""
config = get_task("hard")
dag = DAGExecutor(config.subtask_definitions)
assert dag.get_ready_subtasks() == ["alert_triage"]
class TestDAGExecutorTransitions:
def _make_easy_dag(self) -> DAGExecutor:
return DAGExecutor(get_task("easy").subtask_definitions)
def test_delegate_and_complete(self) -> None:
dag = self._make_easy_dag()
dag.delegate("technical_design", "tech_lead")
assert dag.get_subtask_status("technical_design") == "in_progress"
assert "technical_design" in dag.get_in_progress_subtasks()
dag.complete("technical_design", "Design done")
assert dag.get_subtask_status("technical_design") == "completed"
assert dag.get_completed_outputs()["technical_design"] == "Design done"
def test_dependency_cascade(self) -> None:
"""Completing technical_design makes implement_backend ready."""
dag = self._make_easy_dag()
dag.delegate("technical_design", "tech_lead")
dag.complete("technical_design", "done")
dag.update_ready_statuses()
assert "implement_backend" in dag.get_ready_subtasks()
def test_parallelism_opportunity(self) -> None:
"""After implement_backend, both implement_frontend and write_tests become ready."""
dag = self._make_easy_dag()
dag.delegate("technical_design", "tech_lead")
dag.complete("technical_design", "done")
dag.update_ready_statuses()
dag.delegate("implement_backend", "backend_dev")
dag.complete("implement_backend", "done")
dag.update_ready_statuses()
ready = dag.get_ready_subtasks()
assert "implement_frontend" in ready
assert "write_tests" in ready
def test_fan_in(self) -> None:
"""run_tests needs BOTH implement_frontend and write_tests completed."""
dag = self._make_easy_dag()
# Walk through to fan-in point
for sid, agent in [
("technical_design", "tech_lead"),
("implement_backend", "backend_dev"),
]:
dag.delegate(sid, agent)
dag.complete(sid, "done")
dag.update_ready_statuses()
# Complete only one of the parallel tasks
dag.delegate("implement_frontend", "frontend_dev")
dag.complete("implement_frontend", "done")
dag.update_ready_statuses()
assert "run_tests" not in dag.get_ready_subtasks()
# Complete the other
dag.delegate("write_tests", "backend_dev")
dag.complete("write_tests", "done")
dag.update_ready_statuses()
assert "run_tests" in dag.get_ready_subtasks()
def test_fail_and_retry(self) -> None:
dag = self._make_easy_dag()
dag.delegate("technical_design", "tech_lead")
dag.fail("technical_design", "Something went wrong")
assert dag.get_subtask_status("technical_design") == "failed"
assert dag.get_subtask_attempt_count("technical_design") == 1
dag.retry("technical_design", "tech_lead")
assert dag.get_subtask_status("technical_design") == "in_progress"
def test_abort(self) -> None:
dag = self._make_easy_dag()
dag.abort("technical_design")
assert dag.get_subtask_status("technical_design") == "failed"
def test_cannot_delegate_pending(self) -> None:
dag = self._make_easy_dag()
with pytest.raises(ValueError, match="not.*ready"):
dag.delegate("implement_backend", "backend_dev")
def test_cannot_retry_completed(self) -> None:
dag = self._make_easy_dag()
dag.delegate("technical_design", "tech_lead")
dag.complete("technical_design", "done")
with pytest.raises(ValueError, match="not.*failed"):
dag.retry("technical_design", "tech_lead")
def test_cannot_abort_completed(self) -> None:
dag = self._make_easy_dag()
dag.delegate("technical_design", "tech_lead")
dag.complete("technical_design", "done")
with pytest.raises(ValueError, match="already completed"):
dag.abort("technical_design")
def test_unknown_subtask_raises(self) -> None:
dag = self._make_easy_dag()
with pytest.raises(KeyError):
dag.delegate("nonexistent", "tech_lead")
class TestDAGExecutorFullWalkthrough:
def test_easy_task_sequential(self) -> None:
"""Walk through the entire easy task sequentially and verify completion."""
dag = DAGExecutor(get_task("easy").subtask_definitions)
sequence = [
("technical_design", "tech_lead"),
("implement_backend", "backend_dev"),
("implement_frontend", "frontend_dev"),
("write_tests", "qa_engineer"),
("run_tests", "qa_engineer"),
("review_and_merge", "tech_lead"),
]
for sid, agent in sequence:
dag.update_ready_statuses()
assert sid in dag.get_ready_subtasks(), f"{sid} should be ready"
dag.delegate(sid, agent)
dag.complete(sid, f"{sid} done")
assert dag.is_all_completed()
assert len(dag.get_completed_outputs()) == 6
class TestCriticalPath:
def test_linear_dag_default_durations(self) -> None:
"""A→B→C with default durations (1 each) has critical path = 3."""
dag = DAGExecutor([
{"id": "a", "type": "x", "dependencies": []},
{"id": "b", "type": "x", "dependencies": ["a"]},
{"id": "c", "type": "x", "dependencies": ["b"]},
])
assert dag.compute_critical_path_length() == 3
def test_diamond_dag_takes_longest_branch(self) -> None:
"""A→[B,C]→D: if B takes 3 ticks and C takes 1, critical path = 1+3+1 = 5."""
dag = DAGExecutor([
{"id": "a", "type": "x", "dependencies": []},
{"id": "b", "type": "x", "dependencies": ["a"]},
{"id": "c", "type": "x", "dependencies": ["a"]},
{"id": "d", "type": "x", "dependencies": ["b", "c"]},
])
durations = {"a": 1, "b": 3, "c": 1, "d": 1}
assert dag.compute_critical_path_length(durations) == 5
def test_easy_task_critical_path(self) -> None:
"""Easy task: 6 nodes, all speed=1. Critical path = longest chain length."""
# Longest chain: technical_design β†’ implement_backend β†’ implement_frontend β†’ run_tests β†’ review_and_merge = 5
# (write_tests is parallel to implement_frontend, shorter branch)
dag = DAGExecutor(get_task("easy").subtask_definitions)
assert dag.compute_critical_path_length() == 5
def test_medium_task_critical_path(self) -> None:
"""Medium task: 9-node pipeline with fan-out. All durations=1 β†’ longest chain = 7."""
# checkout β†’ (longest of lint/tests/scan) β†’ build β†’ push β†’ staging β†’ smoke β†’ prod
# The fan-out [lint, unit_tests, security_scan] are parallel, each takes 1 tick
# So critical path = 7 (the 7-node main chain through any fan-out branch)
dag = DAGExecutor(get_task("medium").subtask_definitions)
assert dag.compute_critical_path_length() == 7
class TestDAGValidation:
def test_cycle_detection(self) -> None:
"""DAG with a cycle should raise ValueError."""
cyclic = [
{"id": "a", "type": "x", "dependencies": ["b"]},
{"id": "b", "type": "x", "dependencies": ["a"]},
]
with pytest.raises(ValueError, match="cycle"):
DAGExecutor(cyclic)
def test_unknown_dependency_raises(self) -> None:
with pytest.raises(ValueError, match="unknown subtask"):
DAGExecutor([
{"id": "a", "type": "x", "dependencies": ["nonexistent"]},
])
def test_subtask_infos_export(self) -> None:
dag = DAGExecutor(get_task("easy").subtask_definitions)
infos = dag.get_subtask_infos()
assert len(infos) == 6
assert all(hasattr(info, "id") for info in infos)
ready_count = sum(1 for info in infos if info.status == "ready")
assert ready_count == 1 # Only technical_design