teamforge / sandbox /git_sandbox.py
Your Name
fix: add FastAPI REST endpoints for OpenEnv validator
637f42c
"""
TeamForge Git Sandbox
Creates and manages isolated git repositories per episode.
"""
from __future__ import annotations
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Dict, List, Optional
class GitSandbox:
"""
Manages an isolated git repository for one episode.
Uses subprocess (not gitpython) for reliability in Docker.
"""
def __init__(self, base_dir: Optional[str] = None):
self.base_dir = base_dir or tempfile.mkdtemp(prefix="teamforge_")
self.repo_path = Path(self.base_dir)
self._initialized = False
# ──────────────────────────────────────────
# LIFECYCLE
# ──────────────────────────────────────────
def init(self, initial_files: Dict[str, str]) -> None:
"""Initialise repo with initial_files dict {relative_path: content}."""
if self._initialized:
self.teardown()
self.base_dir = tempfile.mkdtemp(prefix="teamforge_")
self.repo_path = Path(self.base_dir)
self.repo_path.mkdir(parents=True, exist_ok=True)
self._run_git("init")
self._run_git("config", "user.email", "agent@teamforge.ai")
self._run_git("config", "user.name", "TeamForge Agent")
self._run_git("config", "commit.gpgsign", "false")
# Write initial files
for rel_path, content in initial_files.items():
full_path = self.repo_path / rel_path
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.write_text(content, encoding="utf-8")
self._run_git("add", ".")
self._run_git("commit", "-m", "chore: initial repo snapshot")
self._initialized = True
def teardown(self) -> None:
if self.repo_path.exists():
shutil.rmtree(self.base_dir, ignore_errors=True)
self._initialized = False
# ──────────────────────────────────────────
# FILE OPERATIONS
# ──────────────────────────────────────────
def write_file(self, rel_path: str, content: str) -> None:
full_path = self.repo_path / rel_path
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.write_text(content, encoding="utf-8")
def read_file(self, rel_path: str) -> Optional[str]:
full_path = self.repo_path / rel_path
if full_path.exists():
try:
return full_path.read_text(encoding="utf-8")
except (UnicodeDecodeError, PermissionError):
return None # skip binary / unreadable files
return None
TEXT_EXTENSIONS = {
".py", ".md", ".txt", ".toml", ".cfg", ".ini", ".yaml", ".yml",
".json", ".rst", ".sh", ".env", ".gitignore", ".dockerignore",
}
SKIP_DIRS = {"__pycache__", ".git", ".pytest_cache", ".ruff_cache", "node_modules"}
def list_files(self, extensions: tuple = (".py", ".md", ".txt", ".toml", ".cfg")) -> List[str]:
result = []
for p in self.repo_path.rglob("*"):
if p.is_file():
# Skip hidden dirs and cache dirs
if any(part in self.SKIP_DIRS or part.startswith(".") for part in p.parts[len(self.repo_path.parts):]):
continue
if extensions is None or p.suffix in extensions:
result.append(str(p.relative_to(self.repo_path)))
return sorted(result)
def get_all_files(self) -> Dict[str, str]:
"""Return all text files in the repo, skipping binaries and cache dirs."""
files = {}
for rel in self.list_files(extensions=None):
p = self.repo_path / rel
# Only read known text extensions
if p.suffix not in self.TEXT_EXTENSIONS:
continue
content = self.read_file(rel)
if content is not None:
files[rel] = content
return files
# ──────────────────────────────────────────
# GIT OPERATIONS
# ──────────────────────────────────────────
def commit(self, message: str, files: Optional[List[str]] = None) -> str:
"""Stage files and commit. Returns commit SHA."""
if files:
for f in files:
self._run_git("add", f)
else:
self._run_git("add", ".")
result = self._run_git("commit", "-m", message, check=False)
if result.returncode != 0:
return ""
sha = self._run_git("rev-parse", "--short", "HEAD").stdout.strip()
return sha
def get_log(self, n: int = 10) -> List[str]:
result = self._run_git(
"log", f"-{n}", "--oneline", "--no-color", check=False
)
if result.returncode != 0:
return []
return [line for line in result.stdout.splitlines() if line.strip()]
def get_diff(self) -> str:
result = self._run_git("diff", "HEAD", check=False)
return result.stdout[:4000] # cap to 4KB
def has_changes(self) -> bool:
result = self._run_git("status", "--porcelain", check=False)
return bool(result.stdout.strip())
# ──────────────────────────────────────────
# INTERNAL
# ──────────────────────────────────────────
def _run_git(self, *args: str, check: bool = True) -> subprocess.CompletedProcess:
cmd = ["git", *args]
return subprocess.run(
cmd,
cwd=str(self.repo_path),
capture_output=True,
text=True,
check=False, # we handle errors manually
)