"""Scrape real OpenAPI version histories from public Git repos. Clones each provider's openapi repo (bare, blob-filtered), walks tags/commits, extracts spec files at each version, and writes them to scenarios/layer1_real//.. Usage: python scripts/scrape_specs.py [--out scenarios/layer1_real] [--providers stripe,github,twilio,slack,openai] python scripts/scrape_specs.py --dry-run # print plan, do not clone Output layout: scenarios/layer1_real/ ├── stripe/ │ ├── _versions.json # {"2024-04-10": "2024-04-10.json", ...} │ └── 2024-04-10.json ├── github/ │ ├── _versions.json │ └── 2024-09-01.yaml ├── twilio/ │ ├── _versions.json │ ├── messaging/ │ └── voice/ ├── slack/ │ ├── _versions.json │ └── 2024-01-01.json └── openai/ ├── _versions.json └── 2024-02-01.yaml """ import argparse import json import os import re import shutil import subprocess import sys import tempfile from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple # --------------------------------------------------------------------------- # Provider configurations # --------------------------------------------------------------------------- PROVIDERS: Dict[str, dict] = { "stripe": { "url": "https://github.com/stripe/openapi.git", "strategy": "tags", "spec_paths": ["openapi/spec3.sdk.json", "openapi/spec3.json"], "tag_pattern": r"^v\d+$", "language": "python", "framework": "stripe-python", "max_versions": 100, }, "github": { "url": "https://github.com/github/rest-api-description.git", "strategy": "commits", "spec_paths": [ "descriptions/api.github.com/api.github.com.yaml", "descriptions/api.github.com/api.github.com.json", ], "language": "python", "framework": "requests", "max_versions": 60, }, "twilio": { "url": "https://github.com/twilio/twilio-oai.git", "strategy": "commits", "spec_paths": [ "spec/yaml/twilio_messaging_v1.yaml", "spec/yaml/twilio_api_v2010.yaml", ], "language": "python", "framework": "twilio-python", "max_versions": 50, }, "slack": { "url": "https://github.com/slackapi/slack-api-specs.git", "strategy": "commits", "spec_paths": [ "web-api/slack_web_openapi_v2.json", "web-api/slack_web_openapi_v2_without_examples.json", ], "language": "python", "framework": "slack-sdk", "max_versions": 40, }, "openai": { "url": "https://github.com/openai/openai-openapi.git", "strategy": "commits", "spec_paths": ["openapi.yaml"], "language": "python", "framework": "openai", "max_versions": 30, }, } # --------------------------------------------------------------------------- # Git helpers # --------------------------------------------------------------------------- def _run(cmd: List[str], cwd: Optional[Path] = None, capture: bool = True) -> Tuple[int, str]: result = subprocess.run( cmd, cwd=str(cwd) if cwd else None, stdout=subprocess.PIPE if capture else None, stderr=subprocess.PIPE if capture else None, text=True, ) return result.returncode, (result.stdout or "").strip() def clone_bare(url: str, dest: Path) -> bool: """Clone bare with blob:none filter (fast, no working tree).""" if dest.exists(): print(f" [skip] already cloned at {dest}") return True print(f" [clone] {url} -> {dest}") rc, _ = _run( ["git", "clone", "--bare", "--filter=blob:none", url, str(dest)], capture=False, ) if rc != 0: print(f" [warn] clone failed (rc={rc}), retrying without filter") rc, _ = _run(["git", "clone", "--bare", "--depth=200", url, str(dest)], capture=False) return rc == 0 def list_tags(repo: Path) -> List[str]: rc, out = _run(["git", "tag", "--list"], cwd=repo) if rc != 0: return [] return [t.strip() for t in out.splitlines() if t.strip()] def list_commits_touching(repo: Path, spec_path: str, max_n: int = 100) -> List[Tuple[str, str]]: """Return list of (sha, date_str) for commits that touched spec_path.""" rc, out = _run( ["git", "log", "--follow", "--format=%H %ai", "--", spec_path], cwd=repo, ) if rc != 0 or not out: return [] results = [] for line in out.splitlines(): parts = line.split() if len(parts) >= 2: sha = parts[0] date = parts[1] # YYYY-MM-DD results.append((sha, date)) return results[:max_n] def show_file(repo: Path, ref: str, spec_path: str) -> Optional[bytes]: """Extract file content at a specific ref.""" result = subprocess.run( ["git", "show", f"{ref}:{spec_path}"], cwd=str(repo), stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) if result.returncode != 0: return None return result.stdout def tag_to_date(tag: str, repo: Path) -> Optional[str]: """Try to get the commit date for a tag.""" rc, out = _run( ["git", "log", "-1", "--format=%ai", tag], cwd=repo, ) if rc == 0 and out: return out.split()[0] # YYYY-MM-DD return None # --------------------------------------------------------------------------- # Per-provider extraction # --------------------------------------------------------------------------- def extract_stripe(repo: Path, out_dir: Path, max_versions: int) -> Dict[str, str]: """Walk stripe tags (v700, v701, ...), extract spec3.sdk.json.""" tags = list_tags(repo) # Filter to versioned tags versioned = sorted( [t for t in tags if re.match(r"^v\d+$", t)], key=lambda t: int(t[1:]), ) if not versioned: # Fallback: try date-formatted tags versioned = sorted([t for t in tags if re.match(r"^\d{4}-\d{2}-\d{2}$", t)]) versions_map: Dict[str, str] = {} spec_paths = PROVIDERS["stripe"]["spec_paths"] for tag in versioned[-max_versions:]: date = tag_to_date(tag, repo) if not date: date = tag # use tag name as version label for sp in spec_paths: content = show_file(repo, tag, sp) if content: ext = Path(sp).suffix fname = f"{date}{ext}" (out_dir / fname).write_bytes(content) versions_map[date] = fname break return versions_map def extract_via_commits( repo: Path, out_dir: Path, spec_paths: List[str], max_versions: int, subdir: Optional[str] = None, ) -> Dict[str, str]: """Walk commit history for each spec_path, dedup by date.""" target_dir = out_dir / subdir if subdir else out_dir target_dir.mkdir(parents=True, exist_ok=True) versions_map: Dict[str, str] = {} for sp in spec_paths: commits = list_commits_touching(repo, sp, max_versions) if not commits: continue seen_dates: Dict[str, str] = {} for sha, date in commits: if date in seen_dates: continue content = show_file(repo, sha, sp) if not content: continue ext = Path(sp).suffix fname = f"{date}{ext}" (target_dir / fname).write_bytes(content) seen_dates[date] = fname versions_map.update(seen_dates) if len(versions_map) >= max_versions: break return versions_map def extract_twilio(repo: Path, out_dir: Path, max_versions: int) -> Dict[str, str]: """Twilio has multiple product lines; organize by product.""" spec_files = { "messaging": "spec/yaml/twilio_messaging_v1.yaml", "api": "spec/yaml/twilio_api_v2010.yaml", "voice": "spec/yaml/twilio_voice_v1.yaml", } combined: Dict[str, str] = {} for product, sp in spec_files.items(): sub_map = extract_via_commits(repo, out_dir, [sp], max_versions // len(spec_files), subdir=product) # Also write to the provider root for easier indexing for date, fname in sub_map.items(): combined[f"{product}/{date}"] = f"{product}/{fname}" return combined # --------------------------------------------------------------------------- # Main orchestrator # --------------------------------------------------------------------------- def scrape_provider( name: str, cfg: dict, temp_dir: Path, out_root: Path, dry_run: bool = False, ) -> Dict[str, str]: out_dir = out_root / name out_dir.mkdir(parents=True, exist_ok=True) if dry_run: print(f"[dry-run] {name}: would clone {cfg['url']} and extract {cfg['max_versions']} versions") return {} repo_dir = temp_dir / f"{name}.git" print(f"\n[{name}] Cloning...") if not clone_bare(cfg["url"], repo_dir): print(f"[{name}] Clone failed, skipping.") return {} print(f"[{name}] Extracting specs...") if name == "stripe": versions_map = extract_stripe(repo_dir, out_dir, cfg["max_versions"]) elif name == "twilio": versions_map = extract_twilio(repo_dir, out_dir, cfg["max_versions"]) else: versions_map = extract_via_commits( repo_dir, out_dir, cfg["spec_paths"], cfg["max_versions"] ) index_file = out_dir / "_versions.json" index_file.write_text( json.dumps({"provider": name, "versions": versions_map}, indent=2), encoding="utf-8", ) print(f"[{name}] Done — {len(versions_map)} versions extracted to {out_dir}") return versions_map def main() -> None: parser = argparse.ArgumentParser(description="Scrape real OpenAPI specs from public repos") parser.add_argument("--out", default="scenarios/layer1_real", help="Output directory") parser.add_argument("--temp", default=None, help="Temp dir for bare clones (default: system temp)") parser.add_argument("--providers", default=",".join(PROVIDERS.keys()), help="Comma-separated provider list") parser.add_argument("--dry-run", action="store_true", help="Print plan without cloning") parser.add_argument("--keep-temp", action="store_true", help="Keep bare clone dirs after extraction") args = parser.parse_args() out_root = Path(args.out) out_root.mkdir(parents=True, exist_ok=True) providers_list = [p.strip() for p in args.providers.split(",") if p.strip() in PROVIDERS] if not providers_list: print(f"No valid providers. Available: {list(PROVIDERS.keys())}") sys.exit(1) use_temp_dir = args.temp is None and not args.dry_run temp_ctx = tempfile.TemporaryDirectory(prefix="apishift_clones_") if use_temp_dir else None temp_dir = Path(temp_ctx.name) if temp_ctx else Path(args.temp or "scripts/_temp_clones") temp_dir.mkdir(parents=True, exist_ok=True) summary: Dict[str, int] = {} try: for name in providers_list: cfg = PROVIDERS[name] versions_map = scrape_provider(name, cfg, temp_dir, out_root, dry_run=args.dry_run) summary[name] = len(versions_map) finally: if temp_ctx and not args.keep_temp: temp_ctx.cleanup() print("\n=== Scrape Summary ===") total = 0 for name, count in summary.items(): print(f" {name}: {count} versions") total += count print(f" TOTAL: {total} spec versions") print(f" Output: {out_root.resolve()}") if __name__ == "__main__": main()