DGX_AI / codeforge /sandbox /tools.py
vasiuuu's picture
Initial commit for CodeForge GRPO training
acf77ab
from __future__ import annotations
import json
import re
import shutil
from typing import TYPE_CHECKING
from codeforge.sandbox.models import ParsedResult, ToolResult
if TYPE_CHECKING:
from pathlib import Path
DEFAULT_TOOLS: tuple[str, ...] = ("ruff", "mypy", "pytest", "imports")
_MYPY_ERROR_RE = re.compile(r"Found (\d+) errors?")
def argv_for(name: str, project_dir: Path) -> list[str]:
"""Return the argv for running a tool. Runner sets cwd=project_dir."""
del project_dir
if name == "ruff":
return ["ruff", "check", "--output-format", "json", "."]
if name == "mypy":
return ["mypy", "--no-incremental", "--strict", "."]
if name == "pytest":
return ["pytest", "-q", "--tb=line", "--no-header"]
if name == "pip-audit":
return ["pip-audit", "--format", "json"]
msg = f"unknown tool: {name}"
raise ValueError(msg)
def parse(name: str, tool_result: ToolResult) -> ParsedResult:
if name == "ruff":
return _parse_ruff(tool_result)
if name == "mypy":
return _parse_mypy(tool_result)
if name == "pytest":
return _parse_pytest(tool_result)
if name == "pip-audit":
return _parse_pip_audit(tool_result)
return ParsedResult(ok=tool_result.exit_code == 0, count=0, details={})
def _parse_ruff(tr: ToolResult) -> ParsedResult:
try:
items = json.loads(tr.stdout or "[]")
except json.JSONDecodeError:
return ParsedResult(ok=False, count=0, details={"parse_error": tr.stdout[:500]})
count = len(items) if isinstance(items, list) else 0
return ParsedResult(
ok=count == 0 and tr.exit_code == 0,
count=count,
details={"violations": items[:20]},
)
def _parse_mypy(tr: ToolResult) -> ParsedResult:
if tr.exit_code == 0 and "Success" in tr.stdout:
return ParsedResult(ok=True, count=0, details={})
m = _MYPY_ERROR_RE.search(tr.stdout)
count = int(m.group(1)) if m else 0
return ParsedResult(ok=False, count=count, details={"tail": tr.stdout[-500:]})
def _parse_pytest(tr: ToolResult) -> ParsedResult:
ok = tr.exit_code == 0
return ParsedResult(ok=ok, count=0 if ok else 1, details={"tail": tr.stdout[-500:]})
def _parse_pip_audit(tr: ToolResult) -> ParsedResult:
if tr.exit_code == 0:
return ParsedResult(ok=True, count=0, details={})
try:
data = json.loads(tr.stdout or "{}")
except json.JSONDecodeError:
data = {}
count = len(data.get("vulnerabilities", []))
return ParsedResult(ok=count == 0, count=count, details={"tail": tr.stdout[-500:]})
def is_available(name: str) -> bool:
if name == "imports":
return True
return shutil.which(name) is not None