mcpmark / src /mcp_services /github /github_state_manager.py
haochengsama's picture
Add files using upload-large-folder tool
a2ec7b6 verified
Raw
History Blame Contribute Delete
40.3 kB
"""
GitHub State Manager for MCPMark
=================================
This module handles GitHub repository state management for consistent task evaluation.
Manages test repositories, branches, and cleanup after evaluation.
"""
import requests
from typing import Optional, List, Union
from pathlib import Path
from src.base.state_manager import BaseStateManager, InitialStateInfo
from src.base.task_manager import BaseTask
from src.logger import get_logger
from src.mcp_services.github.token_pool import GitHubTokenPool
logger = get_logger(__name__)
class GitHubStateManager(BaseStateManager):
"""
Manages GitHub repository state for task evaluation.
"""
def __init__(
self,
github_token: Union[str, List[str]],
# Name of the evaluation organisation / user where temporary test repositories are created
eval_org: str = "mcpmark-eval",
# Local directory that stores *exported* repository templates (produced by repo_exporter.py)
templates_root: str = "./github_state",
):
"""
Initialize GitHub state manager.
Args:
github_token: GitHub Personal Access Token(s). Can be a single token string or a list of tokens for round-robin usage.
eval_org: Organisation / user used to host **ephemeral evaluation repositories**.
"""
super().__init__(service_name="github")
# Track repos created via template import so we can delete them afterwards
self._repos_to_cleanup: list[tuple[str, str]] = [] # (owner, repo_name)
# Initialize token pool
if isinstance(github_token, str):
# Single token - create pool with one token
self.token_pool = GitHubTokenPool([github_token])
self.github_token = github_token # Keep for backward compatibility
else:
# Multiple tokens - use token pool
self.token_pool = GitHubTokenPool(github_token)
self.github_token = (
self.token_pool.get_current_token()
) # For backward compatibility
# Store evaluation context (consistent naming)
self.eval_org = eval_org # evaluation organisation / user
# Local path that contains exported repository templates
self.templates_root = Path(templates_root).expanduser().resolve()
# Set up HTTP session for GitHub API
self.session = requests.Session()
# Note: We'll update the Authorization header before each request
self.session.headers.update(
{
"Accept": "application/vnd.github.v3+json",
"X-GitHub-Api-Version": "2022-11-28",
"User-Agent": "MCPMark/1.0",
}
)
# Validate GitHub configuration during initialization
try:
# Set initial token for validation
self._update_session_token()
response = self.session.get("https://api.github.com/user")
if response.status_code != 200:
raise ValueError(
f"Invalid GitHub token: {response.status_code} {response.text}"
)
user_info = response.json()
logger.info(f"GitHub authenticated as: {user_info['login']}")
logger.info(f"Using token pool with {self.token_pool.pool_size} token(s)")
# Check if evaluation organisation exists (optional)
if self.eval_org:
org_response = self.session.get(
f"https://api.github.com/orgs/{self.eval_org}"
)
if org_response.status_code == 200:
logger.info(f"Using evaluation organisation: {self.eval_org}")
else:
logger.warning(
f"Evaluation organisation {self.eval_org} not accessible, using user account"
)
# Fall back to user account
self.eval_org = user_info["login"]
logger.info("GitHub state manager initialized successfully")
except Exception as e:
raise RuntimeError(f"GitHub initialization failed: {e}")
# Initial state mapping - categories to initial state repositories
self.initial_state_mapping = {
"build_your_own_x": "codecrafters-io-build-your-own-x",
"missing-semester": "missing-semester-missing-semester",
"mcpmark-cicd": "zjwu0522-mcpmark-cicd",
"harmony": "openai-harmony",
"claude-code": "anthropics-claude-code",
"easyr1": "hiyouga-EasyR1",
}
# CDN URL mapping for downloading GitHub templates
self.github_template_url_mapping = {
"codecrafters-io-build-your-own-x": "https://storage.mcpmark.ai/github/codecrafters-io-build-your-own-x.zip",
"missing-semester-missing-semester": "https://storage.mcpmark.ai/github/missing-semester-missing-semester.zip",
"zjwu0522-mcpmark-cicd": "https://storage.mcpmark.ai/github/zjwu0522-mcpmark-cicd.zip",
"openai-harmony": "https://storage.mcpmark.ai/github/openai-harmony.zip",
"anthropics-claude-code": "https://storage.mcpmark.ai/github/anthropics-claude-code.zip",
"hiyouga-EasyR1": "https://storage.mcpmark.ai/github/hiyouga-EasyR1.zip",
}
# =========================================================================
# Core Template Methods (Required by BaseStateManager)
# =========================================================================
# ---------------------------------------------------------------------
# Internal helper – template importer (replicates repo_importer logic)
# ---------------------------------------------------------------------
def _import_template_repo(
self, template_dir: Path, owner: str, private: bool = True
) -> str:
"""Import repository from local template directory to GitHub (simplified)."""
import json
import subprocess
import time
# ------------------------------------------------------------------
# Helper functions (stripped-down versions of repo_importer utilities)
# ------------------------------------------------------------------
def _list_refs(repo_dir: str) -> list[str]:
result = subprocess.run(
["git", "-C", repo_dir, "for-each-ref", "--format=%(refname)"],
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip().splitlines()
def _push_repo(
repo_path: Path, repo_owner: str, repo_name: str, required_refs: list[str]
):
"""Push repo to GitHub: try mirror, else per-ref."""
token = self.github_token
dst_url = f"https://x-access-token:{token}@github.com/{repo_owner}/{repo_name}.git"
try:
subprocess.run(
["git", "-C", str(repo_path), "push", "--mirror", dst_url],
check=True,
capture_output=True,
)
return
except subprocess.CalledProcessError as err:
logger.warning(
"| [push] Mirror push failed – falling back: %s",
err.stderr.decode(errors="ignore"),
)
refs = required_refs or _list_refs(str(repo_path))
for ref in refs:
for attempt in range(3):
try:
subprocess.run(
[
"git",
"-C",
str(repo_path),
"push",
dst_url,
f"{ref}:{ref}",
],
check=True,
capture_output=True,
)
break
except subprocess.CalledProcessError as ref_err:
if attempt == 2:
raise RuntimeError(
f"Failed to push ref {ref}: {ref_err.stderr}"
) from ref_err
time.sleep(2 * (attempt + 1))
# ------------------------------------------------------------------
# Phase 0 – read template metadata
# ------------------------------------------------------------------
meta = json.loads((template_dir / "meta.json").read_text())
repo_name: str = meta["repo"]
pr_head_refs = meta.get("pr_head_refs", [])
default_branch = meta.get("default_branch", "main")
pulls_data = json.loads((template_dir / "pulls.json").read_text())
fork_branches = [
pr["local_branch"]
for pr in pulls_data
if pr.get("is_from_fork") and "local_branch" in pr
]
needed_refs = (
[f"refs/heads/{default_branch}"]
+ [f"refs/heads/{h}" for h in pr_head_refs]
+ [f"refs/heads/{b}" for b in fork_branches]
)
# ------------------------------------------------------------------
# Phase 1 – create empty repo under owner
# ------------------------------------------------------------------
create_payload = {
"name": repo_name,
"description": f"Restored template repo {repo_name}",
"private": private,
"auto_init": False,
"has_issues": True,
"has_projects": True,
"has_wiki": False,
"default_branch": default_branch, # Set the correct default branch
}
auth_user = self._get_authenticated_user()
create_url = (
"https://api.github.com/user/repos"
if owner == auth_user
else f"https://api.github.com/orgs/{owner}/repos"
)
resp = self._request_with_retry("POST", create_url, json=create_payload)
if resp.status_code == 422 and "name already exists" in resp.text:
# Attempt to delete and recreate
self._delete_repository(owner, repo_name)
resp = self._request_with_retry("POST", create_url, json=create_payload)
if resp.status_code not in (200, 201):
raise RuntimeError(f"Failed to create repo: {resp.status_code} {resp.text}")
html_url = resp.json()["html_url"]
logger.info("| [import] Target repository created: %s", html_url)
# Safety check: Prevent importing to public repositories
# Public repos would send @ mention notifications to real users, causing spam
if not private:
error_msg = (
"ERROR: Cannot import template to a public repository.\n\n"
"Reason: The template contains @ mentions of real GitHub users from the original\n"
"repository. Importing to a public repository would send notifications to these\n"
"users, which is disruptive and inappropriate.\n\n"
"Solution: Set private=True when calling _import_template_repo()."
)
logger.error(error_msg)
# Clean up the created repo before raising
self._delete_repository(owner, repo_name)
raise RuntimeError(error_msg)
# Immediately disable GitHub Actions for ALL repositories to prevent any accidental triggers
# We'll re-enable it later only for mcpmark-cicd
logger.info(
"| [import] Disabling GitHub Actions immediately after repo creation..."
)
self._disable_github_actions(owner, repo_name)
# ------------------------------------------------------------------
# Phase 2 – push git history
# ------------------------------------------------------------------
repo_path = template_dir / "repo"
logger.info("| [import] Pushing git history …")
_push_repo(repo_path, owner, repo_name, needed_refs)
# Remove .github directory after pushing with a new commit
import shutil
github_dir = repo_path / ".github"
if github_dir.exists():
logger.info("| [import] Removing .github directory after push …")
shutil.rmtree(github_dir)
# Commit the deletion
subprocess.run(
["git", "-C", str(repo_path), "add", "-A"],
check=True,
capture_output=True,
)
subprocess.run(
[
"git",
"-C",
str(repo_path),
"commit",
"-m",
"Remove .github directory",
],
capture_output=True,
)
# Push the new commit
token = self.github_token
dst_url = (
f"https://x-access-token:{token}@github.com/{owner}/{repo_name}.git"
)
subprocess.run(
["git", "-C", str(repo_path), "push", dst_url],
check=True,
capture_output=True,
)
# ------------------------------------------------------------------
# Phase 3 – recreate issues & PRs
# ------------------------------------------------------------------
def _create_comment(issue_number: int, body: str):
self._request_with_retry(
"POST",
f"https://api.github.com/repos/{owner}/{repo_name}/issues/{issue_number}/comments",
json={"body": body},
)
def _create_issue(item: dict) -> Optional[int]:
data = {
"title": item["title"],
"body": self._obfuscate_mentions(item.get("body", "")),
"labels": item.get("labels", []),
}
r = self._request_with_retry(
"POST",
f"https://api.github.com/repos/{owner}/{repo_name}/issues",
json=data,
)
if r.status_code not in (200, 201):
return None
new_no = r.json()["number"]
if item.get("state") == "closed":
self._request_with_retry(
"PATCH",
f"https://api.github.com/repos/{owner}/{repo_name}/issues/{new_no}",
json={"state": "closed"},
)
return new_no
def _create_pull(pr_itm: dict) -> Optional[int]:
body = self._obfuscate_mentions(pr_itm.get("body", ""))
if pr_itm.get("is_from_fork", False):
fork_note = f"\n\n---\n_This PR was originally from a fork: **{pr_itm.get('fork_owner')}/{pr_itm.get('fork_repo')}** (branch: `{pr_itm['head']}`)_"
body = body + fork_note if body else fork_note[2:]
payload = {
"title": pr_itm["title"],
"body": body,
"head": pr_itm.get("local_branch", pr_itm["head"]),
"base": pr_itm["base"],
}
r = self._request_with_retry(
"POST",
f"https://api.github.com/repos/{owner}/{repo_name}/pulls",
json=payload,
)
if r.status_code not in (200, 201):
return None
return r.json()["number"]
# Issues
issues_data = json.loads((template_dir / "issues.json").read_text())
created_issues = 0
logger.info("| [phase] Re-creating issues …")
for itm in issues_data:
new_no = _create_issue(itm)
if new_no:
created_issues += 1
for c in itm.get("comments", []):
_create_comment(
new_no,
self._obfuscate_mentions(
f"*Original author: @{c['user']}*\n\n{c['body']}"
),
)
logger.info(
"| [phase] Created %d out of %d issues", created_issues, len(issues_data)
)
# Pull requests
logger.info("| [phase] Re-creating pull requests …")
created_prs = 0
skipped_prs = 0
for pr in pulls_data:
new_pr_no = _create_pull(pr)
if new_pr_no:
created_prs += 1
for c in pr.get("comments", []):
_create_comment(
new_pr_no,
self._obfuscate_mentions(
f"*Original author: @{c['user']}*\n\n{c['body']}"
),
)
for rc in pr.get("review_comments", []):
_create_comment(
new_pr_no,
self._obfuscate_mentions(
f"*Original author: @{rc['user']}* (review)\n\n{rc['body']}"
),
)
else:
skipped_prs += 1
logger.info(
"| [phase] Created %d PRs, skipped %d PRs", created_prs, skipped_prs
)
# Re-enable GitHub Actions ONLY for mcpmark-cicd repository
# All other repos remain disabled (as set immediately after creation)
if "mcpmark-cicd" in template_dir.name:
logger.info("| [import] Re-enabling GitHub Actions for CI/CD repository…")
self._enable_github_actions(owner, repo_name)
# Disable notifications to prevent email spam
logger.info("| [import] Disabling repository notifications …")
self._disable_repository_notifications(owner, repo_name)
logger.info("| [import] Repository import complete: %s", html_url)
return html_url
# ---------------------------------------------------------------------
# Public – create initial state using local template import
# ---------------------------------------------------------------------
def _create_initial_state(self, task: "BaseTask") -> Optional[InitialStateInfo]:
"""
Set up GitHub environment for a specific task.
This may involve:
1. Creating/forking test repositories
2. Setting up branches
3. Creating issues or PRs if needed
"""
try:
logger.info(f"| Setting up GitHub state for task: {task.name}")
template_name = self.select_initial_state_for_task(task.category_id)
if template_name is None:
raise RuntimeError(
f"No template configured for task category: {task.category_id}"
)
template_dir = (self.templates_root / template_name).resolve()
if not template_dir.exists():
logger.warning(
"| Template directory %s not found locally, attempting to download from CDN",
template_dir,
)
if not self._download_and_extract_github_template(template_name):
logger.error(
"| Failed to download template %s from CDN", template_name
)
return None
logger.info("| Template %s downloaded successfully", template_name)
logger.info(f"| Importing repository template from {template_dir} …")
owner = self.eval_org if self.eval_org else self._get_authenticated_user()
if "mcpmark-cicd" in template_name:
repo_url = self._import_template_repo(template_dir, owner, False)
else:
repo_url = self._import_template_repo(template_dir, owner, True)
# Record for cleanup later
repo_name = repo_url.rstrip("/").split("/")[-1]
self._repos_to_cleanup.append((owner, repo_name))
# Build InitialStateInfo
return InitialStateInfo(
state_id=f"{owner}/{repo_name}",
state_url=repo_url,
metadata={
"owner": owner,
"repo_name": repo_name,
"category": task.category_id,
"task_id": task.task_id,
},
)
except Exception as e:
logger.error(f"| GitHub setup failed for {task.name}: {e}")
return None
# ---------------------------------------------------------------------
# BaseStateManager required hooks
# ---------------------------------------------------------------------
def _store_initial_state_info(self, task, state_info: InitialStateInfo) -> None: # type: ignore[override]
if hasattr(task, "repository_url"):
task.repository_url = state_info.state_url
def _cleanup_task_initial_state(self, task) -> bool: # type: ignore[override]
"""No-op – cleanup is handled by self.clean_up which deletes imported repos."""
return True
def _cleanup_single_resource(self, resource) -> bool: # type: ignore[override]
"""No-op – we don't use BaseStateManager's tracked_resources anymore."""
return True
# ---------------------------------------------------------------------
def clean_up(self, task=None, **kwargs) -> bool:
"""Delete repositories that were imported for tasks."""
success = True
for owner, repo_name in self._repos_to_cleanup:
try:
self._delete_repository(owner, repo_name)
logger.info("| Deleted repository: %s/%s", owner, repo_name)
except Exception as err:
logger.error(
"| Failed to delete repository %s/%s: %s", owner, repo_name, err
)
success = False
self._repos_to_cleanup.clear()
return success
# =========================================================================
# Repository Creation and Setup Operations
# =========================================================================
def _delete_repository(self, owner: str, repo_name: str):
"""Delete a repository (use with caution)."""
delete_url = f"https://api.github.com/repos/{owner}/{repo_name}"
response = self.session.delete(delete_url)
if response.status_code not in [200, 204]:
logger.warning(
f"| Failed to delete repository {owner}/{repo_name}: {response.text}"
)
raise Exception(
f"| Failed to delete repository {owner}/{repo_name}: {response.status_code} {response.text}"
)
else:
logger.info(f"| Successfully deleted repository {owner}/{repo_name}")
def _obfuscate_mentions(self, text: str) -> str:
"""
Obfuscate @ mentions to prevent notifications to real users.
Replaces @username with @username_XXXX (random suffix) to ensure the mentioned
user does not exist on GitHub. This prevents notification spam when importing
templates that contain @ mentions from original repositories.
Args:
text: The text content that may contain @ mentions
Returns:
Text with obfuscated @ mentions
"""
import re
import random
import string
if not text:
return text
# Pattern matches @username (GitHub usernames: alphanumeric, hyphens, max 39 chars)
# Negative lookbehind (?<![a-zA-Z0-9]) ensures @ is not preceded by alphanumeric,
# which excludes emails like user@example.com
pattern = r"(?<![a-zA-Z0-9])@([a-zA-Z0-9](?:[a-zA-Z0-9-]*[a-zA-Z0-9])?)"
def replace_mention(match):
username = match.group(1)
# Generate random 4-char suffix
suffix = "".join(
random.choices(string.ascii_lowercase + string.digits, k=4)
)
return f"@{username}_{suffix}"
return re.sub(pattern, replace_mention, text)
# ---------------------------------------------------------------------
# Helper utilities (organisation vs user)
# ---------------------------------------------------------------------
def _get_authenticated_user(self) -> str:
"""Return cached authenticated username or fetch once from GitHub."""
if hasattr(self, "_auth_user") and self._auth_user:
return self._auth_user
response = self.session.get("https://api.github.com/user")
if response.status_code == 200:
self._auth_user = response.json()["login"]
return self._auth_user
return None
# ---------------------------------------------------------------------
# Token management helpers
# ---------------------------------------------------------------------
def _update_session_token(self):
"""Update the session Authorization header with the current token."""
current_token = self.token_pool.get_current_token()
self.session.headers.update({"Authorization": f"Bearer {current_token}"})
# Update backward compatibility attribute
self.github_token = current_token
def _rotate_token(self):
"""Rotate to the next token in the pool and update session."""
next_token = self.token_pool.get_next_token()
self.session.headers.update({"Authorization": f"Bearer {next_token}"})
# Update backward compatibility attribute
self.github_token = next_token
logger.debug(f"| Rotated to next token in pool")
# ---------------------------------------------------------------------
# Generic request helper with rate-limit (403) retry handling
# ---------------------------------------------------------------------
def _request_with_retry(
self,
method: str,
url: str,
*,
max_retries: int = 2,
sleep_seconds: int = 120,
**kwargs,
):
"""Send a GitHub API request with basic rate-limit handling and token rotation.
If a request receives HTTP 403 (rate limit):
1. First try rotating to the next token in the pool
2. If still rate limited, sleep and retry
3. After max_retries are exhausted, raise RuntimeError
"""
import time # local import to avoid adding global dependency
attempt = 0
tokens_tried = 0
while True:
# Ensure we have the current token set
self._update_session_token()
resp = self.session.request(method, url, **kwargs)
# Successful or non-rate-limited response – return immediately
if resp.status_code != 403:
return resp
# 403 – very likely rate-limited
# First, try rotating tokens if we have multiple
if (
self.token_pool.pool_size > 1
and tokens_tried < self.token_pool.pool_size
):
logger.warning(
"| GitHub API rate limit encountered. Rotating to next token (tried %d/%d tokens)",
tokens_tried + 1,
self.token_pool.pool_size,
)
self._rotate_token()
tokens_tried += 1
continue
# All tokens exhausted or single token, resort to sleep/retry
if attempt >= max_retries:
raise RuntimeError(
f"GitHub API rate limited after {attempt + 1} attempts with {self.token_pool.pool_size} token(s): {resp.status_code} {resp.text}"
)
logger.warning(
"| All tokens rate limited (attempt %d/%d). Sleeping %d seconds before retrying …",
attempt + 1,
max_retries + 1,
sleep_seconds,
)
time.sleep(sleep_seconds)
attempt += 1
tokens_tried = 0 # Reset token counter for next attempt
# =========================================================================
# Initial State Selection and Repository Creation
# =========================================================================
# Initial state for each task category is resolved via self.initial_state_mapping
def select_initial_state_for_task(self, task_category: str) -> Optional[str]:
"""Resolve template name for a task category with light normalization."""
if not task_category:
return None
candidate_keys = []
candidate_keys.append(task_category)
# Allow users to swap between hyphen/underscore naming conventions.
hyphen_to_underscore = task_category.replace("-", "_")
if hyphen_to_underscore not in candidate_keys:
candidate_keys.append(hyphen_to_underscore)
underscore_to_hyphen = task_category.replace("_", "-")
if underscore_to_hyphen not in candidate_keys:
candidate_keys.append(underscore_to_hyphen)
for key in candidate_keys:
template = self.initial_state_mapping.get(key)
if template:
if key != task_category:
logger.debug(
"| Resolved GitHub template for %s via alias %s -> %s",
task_category,
key,
template,
)
return template
return None
def extract_repo_info_from_url(self, repo_url: str) -> tuple[str, str]:
"""Extract owner and repo name from GitHub URL."""
try:
from urllib.parse import urlparse
# Support https://github.com/owner/repo format
if "github.com" in repo_url:
path = urlparse(repo_url).path.strip("/")
parts = path.split("/")
if len(parts) >= 2:
return parts[0], parts[1]
raise ValueError(f"Invalid GitHub URL format: {repo_url}")
except Exception as e:
logger.error(f"| Failed to extract repo info from URL {repo_url}: {e}")
raise
def get_service_config_for_agent(self) -> dict:
"""
Get service-specific configuration for agent execution.
Rotates to the next token in the pool before returning config
to distribute load across tokens.
Returns:
Dictionary containing configuration needed by the agent/MCP server
"""
service_config = {}
# Add GitHub token if available
if self.github_token:
service_config["github_token"] = self.github_token
return service_config
def set_verification_environment(self, messages_path: str = None) -> None:
"""
Set GitHub-specific environment variables for verification scripts.
This ensures verification scripts use the same token as the current
agent execution, maintaining consistency across the evaluation flow.
Args:
messages_path: Optional path to messages.json file for verification
"""
import os
# Set common MCP_MESSAGES if provided
if messages_path:
os.environ["MCP_MESSAGES"] = str(messages_path)
# Set GitHub-specific token
current_token = self.token_pool.get_current_token()
os.environ["MCP_GITHUB_TOKEN"] = current_token
logger.info("| Set MCP_GITHUB_TOKEN for verification scripts")
def _enable_github_actions(self, owner: str, repo_name: str):
"""Enable GitHub Actions for the repository using REST API."""
try:
# Enable GitHub Actions
url = (
f"https://api.github.com/repos/{owner}/{repo_name}/actions/permissions"
)
response = self.session.put(
url, json={"enabled": True, "allowed_actions": "all"}
)
if response.status_code in [200, 204]:
logger.info(
"| Successfully enabled GitHub Actions for %s/%s", owner, repo_name
)
else:
logger.warning(
"| Failed to enable GitHub Actions: %s %s",
response.status_code,
response.text,
)
except Exception as e:
logger.error("| Failed to enable GitHub Actions: %s", e)
def _disable_github_actions(self, owner: str, repo_name: str):
"""Disable GitHub Actions for the repository using REST API."""
try:
# Disable GitHub Actions
url = (
f"https://api.github.com/repos/{owner}/{repo_name}/actions/permissions"
)
response = self.session.put(url, json={"enabled": False})
if response.status_code in [200, 204]:
logger.info(
"| Successfully disabled GitHub Actions for %s/%s", owner, repo_name
)
else:
logger.warning(
"| Failed to disable GitHub Actions: %s %s",
response.status_code,
response.text,
)
except Exception as e:
logger.error("| Failed to disable GitHub Actions: %s", e)
def _disable_repository_notifications(self, owner: str, repo_name: str):
"""Disable repository notifications to prevent email spam."""
try:
# Set repository notification subscription to ignore
url = f"https://api.github.com/repos/{owner}/{repo_name}/subscription"
response = self.session.put(
url, json={"subscribed": False, "ignored": True}
)
if response.status_code in [200, 201]:
logger.info(
"| Successfully disabled notifications for %s/%s", owner, repo_name
)
elif response.status_code == 403:
# This is expected if the token doesn't have notifications scope
logger.debug(
"| Cannot disable notifications for %s/%s (token lacks notifications scope - this is OK)",
owner,
repo_name,
)
else:
logger.warning(
"| Failed to disable repository notifications: %s %s",
response.status_code,
response.text,
)
except Exception as e:
logger.error("| Failed to disable repository notifications: %s", e)
def _download_and_extract_github_template(self, template_name: str) -> bool:
"""
Download and extract GitHub template from CDN using wget and unzip commands.
This approach preserves original file timestamps and is simpler than Python zipfile.
Args:
template_name: Name of the template to download (e.g., "anthropics-claude-code")
Returns:
bool: True if download and extraction successful
"""
try:
import subprocess
import sys
import tempfile
import shutil
import os
# Get the URL from mapping
if template_name not in self.github_template_url_mapping:
logger.error(f"| No URL mapping found for template: {template_name}")
return False
template_url = self.github_template_url_mapping[template_name]
# Allow override via environment variable
template_url = os.getenv("GITHUB_TEMPLATE_URL", template_url)
logger.info(f"| â—‹ Downloading GitHub template from: {template_url}")
# Create a temporary directory for the download
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
zip_path = temp_path / "github_template.zip"
# Step 1: Download using wget/curl
logger.info("| â—‹ Downloading GitHub template zip file...")
try:
# Use wget if available, otherwise fall back to curl
if sys.platform == "win32":
# Windows: try wget, fall back to curl
try:
result = subprocess.run(
["wget", "-O", str(zip_path), template_url],
capture_output=True,
text=True,
check=True,
)
except (subprocess.CalledProcessError, FileNotFoundError):
# Fall back to curl
result = subprocess.run(
["curl", "-L", "-o", str(zip_path), template_url],
capture_output=True,
text=True,
check=True,
)
else:
# Unix-like systems: try wget, fall back to curl
try:
result = subprocess.run(
["wget", "-O", str(zip_path), template_url],
capture_output=True,
text=True,
check=True,
)
except (subprocess.CalledProcessError, FileNotFoundError):
# Fall back to curl
result = subprocess.run(
["curl", "-L", "-o", str(zip_path), template_url],
capture_output=True,
text=True,
check=True,
)
logger.info("| ✓ Download completed successfully")
except Exception as e:
logger.error(f"| Download failed: {e}")
return False
# Step 2: Extract using unzip
logger.info("| â—‹ Extracting GitHub template...")
try:
# Extract to templates root directory
result = subprocess.run(
["unzip", "-o", str(zip_path), "-d", str(self.templates_root)],
capture_output=True,
text=True,
check=True,
)
logger.info("| ✓ Extraction completed successfully")
except Exception as e:
logger.error(f"| Extraction failed: {e}")
return False
# Step 3: Remove __MACOSX folder if it exists
macosx_path = self.templates_root / "__MACOSX"
if macosx_path.exists():
logger.info("| â—‹ Cleaning up macOS metadata...")
try:
shutil.rmtree(macosx_path)
logger.info("| ✓ Removed __MACOSX folder")
except Exception as e:
logger.warning(f"| Failed to remove __MACOSX folder: {e}")
# Verify the extracted template directory exists
template_path = self.templates_root / template_name
if not template_path.exists():
logger.error(
f"| Extracted template directory not found at expected path: {template_path}"
)
return False
logger.info(
f"| ✓ Successfully downloaded and extracted GitHub template to: {template_path}"
)
return True
except Exception as e:
logger.error(f"| Failed to download and extract GitHub template: {e}")
return False