""" TeamForge Git Sandbox Creates and manages isolated git repositories per episode. """ from __future__ import annotations import os import shutil import subprocess import tempfile from pathlib import Path from typing import Dict, List, Optional class GitSandbox: """ Manages an isolated git repository for one episode. Uses subprocess (not gitpython) for reliability in Docker. """ def __init__(self, base_dir: Optional[str] = None): self.base_dir = base_dir or tempfile.mkdtemp(prefix="teamforge_") self.repo_path = Path(self.base_dir) self._initialized = False # ────────────────────────────────────────── # LIFECYCLE # ────────────────────────────────────────── def init(self, initial_files: Dict[str, str]) -> None: """Initialise repo with initial_files dict {relative_path: content}.""" if self._initialized: self.teardown() self.base_dir = tempfile.mkdtemp(prefix="teamforge_") self.repo_path = Path(self.base_dir) self.repo_path.mkdir(parents=True, exist_ok=True) self._run_git("init") self._run_git("config", "user.email", "agent@teamforge.ai") self._run_git("config", "user.name", "TeamForge Agent") self._run_git("config", "commit.gpgsign", "false") # Write initial files for rel_path, content in initial_files.items(): full_path = self.repo_path / rel_path full_path.parent.mkdir(parents=True, exist_ok=True) full_path.write_text(content, encoding="utf-8") self._run_git("add", ".") self._run_git("commit", "-m", "chore: initial repo snapshot") self._initialized = True def teardown(self) -> None: if self.repo_path.exists(): shutil.rmtree(self.base_dir, ignore_errors=True) self._initialized = False # ────────────────────────────────────────── # FILE OPERATIONS # ────────────────────────────────────────── def write_file(self, rel_path: str, content: str) -> None: full_path = self.repo_path / rel_path full_path.parent.mkdir(parents=True, exist_ok=True) full_path.write_text(content, encoding="utf-8") def read_file(self, rel_path: str) -> Optional[str]: full_path = self.repo_path / rel_path if full_path.exists(): try: return full_path.read_text(encoding="utf-8") except (UnicodeDecodeError, PermissionError): return None # skip binary / unreadable files return None TEXT_EXTENSIONS = { ".py", ".md", ".txt", ".toml", ".cfg", ".ini", ".yaml", ".yml", ".json", ".rst", ".sh", ".env", ".gitignore", ".dockerignore", } SKIP_DIRS = {"__pycache__", ".git", ".pytest_cache", ".ruff_cache", "node_modules"} def list_files(self, extensions: tuple = (".py", ".md", ".txt", ".toml", ".cfg")) -> List[str]: result = [] for p in self.repo_path.rglob("*"): if p.is_file(): # Skip hidden dirs and cache dirs if any(part in self.SKIP_DIRS or part.startswith(".") for part in p.parts[len(self.repo_path.parts):]): continue if extensions is None or p.suffix in extensions: result.append(str(p.relative_to(self.repo_path))) return sorted(result) def get_all_files(self) -> Dict[str, str]: """Return all text files in the repo, skipping binaries and cache dirs.""" files = {} for rel in self.list_files(extensions=None): p = self.repo_path / rel # Only read known text extensions if p.suffix not in self.TEXT_EXTENSIONS: continue content = self.read_file(rel) if content is not None: files[rel] = content return files # ────────────────────────────────────────── # GIT OPERATIONS # ────────────────────────────────────────── def commit(self, message: str, files: Optional[List[str]] = None) -> str: """Stage files and commit. Returns commit SHA.""" if files: for f in files: self._run_git("add", f) else: self._run_git("add", ".") result = self._run_git("commit", "-m", message, check=False) if result.returncode != 0: return "" sha = self._run_git("rev-parse", "--short", "HEAD").stdout.strip() return sha def get_log(self, n: int = 10) -> List[str]: result = self._run_git( "log", f"-{n}", "--oneline", "--no-color", check=False ) if result.returncode != 0: return [] return [line for line in result.stdout.splitlines() if line.strip()] def get_diff(self) -> str: result = self._run_git("diff", "HEAD", check=False) return result.stdout[:4000] # cap to 4KB def has_changes(self) -> bool: result = self._run_git("status", "--porcelain", check=False) return bool(result.stdout.strip()) # ────────────────────────────────────────── # INTERNAL # ────────────────────────────────────────── def _run_git(self, *args: str, check: bool = True) -> subprocess.CompletedProcess: cmd = ["git", *args] return subprocess.run( cmd, cwd=str(self.repo_path), capture_output=True, text=True, check=False, # we handle errors manually )