test / executor.py
Jack-Khuu
Demo
88a1dd2
import json
import os
import re
import shutil
import subprocess
import tempfile
from abc import ABC, abstractmethod
from collections.abc import Generator
from dataclasses import dataclass
from pathlib import Path
@dataclass
class KernelRequest:
skill_path: str # path to SKILL.md
prompt: str # user's kernel generation prompt
@dataclass
class KernelResponse:
streaming_output: str = "" # intermediate output (from assistant events)
final_output: str = "" # final result (from result event only)
log: str = "" # full log of all events
file_tree: str = "" # text-based tree of generated files
files: dict[str, str] | None = None # relative_path -> content
error: str | None = None # error message if failed
class Executor(ABC):
@abstractmethod
def execute(self, request: KernelRequest) -> KernelResponse:
...
@abstractmethod
def stream(self, request: KernelRequest) -> Generator[KernelResponse, None, None]:
...
def _scan_generated_files(tmpdir: str) -> tuple[str, dict[str, str]]:
"""Walk tmpdir for files Claude created, skipping .claude/ scaffolding.
Returns (tree_string, {relative_path: file_content}).
"""
root = Path(tmpdir)
files: dict[str, str] = {}
for path in sorted(root.rglob("*")):
if not path.is_file():
continue
rel = path.relative_to(root)
# Skip our scaffolding
if rel.parts[0] == ".claude":
continue
try:
files[str(rel)] = path.read_text()
except (OSError, UnicodeDecodeError):
files[str(rel)] = "<binary or unreadable>"
if not files:
return "(no files generated)", {}
# Build a simple text tree
lines = []
names = list(files.keys())
for i, name in enumerate(names):
connector = "└── " if i == len(names) - 1 else "β”œβ”€β”€ "
lines.append(f"{connector}{name}")
return "\n".join(lines), files
def _parse_frontmatter(content: str) -> dict[str, str]:
"""Extract YAML frontmatter fields from a SKILL.md file."""
match = re.match(r"^---\s*\n(.*?)\n---", content, re.DOTALL)
if not match:
return {}
fields = {}
for line in match.group(1).splitlines():
if ":" in line:
key, _, value = line.partition(":")
fields[key.strip()] = value.strip().strip('"')
return fields
class LocalCLIExecutor(Executor):
"""Runs `claude` CLI with native skill discovery via a temp directory.
Copies the SKILL.md and all files listed in manifest.txt into a
temporary .claude/skills/<name>/ directory, then runs `claude -p`
from there so the skill is discovered natively without touching
the real project's .claude/ directory.
"""
def __init__(self, project_dir: str | None = None):
self.project_dir = project_dir or os.getcwd()
self._prev_tmpdir: str | None = None
def _load_skill(self, skill_path: str) -> tuple[str | None, Path, str, dict[str, str]]:
"""Read skill file and parse its frontmatter.
Returns (error_or_none, skill_dir, file_content, frontmatter_dict).
"""
src = Path(skill_path)
if not src.is_absolute():
src = Path(self.project_dir) / src
if not src.exists():
return f"SKILL.md not found at: {src}", Path(), "", {}
try:
content = src.read_text()
except OSError as e:
return f"Failed to read SKILL.md: {e}", Path(), "", {}
frontmatter = _parse_frontmatter(content)
return None, src.parent, content, frontmatter
def _install_skill_files(self, skill_src_dir: Path, dest_dir: Path) -> str | None:
"""Copy all files listed in manifest.txt into the dest skill directory.
If no manifest.txt exists, only the SKILL.md (already written by caller)
is present. Returns an error string on failure, None on success.
"""
manifest = skill_src_dir / "manifest.txt"
if not manifest.exists():
return None
try:
lines = manifest.read_text().splitlines()
except OSError as e:
return f"Failed to read manifest.txt: {e}"
for line in lines:
entry = line.strip()
if not entry or entry.startswith("#"):
continue
if entry == "SKILL.md":
continue
src_file = skill_src_dir / entry
dest_file = dest_dir / entry
if not src_file.exists():
continue
try:
dest_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_file, dest_file)
except OSError as e:
return f"Failed to copy {entry}: {e}"
return None
def _prepare(self, request: KernelRequest) -> tuple[str | None, str, dict[str, str], str]:
"""Set up temp dir with skill files. Returns (error, skill_name, frontmatter, tmpdir_path)."""
# Clean up previous run's temp dir
if self._prev_tmpdir:
shutil.rmtree(self._prev_tmpdir, ignore_errors=True)
self._prev_tmpdir = None
err, skill_src_dir, skill_content, frontmatter = self._load_skill(request.skill_path)
if err:
return err, "", {}, ""
skill_name = frontmatter.get("name", "skill")
tmpdir = tempfile.mkdtemp()
skill_dest = Path(tmpdir) / ".claude" / "skills" / skill_name
skill_dest.mkdir(parents=True)
(skill_dest / "SKILL.md").write_text(skill_content)
err = self._install_skill_files(skill_src_dir, skill_dest)
if err:
shutil.rmtree(tmpdir, ignore_errors=True)
return err, "", {}, ""
return None, skill_name, frontmatter, tmpdir
def _build_cmd(self, request: KernelRequest, frontmatter: dict[str, str], streaming: bool = False) -> list[str]:
cmd = ["claude"]
if streaming:
cmd.extend(["--output-format", "stream-json", "--verbose"])
allowed_tools = frontmatter.get("allowed-tools")
if allowed_tools:
tools = [t.strip() for t in allowed_tools.split(",")]
cmd.extend(["--allowedTools"] + tools)
# -p and prompt go last so flags aren't misinterpreted
cmd.extend(["-p", request.prompt])
return cmd
def execute(self, request: KernelRequest) -> KernelResponse:
err, skill_name, frontmatter, tmpdir = self._prepare(request)
if err:
return KernelResponse(error=err)
cmd = self._build_cmd(request, frontmatter)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120,
cwd=tmpdir,
)
except FileNotFoundError:
return KernelResponse(
error="'claude' CLI not found. Make sure it is installed and on your PATH.",
)
except subprocess.TimeoutExpired:
return KernelResponse(
error="Claude CLI timed out after 120 seconds.",
)
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
if result.returncode != 0:
return KernelResponse(
error=f"Claude CLI exited with code {result.returncode}: {result.stderr}",
)
return KernelResponse(final_output=result.stdout)
def stream(self, request: KernelRequest) -> Generator[KernelResponse, None, None]:
err, skill_name, frontmatter, tmpdir = self._prepare(request)
if err:
yield KernelResponse(error=err)
return
cmd = self._build_cmd(request, frontmatter, streaming=True)
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # line-buffered
cwd=tmpdir,
)
except FileNotFoundError:
shutil.rmtree(tmpdir, ignore_errors=True)
yield KernelResponse(
error="'claude' CLI not found. Make sure it is installed and on your PATH.",
)
return
streaming = ""
final = ""
log_lines: list[str] = []
try:
# Use readline() instead of iterating proc.stdout to avoid
# Python's internal read-ahead buffer which blocks streaming.
while True:
line = proc.stdout.readline()
if not line and proc.poll() is not None:
break
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
etype = event.get("type")
# Log system events (hooks, init)
if etype == "system":
subtype = event.get("subtype", "")
if subtype == "init":
log_lines.append("[init] Session started")
elif subtype == "hook_started":
log_lines.append(f"[hook] {event.get('hook_name', '?')} started")
elif subtype == "hook_response":
outcome = event.get("outcome", "?")
log_lines.append(f"[hook] {event.get('hook_name', '?')} β†’ {outcome}")
else:
log_lines.append(f"[system] {subtype}")
yield KernelResponse(
streaming_output=streaming,
final_output=final,
log="\n".join(log_lines),
)
# Assistant message chunks β€” intermediate output
elif etype == "assistant":
content = event.get("message", {}).get("content", [])
text = "".join(
block["text"]
for block in content
if block.get("type") == "text"
)
if text:
streaming += text + "\n"
log_lines.append(f"[assistant] Received {len(text)} chars")
yield KernelResponse(
streaming_output=streaming,
final_output=final,
log="\n".join(log_lines),
)
# Final result β€” definitive output
elif etype == "result":
if event.get("is_error"):
log_lines.append(f"[error] {event.get('result', 'unknown error')}")
else:
result_text = event.get("result", "")
if result_text:
final = result_text
duration = event.get("duration_ms", 0)
cost = event.get("total_cost_usd", 0)
log_lines.append(f"[done] {duration}ms, ${cost:.4f}")
yield KernelResponse(
streaming_output=streaming,
final_output=final,
log="\n".join(log_lines),
)
proc.wait()
if proc.returncode != 0:
stderr = proc.stderr.read()
log_lines.append(f"[error] exit code {proc.returncode}")
yield KernelResponse(
streaming_output=streaming,
final_output=final,
log="\n".join(log_lines),
error=f"Claude CLI exited with code {proc.returncode}: {stderr}",
)
# Scan for generated files and yield final response
file_tree, files = _scan_generated_files(tmpdir)
if files:
log_lines.append(f"[files] {len(files)} file(s) generated")
yield KernelResponse(
streaming_output=streaming,
final_output=final,
log="\n".join(log_lines),
file_tree=file_tree,
files=files,
)
# Keep tmpdir alive so the UI can reference files
self._prev_tmpdir = tmpdir
finally:
if proc.poll() is None:
proc.kill()