Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from abc import ABC, abstractmethod | |
| from collections.abc import Generator | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| class KernelRequest: | |
| skill_path: str # path to SKILL.md | |
| prompt: str # user's kernel generation prompt | |
| class KernelResponse: | |
| streaming_output: str = "" # intermediate output (from assistant events) | |
| final_output: str = "" # final result (from result event only) | |
| log: str = "" # full log of all events | |
| file_tree: str = "" # text-based tree of generated files | |
| files: dict[str, str] | None = None # relative_path -> content | |
| error: str | None = None # error message if failed | |
| class Executor(ABC): | |
| def execute(self, request: KernelRequest) -> KernelResponse: | |
| ... | |
| def stream(self, request: KernelRequest) -> Generator[KernelResponse, None, None]: | |
| ... | |
| def _scan_generated_files(tmpdir: str) -> tuple[str, dict[str, str]]: | |
| """Walk tmpdir for files Claude created, skipping .claude/ scaffolding. | |
| Returns (tree_string, {relative_path: file_content}). | |
| """ | |
| root = Path(tmpdir) | |
| files: dict[str, str] = {} | |
| for path in sorted(root.rglob("*")): | |
| if not path.is_file(): | |
| continue | |
| rel = path.relative_to(root) | |
| # Skip our scaffolding | |
| if rel.parts[0] == ".claude": | |
| continue | |
| try: | |
| files[str(rel)] = path.read_text() | |
| except (OSError, UnicodeDecodeError): | |
| files[str(rel)] = "<binary or unreadable>" | |
| if not files: | |
| return "(no files generated)", {} | |
| # Build a simple text tree | |
| lines = [] | |
| names = list(files.keys()) | |
| for i, name in enumerate(names): | |
| connector = "βββ " if i == len(names) - 1 else "βββ " | |
| lines.append(f"{connector}{name}") | |
| return "\n".join(lines), files | |
| def _parse_frontmatter(content: str) -> dict[str, str]: | |
| """Extract YAML frontmatter fields from a SKILL.md file.""" | |
| match = re.match(r"^---\s*\n(.*?)\n---", content, re.DOTALL) | |
| if not match: | |
| return {} | |
| fields = {} | |
| for line in match.group(1).splitlines(): | |
| if ":" in line: | |
| key, _, value = line.partition(":") | |
| fields[key.strip()] = value.strip().strip('"') | |
| return fields | |
| class LocalCLIExecutor(Executor): | |
| """Runs `claude` CLI with native skill discovery via a temp directory. | |
| Copies the SKILL.md and all files listed in manifest.txt into a | |
| temporary .claude/skills/<name>/ directory, then runs `claude -p` | |
| from there so the skill is discovered natively without touching | |
| the real project's .claude/ directory. | |
| """ | |
| def __init__(self, project_dir: str | None = None): | |
| self.project_dir = project_dir or os.getcwd() | |
| self._prev_tmpdir: str | None = None | |
| def _load_skill(self, skill_path: str) -> tuple[str | None, Path, str, dict[str, str]]: | |
| """Read skill file and parse its frontmatter. | |
| Returns (error_or_none, skill_dir, file_content, frontmatter_dict). | |
| """ | |
| src = Path(skill_path) | |
| if not src.is_absolute(): | |
| src = Path(self.project_dir) / src | |
| if not src.exists(): | |
| return f"SKILL.md not found at: {src}", Path(), "", {} | |
| try: | |
| content = src.read_text() | |
| except OSError as e: | |
| return f"Failed to read SKILL.md: {e}", Path(), "", {} | |
| frontmatter = _parse_frontmatter(content) | |
| return None, src.parent, content, frontmatter | |
| def _install_skill_files(self, skill_src_dir: Path, dest_dir: Path) -> str | None: | |
| """Copy all files listed in manifest.txt into the dest skill directory. | |
| If no manifest.txt exists, only the SKILL.md (already written by caller) | |
| is present. Returns an error string on failure, None on success. | |
| """ | |
| manifest = skill_src_dir / "manifest.txt" | |
| if not manifest.exists(): | |
| return None | |
| try: | |
| lines = manifest.read_text().splitlines() | |
| except OSError as e: | |
| return f"Failed to read manifest.txt: {e}" | |
| for line in lines: | |
| entry = line.strip() | |
| if not entry or entry.startswith("#"): | |
| continue | |
| if entry == "SKILL.md": | |
| continue | |
| src_file = skill_src_dir / entry | |
| dest_file = dest_dir / entry | |
| if not src_file.exists(): | |
| continue | |
| try: | |
| dest_file.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy2(src_file, dest_file) | |
| except OSError as e: | |
| return f"Failed to copy {entry}: {e}" | |
| return None | |
| def _prepare(self, request: KernelRequest) -> tuple[str | None, str, dict[str, str], str]: | |
| """Set up temp dir with skill files. Returns (error, skill_name, frontmatter, tmpdir_path).""" | |
| # Clean up previous run's temp dir | |
| if self._prev_tmpdir: | |
| shutil.rmtree(self._prev_tmpdir, ignore_errors=True) | |
| self._prev_tmpdir = None | |
| err, skill_src_dir, skill_content, frontmatter = self._load_skill(request.skill_path) | |
| if err: | |
| return err, "", {}, "" | |
| skill_name = frontmatter.get("name", "skill") | |
| tmpdir = tempfile.mkdtemp() | |
| skill_dest = Path(tmpdir) / ".claude" / "skills" / skill_name | |
| skill_dest.mkdir(parents=True) | |
| (skill_dest / "SKILL.md").write_text(skill_content) | |
| err = self._install_skill_files(skill_src_dir, skill_dest) | |
| if err: | |
| shutil.rmtree(tmpdir, ignore_errors=True) | |
| return err, "", {}, "" | |
| return None, skill_name, frontmatter, tmpdir | |
| def _build_cmd(self, request: KernelRequest, frontmatter: dict[str, str], streaming: bool = False) -> list[str]: | |
| cmd = ["claude"] | |
| if streaming: | |
| cmd.extend(["--output-format", "stream-json", "--verbose"]) | |
| allowed_tools = frontmatter.get("allowed-tools") | |
| if allowed_tools: | |
| tools = [t.strip() for t in allowed_tools.split(",")] | |
| cmd.extend(["--allowedTools"] + tools) | |
| # -p and prompt go last so flags aren't misinterpreted | |
| cmd.extend(["-p", request.prompt]) | |
| return cmd | |
| def execute(self, request: KernelRequest) -> KernelResponse: | |
| err, skill_name, frontmatter, tmpdir = self._prepare(request) | |
| if err: | |
| return KernelResponse(error=err) | |
| cmd = self._build_cmd(request, frontmatter) | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=120, | |
| cwd=tmpdir, | |
| ) | |
| except FileNotFoundError: | |
| return KernelResponse( | |
| error="'claude' CLI not found. Make sure it is installed and on your PATH.", | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return KernelResponse( | |
| error="Claude CLI timed out after 120 seconds.", | |
| ) | |
| finally: | |
| shutil.rmtree(tmpdir, ignore_errors=True) | |
| if result.returncode != 0: | |
| return KernelResponse( | |
| error=f"Claude CLI exited with code {result.returncode}: {result.stderr}", | |
| ) | |
| return KernelResponse(final_output=result.stdout) | |
| def stream(self, request: KernelRequest) -> Generator[KernelResponse, None, None]: | |
| err, skill_name, frontmatter, tmpdir = self._prepare(request) | |
| if err: | |
| yield KernelResponse(error=err) | |
| return | |
| cmd = self._build_cmd(request, frontmatter, streaming=True) | |
| try: | |
| proc = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| bufsize=1, # line-buffered | |
| cwd=tmpdir, | |
| ) | |
| except FileNotFoundError: | |
| shutil.rmtree(tmpdir, ignore_errors=True) | |
| yield KernelResponse( | |
| error="'claude' CLI not found. Make sure it is installed and on your PATH.", | |
| ) | |
| return | |
| streaming = "" | |
| final = "" | |
| log_lines: list[str] = [] | |
| try: | |
| # Use readline() instead of iterating proc.stdout to avoid | |
| # Python's internal read-ahead buffer which blocks streaming. | |
| while True: | |
| line = proc.stdout.readline() | |
| if not line and proc.poll() is not None: | |
| break | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| event = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| etype = event.get("type") | |
| # Log system events (hooks, init) | |
| if etype == "system": | |
| subtype = event.get("subtype", "") | |
| if subtype == "init": | |
| log_lines.append("[init] Session started") | |
| elif subtype == "hook_started": | |
| log_lines.append(f"[hook] {event.get('hook_name', '?')} started") | |
| elif subtype == "hook_response": | |
| outcome = event.get("outcome", "?") | |
| log_lines.append(f"[hook] {event.get('hook_name', '?')} β {outcome}") | |
| else: | |
| log_lines.append(f"[system] {subtype}") | |
| yield KernelResponse( | |
| streaming_output=streaming, | |
| final_output=final, | |
| log="\n".join(log_lines), | |
| ) | |
| # Assistant message chunks β intermediate output | |
| elif etype == "assistant": | |
| content = event.get("message", {}).get("content", []) | |
| text = "".join( | |
| block["text"] | |
| for block in content | |
| if block.get("type") == "text" | |
| ) | |
| if text: | |
| streaming += text + "\n" | |
| log_lines.append(f"[assistant] Received {len(text)} chars") | |
| yield KernelResponse( | |
| streaming_output=streaming, | |
| final_output=final, | |
| log="\n".join(log_lines), | |
| ) | |
| # Final result β definitive output | |
| elif etype == "result": | |
| if event.get("is_error"): | |
| log_lines.append(f"[error] {event.get('result', 'unknown error')}") | |
| else: | |
| result_text = event.get("result", "") | |
| if result_text: | |
| final = result_text | |
| duration = event.get("duration_ms", 0) | |
| cost = event.get("total_cost_usd", 0) | |
| log_lines.append(f"[done] {duration}ms, ${cost:.4f}") | |
| yield KernelResponse( | |
| streaming_output=streaming, | |
| final_output=final, | |
| log="\n".join(log_lines), | |
| ) | |
| proc.wait() | |
| if proc.returncode != 0: | |
| stderr = proc.stderr.read() | |
| log_lines.append(f"[error] exit code {proc.returncode}") | |
| yield KernelResponse( | |
| streaming_output=streaming, | |
| final_output=final, | |
| log="\n".join(log_lines), | |
| error=f"Claude CLI exited with code {proc.returncode}: {stderr}", | |
| ) | |
| # Scan for generated files and yield final response | |
| file_tree, files = _scan_generated_files(tmpdir) | |
| if files: | |
| log_lines.append(f"[files] {len(files)} file(s) generated") | |
| yield KernelResponse( | |
| streaming_output=streaming, | |
| final_output=final, | |
| log="\n".join(log_lines), | |
| file_tree=file_tree, | |
| files=files, | |
| ) | |
| # Keep tmpdir alive so the UI can reference files | |
| self._prev_tmpdir = tmpdir | |
| finally: | |
| if proc.poll() is None: | |
| proc.kill() | |