| """GitHub repository fetch/cache utilities for NeuroCaster.""" |
|
|
| from __future__ import annotations |
|
|
| import os |
| import shutil |
| import subprocess |
| import tarfile |
| import tempfile |
| from pathlib import Path |
| from typing import Any, Dict |
|
|
| import httpx |
|
|
|
|
| def fetch_github_repo(repo_spec: Dict[str, Any], shared_root: Path) -> Dict[str, Any]: |
| """Fetch or reuse an allowlisted GitHub repository under shared_data/repos.""" |
|
|
| source = str(repo_spec.get("url") or repo_spec.get("repo") or "") |
| ref = str(repo_spec.get("ref") or repo_spec.get("branch") or "main") |
| if not source: |
| raise ValueError("repo spec must include 'url' or 'repo'") |
|
|
| owner, repo = _parse_owner_repo(source) |
| cache_dir = shared_root / "repos" / f"{owner}__{repo}__{_safe_ref(ref)}" |
| metadata = { |
| "source": "github", |
| "owner": owner, |
| "repo": repo, |
| "url": f"https://github.com/{owner}/{repo}", |
| "ref": ref, |
| "local_path": str(cache_dir), |
| "cache_hit": cache_dir.exists(), |
| } |
| if cache_dir.exists(): |
| return metadata |
|
|
| cache_dir.parent.mkdir(parents=True, exist_ok=True) |
| if shutil.which("git"): |
| _clone_with_git(owner, repo, ref, cache_dir) |
| else: |
| _download_archive(owner, repo, ref, cache_dir) |
| metadata["cache_hit"] = False |
| return metadata |
|
|
|
|
| def ensure_local_repo(repo_path: Path, shared_root: Path) -> Dict[str, Any]: |
| """Return metadata for an already-present local repository.""" |
|
|
| return { |
| "source": "local", |
| "owner": "local", |
| "repo": repo_path.name, |
| "url": str(repo_path), |
| "ref": "local", |
| "local_path": str(repo_path), |
| "cache_hit": True, |
| } |
|
|
|
|
| def _parse_owner_repo(source: str) -> tuple[str, str]: |
| cleaned = source.removesuffix(".git").rstrip("/") |
| if cleaned.startswith("https://github.com/"): |
| cleaned = cleaned.removeprefix("https://github.com/") |
| parts = cleaned.split("/") |
| if len(parts) < 2: |
| raise ValueError(f"Invalid GitHub repository spec: {source}") |
| return parts[-2], parts[-1] |
|
|
|
|
| def _clone_with_git(owner: str, repo: str, ref: str, output_dir: Path) -> None: |
| url = f"https://github.com/{owner}/{repo}.git" |
| env = os.environ.copy() |
| token = env.get("GITHUB_TOKEN") |
| if token: |
| url = f"https://x-access-token:{token}@github.com/{owner}/{repo}.git" |
| subprocess.run( |
| ["git", "clone", "--depth", "1", "--branch", ref, url, str(output_dir)], |
| check=True, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True, |
| ) |
|
|
|
|
| def _download_archive(owner: str, repo: str, ref: str, output_dir: Path) -> None: |
| headers = {} |
| token = os.getenv("GITHUB_TOKEN") |
| if token: |
| headers["Authorization"] = f"Bearer {token}" |
| archive_url = f"https://github.com/{owner}/{repo}/archive/refs/heads/{ref}.tar.gz" |
| with tempfile.TemporaryDirectory() as tmp: |
| archive = Path(tmp) / "repo.tar.gz" |
| with httpx.stream("GET", archive_url, headers=headers, follow_redirects=True, timeout=60.0) as response: |
| response.raise_for_status() |
| with archive.open("wb") as handle: |
| for chunk in response.iter_bytes(): |
| handle.write(chunk) |
| with tarfile.open(archive) as tar: |
| tar.extractall(tmp) |
| extracted = next(path for path in Path(tmp).iterdir() if path.is_dir()) |
| shutil.move(str(extracted), str(output_dir)) |
|
|
|
|
| def _safe_ref(ref: str) -> str: |
| return "".join(char if char.isalnum() or char in {"-", "_", "."} else "_" for char in ref) |
|
|