| """ |
| GitHub State Manager for MCPMark |
| ================================= |
| |
| This module handles GitHub repository state management for consistent task evaluation. |
| Manages test repositories, branches, and cleanup after evaluation. |
| """ |
|
|
| import requests |
| from typing import Optional, List, Union |
| from pathlib import Path |
|
|
| from src.base.state_manager import BaseStateManager, InitialStateInfo |
| from src.base.task_manager import BaseTask |
| from src.logger import get_logger |
| from src.mcp_services.github.token_pool import GitHubTokenPool |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| class GitHubStateManager(BaseStateManager): |
| """ |
| Manages GitHub repository state for task evaluation. |
| """ |
|
|
| def __init__( |
| self, |
| github_token: Union[str, List[str]], |
| |
| eval_org: str = "mcpmark-eval", |
| |
| templates_root: str = "./github_state", |
| ): |
| """ |
| Initialize GitHub state manager. |
| |
| Args: |
| github_token: GitHub Personal Access Token(s). Can be a single token string or a list of tokens for round-robin usage. |
| eval_org: Organisation / user used to host **ephemeral evaluation repositories**. |
| """ |
| super().__init__(service_name="github") |
|
|
| |
| self._repos_to_cleanup: list[tuple[str, str]] = [] |
|
|
| |
| if isinstance(github_token, str): |
| |
| self.token_pool = GitHubTokenPool([github_token]) |
| self.github_token = github_token |
| else: |
| |
| self.token_pool = GitHubTokenPool(github_token) |
| self.github_token = ( |
| self.token_pool.get_current_token() |
| ) |
|
|
| |
| self.eval_org = eval_org |
|
|
| |
| self.templates_root = Path(templates_root).expanduser().resolve() |
|
|
| |
| self.session = requests.Session() |
| |
| self.session.headers.update( |
| { |
| "Accept": "application/vnd.github.v3+json", |
| "X-GitHub-Api-Version": "2022-11-28", |
| "User-Agent": "MCPMark/1.0", |
| } |
| ) |
|
|
| |
| try: |
| |
| self._update_session_token() |
|
|
| response = self.session.get("https://api.github.com/user") |
| if response.status_code != 200: |
| raise ValueError( |
| f"Invalid GitHub token: {response.status_code} {response.text}" |
| ) |
|
|
| user_info = response.json() |
| logger.info(f"GitHub authenticated as: {user_info['login']}") |
| logger.info(f"Using token pool with {self.token_pool.pool_size} token(s)") |
|
|
| |
| if self.eval_org: |
| org_response = self.session.get( |
| f"https://api.github.com/orgs/{self.eval_org}" |
| ) |
| if org_response.status_code == 200: |
| logger.info(f"Using evaluation organisation: {self.eval_org}") |
| else: |
| logger.warning( |
| f"Evaluation organisation {self.eval_org} not accessible, using user account" |
| ) |
| |
| self.eval_org = user_info["login"] |
|
|
| logger.info("GitHub state manager initialized successfully") |
|
|
| except Exception as e: |
| raise RuntimeError(f"GitHub initialization failed: {e}") |
|
|
| |
| self.initial_state_mapping = { |
| "build_your_own_x": "codecrafters-io-build-your-own-x", |
| "missing-semester": "missing-semester-missing-semester", |
| "mcpmark-cicd": "zjwu0522-mcpmark-cicd", |
| "harmony": "openai-harmony", |
| "claude-code": "anthropics-claude-code", |
| "easyr1": "hiyouga-EasyR1", |
| } |
|
|
| |
| self.github_template_url_mapping = { |
| "codecrafters-io-build-your-own-x": "https://storage.mcpmark.ai/github/codecrafters-io-build-your-own-x.zip", |
| "missing-semester-missing-semester": "https://storage.mcpmark.ai/github/missing-semester-missing-semester.zip", |
| "zjwu0522-mcpmark-cicd": "https://storage.mcpmark.ai/github/zjwu0522-mcpmark-cicd.zip", |
| "openai-harmony": "https://storage.mcpmark.ai/github/openai-harmony.zip", |
| "anthropics-claude-code": "https://storage.mcpmark.ai/github/anthropics-claude-code.zip", |
| "hiyouga-EasyR1": "https://storage.mcpmark.ai/github/hiyouga-EasyR1.zip", |
| } |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| def _import_template_repo( |
| self, template_dir: Path, owner: str, private: bool = True |
| ) -> str: |
| """Import repository from local template directory to GitHub (simplified).""" |
|
|
| import json |
| import subprocess |
| import time |
|
|
| |
| |
| |
|
|
| def _list_refs(repo_dir: str) -> list[str]: |
| result = subprocess.run( |
| ["git", "-C", repo_dir, "for-each-ref", "--format=%(refname)"], |
| check=True, |
| capture_output=True, |
| text=True, |
| ) |
| return result.stdout.strip().splitlines() |
|
|
| def _push_repo( |
| repo_path: Path, repo_owner: str, repo_name: str, required_refs: list[str] |
| ): |
| """Push repo to GitHub: try mirror, else per-ref.""" |
| token = self.github_token |
| dst_url = f"https://x-access-token:{token}@github.com/{repo_owner}/{repo_name}.git" |
|
|
| try: |
| subprocess.run( |
| ["git", "-C", str(repo_path), "push", "--mirror", dst_url], |
| check=True, |
| capture_output=True, |
| ) |
| return |
| except subprocess.CalledProcessError as err: |
| logger.warning( |
| "| [push] Mirror push failed – falling back: %s", |
| err.stderr.decode(errors="ignore"), |
| ) |
|
|
| refs = required_refs or _list_refs(str(repo_path)) |
| for ref in refs: |
| for attempt in range(3): |
| try: |
| subprocess.run( |
| [ |
| "git", |
| "-C", |
| str(repo_path), |
| "push", |
| dst_url, |
| f"{ref}:{ref}", |
| ], |
| check=True, |
| capture_output=True, |
| ) |
| break |
| except subprocess.CalledProcessError as ref_err: |
| if attempt == 2: |
| raise RuntimeError( |
| f"Failed to push ref {ref}: {ref_err.stderr}" |
| ) from ref_err |
| time.sleep(2 * (attempt + 1)) |
|
|
| |
| |
| |
| meta = json.loads((template_dir / "meta.json").read_text()) |
| repo_name: str = meta["repo"] |
| pr_head_refs = meta.get("pr_head_refs", []) |
| default_branch = meta.get("default_branch", "main") |
|
|
| pulls_data = json.loads((template_dir / "pulls.json").read_text()) |
| fork_branches = [ |
| pr["local_branch"] |
| for pr in pulls_data |
| if pr.get("is_from_fork") and "local_branch" in pr |
| ] |
| needed_refs = ( |
| [f"refs/heads/{default_branch}"] |
| + [f"refs/heads/{h}" for h in pr_head_refs] |
| + [f"refs/heads/{b}" for b in fork_branches] |
| ) |
|
|
| |
| |
| |
| create_payload = { |
| "name": repo_name, |
| "description": f"Restored template repo {repo_name}", |
| "private": private, |
| "auto_init": False, |
| "has_issues": True, |
| "has_projects": True, |
| "has_wiki": False, |
| "default_branch": default_branch, |
| } |
|
|
| auth_user = self._get_authenticated_user() |
| create_url = ( |
| "https://api.github.com/user/repos" |
| if owner == auth_user |
| else f"https://api.github.com/orgs/{owner}/repos" |
| ) |
|
|
| resp = self._request_with_retry("POST", create_url, json=create_payload) |
| if resp.status_code == 422 and "name already exists" in resp.text: |
| |
| self._delete_repository(owner, repo_name) |
| resp = self._request_with_retry("POST", create_url, json=create_payload) |
|
|
| if resp.status_code not in (200, 201): |
| raise RuntimeError(f"Failed to create repo: {resp.status_code} {resp.text}") |
|
|
| html_url = resp.json()["html_url"] |
| logger.info("| [import] Target repository created: %s", html_url) |
|
|
| |
| |
| if not private: |
| error_msg = ( |
| "ERROR: Cannot import template to a public repository.\n\n" |
| "Reason: The template contains @ mentions of real GitHub users from the original\n" |
| "repository. Importing to a public repository would send notifications to these\n" |
| "users, which is disruptive and inappropriate.\n\n" |
| "Solution: Set private=True when calling _import_template_repo()." |
| ) |
| logger.error(error_msg) |
| |
| self._delete_repository(owner, repo_name) |
| raise RuntimeError(error_msg) |
|
|
| |
| |
| logger.info( |
| "| [import] Disabling GitHub Actions immediately after repo creation..." |
| ) |
| self._disable_github_actions(owner, repo_name) |
|
|
| |
| |
| |
| repo_path = template_dir / "repo" |
|
|
| logger.info("| [import] Pushing git history …") |
| _push_repo(repo_path, owner, repo_name, needed_refs) |
|
|
| |
| import shutil |
|
|
| github_dir = repo_path / ".github" |
| if github_dir.exists(): |
| logger.info("| [import] Removing .github directory after push …") |
| shutil.rmtree(github_dir) |
| |
| subprocess.run( |
| ["git", "-C", str(repo_path), "add", "-A"], |
| check=True, |
| capture_output=True, |
| ) |
| subprocess.run( |
| [ |
| "git", |
| "-C", |
| str(repo_path), |
| "commit", |
| "-m", |
| "Remove .github directory", |
| ], |
| capture_output=True, |
| ) |
| |
| token = self.github_token |
| dst_url = ( |
| f"https://x-access-token:{token}@github.com/{owner}/{repo_name}.git" |
| ) |
| subprocess.run( |
| ["git", "-C", str(repo_path), "push", dst_url], |
| check=True, |
| capture_output=True, |
| ) |
|
|
| |
| |
| |
|
|
| def _create_comment(issue_number: int, body: str): |
| self._request_with_retry( |
| "POST", |
| f"https://api.github.com/repos/{owner}/{repo_name}/issues/{issue_number}/comments", |
| json={"body": body}, |
| ) |
|
|
| def _create_issue(item: dict) -> Optional[int]: |
| data = { |
| "title": item["title"], |
| "body": self._obfuscate_mentions(item.get("body", "")), |
| "labels": item.get("labels", []), |
| } |
| r = self._request_with_retry( |
| "POST", |
| f"https://api.github.com/repos/{owner}/{repo_name}/issues", |
| json=data, |
| ) |
| if r.status_code not in (200, 201): |
| return None |
| new_no = r.json()["number"] |
| if item.get("state") == "closed": |
| self._request_with_retry( |
| "PATCH", |
| f"https://api.github.com/repos/{owner}/{repo_name}/issues/{new_no}", |
| json={"state": "closed"}, |
| ) |
| return new_no |
|
|
| def _create_pull(pr_itm: dict) -> Optional[int]: |
| body = self._obfuscate_mentions(pr_itm.get("body", "")) |
| if pr_itm.get("is_from_fork", False): |
| fork_note = f"\n\n---\n_This PR was originally from a fork: **{pr_itm.get('fork_owner')}/{pr_itm.get('fork_repo')}** (branch: `{pr_itm['head']}`)_" |
| body = body + fork_note if body else fork_note[2:] |
| payload = { |
| "title": pr_itm["title"], |
| "body": body, |
| "head": pr_itm.get("local_branch", pr_itm["head"]), |
| "base": pr_itm["base"], |
| } |
| r = self._request_with_retry( |
| "POST", |
| f"https://api.github.com/repos/{owner}/{repo_name}/pulls", |
| json=payload, |
| ) |
| if r.status_code not in (200, 201): |
| return None |
| return r.json()["number"] |
|
|
| |
| issues_data = json.loads((template_dir / "issues.json").read_text()) |
| created_issues = 0 |
| logger.info("| [phase] Re-creating issues …") |
| for itm in issues_data: |
| new_no = _create_issue(itm) |
| if new_no: |
| created_issues += 1 |
| for c in itm.get("comments", []): |
| _create_comment( |
| new_no, |
| self._obfuscate_mentions( |
| f"*Original author: @{c['user']}*\n\n{c['body']}" |
| ), |
| ) |
| logger.info( |
| "| [phase] Created %d out of %d issues", created_issues, len(issues_data) |
| ) |
|
|
| |
| logger.info("| [phase] Re-creating pull requests …") |
| created_prs = 0 |
| skipped_prs = 0 |
| for pr in pulls_data: |
| new_pr_no = _create_pull(pr) |
| if new_pr_no: |
| created_prs += 1 |
| for c in pr.get("comments", []): |
| _create_comment( |
| new_pr_no, |
| self._obfuscate_mentions( |
| f"*Original author: @{c['user']}*\n\n{c['body']}" |
| ), |
| ) |
| for rc in pr.get("review_comments", []): |
| _create_comment( |
| new_pr_no, |
| self._obfuscate_mentions( |
| f"*Original author: @{rc['user']}* (review)\n\n{rc['body']}" |
| ), |
| ) |
| else: |
| skipped_prs += 1 |
| logger.info( |
| "| [phase] Created %d PRs, skipped %d PRs", created_prs, skipped_prs |
| ) |
|
|
| |
| |
| if "mcpmark-cicd" in template_dir.name: |
| logger.info("| [import] Re-enabling GitHub Actions for CI/CD repository…") |
| self._enable_github_actions(owner, repo_name) |
|
|
| |
| logger.info("| [import] Disabling repository notifications …") |
| self._disable_repository_notifications(owner, repo_name) |
|
|
| logger.info("| [import] Repository import complete: %s", html_url) |
| return html_url |
|
|
| |
| |
| |
|
|
| def _create_initial_state(self, task: "BaseTask") -> Optional[InitialStateInfo]: |
| """ |
| Set up GitHub environment for a specific task. |
| |
| This may involve: |
| 1. Creating/forking test repositories |
| 2. Setting up branches |
| 3. Creating issues or PRs if needed |
| """ |
| try: |
| logger.info(f"| Setting up GitHub state for task: {task.name}") |
|
|
| template_name = self.select_initial_state_for_task(task.category_id) |
| if template_name is None: |
| raise RuntimeError( |
| f"No template configured for task category: {task.category_id}" |
| ) |
|
|
| template_dir = (self.templates_root / template_name).resolve() |
| if not template_dir.exists(): |
| logger.warning( |
| "| Template directory %s not found locally, attempting to download from CDN", |
| template_dir, |
| ) |
| if not self._download_and_extract_github_template(template_name): |
| logger.error( |
| "| Failed to download template %s from CDN", template_name |
| ) |
| return None |
| logger.info("| Template %s downloaded successfully", template_name) |
|
|
| logger.info(f"| Importing repository template from {template_dir} …") |
| owner = self.eval_org if self.eval_org else self._get_authenticated_user() |
|
|
| if "mcpmark-cicd" in template_name: |
| repo_url = self._import_template_repo(template_dir, owner, False) |
| else: |
| repo_url = self._import_template_repo(template_dir, owner, True) |
|
|
| |
| repo_name = repo_url.rstrip("/").split("/")[-1] |
| self._repos_to_cleanup.append((owner, repo_name)) |
|
|
| |
| return InitialStateInfo( |
| state_id=f"{owner}/{repo_name}", |
| state_url=repo_url, |
| metadata={ |
| "owner": owner, |
| "repo_name": repo_name, |
| "category": task.category_id, |
| "task_id": task.task_id, |
| }, |
| ) |
|
|
| except Exception as e: |
| logger.error(f"| GitHub setup failed for {task.name}: {e}") |
| return None |
|
|
| |
| |
| |
|
|
| def _store_initial_state_info(self, task, state_info: InitialStateInfo) -> None: |
| if hasattr(task, "repository_url"): |
| task.repository_url = state_info.state_url |
|
|
| def _cleanup_task_initial_state(self, task) -> bool: |
| """No-op – cleanup is handled by self.clean_up which deletes imported repos.""" |
| return True |
|
|
| def _cleanup_single_resource(self, resource) -> bool: |
| """No-op – we don't use BaseStateManager's tracked_resources anymore.""" |
| return True |
|
|
| |
| def clean_up(self, task=None, **kwargs) -> bool: |
| """Delete repositories that were imported for tasks.""" |
| success = True |
| for owner, repo_name in self._repos_to_cleanup: |
| try: |
| self._delete_repository(owner, repo_name) |
| logger.info("| Deleted repository: %s/%s", owner, repo_name) |
| except Exception as err: |
| logger.error( |
| "| Failed to delete repository %s/%s: %s", owner, repo_name, err |
| ) |
| success = False |
|
|
| self._repos_to_cleanup.clear() |
| return success |
|
|
| |
| |
| |
|
|
| def _delete_repository(self, owner: str, repo_name: str): |
| """Delete a repository (use with caution).""" |
| delete_url = f"https://api.github.com/repos/{owner}/{repo_name}" |
| response = self.session.delete(delete_url) |
|
|
| if response.status_code not in [200, 204]: |
| logger.warning( |
| f"| Failed to delete repository {owner}/{repo_name}: {response.text}" |
| ) |
| raise Exception( |
| f"| Failed to delete repository {owner}/{repo_name}: {response.status_code} {response.text}" |
| ) |
| else: |
| logger.info(f"| Successfully deleted repository {owner}/{repo_name}") |
|
|
| def _obfuscate_mentions(self, text: str) -> str: |
| """ |
| Obfuscate @ mentions to prevent notifications to real users. |
| |
| Replaces @username with @username_XXXX (random suffix) to ensure the mentioned |
| user does not exist on GitHub. This prevents notification spam when importing |
| templates that contain @ mentions from original repositories. |
| |
| Args: |
| text: The text content that may contain @ mentions |
| |
| Returns: |
| Text with obfuscated @ mentions |
| """ |
| import re |
| import random |
| import string |
|
|
| if not text: |
| return text |
|
|
| |
| |
| |
| pattern = r"(?<![a-zA-Z0-9])@([a-zA-Z0-9](?:[a-zA-Z0-9-]*[a-zA-Z0-9])?)" |
|
|
| def replace_mention(match): |
| username = match.group(1) |
| |
| suffix = "".join( |
| random.choices(string.ascii_lowercase + string.digits, k=4) |
| ) |
| return f"@{username}_{suffix}" |
|
|
| return re.sub(pattern, replace_mention, text) |
|
|
| |
| |
| |
|
|
| def _get_authenticated_user(self) -> str: |
| """Return cached authenticated username or fetch once from GitHub.""" |
| if hasattr(self, "_auth_user") and self._auth_user: |
| return self._auth_user |
|
|
| response = self.session.get("https://api.github.com/user") |
| if response.status_code == 200: |
| self._auth_user = response.json()["login"] |
| return self._auth_user |
| return None |
|
|
| |
| |
| |
| def _update_session_token(self): |
| """Update the session Authorization header with the current token.""" |
| current_token = self.token_pool.get_current_token() |
| self.session.headers.update({"Authorization": f"Bearer {current_token}"}) |
| |
| self.github_token = current_token |
|
|
| def _rotate_token(self): |
| """Rotate to the next token in the pool and update session.""" |
| next_token = self.token_pool.get_next_token() |
| self.session.headers.update({"Authorization": f"Bearer {next_token}"}) |
| |
| self.github_token = next_token |
| logger.debug(f"| Rotated to next token in pool") |
|
|
| |
| |
| |
| def _request_with_retry( |
| self, |
| method: str, |
| url: str, |
| *, |
| max_retries: int = 2, |
| sleep_seconds: int = 120, |
| **kwargs, |
| ): |
| """Send a GitHub API request with basic rate-limit handling and token rotation. |
| |
| If a request receives HTTP 403 (rate limit): |
| 1. First try rotating to the next token in the pool |
| 2. If still rate limited, sleep and retry |
| 3. After max_retries are exhausted, raise RuntimeError |
| """ |
| import time |
|
|
| attempt = 0 |
| tokens_tried = 0 |
|
|
| while True: |
| |
| self._update_session_token() |
|
|
| resp = self.session.request(method, url, **kwargs) |
| |
| if resp.status_code != 403: |
| return resp |
|
|
| |
| |
| if ( |
| self.token_pool.pool_size > 1 |
| and tokens_tried < self.token_pool.pool_size |
| ): |
| logger.warning( |
| "| GitHub API rate limit encountered. Rotating to next token (tried %d/%d tokens)", |
| tokens_tried + 1, |
| self.token_pool.pool_size, |
| ) |
| self._rotate_token() |
| tokens_tried += 1 |
| continue |
|
|
| |
| if attempt >= max_retries: |
| raise RuntimeError( |
| f"GitHub API rate limited after {attempt + 1} attempts with {self.token_pool.pool_size} token(s): {resp.status_code} {resp.text}" |
| ) |
|
|
| logger.warning( |
| "| All tokens rate limited (attempt %d/%d). Sleeping %d seconds before retrying …", |
| attempt + 1, |
| max_retries + 1, |
| sleep_seconds, |
| ) |
| time.sleep(sleep_seconds) |
| attempt += 1 |
| tokens_tried = 0 |
|
|
| |
| |
| |
|
|
| |
| def select_initial_state_for_task(self, task_category: str) -> Optional[str]: |
| """Resolve template name for a task category with light normalization.""" |
| if not task_category: |
| return None |
|
|
| candidate_keys = [] |
| candidate_keys.append(task_category) |
|
|
| |
| hyphen_to_underscore = task_category.replace("-", "_") |
| if hyphen_to_underscore not in candidate_keys: |
| candidate_keys.append(hyphen_to_underscore) |
|
|
| underscore_to_hyphen = task_category.replace("_", "-") |
| if underscore_to_hyphen not in candidate_keys: |
| candidate_keys.append(underscore_to_hyphen) |
|
|
| for key in candidate_keys: |
| template = self.initial_state_mapping.get(key) |
| if template: |
| if key != task_category: |
| logger.debug( |
| "| Resolved GitHub template for %s via alias %s -> %s", |
| task_category, |
| key, |
| template, |
| ) |
| return template |
|
|
| return None |
|
|
| def extract_repo_info_from_url(self, repo_url: str) -> tuple[str, str]: |
| """Extract owner and repo name from GitHub URL.""" |
| try: |
| from urllib.parse import urlparse |
|
|
| |
| if "github.com" in repo_url: |
| path = urlparse(repo_url).path.strip("/") |
| parts = path.split("/") |
| if len(parts) >= 2: |
| return parts[0], parts[1] |
|
|
| raise ValueError(f"Invalid GitHub URL format: {repo_url}") |
|
|
| except Exception as e: |
| logger.error(f"| Failed to extract repo info from URL {repo_url}: {e}") |
| raise |
|
|
| def get_service_config_for_agent(self) -> dict: |
| """ |
| Get service-specific configuration for agent execution. |
| |
| Rotates to the next token in the pool before returning config |
| to distribute load across tokens. |
| |
| Returns: |
| Dictionary containing configuration needed by the agent/MCP server |
| """ |
| service_config = {} |
|
|
| |
| if self.github_token: |
| service_config["github_token"] = self.github_token |
|
|
| return service_config |
|
|
| def set_verification_environment(self, messages_path: str = None) -> None: |
| """ |
| Set GitHub-specific environment variables for verification scripts. |
| |
| This ensures verification scripts use the same token as the current |
| agent execution, maintaining consistency across the evaluation flow. |
| |
| Args: |
| messages_path: Optional path to messages.json file for verification |
| """ |
| import os |
|
|
| |
| if messages_path: |
| os.environ["MCP_MESSAGES"] = str(messages_path) |
|
|
| |
| current_token = self.token_pool.get_current_token() |
| os.environ["MCP_GITHUB_TOKEN"] = current_token |
| logger.info("| Set MCP_GITHUB_TOKEN for verification scripts") |
|
|
| def _enable_github_actions(self, owner: str, repo_name: str): |
| """Enable GitHub Actions for the repository using REST API.""" |
| try: |
| |
| url = ( |
| f"https://api.github.com/repos/{owner}/{repo_name}/actions/permissions" |
| ) |
| response = self.session.put( |
| url, json={"enabled": True, "allowed_actions": "all"} |
| ) |
|
|
| if response.status_code in [200, 204]: |
| logger.info( |
| "| Successfully enabled GitHub Actions for %s/%s", owner, repo_name |
| ) |
| else: |
| logger.warning( |
| "| Failed to enable GitHub Actions: %s %s", |
| response.status_code, |
| response.text, |
| ) |
|
|
| except Exception as e: |
| logger.error("| Failed to enable GitHub Actions: %s", e) |
|
|
| def _disable_github_actions(self, owner: str, repo_name: str): |
| """Disable GitHub Actions for the repository using REST API.""" |
| try: |
| |
| url = ( |
| f"https://api.github.com/repos/{owner}/{repo_name}/actions/permissions" |
| ) |
| response = self.session.put(url, json={"enabled": False}) |
|
|
| if response.status_code in [200, 204]: |
| logger.info( |
| "| Successfully disabled GitHub Actions for %s/%s", owner, repo_name |
| ) |
| else: |
| logger.warning( |
| "| Failed to disable GitHub Actions: %s %s", |
| response.status_code, |
| response.text, |
| ) |
|
|
| except Exception as e: |
| logger.error("| Failed to disable GitHub Actions: %s", e) |
|
|
| def _disable_repository_notifications(self, owner: str, repo_name: str): |
| """Disable repository notifications to prevent email spam.""" |
| try: |
| |
| url = f"https://api.github.com/repos/{owner}/{repo_name}/subscription" |
| response = self.session.put( |
| url, json={"subscribed": False, "ignored": True} |
| ) |
|
|
| if response.status_code in [200, 201]: |
| logger.info( |
| "| Successfully disabled notifications for %s/%s", owner, repo_name |
| ) |
| elif response.status_code == 403: |
| |
| logger.debug( |
| "| Cannot disable notifications for %s/%s (token lacks notifications scope - this is OK)", |
| owner, |
| repo_name, |
| ) |
| else: |
| logger.warning( |
| "| Failed to disable repository notifications: %s %s", |
| response.status_code, |
| response.text, |
| ) |
|
|
| except Exception as e: |
| logger.error("| Failed to disable repository notifications: %s", e) |
|
|
| def _download_and_extract_github_template(self, template_name: str) -> bool: |
| """ |
| Download and extract GitHub template from CDN using wget and unzip commands. |
| |
| This approach preserves original file timestamps and is simpler than Python zipfile. |
| |
| Args: |
| template_name: Name of the template to download (e.g., "anthropics-claude-code") |
| |
| Returns: |
| bool: True if download and extraction successful |
| """ |
| try: |
| import subprocess |
| import sys |
| import tempfile |
| import shutil |
| import os |
|
|
| |
| if template_name not in self.github_template_url_mapping: |
| logger.error(f"| No URL mapping found for template: {template_name}") |
| return False |
|
|
| template_url = self.github_template_url_mapping[template_name] |
| |
| template_url = os.getenv("GITHUB_TEMPLATE_URL", template_url) |
|
|
| logger.info(f"| â—‹ Downloading GitHub template from: {template_url}") |
|
|
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| temp_path = Path(temp_dir) |
| zip_path = temp_path / "github_template.zip" |
|
|
| |
| logger.info("| â—‹ Downloading GitHub template zip file...") |
| try: |
| |
| if sys.platform == "win32": |
| |
| try: |
| result = subprocess.run( |
| ["wget", "-O", str(zip_path), template_url], |
| capture_output=True, |
| text=True, |
| check=True, |
| ) |
| except (subprocess.CalledProcessError, FileNotFoundError): |
| |
| result = subprocess.run( |
| ["curl", "-L", "-o", str(zip_path), template_url], |
| capture_output=True, |
| text=True, |
| check=True, |
| ) |
| else: |
| |
| try: |
| result = subprocess.run( |
| ["wget", "-O", str(zip_path), template_url], |
| capture_output=True, |
| text=True, |
| check=True, |
| ) |
| except (subprocess.CalledProcessError, FileNotFoundError): |
| |
| result = subprocess.run( |
| ["curl", "-L", "-o", str(zip_path), template_url], |
| capture_output=True, |
| text=True, |
| check=True, |
| ) |
|
|
| logger.info("| ✓ Download completed successfully") |
| except Exception as e: |
| logger.error(f"| Download failed: {e}") |
| return False |
|
|
| |
| logger.info("| â—‹ Extracting GitHub template...") |
| try: |
| |
| result = subprocess.run( |
| ["unzip", "-o", str(zip_path), "-d", str(self.templates_root)], |
| capture_output=True, |
| text=True, |
| check=True, |
| ) |
| logger.info("| ✓ Extraction completed successfully") |
| except Exception as e: |
| logger.error(f"| Extraction failed: {e}") |
| return False |
|
|
| |
| macosx_path = self.templates_root / "__MACOSX" |
| if macosx_path.exists(): |
| logger.info("| â—‹ Cleaning up macOS metadata...") |
| try: |
| shutil.rmtree(macosx_path) |
| logger.info("| ✓ Removed __MACOSX folder") |
| except Exception as e: |
| logger.warning(f"| Failed to remove __MACOSX folder: {e}") |
|
|
| |
| template_path = self.templates_root / template_name |
| if not template_path.exists(): |
| logger.error( |
| f"| Extracted template directory not found at expected path: {template_path}" |
| ) |
| return False |
|
|
| logger.info( |
| f"| ✓ Successfully downloaded and extracted GitHub template to: {template_path}" |
| ) |
| return True |
|
|
| except Exception as e: |
| logger.error(f"| Failed to download and extract GitHub template: {e}") |
| return False |
|
|