git_env-v2-1-0 / server /git_task_environment.py
burtenshaw's picture
burtenshaw HF Staff
Upload folder using huggingface_hub
b0de8ff verified
#!/usr/bin/env python3
"""
Git Task Environment - Optimized for task-based isolation.
This module provides an optimized Git environment for scenarios where:
- Multiple tasks share the same base repository states
- Tasks need fast reset() to reproducible states
- Each task has an isolated workspace
- A shared Gitea service provides repository storage
"""
import uuid
from openenv.core.env_server import Action, Environment, Observation
from openenv.core.tools.git_server_client import GitServerClient
from ..models import GitAction, GitObservation, GitState
class GitTaskEnvironment(Environment):
"""
Git Environment optimized for task-based isolation.
This environment connects to a shared Gitea service and provides:
- Fast reset() via git operations (no server restart)
- Isolated workspace per environment instance
- Shared repository cache across tasks
- Reproducible base states from specific commits
Architecture:
Shared Gitea Service (external)
GitTaskEnvironment instances (many)
Isolated workspaces (/workspace)
Args:
gitea_url: URL of shared Gitea service (e.g., "http://gitea:3000")
username: Gitea username for authentication
password: Gitea password for authentication
workspace_dir: Directory for git operations (default: /workspace)
task_repos: Dict mapping task names to (repo_name, commit) tuples
for pre-configuring task base states
Example (Basic):
>>> env = GitTaskEnvironment(gitea_url="http://localhost:3000")
>>> obs = env.reset()
>>> # Clone and work
>>> from ..models import GitAction
>>> obs = env.step(GitAction(action_type="clone_repo", repo_name="my-repo"))
>>> obs = env.step(GitAction(action_type="execute_git_command", command="status", working_dir="my-repo"))
Example (Task-based):
>>> # Pre-configure tasks with specific repo states
>>> env = GitTaskEnvironment(
... gitea_url="http://localhost:3000",
... task_repos={
... "task1": ("my-repo", "abc123"), # Specific commit
... "task2": ("my-repo", "def456"), # Different commit
... }
... )
>>> # Reset to task1 base state
>>> obs = env.reset(task_id="task1") # Fast! Just git reset
>>> # Work on task...
>>> # Reset to task2 base state
>>> obs = env.reset(task_id="task2") # Fast reset to different state
"""
def __init__(
self,
gitea_url: str,
username: str,
password: str,
workspace_dir: str = "/workspace",
task_repos: dict[str, tuple[str, str]] | None = None,
):
"""Initialize Git Task Environment."""
super().__init__()
self.workspace_dir = workspace_dir
self.task_repos = task_repos or {}
# Initialize Git server client (connects to external Gitea)
self._git_client = GitServerClient(
gitea_url=gitea_url,
username=username,
password=password,
workspace_dir=workspace_dir,
)
# Initialize state
self._state = GitState(workspace_path=workspace_dir)
self._current_task_id: str | None = None
# Wait for Gitea to be ready
if self._git_client.wait_for_ready():
self._state.gitea_ready = True
else:
print("Warning: Gitea server not ready")
self._state.gitea_ready = False
def reset(self, task_id: str | None = None) -> Observation:
"""
Reset environment to clean state.
This is optimized for task-based workflows:
- If task_id specified and configured: fast reset to that task's base state
- If workspace exists: git reset --hard (very fast, <1s)
- Otherwise: clone from Gitea (slower, ~5-10s)
Args:
task_id: Optional task identifier for task-specific base states
Returns:
Initial observation indicating environment is ready
"""
# Initialize fresh state
self._state = GitState(
episode_id=str(uuid.uuid4()),
step_count=0,
gitea_ready=self._git_client.is_ready,
workspace_path=self.workspace_dir,
)
self._current_task_id = task_id
# If task_id provided and configured, set up task base state
if task_id and task_id in self.task_repos:
repo_name, commit = self.task_repos[task_id]
try:
if self._git_client.workspace_exists(repo_name):
# Fast path: workspace exists, just reset
self._git_client.reset_workspace(repo_name, commit)
message = f"Reset to task '{task_id}' base state (repo: {repo_name}@{commit})"
else:
# Slower path: clone fresh
self._git_client.clone_to_workspace(repo_name, commit=commit)
message = f"Initialized task '{task_id}' (repo: {repo_name}@{commit})"
current_commit = self._git_client.get_current_commit(repo_name)
return GitObservation(
success=True,
message=message,
output=f"Workspace: {self.workspace_dir}/{repo_name}\nCommit: {current_commit}\nTask: {task_id}",
)
except Exception as e:
return GitObservation(
success=False,
message=f"Failed to reset task '{task_id}'",
error=str(e),
)
# Default reset: just ready state, no pre-configured repos
return GitObservation(
success=True,
message="Git task environment ready.",
output=f"Workspace: {self.workspace_dir}\nGitea: {self._git_client.gitea_url}\nUse GitAction with action_type='clone_repo' to clone repositories.",
)
def step(self, action: Action) -> Observation:
"""
Execute a Git action and return observation.
Supported action types:
- "clone_repo": Clone repository to workspace
- "execute_git_command": Execute git command
- "list_repos": List available repositories
Args:
action: GitAction to execute
Returns:
GitObservation with execution results
"""
if not isinstance(action, GitAction):
raise ValueError(f"Expected GitAction, got {type(action)}")
# Update step count
self._state.step_count += 1
# Route to appropriate handler based on action_type
try:
if action.action_type == "clone_repo":
return self._handle_clone_repo(action)
elif action.action_type == "list_repos":
return self._handle_list_repos(action)
elif action.action_type == "execute_git_command":
return self._handle_git_command(action)
else:
return GitObservation(
success=False,
message=f"Action not supported in task mode: {type(action).__name__}",
error="Use shared Gitea for repository migration/creation",
)
except Exception as e:
return GitObservation(
success=False, message=f"Action failed: {str(e)}", error=str(e)
)
def _handle_clone_repo(self, action: GitAction) -> GitObservation:
"""Handle repository clone action."""
try:
# Determine commit to use
commit = "main" # Default
# If this repo is part of current task config, use that commit
if (
self._current_task_id
and self._current_task_id in self.task_repos
):
task_repo, task_commit = self.task_repos[self._current_task_id]
if task_repo == action.repo_name:
commit = task_commit
clone_path = self._git_client.clone_to_workspace(
action.repo_name, action.target_dir, commit=commit
)
return GitObservation(
success=True,
message=f"Successfully cloned {action.repo_name}",
output=f"Cloned to: {clone_path}\nCommit: {commit}",
)
except Exception as e:
return GitObservation(
success=False,
message=f"Failed to clone repository: {action.repo_name}",
error=str(e),
)
def _handle_list_repos(self, action: GitAction) -> GitObservation:
"""Handle list repositories action."""
try:
repos = self._git_client.list_repositories()
# Format output
if not repos:
output = "No repositories available."
else:
output = "Available repositories:\n"
for repo in repos:
output += f" - {repo['name']}: {repo['clone_url']}\n"
if repo.get("description"):
output += f" {repo['description']}\n"
return GitObservation(
success=True,
message=f"Found {len(repos)} repositories",
output=output,
repos=repos,
)
except Exception as e:
return GitObservation(
success=False, message="Failed to list repositories", error=str(e)
)
def _handle_git_command(self, action: GitAction) -> GitObservation:
"""Handle git command execution action."""
try:
exit_code, stdout, stderr = self._git_client.execute_git_command(
action.command, action.working_dir
)
success = exit_code == 0
message = f"Git command {'succeeded' if success else 'failed'}"
return GitObservation(
success=success, message=message, output=stdout, error=stderr
)
except Exception as e:
return GitObservation(
success=False,
message=f"Failed to execute git command: {action.command}",
error=str(e),
)
@property
def state(self) -> GitState:
"""Get current environment state."""
return self._state