from __future__ import annotations import json import subprocess import tempfile import unittest from pathlib import Path ROOT = Path(__file__).resolve().parents[1] SCRIPT = ROOT / "scripts" / "run-task-batches.ps1" DEFAULT_CONFIG = ROOT / "config" / "task-batch-runner.json" def run_script(config: dict) -> subprocess.CompletedProcess[str]: with tempfile.TemporaryDirectory() as tmp: config_path = Path(tmp) / "task-batch-runner.json" config = {"loadLocalConfig": False, **config} config_path.write_text(json.dumps(config), encoding="utf-8") return subprocess.run( [ "powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-File", str(SCRIPT), "-ConfigPath", str(config_path), "-DryRun", "-PlanJson", ], cwd=ROOT, text=True, encoding="utf-8", errors="replace", capture_output=True, check=False, ) class TaskBatchRunnerTests(unittest.TestCase): def test_dry_run_builds_pipeline_command_with_max_concurrency(self) -> None: result = run_script( { "tasks": ["a", "b", "c", "d", "e", "f", "g"], "batchSize": 3, "runsDir": "output/custom-runs", "maxRounds": 4, "timeoutSeconds": 99, "timestampPrefix": "unit", } ) self.assertEqual(result.returncode, 0, result.stderr) plan = json.loads(result.stdout) self.assertEqual(plan["tasks"], ["a", "b", "c", "d", "e", "f", "g"]) self.assertEqual(plan["maxConcurrentTasks"], 3) self.assertIs(plan["continueOnFailure"], True) command = plan["command"] for task in ["a", "b", "c", "d", "e", "f", "g"]: self.assertIn(f"'--task' '{task}'", command) self.assertIn("'--concurrency' '3'", command) self.assertIn("'--runs-dir' 'output/custom-runs'", command) self.assertIn("'--max-rounds' '4'", command) self.assertIn("'--timeout-seconds' '99'", command) def test_rejects_empty_task_list(self) -> None: result = run_script({"tasks": [], "batchSize": 3}) self.assertNotEqual(result.returncode, 0) self.assertIn("tasks", result.stderr.lower()) def test_mri_example_config_runs_requested_tasks_in_one_batch(self) -> None: result = run_script( { "tasks": ["mri_sense", "mri_tv"], "batchSize": 3, "timestampPrefix": "mri_sense_tv_rerun", } ) self.assertEqual(result.returncode, 0, result.stderr) plan = json.loads(result.stdout) self.assertEqual(plan["tasks"], ["mri_sense", "mri_tv"]) self.assertEqual(plan["maxConcurrentTasks"], 3) self.assertIn("'--task' 'mri_sense'", plan["command"]) self.assertIn("'--task' 'mri_tv'", plan["command"]) def test_default_config_is_valid_task_set(self) -> None: config = json.loads(DEFAULT_CONFIG.read_text(encoding="utf-8")) self.assertIsInstance(config.get("tasks"), list) self.assertGreater(len(config["tasks"]), 0) self.assertTrue(all(isinstance(task, str) and task.strip() for task in config["tasks"])) self.assertEqual(config.get("batchSize"), 3) self.assertIs(config.get("continueOnFailure", True), True) def test_default_config_dry_run_matches_configured_pipeline(self) -> None: config = json.loads(DEFAULT_CONFIG.read_text(encoding="utf-8")) tasks = config["tasks"] batch_size = config.get("batchSize", 3) result = subprocess.run( [ "powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-File", str(SCRIPT), "-DryRun", "-PlanJson", ], cwd=ROOT, text=True, encoding="utf-8", errors="replace", capture_output=True, check=False, ) self.assertEqual(result.returncode, 0, result.stderr) plan = json.loads(result.stdout) self.assertEqual(plan["tasks"], tasks) self.assertEqual(plan["maxConcurrentTasks"], batch_size) self.assertIs(plan["continueOnFailure"], True) self.assertIn(f"'--task' '{tasks[0]}'", plan["command"]) self.assertIn(f"'--task' '{tasks[-1]}'", plan["command"]) if __name__ == "__main__": unittest.main()