| |
| """ |
| batch_convert.py β Batch convert skills >180 lines using micro-skill-pipeline logic. |
| |
| Implements the /skill-convert classification spec from stevesolun/micro-skills: |
| - "Do this" instructions -> Build step (03-build.md) |
| - "Check/avoid/ensure" instructions -> YES/NO gate questions (check-gates.md) |
| - Reference data (tables, lists, examples) -> Separate reference files |
| - Context/scope instructions -> Scope step (01-scope.md) |
| - Each pipeline file <= 40 lines |
| - Build step splits into 03a, 03b, ... if >40 lines |
| |
| Usage: |
| python batch_convert.py --scan ~/.claude/skills [--min-lines 180] [--dry-run] |
| python batch_convert.py --file ~/.claude/skills/fastapi-pro/SKILL.md |
| """ |
|
|
| import argparse |
| import hashlib |
| import json |
| import os |
| import re |
| import shutil |
| import sys |
| from datetime import datetime, timezone |
| from pathlib import Path |
|
|
|
|
| from ctx.utils._fs_utils import atomic_write_text as _atomic_write_text |
|
|
| from ctx_config import cfg |
|
|
| MIN_LINES = cfg.line_threshold |
| MAX_STAGE_LINES = cfg.max_stage_lines |
| TODAY = datetime.now(timezone.utc).strftime("%Y-%m-%d") |
|
|
|
|
| def _line_count(content: str) -> int: |
| return len(content.splitlines()) |
|
|
|
|
| |
|
|
| SCOPE_KEYWORDS = re.compile( |
| r"\b(scope|constraint|prerequisite|precondition|before you|context|when to use|" |
| r"trigger|activation|description|overview|purpose|applies when|input|requirements?)\b", |
| re.IGNORECASE, |
| ) |
| PLAN_KEYWORDS = re.compile( |
| r"\b(plan|approach|strategy|design|architecture|workflow|steps overview|" |
| r"phases?|methodology|algorithm|decision|trade-?off|alternative)\b", |
| re.IGNORECASE, |
| ) |
| GATE_KEYWORDS = re.compile( |
| r"\b(check|ensure|avoid|never|must not|must always|do not|don't|verify|validate|" |
| r"confirm|assert|require|guard|prevent|warning|caution|important|rule|" |
| r"forbidden|prohibited|mandatory|critical|always|quality|review|audit)\b", |
| re.IGNORECASE, |
| ) |
| DELIVER_KEYWORDS = re.compile( |
| r"\b(deliver|output|present|finalize|format|handoff|hand off|return|report|" |
| r"summary|cleanup|clean up|result|template|example output|response format)\b", |
| re.IGNORECASE, |
| ) |
| REFERENCE_INDICATORS = re.compile( |
| r"(\|.*\|.*\|)|" |
| r"(```[\s\S]{60,}```)|" |
| r"(\bexample\b.*:)|" |
| r"(^\s*[-*]\s+`[^`]+`\s*[-:])", |
| re.IGNORECASE | re.MULTILINE, |
| ) |
|
|
| DANGEROUS_MARKDOWN_REPLACEMENTS: tuple[tuple[str, str], ...] = ( |
| ("<?php", "<?php"), |
| ("<?PHP", "<?PHP"), |
| ("<?=", "<?="), |
| ("$(", "$​("), |
| ("curl http://", "cu​rl http://"), |
| ("curl https://", "cu​rl https://"), |
| ("wget http://", "wg​et http://"), |
| ("wget https://", "wg​et https://"), |
| ("bash -i", "ba​sh -i"), |
| ("/bin/bash", "/bin/ba​sh"), |
| ("/bin/sh", "/bin/​sh"), |
| ("/dev/tcp", "/dev/​tcp"), |
| ("| bash", "| ba​sh"), |
| ("|/bin/sh", "|/bin/​sh"), |
| ("|nc ", "|n​c "), |
| ("nc -e", "nc ​-e"), |
| ("rm /tmp", "r​m /tmp"), |
| ("mkfifo", "mk​fifo"), |
| ("cat /tmp", "ca​t /tmp"), |
| ("cat /etc/passwd", "ca​t /etc/passwd"), |
| ( |
| "type C:\\Windows\\System32\\config\\sam", |
| "ty​pe C:\\Windows\\System32\\config\\sam", |
| ), |
| ("head -n", "he​ad -n"), |
| ("base64 -d", "base64 ​-d"), |
| ("system(", "system​("), |
| ("exec(", "exec​("), |
| ("os.system", "os.​system"), |
| ("New-Object", "New​-Object"), |
| ("GetStream", "Get​Stream"), |
| ("Out-String", "Out​-String"), |
| ("iex ", "i​ex "), |
| ("System.Net.Sockets.TCPClient", "System.Net.Sockets.TCP​Client"), |
| ("System.Text.ASCIIEncoding", "System.Text.ASCII​Encoding"), |
| ) |
| DANGEROUS_MARKDOWN_REGEX_REPLACEMENTS: tuple[tuple[re.Pattern[str], str], ...] = ( |
| (re.compile(r"\bpowershell\b", re.IGNORECASE), "power​shell"), |
| ) |
|
|
|
|
| def defang_dangerous_markdown(text: str) -> str: |
| """Defang executable-looking snippets before writing generated markdown.""" |
| for needle, replacement in DANGEROUS_MARKDOWN_REPLACEMENTS: |
| text = text.replace(needle, replacement) |
| for pattern, replacement in DANGEROUS_MARKDOWN_REGEX_REPLACEMENTS: |
| text = pattern.sub(replacement, text) |
| return text |
|
|
|
|
| def classify_section(header: str, body: str) -> str: |
| """Classify a markdown section into a pipeline stage.""" |
| combined = f"{header}\n{body}" |
|
|
| |
| gate_hits = len(GATE_KEYWORDS.findall(combined)) |
| scope_hits = len(SCOPE_KEYWORDS.findall(combined)) |
| plan_hits = len(PLAN_KEYWORDS.findall(combined)) |
| deliver_hits = len(DELIVER_KEYWORDS.findall(combined)) |
|
|
| |
| if REFERENCE_INDICATORS.search(body) and len(body.split("\n")) > 10: |
| return "reference" |
|
|
| |
| scores = { |
| "scope": scope_hits * 2, |
| "plan": plan_hits * 2, |
| "gate": gate_hits * 3, |
| "deliver": deliver_hits * 2, |
| "build": 1, |
| } |
|
|
| |
| if re.search(r"^---\n.*?^---", body, re.MULTILINE | re.DOTALL): |
| scores["scope"] += 5 |
|
|
| best = max(scores, key=lambda name: scores[name]) |
| return best |
|
|
|
|
| def extract_gate_questions(text: str) -> list[str]: |
| """Extract and convert instructions into YES/NO gate questions.""" |
| questions: list[str] = [] |
| lines = text.split("\n") |
|
|
| for line in lines: |
| line_stripped = line.strip() |
| if not line_stripped: |
| continue |
|
|
| |
| if line_stripped.endswith("?"): |
| q = line_stripped.lstrip("-*0123456789.) ") |
| if q: |
| questions.append(q) |
| continue |
|
|
| |
| m = re.match(r"^[-*]\s*(?:avoid|don'?t|do not|never)\s+(.+)", line_stripped, re.IGNORECASE) |
| if m: |
| thing = m.group(1).rstrip(".") |
| questions.append(f"Is the output free of {thing}? YES/NO") |
| continue |
|
|
| |
| m = re.match(r"^[-*]\s*(?:ensure|always|must|require)\s+(.+)", line_stripped, re.IGNORECASE) |
| if m: |
| thing = m.group(1).rstrip(".") |
| questions.append(f"Does the output {thing}? YES/NO") |
| continue |
|
|
| |
| m = re.match(r"^[-*]\s*(?:check|verify|validate|confirm)\s+(?:that\s+)?(.+)", line_stripped, re.IGNORECASE) |
| if m: |
| thing = m.group(1).rstrip(".") |
| questions.append(f"Has {thing} been verified? YES/NO") |
| continue |
|
|
| return questions |
|
|
|
|
| def parse_sections(content: str) -> tuple[list[dict], str]: |
| """Parse a markdown document into sections by ## headers.""" |
| sections = [] |
| current_header = "" |
| current_body_lines: list[str] = [] |
|
|
| |
| content_stripped = content |
| fm_match = re.match(r"^---\n(.*?)\n---\n?", content, re.DOTALL) |
| frontmatter = "" |
| if fm_match: |
| frontmatter = fm_match.group(0) |
| content_stripped = content[fm_match.end():] |
|
|
| for line in content_stripped.split("\n"): |
| if re.match(r"^#{1,3}\s+", line): |
| |
| if current_header or current_body_lines: |
| sections.append({ |
| "header": current_header, |
| "body": "\n".join(current_body_lines).strip(), |
| }) |
| current_header = line |
| current_body_lines = [] |
| else: |
| current_body_lines.append(line) |
|
|
| |
| if current_header or current_body_lines: |
| sections.append({ |
| "header": current_header, |
| "body": "\n".join(current_body_lines).strip(), |
| }) |
|
|
| return sections, frontmatter |
|
|
|
|
| def split_into_chunks(text: str, max_lines: int) -> list[str]: |
| """Split text into chunks of max_lines, breaking at paragraph boundaries.""" |
| lines = text.split("\n") |
| if len(lines) <= max_lines: |
| return [text] |
|
|
| chunks = [] |
| current_chunk = [] |
| for line in lines: |
| current_chunk.append(line) |
| if len(current_chunk) >= max_lines: |
| |
| for i in range(len(current_chunk) - 1, max(0, len(current_chunk) - 10), -1): |
| if current_chunk[i].strip() == "": |
| break_at = i + 1 |
| chunks.append("\n".join(current_chunk[:break_at]).strip()) |
| current_chunk = current_chunk[break_at:] |
| break |
| else: |
| chunks.append("\n".join(current_chunk).strip()) |
| current_chunk = [] |
|
|
| if current_chunk: |
| chunks.append("\n".join(current_chunk).strip()) |
|
|
| return chunks |
|
|
|
|
| def build_chunk_filename(index: int) -> str: |
| """Return a Windows-safe build-stage filename for a zero-based chunk index.""" |
| if index < 0: |
| raise ValueError("chunk index must be non-negative") |
| if index < 26: |
| return f"03{chr(ord('a') + index)}-build.md" |
| return f"03-{index + 1:03d}-build.md" |
|
|
|
|
| def _fixed_line_chunks(text: str, max_lines: int) -> list[str]: |
| """Split markdown into hard line-count chunks.""" |
| lines = text.strip().split("\n") |
| return [ |
| "\n".join(lines[i:i + max_lines]).strip() |
| for i in range(0, len(lines), max_lines) |
| ] |
|
|
|
|
| def _stage_shard_path(path: Path, index: int) -> Path: |
| """Return a shard path that sorts next to its stage index file.""" |
| if index < 0: |
| raise ValueError("chunk index must be non-negative") |
| stem = path.stem |
| suffix = path.suffix |
| match = re.match(r"^(\d+)(.*)$", stem) |
| if match: |
| prefix, rest = match.groups() |
| if index < 26: |
| return path.with_name(f"{prefix}{chr(ord('a') + index)}{rest}{suffix}") |
| return path.with_name(f"{prefix}-{index + 1:03d}{rest}{suffix}") |
| return path.with_name(f"{stem}-{index + 1:03d}{suffix}") |
|
|
|
|
| |
|
|
| def convert_skill( |
| skill_path: Path | str, |
| output_dir: Path | str | None = None, |
| line_threshold: int | None = None, |
| *, |
| source_content: str | None = None, |
| skill_name: str | None = None, |
| preserve_original: bool = True, |
| ) -> dict: |
| """Convert a single skill file into a micro-skill pipeline. |
| |
| If output_dir is None, converts in-place (same directory as the skill). |
| Callers that already have trusted in-memory content can pass |
| source_content and preserve_original=False to avoid writing raw upstream |
| bodies as temporary SKILL.md files. |
| Returns stats dict. |
| """ |
| skill_path = Path(skill_path) |
| if output_dir is not None: |
| output_dir = Path(output_dir) |
| content = ( |
| source_content |
| if source_content is not None |
| else skill_path.read_text(encoding="utf-8", errors="replace") |
| ) |
| line_count = _line_count(content) |
| threshold = cfg.line_threshold if line_threshold is None else line_threshold |
|
|
| if line_count <= threshold: |
| return {"status": "skipped", "reason": f"{line_count} lines <= {threshold}"} |
|
|
| |
| source_hash = hashlib.sha256(content.encode("utf-8")).hexdigest() |
|
|
| |
| skill_name = skill_name or skill_path.parent.name |
| if output_dir is None: |
| output_dir = skill_path.parent |
|
|
| refs_dir = output_dir / "references" |
| refs_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| sections, frontmatter = parse_sections(content) |
|
|
| |
| scope_parts = [] |
| plan_parts = [] |
| build_parts = [] |
| gate_parts = [] |
| deliver_parts = [] |
| reference_parts = [] |
| all_gate_questions = [] |
|
|
| |
| desc_match = re.search(r"description:\s*[\"']?(.+?)[\"']?\s*$", frontmatter, re.MULTILINE) |
| skill_description = desc_match.group(1) if desc_match else f"Converted from {skill_name} SKILL.md" |
|
|
| for section in sections: |
| category = classify_section(section["header"], section["body"]) |
| combined = f"{section['header']}\n{section['body']}".strip() |
|
|
| if category == "scope": |
| scope_parts.append(combined) |
| elif category == "plan": |
| plan_parts.append(combined) |
| elif category == "gate": |
| gate_parts.append(combined) |
| |
| questions = extract_gate_questions(section["body"]) |
| all_gate_questions.extend(questions) |
| elif category == "deliver": |
| deliver_parts.append(combined) |
| elif category == "reference": |
| reference_parts.append(combined) |
| else: |
| build_parts.append(combined) |
|
|
| |
| if not scope_parts: |
| scope_parts.append(f"# Step 1: Scope\n\nExtract constraints from the request for {skill_name}.") |
| if not plan_parts: |
| plan_parts.append("# Step 2: Plan\n\nDesign the approach. Map components to constraints.") |
| if not build_parts: |
| build_parts.append("# Step 3: Build\n\nExecute the plan, building each component in order.") |
| if not deliver_parts: |
| deliver_parts.append("# Step 5: Deliver\n\nFinalize and present the output.") |
|
|
| |
| if not all_gate_questions and gate_parts: |
| for gp in gate_parts: |
| qs = extract_gate_questions(gp) |
| all_gate_questions.extend(qs) |
|
|
| |
| if not all_gate_questions: |
| all_gate_questions = [ |
| f"Does the output follow all constraints specified in the {skill_name} skill? YES/NO", |
| "Is every element purposeful (no dead code, no placeholder text)? YES/NO", |
| "Is the output usable as-is with no manual fixes needed? YES/NO", |
| ] |
|
|
| |
|
|
| |
| |
| |
| |
| if preserve_original: |
| original_path = output_dir / "SKILL.md.original" |
| if not original_path.exists(): |
| if source_content is None: |
| shutil.copy2(skill_path, original_path) |
| else: |
| original_path.write_text(source_content, encoding="utf-8") |
|
|
| |
| scope_text = "\n\n".join(scope_parts) |
| scope_text = _ensure_header(scope_text, "# Step 1: Scope") |
| scope_text += "\n\n## Gate\n\n- Can I state the deliverable in one sentence? YES/NO\n- Have I listed at least one explicit constraint? YES/NO\n- Do I know what inputs I'm working with? YES/NO\n\nAll YES = proceed. Any NO = ask the user one clarifying question." |
| _write_stage(refs_dir / "01-scope.md", scope_text) |
|
|
| |
| plan_text = "\n\n".join(plan_parts) |
| plan_text = _ensure_header(plan_text, "# Step 2: Plan") |
| plan_text += "\n\n## Gate\n\n- Does every constraint from Step 1 map to at least one component? YES/NO\n- Is the build order explicit? YES/NO\n- Have I checked the failure log? YES/NO\n\nAll YES = proceed. Any NO = revise." |
| _write_stage(refs_dir / "02-plan.md", plan_text) |
|
|
| |
| build_text = "\n\n".join(build_parts) |
| build_text = _ensure_header(build_text, "# Step 3: Build") |
| build_text += "\n\n## Gate\n\n- Have all components from the plan been built? YES/NO\n- Did every component pass its micro-check? YES/NO\n- Does the assembled output match the deliverable from Step 1? YES/NO\n\nAll YES = proceed. Any NO = rebuild the failing component." |
| build_chunks = split_into_chunks(build_text, MAX_STAGE_LINES) |
| build_files = [] |
| if len(build_chunks) == 1: |
| _write_stage(refs_dir / "03-build.md", build_chunks[0]) |
| build_files.append("references/03-build.md") |
| else: |
| for i, chunk in enumerate(build_chunks): |
| fname = build_chunk_filename(i) |
| _write_stage(refs_dir / fname, chunk) |
| build_files.append(f"references/{fname}") |
|
|
| |
| check_text = "# Step 4: Check\n\nHard gate. Assume there are problems. Find them.\nAnswer every question YES or NO. \"Mostly yes\" = NO.\n\n" |
| check_text += "## Universal Checks\n\n" |
| check_text += "1. Does the output match the deliverable from Step 1? YES/NO\n" |
| check_text += "2. Are all constraints satisfied? YES/NO\n" |
| check_text += "3. Does every element serve a purpose? YES/NO\n" |
| check_text += "4. Is the output usable as-is with no manual fixes? YES/NO\n" |
| check_text += "5. If code: does it run without errors? YES/NO\n\n" |
| check_text += "## Domain Checks\n\nLoad `check-gates.md` and answer every question there.\n\n" |
| check_text += "## Failure Log\n\n6. Re-read `failure-log.md`. Does the output violate any pattern? YES/NO\n\n" |
| check_text += "## On Failure\n\n- For each NO: state what is wrong in one sentence.\n- Fix each issue.\n- Re-run this entire checklist.\n- After passing: append a one-line pattern to `failure-log.md`." |
| _write_stage(refs_dir / "04-check.md", check_text) |
|
|
| |
| deliver_text = "\n\n".join(deliver_parts) |
| deliver_text = _ensure_header(deliver_text, "# Step 5: Deliver") |
| deliver_text += "\n\n## Gate\n\n- Is the output in its final location? YES/NO\n- Is the summary concise (under 5 sentences)? YES/NO\n- Are all temp artifacts cleaned up? YES/NO\n\nAll YES = done." |
| _write_stage(refs_dir / "05-deliver.md", deliver_text) |
|
|
| |
| ref_file_list = [] |
| for i, ref in enumerate(reference_parts): |
| fname = f"ref-{i + 1:02d}.md" |
| _write_stage(refs_dir / fname, ref) |
| ref_file_list.append(f"references/{fname}") |
|
|
| |
| gates_text = f"# Domain Gate Questions -- {skill_name}\n\nAnswer each YES or NO. Any NO = fix before proceeding.\n\n" |
| for i, q in enumerate(all_gate_questions[:20], 1): |
| gates_text += f"{i}. {q}\n" |
| (output_dir / "check-gates.md").write_text( |
| defang_dangerous_markdown(gates_text), |
| encoding="utf-8", |
| ) |
|
|
| |
| failure_text = "# Failure Log\nOne-line patterns learned from past mistakes.\n" |
| (output_dir / "failure-log.md").write_text(failure_text, encoding="utf-8") |
|
|
| |
| (output_dir / "original-hash.txt").write_text(source_hash + "\n", encoding="utf-8") |
|
|
| |
| build_ref_str = "" |
| if len(build_files) == 1: |
| build_ref_str = f"Read `{build_files[0]}`." |
| else: |
| build_ref_str = " then ".join(f"`{f}`" for f in build_files) |
| build_ref_str = f"Read {build_ref_str}." |
|
|
| |
| orchestrator = f"""--- |
| name: {skill_name} |
| description: "{skill_description}" |
| --- |
| |
| # {skill_name} |
| |
| When this skill triggers, execute the following gated pipeline. |
| One step at a time. Do NOT skip ahead. |
| |
| ## Pipeline |
| |
| 1. **Scope** -- Read `references/01-scope.md`. Extract constraints from the request. |
| 2. **Plan** -- Read `references/02-plan.md`. Design the approach. Map components. |
| 3. **Build** -- {build_ref_str} Execute with micro-checks per component. |
| 4. **Check** -- Read `references/04-check.md`. Answer every gate question YES or NO. Any NO = fix. |
| 5. **Deliver** -- Read `references/05-deliver.md`. Finalize and present. |
| |
| ## Failure Log |
| |
| Read `failure-log.md` before starting. Every pattern is a mandatory constraint. |
| |
| ## Rules |
| |
| - Read each reference file when you reach that step, not all at once. |
| - Step 4 (Check) is the hard gate. "Mostly yes" counts as NO. |
| - On Check failure: fix, re-run full checklist, append pattern to `failure-log.md`. |
| """ |
| |
| |
| _atomic_write_text(output_dir / "SKILL.md", orchestrator.strip() + "\n") |
|
|
| |
| all_pipeline_files = list(refs_dir.glob("*.md")) + [ |
| output_dir / "SKILL.md", |
| output_dir / "check-gates.md", |
| output_dir / "failure-log.md", |
| ] |
| max_lines = 0 |
| total_files = len(all_pipeline_files) |
| for f in all_pipeline_files: |
| if f.exists(): |
| try: |
| lc = len(f.read_text(encoding="utf-8", errors="replace").split("\n")) |
| except OSError: |
| continue |
| if lc > max_lines: |
| max_lines = lc |
|
|
| return { |
| "status": "converted", |
| "skill": skill_name, |
| "original_lines": line_count, |
| "pipeline_files": total_files, |
| "gate_questions": len(all_gate_questions[:20]), |
| "max_file_lines": max_lines, |
| "build_splits": len(build_files), |
| "reference_files": len(ref_file_list), |
| } |
|
|
|
|
| def _ensure_header(text: str, default_header: str) -> str: |
| """Ensure text starts with a markdown header.""" |
| if not text.strip().startswith("#"): |
| return f"{default_header}\n\n{text}" |
| return text |
|
|
|
|
| def _write_stage(path: Path, text: str) -> None: |
| """Write a stage file, splitting into sub-files if >MAX_STAGE_LINES.""" |
| text = defang_dangerous_markdown(text).strip() |
| lines = text.split("\n") |
| if len(lines) <= MAX_STAGE_LINES: |
| path.write_text(text + "\n", encoding="utf-8") |
| return |
|
|
| chunks = _fixed_line_chunks(text, MAX_STAGE_LINES) |
| shard_paths = [] |
| for i, chunk in enumerate(chunks): |
| shard_path = _stage_shard_path(path, i) |
| shard_path.write_text(chunk + "\n", encoding="utf-8") |
| shard_paths.append(shard_path) |
|
|
| index_text = ( |
| f"# {path.stem}\n\n" |
| f"This generated stage was split into {len(shard_paths)} shards to keep " |
| f"each file under {MAX_STAGE_LINES} lines.\n" |
| "Read the shard files in sorted filename order.\n\n" |
| f"First shard: `{shard_paths[0].name}`\n" |
| f"Last shard: `{shard_paths[-1].name}`\n" |
| ) |
| path.write_text(index_text, encoding="utf-8") |
|
|
|
|
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Batch convert skills to micro-skill pipeline") |
| parser.add_argument("--scan", help="Directory to scan for SKILL.md files") |
| parser.add_argument("--file", help="Single SKILL.md file to convert") |
| parser.add_argument("--min-lines", type=int, default=cfg.line_threshold, help=f"Minimum lines to convert (default: {cfg.line_threshold})") |
| parser.add_argument("--dry-run", action="store_true", help="Just count, don't convert") |
| parser.add_argument("--extra-dirs", nargs="*", help="Additional directories to scan") |
| args = parser.parse_args() |
|
|
| min_lines_val = args.min_lines |
|
|
| if args.file: |
| path = Path(args.file) |
| if not path.exists(): |
| print(f"File not found: {path}", file=sys.stderr) |
| sys.exit(1) |
| result = convert_skill(path, line_threshold=min_lines_val) |
| print(json.dumps(result, indent=2)) |
| return |
|
|
| if not args.scan: |
| print("Error: --scan DIR or --file PATH required", file=sys.stderr) |
| sys.exit(1) |
|
|
| |
| scan_dirs = [Path(os.path.expanduser(args.scan))] |
| if args.extra_dirs: |
| for d in args.extra_dirs: |
| scan_dirs.append(Path(os.path.expanduser(d))) |
|
|
| skill_files = [] |
| for scan_dir in scan_dirs: |
| if not scan_dir.exists(): |
| print(f"Warning: {scan_dir} does not exist, skipping", file=sys.stderr) |
| continue |
| for skill_md in scan_dir.rglob("SKILL.md"): |
| |
| if (skill_md.parent / "SKILL.md.original").exists(): |
| continue |
| try: |
| line_count = _line_count(skill_md.read_text(encoding="utf-8", errors="replace")) |
| if line_count > min_lines_val: |
| skill_files.append((skill_md, line_count)) |
| except Exception as exc: |
| print(f"Warning: failed to read skill file {skill_md}: {exc}", file=sys.stderr) |
|
|
| print(f"Found {len(skill_files)} skills > {min_lines_val} lines") |
|
|
| if args.dry_run: |
| for sf, lc in sorted(skill_files, key=lambda x: -x[1])[:20]: |
| print(f" {lc:5d} lines {sf.parent.name}") |
| if len(skill_files) > 20: |
| print(f" ... and {len(skill_files) - 20} more") |
| return |
|
|
| |
| converted = 0 |
| errors = 0 |
| skipped = 0 |
| for i, (sf, lc) in enumerate(skill_files): |
| try: |
| result = convert_skill(sf, line_threshold=min_lines_val) |
| if result["status"] == "converted": |
| converted += 1 |
| if (i + 1) % 50 == 0: |
| print(f" [{i + 1}/{len(skill_files)}] converted: {result['skill']} ({result['original_lines']} -> {result['pipeline_files']} files, {result['gate_questions']} gates)") |
| else: |
| skipped += 1 |
| except Exception as e: |
| errors += 1 |
| print(f" ERROR: {sf.parent.name}: {e}", file=sys.stderr) |
|
|
| print(f"\nDone: {converted} converted, {skipped} skipped, {errors} errors") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|