import json import os import re import shutil import subprocess import tempfile from abc import ABC, abstractmethod from collections.abc import Generator from dataclasses import dataclass from pathlib import Path @dataclass class KernelRequest: skill_path: str # path to SKILL.md prompt: str # user's kernel generation prompt @dataclass class KernelResponse: streaming_output: str = "" # intermediate output (from assistant events) final_output: str = "" # final result (from result event only) log: str = "" # full log of all events file_tree: str = "" # text-based tree of generated files files: dict[str, str] | None = None # relative_path -> content error: str | None = None # error message if failed class Executor(ABC): @abstractmethod def execute(self, request: KernelRequest) -> KernelResponse: ... @abstractmethod def stream(self, request: KernelRequest) -> Generator[KernelResponse, None, None]: ... def _scan_generated_files(tmpdir: str) -> tuple[str, dict[str, str]]: """Walk tmpdir for files Claude created, skipping .claude/ scaffolding. Returns (tree_string, {relative_path: file_content}). """ root = Path(tmpdir) files: dict[str, str] = {} for path in sorted(root.rglob("*")): if not path.is_file(): continue rel = path.relative_to(root) # Skip our scaffolding if rel.parts[0] == ".claude": continue try: files[str(rel)] = path.read_text() except (OSError, UnicodeDecodeError): files[str(rel)] = "" if not files: return "(no files generated)", {} # Build a simple text tree lines = [] names = list(files.keys()) for i, name in enumerate(names): connector = "└── " if i == len(names) - 1 else "├── " lines.append(f"{connector}{name}") return "\n".join(lines), files def _parse_frontmatter(content: str) -> dict[str, str]: """Extract YAML frontmatter fields from a SKILL.md file.""" match = re.match(r"^---\s*\n(.*?)\n---", content, re.DOTALL) if not match: return {} fields = {} for line in match.group(1).splitlines(): if ":" in line: key, _, value = line.partition(":") fields[key.strip()] = value.strip().strip('"') return fields class LocalCLIExecutor(Executor): """Runs `claude` CLI with native skill discovery via a temp directory. Copies the SKILL.md and all files listed in manifest.txt into a temporary .claude/skills// directory, then runs `claude -p` from there so the skill is discovered natively without touching the real project's .claude/ directory. """ def __init__(self, project_dir: str | None = None): self.project_dir = project_dir or os.getcwd() self._prev_tmpdir: str | None = None def _load_skill(self, skill_path: str) -> tuple[str | None, Path, str, dict[str, str]]: """Read skill file and parse its frontmatter. Returns (error_or_none, skill_dir, file_content, frontmatter_dict). """ src = Path(skill_path) if not src.is_absolute(): src = Path(self.project_dir) / src if not src.exists(): return f"SKILL.md not found at: {src}", Path(), "", {} try: content = src.read_text() except OSError as e: return f"Failed to read SKILL.md: {e}", Path(), "", {} frontmatter = _parse_frontmatter(content) return None, src.parent, content, frontmatter def _install_skill_files(self, skill_src_dir: Path, dest_dir: Path) -> str | None: """Copy all files listed in manifest.txt into the dest skill directory. If no manifest.txt exists, only the SKILL.md (already written by caller) is present. Returns an error string on failure, None on success. """ manifest = skill_src_dir / "manifest.txt" if not manifest.exists(): return None try: lines = manifest.read_text().splitlines() except OSError as e: return f"Failed to read manifest.txt: {e}" for line in lines: entry = line.strip() if not entry or entry.startswith("#"): continue if entry == "SKILL.md": continue src_file = skill_src_dir / entry dest_file = dest_dir / entry if not src_file.exists(): continue try: dest_file.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src_file, dest_file) except OSError as e: return f"Failed to copy {entry}: {e}" return None def _prepare(self, request: KernelRequest) -> tuple[str | None, str, dict[str, str], str]: """Set up temp dir with skill files. Returns (error, skill_name, frontmatter, tmpdir_path).""" # Clean up previous run's temp dir if self._prev_tmpdir: shutil.rmtree(self._prev_tmpdir, ignore_errors=True) self._prev_tmpdir = None err, skill_src_dir, skill_content, frontmatter = self._load_skill(request.skill_path) if err: return err, "", {}, "" skill_name = frontmatter.get("name", "skill") tmpdir = tempfile.mkdtemp() skill_dest = Path(tmpdir) / ".claude" / "skills" / skill_name skill_dest.mkdir(parents=True) (skill_dest / "SKILL.md").write_text(skill_content) err = self._install_skill_files(skill_src_dir, skill_dest) if err: shutil.rmtree(tmpdir, ignore_errors=True) return err, "", {}, "" return None, skill_name, frontmatter, tmpdir def _build_cmd(self, request: KernelRequest, frontmatter: dict[str, str], streaming: bool = False) -> list[str]: cmd = ["claude"] if streaming: cmd.extend(["--output-format", "stream-json", "--verbose"]) allowed_tools = frontmatter.get("allowed-tools") if allowed_tools: tools = [t.strip() for t in allowed_tools.split(",")] cmd.extend(["--allowedTools"] + tools) # -p and prompt go last so flags aren't misinterpreted cmd.extend(["-p", request.prompt]) return cmd def execute(self, request: KernelRequest) -> KernelResponse: err, skill_name, frontmatter, tmpdir = self._prepare(request) if err: return KernelResponse(error=err) cmd = self._build_cmd(request, frontmatter) try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=120, cwd=tmpdir, ) except FileNotFoundError: return KernelResponse( error="'claude' CLI not found. Make sure it is installed and on your PATH.", ) except subprocess.TimeoutExpired: return KernelResponse( error="Claude CLI timed out after 120 seconds.", ) finally: shutil.rmtree(tmpdir, ignore_errors=True) if result.returncode != 0: return KernelResponse( error=f"Claude CLI exited with code {result.returncode}: {result.stderr}", ) return KernelResponse(final_output=result.stdout) def stream(self, request: KernelRequest) -> Generator[KernelResponse, None, None]: err, skill_name, frontmatter, tmpdir = self._prepare(request) if err: yield KernelResponse(error=err) return cmd = self._build_cmd(request, frontmatter, streaming=True) try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, # line-buffered cwd=tmpdir, ) except FileNotFoundError: shutil.rmtree(tmpdir, ignore_errors=True) yield KernelResponse( error="'claude' CLI not found. Make sure it is installed and on your PATH.", ) return streaming = "" final = "" log_lines: list[str] = [] try: # Use readline() instead of iterating proc.stdout to avoid # Python's internal read-ahead buffer which blocks streaming. while True: line = proc.stdout.readline() if not line and proc.poll() is not None: break line = line.strip() if not line: continue try: event = json.loads(line) except json.JSONDecodeError: continue etype = event.get("type") # Log system events (hooks, init) if etype == "system": subtype = event.get("subtype", "") if subtype == "init": log_lines.append("[init] Session started") elif subtype == "hook_started": log_lines.append(f"[hook] {event.get('hook_name', '?')} started") elif subtype == "hook_response": outcome = event.get("outcome", "?") log_lines.append(f"[hook] {event.get('hook_name', '?')} → {outcome}") else: log_lines.append(f"[system] {subtype}") yield KernelResponse( streaming_output=streaming, final_output=final, log="\n".join(log_lines), ) # Assistant message chunks — intermediate output elif etype == "assistant": content = event.get("message", {}).get("content", []) text = "".join( block["text"] for block in content if block.get("type") == "text" ) if text: streaming += text + "\n" log_lines.append(f"[assistant] Received {len(text)} chars") yield KernelResponse( streaming_output=streaming, final_output=final, log="\n".join(log_lines), ) # Final result — definitive output elif etype == "result": if event.get("is_error"): log_lines.append(f"[error] {event.get('result', 'unknown error')}") else: result_text = event.get("result", "") if result_text: final = result_text duration = event.get("duration_ms", 0) cost = event.get("total_cost_usd", 0) log_lines.append(f"[done] {duration}ms, ${cost:.4f}") yield KernelResponse( streaming_output=streaming, final_output=final, log="\n".join(log_lines), ) proc.wait() if proc.returncode != 0: stderr = proc.stderr.read() log_lines.append(f"[error] exit code {proc.returncode}") yield KernelResponse( streaming_output=streaming, final_output=final, log="\n".join(log_lines), error=f"Claude CLI exited with code {proc.returncode}: {stderr}", ) # Scan for generated files and yield final response file_tree, files = _scan_generated_files(tmpdir) if files: log_lines.append(f"[files] {len(files)} file(s) generated") yield KernelResponse( streaming_output=streaming, final_output=final, log="\n".join(log_lines), file_tree=file_tree, files=files, ) # Keep tmpdir alive so the UI can reference files self._prev_tmpdir = tmpdir finally: if proc.poll() is None: proc.kill()