migratron / code_migration /repo_manager.py
amrithanandini's picture
fixing dockerfile
364b897
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""Repository cloning, patching, and workspace management."""
from __future__ import annotations
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from .dataset_loader import Task
class RepoManager:
"""Clone repos, apply patches, build Docker images, manage temp dirs."""
def setup_workspace(self, task: Task) -> str:
"""Clone repo at commit, apply patches, set up environment.
If Docker is available, builds a Docker image.
Otherwise, creates a venv and runs the setup script directly.
Returns the path to the temporary workspace directory.
"""
workspace_dir = tempfile.mkdtemp(prefix="code_migration_")
# Clone
subprocess.run(
["git", "clone", "--quiet", task.repo_url, workspace_dir],
check=True,
capture_output=True,
text=True,
)
# Checkout commit
subprocess.run(
["git", "checkout", "--quiet", task.commit_hash],
cwd=workspace_dir,
check=True,
capture_output=True,
text=True,
)
# Apply init patch
if task.patch:
self.apply_patch(workspace_dir, task.patch)
# Apply test patch
if task.test_patch:
self.apply_patch(workspace_dir, task.test_patch)
escaped_name = task.repo_name.replace("/", "__").lower()
image_name = escaped_name + "_new"
fixed_dockerfile = self._fix_dockerfile(task.dockerfile, task.repo_name)
# Try Docker first
docker_available = False
if shutil.which("docker"):
try:
subprocess.run(["docker", "info"], capture_output=True, timeout=5, check=True)
docker_available = True
except Exception:
pass
if docker_available:
self.build_docker_image(workspace_dir, image_name, fixed_dockerfile)
else:
# No Docker — create a venv and run setup script directly
print("[NO-DOCKER] Setting up venv and installing dependencies...", flush=True)
self._setup_venv(workspace_dir, task)
return workspace_dir
@staticmethod
def _setup_venv(workspace_dir: str, task: Task) -> None:
"""Create a venv in the workspace and run the setup script."""
import sys
venv_dir = Path(workspace_dir) / ".venv"
# Create venv
subprocess.run(
[sys.executable, "-m", "venv", str(venv_dir)],
check=True, capture_output=True, text=True,
)
venv_pip = str(venv_dir / "bin" / "pip")
venv_python = str(venv_dir / "bin" / "python")
# Upgrade pip
subprocess.run(
[venv_pip, "install", "--upgrade", "pip", "setuptools", "wheel"],
capture_output=True, text=True, cwd=workspace_dir, timeout=120,
)
# Find and run the setup script
escaped = task.repo_name.replace("/", "__")
setup_script = Path(workspace_dir) / f"setup_{escaped}.sh"
if setup_script.exists():
# Parse the setup script and run pip install commands with the venv pip
with open(setup_script) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or line.startswith("set "):
continue
# Replace 'pip install' with venv pip
if line.startswith("pip install") or line.startswith("pip3 install"):
cmd = line.replace("pip3 install", f"{venv_pip} install").replace("pip install", f"{venv_pip} install")
print(f"[NO-DOCKER] Running: {cmd}", flush=True)
result = subprocess.run(
cmd, shell=True, cwd=workspace_dir,
capture_output=True, text=True, timeout=300,
)
if result.returncode != 0:
print(f"[NO-DOCKER] Warning: {cmd} failed: {result.stderr[-200:]}", flush=True)
elif line.startswith("python ") or line.startswith("python3 "):
cmd = line.replace("python3 ", f"{venv_python} ").replace("python ", f"{venv_python} ")
print(f"[NO-DOCKER] Running: {cmd}", flush=True)
subprocess.run(cmd, shell=True, cwd=workspace_dir, capture_output=True, text=True, timeout=300)
else:
# Fallback: try installing requirements.txt if it exists
req_file = Path(workspace_dir) / "requirements.txt"
if req_file.exists():
print(f"[NO-DOCKER] pip install -r requirements.txt", flush=True)
subprocess.run(
[venv_pip, "install", "-r", str(req_file)],
cwd=workspace_dir, capture_output=True, text=True, timeout=300,
)
def cleanup(self, workspace_dir: str) -> None:
"""Remove the temporary workspace directory."""
try:
shutil.rmtree(workspace_dir, ignore_errors=True)
except Exception:
pass
@staticmethod
def apply_patch(workspace_dir: str, patch_content: str) -> None:
"""Apply a unified diff patch to the workspace.
TimeMachine-bench patches use bare filenames (no a/ b/ prefix),
so we use -p0.
"""
subprocess.run(
["patch", "-p0", "-d", workspace_dir],
input=patch_content,
text=True,
check=True,
capture_output=True,
)
@staticmethod
def _fix_dockerfile(dockerfile_content: str, repo_name: str) -> str:
"""Fix the dataset Dockerfile to work in our workspace layout.
Changes:
1. Replace mirror.gcr.io/python:X.Y.Z with python:X.Y.Z (Docker Hub)
2. Remove PIP_INDEX_URL pointing to localhost:5000 (use regular PyPI)
3. Remove PIP_TRUSTED_HOST=localhost
4. Replace COPY <escaped_repo_name> . with COPY . . (our workspace IS the repo)
"""
escaped_name = repo_name.replace("/", "__")
lines = dockerfile_content.splitlines()
fixed = []
for line in lines:
# Fix base image: mirror.gcr.io/python:X.Y.Z -> python:X.Y.Z
if line.startswith("FROM mirror.gcr.io/"):
line = line.replace("FROM mirror.gcr.io/", "FROM ")
# Remove PIP_INDEX_URL (the pypi-timemachine snapshot URL)
if "PIP_INDEX_URL" in line:
continue
# Remove PIP_TRUSTED_HOST=localhost
if "PIP_TRUSTED_HOST" in line:
continue
# Fix COPY: the workspace root IS the repo, not a subdirectory
if line.strip().startswith("COPY") and escaped_name in line:
line = "COPY . ."
fixed.append(line)
return "\n".join(fixed)
@staticmethod
def build_docker_image(
workspace_dir: str, image_name: str, dockerfile_content: str
) -> None:
"""Write Dockerfile to workspace and build the Docker image."""
dockerfile_path = Path(workspace_dir) / "Dockerfile"
dockerfile_path.write_text(dockerfile_content)
print(f"[DOCKER] Building image {image_name}...", flush=True)
print(f"[DOCKER] Dockerfile content:\n{dockerfile_content}", flush=True)
result = subprocess.run(
["docker", "build", "-t", image_name, workspace_dir],
capture_output=True,
text=True,
)
if result.returncode != 0:
# Show last 30 lines of build output for debugging
stderr = result.stderr or ""
stdout = result.stdout or ""
output = (stdout + "\n" + stderr).strip()
last_lines = "\n".join(output.splitlines()[-30:])
raise RuntimeError(
f"Docker build failed (exit {result.returncode}):\n{last_lines}"
)