File size: 2,539 Bytes
cca9a63 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | """
Helper utilities for the ChipForge server environment.
"""
import json
import subprocess
from pathlib import Path
from typing import Any, Dict, List
from .constants import TASKS_DIR, TOOL_TIMEOUT
def discover_tasks() -> List[Path]:
"""Return sorted list of task directories under TASKS_DIR (recursive)."""
if not TASKS_DIR.is_dir():
return []
return sorted(
p.parent
for p in TASKS_DIR.rglob("task.json")
if p.is_file() and p.parent.is_dir()
)
def categorize_tasks(tasks: List[Path]) -> Dict[str, List[Path]]:
"""Organize tasks into difficulty categories."""
categorized_tasks = {"easy": [], "medium": [], "hard": []}
for task in tasks:
try:
with open(task / "task.json", "r") as f:
meta = json.load(f)
diff = meta.get("difficulty", "medium").lower()
if diff in categorized_tasks:
categorized_tasks[diff].append(task)
else:
categorized_tasks["medium"].append(task)
except Exception:
categorized_tasks["medium"].append(task)
return categorized_tasks
def run_tool(cmd: List[str], cwd: str, timeout: int = TOOL_TIMEOUT) -> Dict[str, Any]:
"""Run a shell command and return stdout, stderr, returncode."""
try:
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout,
)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode,
}
except subprocess.TimeoutExpired:
return {
"stdout": "",
"stderr": f"Command timed out after {timeout}s",
"returncode": -1,
}
except FileNotFoundError as e:
return {
"stdout": "",
"stderr": f"Tool not found: {e}",
"returncode": -1,
}
def extract_error_summary(stderr: str, stdout: str = "") -> str:
"""Extract a one-line error summary from tool output."""
combined = stderr + "\n" + stdout
for line in combined.splitlines():
line = line.strip()
if not line:
continue
lower = line.lower()
if any(kw in lower for kw in ("error", "syntax", "warning", "latch", "fail")):
return line[:200]
for line in stderr.splitlines():
line = line.strip()
if line:
return line[:200]
return ""
|