Spaces:
Sleeping
Sleeping
| """ | |
| TeamForge Git Sandbox | |
| Creates and manages isolated git repositories per episode. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| class GitSandbox: | |
| """ | |
| Manages an isolated git repository for one episode. | |
| Uses subprocess (not gitpython) for reliability in Docker. | |
| """ | |
| def __init__(self, base_dir: Optional[str] = None): | |
| self.base_dir = base_dir or tempfile.mkdtemp(prefix="teamforge_") | |
| self.repo_path = Path(self.base_dir) | |
| self._initialized = False | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| # LIFECYCLE | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| def init(self, initial_files: Dict[str, str]) -> None: | |
| """Initialise repo with initial_files dict {relative_path: content}.""" | |
| if self._initialized: | |
| self.teardown() | |
| self.base_dir = tempfile.mkdtemp(prefix="teamforge_") | |
| self.repo_path = Path(self.base_dir) | |
| self.repo_path.mkdir(parents=True, exist_ok=True) | |
| self._run_git("init") | |
| self._run_git("config", "user.email", "agent@teamforge.ai") | |
| self._run_git("config", "user.name", "TeamForge Agent") | |
| self._run_git("config", "commit.gpgsign", "false") | |
| # Write initial files | |
| for rel_path, content in initial_files.items(): | |
| full_path = self.repo_path / rel_path | |
| full_path.parent.mkdir(parents=True, exist_ok=True) | |
| full_path.write_text(content, encoding="utf-8") | |
| self._run_git("add", ".") | |
| self._run_git("commit", "-m", "chore: initial repo snapshot") | |
| self._initialized = True | |
| def teardown(self) -> None: | |
| if self.repo_path.exists(): | |
| shutil.rmtree(self.base_dir, ignore_errors=True) | |
| self._initialized = False | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| # FILE OPERATIONS | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| def write_file(self, rel_path: str, content: str) -> None: | |
| full_path = self.repo_path / rel_path | |
| full_path.parent.mkdir(parents=True, exist_ok=True) | |
| full_path.write_text(content, encoding="utf-8") | |
| def read_file(self, rel_path: str) -> Optional[str]: | |
| full_path = self.repo_path / rel_path | |
| if full_path.exists(): | |
| try: | |
| return full_path.read_text(encoding="utf-8") | |
| except (UnicodeDecodeError, PermissionError): | |
| return None # skip binary / unreadable files | |
| return None | |
| TEXT_EXTENSIONS = { | |
| ".py", ".md", ".txt", ".toml", ".cfg", ".ini", ".yaml", ".yml", | |
| ".json", ".rst", ".sh", ".env", ".gitignore", ".dockerignore", | |
| } | |
| SKIP_DIRS = {"__pycache__", ".git", ".pytest_cache", ".ruff_cache", "node_modules"} | |
| def list_files(self, extensions: tuple = (".py", ".md", ".txt", ".toml", ".cfg")) -> List[str]: | |
| result = [] | |
| for p in self.repo_path.rglob("*"): | |
| if p.is_file(): | |
| # Skip hidden dirs and cache dirs | |
| if any(part in self.SKIP_DIRS or part.startswith(".") for part in p.parts[len(self.repo_path.parts):]): | |
| continue | |
| if extensions is None or p.suffix in extensions: | |
| result.append(str(p.relative_to(self.repo_path))) | |
| return sorted(result) | |
| def get_all_files(self) -> Dict[str, str]: | |
| """Return all text files in the repo, skipping binaries and cache dirs.""" | |
| files = {} | |
| for rel in self.list_files(extensions=None): | |
| p = self.repo_path / rel | |
| # Only read known text extensions | |
| if p.suffix not in self.TEXT_EXTENSIONS: | |
| continue | |
| content = self.read_file(rel) | |
| if content is not None: | |
| files[rel] = content | |
| return files | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| # GIT OPERATIONS | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| def commit(self, message: str, files: Optional[List[str]] = None) -> str: | |
| """Stage files and commit. Returns commit SHA.""" | |
| if files: | |
| for f in files: | |
| self._run_git("add", f) | |
| else: | |
| self._run_git("add", ".") | |
| result = self._run_git("commit", "-m", message, check=False) | |
| if result.returncode != 0: | |
| return "" | |
| sha = self._run_git("rev-parse", "--short", "HEAD").stdout.strip() | |
| return sha | |
| def get_log(self, n: int = 10) -> List[str]: | |
| result = self._run_git( | |
| "log", f"-{n}", "--oneline", "--no-color", check=False | |
| ) | |
| if result.returncode != 0: | |
| return [] | |
| return [line for line in result.stdout.splitlines() if line.strip()] | |
| def get_diff(self) -> str: | |
| result = self._run_git("diff", "HEAD", check=False) | |
| return result.stdout[:4000] # cap to 4KB | |
| def has_changes(self) -> bool: | |
| result = self._run_git("status", "--porcelain", check=False) | |
| return bool(result.stdout.strip()) | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| # INTERNAL | |
| # ββββββββββββββββββββββββββββββββββββββββββ | |
| def _run_git(self, *args: str, check: bool = True) -> subprocess.CompletedProcess: | |
| cmd = ["git", *args] | |
| return subprocess.run( | |
| cmd, | |
| cwd=str(self.repo_path), | |
| capture_output=True, | |
| text=True, | |
| check=False, # we handle errors manually | |
| ) | |