Spaces:
Running
Running
| """Scrape real OpenAPI version histories from public Git repos. | |
| Clones each provider's openapi repo (bare, blob-filtered), walks tags/commits, | |
| extracts spec files at each version, and writes them to | |
| scenarios/layer1_real/<provider>/<version>.<ext>. | |
| Usage: | |
| python scripts/scrape_specs.py [--out scenarios/layer1_real] [--providers stripe,github,twilio,slack,openai] | |
| python scripts/scrape_specs.py --dry-run # print plan, do not clone | |
| Output layout: | |
| scenarios/layer1_real/ | |
| ├── stripe/ | |
| │ ├── _versions.json # {"2024-04-10": "2024-04-10.json", ...} | |
| │ └── 2024-04-10.json | |
| ├── github/ | |
| │ ├── _versions.json | |
| │ └── 2024-09-01.yaml | |
| ├── twilio/ | |
| │ ├── _versions.json | |
| │ ├── messaging/ | |
| │ └── voice/ | |
| ├── slack/ | |
| │ ├── _versions.json | |
| │ └── 2024-01-01.json | |
| └── openai/ | |
| ├── _versions.json | |
| └── 2024-02-01.yaml | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| # --------------------------------------------------------------------------- | |
| # Provider configurations | |
| # --------------------------------------------------------------------------- | |
| PROVIDERS: Dict[str, dict] = { | |
| "stripe": { | |
| "url": "https://github.com/stripe/openapi.git", | |
| "strategy": "tags", | |
| "spec_paths": ["openapi/spec3.sdk.json", "openapi/spec3.json"], | |
| "tag_pattern": r"^v\d+$", | |
| "language": "python", | |
| "framework": "stripe-python", | |
| "max_versions": 100, | |
| }, | |
| "github": { | |
| "url": "https://github.com/github/rest-api-description.git", | |
| "strategy": "commits", | |
| "spec_paths": [ | |
| "descriptions/api.github.com/api.github.com.yaml", | |
| "descriptions/api.github.com/api.github.com.json", | |
| ], | |
| "language": "python", | |
| "framework": "requests", | |
| "max_versions": 60, | |
| }, | |
| "twilio": { | |
| "url": "https://github.com/twilio/twilio-oai.git", | |
| "strategy": "commits", | |
| "spec_paths": [ | |
| "spec/yaml/twilio_messaging_v1.yaml", | |
| "spec/yaml/twilio_api_v2010.yaml", | |
| ], | |
| "language": "python", | |
| "framework": "twilio-python", | |
| "max_versions": 50, | |
| }, | |
| "slack": { | |
| "url": "https://github.com/slackapi/slack-api-specs.git", | |
| "strategy": "commits", | |
| "spec_paths": [ | |
| "web-api/slack_web_openapi_v2.json", | |
| "web-api/slack_web_openapi_v2_without_examples.json", | |
| ], | |
| "language": "python", | |
| "framework": "slack-sdk", | |
| "max_versions": 40, | |
| }, | |
| "openai": { | |
| "url": "https://github.com/openai/openai-openapi.git", | |
| "strategy": "commits", | |
| "spec_paths": ["openapi.yaml"], | |
| "language": "python", | |
| "framework": "openai", | |
| "max_versions": 30, | |
| }, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Git helpers | |
| # --------------------------------------------------------------------------- | |
| def _run(cmd: List[str], cwd: Optional[Path] = None, capture: bool = True) -> Tuple[int, str]: | |
| result = subprocess.run( | |
| cmd, cwd=str(cwd) if cwd else None, | |
| stdout=subprocess.PIPE if capture else None, | |
| stderr=subprocess.PIPE if capture else None, | |
| text=True, | |
| ) | |
| return result.returncode, (result.stdout or "").strip() | |
| def clone_bare(url: str, dest: Path) -> bool: | |
| """Clone bare with blob:none filter (fast, no working tree).""" | |
| if dest.exists(): | |
| print(f" [skip] already cloned at {dest}") | |
| return True | |
| print(f" [clone] {url} -> {dest}") | |
| rc, _ = _run( | |
| ["git", "clone", "--bare", "--filter=blob:none", url, str(dest)], | |
| capture=False, | |
| ) | |
| if rc != 0: | |
| print(f" [warn] clone failed (rc={rc}), retrying without filter") | |
| rc, _ = _run(["git", "clone", "--bare", "--depth=200", url, str(dest)], capture=False) | |
| return rc == 0 | |
| def list_tags(repo: Path) -> List[str]: | |
| rc, out = _run(["git", "tag", "--list"], cwd=repo) | |
| if rc != 0: | |
| return [] | |
| return [t.strip() for t in out.splitlines() if t.strip()] | |
| def list_commits_touching(repo: Path, spec_path: str, max_n: int = 100) -> List[Tuple[str, str]]: | |
| """Return list of (sha, date_str) for commits that touched spec_path.""" | |
| rc, out = _run( | |
| ["git", "log", "--follow", "--format=%H %ai", "--", spec_path], | |
| cwd=repo, | |
| ) | |
| if rc != 0 or not out: | |
| return [] | |
| results = [] | |
| for line in out.splitlines(): | |
| parts = line.split() | |
| if len(parts) >= 2: | |
| sha = parts[0] | |
| date = parts[1] # YYYY-MM-DD | |
| results.append((sha, date)) | |
| return results[:max_n] | |
| def show_file(repo: Path, ref: str, spec_path: str) -> Optional[bytes]: | |
| """Extract file content at a specific ref.""" | |
| result = subprocess.run( | |
| ["git", "show", f"{ref}:{spec_path}"], | |
| cwd=str(repo), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| ) | |
| if result.returncode != 0: | |
| return None | |
| return result.stdout | |
| def tag_to_date(tag: str, repo: Path) -> Optional[str]: | |
| """Try to get the commit date for a tag.""" | |
| rc, out = _run( | |
| ["git", "log", "-1", "--format=%ai", tag], | |
| cwd=repo, | |
| ) | |
| if rc == 0 and out: | |
| return out.split()[0] # YYYY-MM-DD | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Per-provider extraction | |
| # --------------------------------------------------------------------------- | |
| def extract_stripe(repo: Path, out_dir: Path, max_versions: int) -> Dict[str, str]: | |
| """Walk stripe tags (v700, v701, ...), extract spec3.sdk.json.""" | |
| tags = list_tags(repo) | |
| # Filter to versioned tags | |
| versioned = sorted( | |
| [t for t in tags if re.match(r"^v\d+$", t)], | |
| key=lambda t: int(t[1:]), | |
| ) | |
| if not versioned: | |
| # Fallback: try date-formatted tags | |
| versioned = sorted([t for t in tags if re.match(r"^\d{4}-\d{2}-\d{2}$", t)]) | |
| versions_map: Dict[str, str] = {} | |
| spec_paths = PROVIDERS["stripe"]["spec_paths"] | |
| for tag in versioned[-max_versions:]: | |
| date = tag_to_date(tag, repo) | |
| if not date: | |
| date = tag # use tag name as version label | |
| for sp in spec_paths: | |
| content = show_file(repo, tag, sp) | |
| if content: | |
| ext = Path(sp).suffix | |
| fname = f"{date}{ext}" | |
| (out_dir / fname).write_bytes(content) | |
| versions_map[date] = fname | |
| break | |
| return versions_map | |
| def extract_via_commits( | |
| repo: Path, | |
| out_dir: Path, | |
| spec_paths: List[str], | |
| max_versions: int, | |
| subdir: Optional[str] = None, | |
| ) -> Dict[str, str]: | |
| """Walk commit history for each spec_path, dedup by date.""" | |
| target_dir = out_dir / subdir if subdir else out_dir | |
| target_dir.mkdir(parents=True, exist_ok=True) | |
| versions_map: Dict[str, str] = {} | |
| for sp in spec_paths: | |
| commits = list_commits_touching(repo, sp, max_versions) | |
| if not commits: | |
| continue | |
| seen_dates: Dict[str, str] = {} | |
| for sha, date in commits: | |
| if date in seen_dates: | |
| continue | |
| content = show_file(repo, sha, sp) | |
| if not content: | |
| continue | |
| ext = Path(sp).suffix | |
| fname = f"{date}{ext}" | |
| (target_dir / fname).write_bytes(content) | |
| seen_dates[date] = fname | |
| versions_map.update(seen_dates) | |
| if len(versions_map) >= max_versions: | |
| break | |
| return versions_map | |
| def extract_twilio(repo: Path, out_dir: Path, max_versions: int) -> Dict[str, str]: | |
| """Twilio has multiple product lines; organize by product.""" | |
| spec_files = { | |
| "messaging": "spec/yaml/twilio_messaging_v1.yaml", | |
| "api": "spec/yaml/twilio_api_v2010.yaml", | |
| "voice": "spec/yaml/twilio_voice_v1.yaml", | |
| } | |
| combined: Dict[str, str] = {} | |
| for product, sp in spec_files.items(): | |
| sub_map = extract_via_commits(repo, out_dir, [sp], max_versions // len(spec_files), subdir=product) | |
| # Also write to the provider root for easier indexing | |
| for date, fname in sub_map.items(): | |
| combined[f"{product}/{date}"] = f"{product}/{fname}" | |
| return combined | |
| # --------------------------------------------------------------------------- | |
| # Main orchestrator | |
| # --------------------------------------------------------------------------- | |
| def scrape_provider( | |
| name: str, | |
| cfg: dict, | |
| temp_dir: Path, | |
| out_root: Path, | |
| dry_run: bool = False, | |
| ) -> Dict[str, str]: | |
| out_dir = out_root / name | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| if dry_run: | |
| print(f"[dry-run] {name}: would clone {cfg['url']} and extract {cfg['max_versions']} versions") | |
| return {} | |
| repo_dir = temp_dir / f"{name}.git" | |
| print(f"\n[{name}] Cloning...") | |
| if not clone_bare(cfg["url"], repo_dir): | |
| print(f"[{name}] Clone failed, skipping.") | |
| return {} | |
| print(f"[{name}] Extracting specs...") | |
| if name == "stripe": | |
| versions_map = extract_stripe(repo_dir, out_dir, cfg["max_versions"]) | |
| elif name == "twilio": | |
| versions_map = extract_twilio(repo_dir, out_dir, cfg["max_versions"]) | |
| else: | |
| versions_map = extract_via_commits( | |
| repo_dir, out_dir, cfg["spec_paths"], cfg["max_versions"] | |
| ) | |
| index_file = out_dir / "_versions.json" | |
| index_file.write_text( | |
| json.dumps({"provider": name, "versions": versions_map}, indent=2), | |
| encoding="utf-8", | |
| ) | |
| print(f"[{name}] Done — {len(versions_map)} versions extracted to {out_dir}") | |
| return versions_map | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Scrape real OpenAPI specs from public repos") | |
| parser.add_argument("--out", default="scenarios/layer1_real", help="Output directory") | |
| parser.add_argument("--temp", default=None, help="Temp dir for bare clones (default: system temp)") | |
| parser.add_argument("--providers", default=",".join(PROVIDERS.keys()), | |
| help="Comma-separated provider list") | |
| parser.add_argument("--dry-run", action="store_true", help="Print plan without cloning") | |
| parser.add_argument("--keep-temp", action="store_true", help="Keep bare clone dirs after extraction") | |
| args = parser.parse_args() | |
| out_root = Path(args.out) | |
| out_root.mkdir(parents=True, exist_ok=True) | |
| providers_list = [p.strip() for p in args.providers.split(",") if p.strip() in PROVIDERS] | |
| if not providers_list: | |
| print(f"No valid providers. Available: {list(PROVIDERS.keys())}") | |
| sys.exit(1) | |
| use_temp_dir = args.temp is None and not args.dry_run | |
| temp_ctx = tempfile.TemporaryDirectory(prefix="apishift_clones_") if use_temp_dir else None | |
| temp_dir = Path(temp_ctx.name) if temp_ctx else Path(args.temp or "scripts/_temp_clones") | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| summary: Dict[str, int] = {} | |
| try: | |
| for name in providers_list: | |
| cfg = PROVIDERS[name] | |
| versions_map = scrape_provider(name, cfg, temp_dir, out_root, dry_run=args.dry_run) | |
| summary[name] = len(versions_map) | |
| finally: | |
| if temp_ctx and not args.keep_temp: | |
| temp_ctx.cleanup() | |
| print("\n=== Scrape Summary ===") | |
| total = 0 | |
| for name, count in summary.items(): | |
| print(f" {name}: {count} versions") | |
| total += count | |
| print(f" TOTAL: {total} spec versions") | |
| print(f" Output: {out_root.resolve()}") | |
| if __name__ == "__main__": | |
| main() | |