apishift-env / scripts /scrape_specs.py
yaswanth169's picture
Initial APIShift env push
3040bf7 verified
"""Scrape real OpenAPI version histories from public Git repos.
Clones each provider's openapi repo (bare, blob-filtered), walks tags/commits,
extracts spec files at each version, and writes them to
scenarios/layer1_real/<provider>/<version>.<ext>.
Usage:
python scripts/scrape_specs.py [--out scenarios/layer1_real] [--providers stripe,github,twilio,slack,openai]
python scripts/scrape_specs.py --dry-run # print plan, do not clone
Output layout:
scenarios/layer1_real/
├── stripe/
│ ├── _versions.json # {"2024-04-10": "2024-04-10.json", ...}
│ └── 2024-04-10.json
├── github/
│ ├── _versions.json
│ └── 2024-09-01.yaml
├── twilio/
│ ├── _versions.json
│ ├── messaging/
│ └── voice/
├── slack/
│ ├── _versions.json
│ └── 2024-01-01.json
└── openai/
├── _versions.json
└── 2024-02-01.yaml
"""
import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Provider configurations
# ---------------------------------------------------------------------------
PROVIDERS: Dict[str, dict] = {
"stripe": {
"url": "https://github.com/stripe/openapi.git",
"strategy": "tags",
"spec_paths": ["openapi/spec3.sdk.json", "openapi/spec3.json"],
"tag_pattern": r"^v\d+$",
"language": "python",
"framework": "stripe-python",
"max_versions": 100,
},
"github": {
"url": "https://github.com/github/rest-api-description.git",
"strategy": "commits",
"spec_paths": [
"descriptions/api.github.com/api.github.com.yaml",
"descriptions/api.github.com/api.github.com.json",
],
"language": "python",
"framework": "requests",
"max_versions": 60,
},
"twilio": {
"url": "https://github.com/twilio/twilio-oai.git",
"strategy": "commits",
"spec_paths": [
"spec/yaml/twilio_messaging_v1.yaml",
"spec/yaml/twilio_api_v2010.yaml",
],
"language": "python",
"framework": "twilio-python",
"max_versions": 50,
},
"slack": {
"url": "https://github.com/slackapi/slack-api-specs.git",
"strategy": "commits",
"spec_paths": [
"web-api/slack_web_openapi_v2.json",
"web-api/slack_web_openapi_v2_without_examples.json",
],
"language": "python",
"framework": "slack-sdk",
"max_versions": 40,
},
"openai": {
"url": "https://github.com/openai/openai-openapi.git",
"strategy": "commits",
"spec_paths": ["openapi.yaml"],
"language": "python",
"framework": "openai",
"max_versions": 30,
},
}
# ---------------------------------------------------------------------------
# Git helpers
# ---------------------------------------------------------------------------
def _run(cmd: List[str], cwd: Optional[Path] = None, capture: bool = True) -> Tuple[int, str]:
result = subprocess.run(
cmd, cwd=str(cwd) if cwd else None,
stdout=subprocess.PIPE if capture else None,
stderr=subprocess.PIPE if capture else None,
text=True,
)
return result.returncode, (result.stdout or "").strip()
def clone_bare(url: str, dest: Path) -> bool:
"""Clone bare with blob:none filter (fast, no working tree)."""
if dest.exists():
print(f" [skip] already cloned at {dest}")
return True
print(f" [clone] {url} -> {dest}")
rc, _ = _run(
["git", "clone", "--bare", "--filter=blob:none", url, str(dest)],
capture=False,
)
if rc != 0:
print(f" [warn] clone failed (rc={rc}), retrying without filter")
rc, _ = _run(["git", "clone", "--bare", "--depth=200", url, str(dest)], capture=False)
return rc == 0
def list_tags(repo: Path) -> List[str]:
rc, out = _run(["git", "tag", "--list"], cwd=repo)
if rc != 0:
return []
return [t.strip() for t in out.splitlines() if t.strip()]
def list_commits_touching(repo: Path, spec_path: str, max_n: int = 100) -> List[Tuple[str, str]]:
"""Return list of (sha, date_str) for commits that touched spec_path."""
rc, out = _run(
["git", "log", "--follow", "--format=%H %ai", "--", spec_path],
cwd=repo,
)
if rc != 0 or not out:
return []
results = []
for line in out.splitlines():
parts = line.split()
if len(parts) >= 2:
sha = parts[0]
date = parts[1] # YYYY-MM-DD
results.append((sha, date))
return results[:max_n]
def show_file(repo: Path, ref: str, spec_path: str) -> Optional[bytes]:
"""Extract file content at a specific ref."""
result = subprocess.run(
["git", "show", f"{ref}:{spec_path}"],
cwd=str(repo),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode != 0:
return None
return result.stdout
def tag_to_date(tag: str, repo: Path) -> Optional[str]:
"""Try to get the commit date for a tag."""
rc, out = _run(
["git", "log", "-1", "--format=%ai", tag],
cwd=repo,
)
if rc == 0 and out:
return out.split()[0] # YYYY-MM-DD
return None
# ---------------------------------------------------------------------------
# Per-provider extraction
# ---------------------------------------------------------------------------
def extract_stripe(repo: Path, out_dir: Path, max_versions: int) -> Dict[str, str]:
"""Walk stripe tags (v700, v701, ...), extract spec3.sdk.json."""
tags = list_tags(repo)
# Filter to versioned tags
versioned = sorted(
[t for t in tags if re.match(r"^v\d+$", t)],
key=lambda t: int(t[1:]),
)
if not versioned:
# Fallback: try date-formatted tags
versioned = sorted([t for t in tags if re.match(r"^\d{4}-\d{2}-\d{2}$", t)])
versions_map: Dict[str, str] = {}
spec_paths = PROVIDERS["stripe"]["spec_paths"]
for tag in versioned[-max_versions:]:
date = tag_to_date(tag, repo)
if not date:
date = tag # use tag name as version label
for sp in spec_paths:
content = show_file(repo, tag, sp)
if content:
ext = Path(sp).suffix
fname = f"{date}{ext}"
(out_dir / fname).write_bytes(content)
versions_map[date] = fname
break
return versions_map
def extract_via_commits(
repo: Path,
out_dir: Path,
spec_paths: List[str],
max_versions: int,
subdir: Optional[str] = None,
) -> Dict[str, str]:
"""Walk commit history for each spec_path, dedup by date."""
target_dir = out_dir / subdir if subdir else out_dir
target_dir.mkdir(parents=True, exist_ok=True)
versions_map: Dict[str, str] = {}
for sp in spec_paths:
commits = list_commits_touching(repo, sp, max_versions)
if not commits:
continue
seen_dates: Dict[str, str] = {}
for sha, date in commits:
if date in seen_dates:
continue
content = show_file(repo, sha, sp)
if not content:
continue
ext = Path(sp).suffix
fname = f"{date}{ext}"
(target_dir / fname).write_bytes(content)
seen_dates[date] = fname
versions_map.update(seen_dates)
if len(versions_map) >= max_versions:
break
return versions_map
def extract_twilio(repo: Path, out_dir: Path, max_versions: int) -> Dict[str, str]:
"""Twilio has multiple product lines; organize by product."""
spec_files = {
"messaging": "spec/yaml/twilio_messaging_v1.yaml",
"api": "spec/yaml/twilio_api_v2010.yaml",
"voice": "spec/yaml/twilio_voice_v1.yaml",
}
combined: Dict[str, str] = {}
for product, sp in spec_files.items():
sub_map = extract_via_commits(repo, out_dir, [sp], max_versions // len(spec_files), subdir=product)
# Also write to the provider root for easier indexing
for date, fname in sub_map.items():
combined[f"{product}/{date}"] = f"{product}/{fname}"
return combined
# ---------------------------------------------------------------------------
# Main orchestrator
# ---------------------------------------------------------------------------
def scrape_provider(
name: str,
cfg: dict,
temp_dir: Path,
out_root: Path,
dry_run: bool = False,
) -> Dict[str, str]:
out_dir = out_root / name
out_dir.mkdir(parents=True, exist_ok=True)
if dry_run:
print(f"[dry-run] {name}: would clone {cfg['url']} and extract {cfg['max_versions']} versions")
return {}
repo_dir = temp_dir / f"{name}.git"
print(f"\n[{name}] Cloning...")
if not clone_bare(cfg["url"], repo_dir):
print(f"[{name}] Clone failed, skipping.")
return {}
print(f"[{name}] Extracting specs...")
if name == "stripe":
versions_map = extract_stripe(repo_dir, out_dir, cfg["max_versions"])
elif name == "twilio":
versions_map = extract_twilio(repo_dir, out_dir, cfg["max_versions"])
else:
versions_map = extract_via_commits(
repo_dir, out_dir, cfg["spec_paths"], cfg["max_versions"]
)
index_file = out_dir / "_versions.json"
index_file.write_text(
json.dumps({"provider": name, "versions": versions_map}, indent=2),
encoding="utf-8",
)
print(f"[{name}] Done — {len(versions_map)} versions extracted to {out_dir}")
return versions_map
def main() -> None:
parser = argparse.ArgumentParser(description="Scrape real OpenAPI specs from public repos")
parser.add_argument("--out", default="scenarios/layer1_real", help="Output directory")
parser.add_argument("--temp", default=None, help="Temp dir for bare clones (default: system temp)")
parser.add_argument("--providers", default=",".join(PROVIDERS.keys()),
help="Comma-separated provider list")
parser.add_argument("--dry-run", action="store_true", help="Print plan without cloning")
parser.add_argument("--keep-temp", action="store_true", help="Keep bare clone dirs after extraction")
args = parser.parse_args()
out_root = Path(args.out)
out_root.mkdir(parents=True, exist_ok=True)
providers_list = [p.strip() for p in args.providers.split(",") if p.strip() in PROVIDERS]
if not providers_list:
print(f"No valid providers. Available: {list(PROVIDERS.keys())}")
sys.exit(1)
use_temp_dir = args.temp is None and not args.dry_run
temp_ctx = tempfile.TemporaryDirectory(prefix="apishift_clones_") if use_temp_dir else None
temp_dir = Path(temp_ctx.name) if temp_ctx else Path(args.temp or "scripts/_temp_clones")
temp_dir.mkdir(parents=True, exist_ok=True)
summary: Dict[str, int] = {}
try:
for name in providers_list:
cfg = PROVIDERS[name]
versions_map = scrape_provider(name, cfg, temp_dir, out_root, dry_run=args.dry_run)
summary[name] = len(versions_map)
finally:
if temp_ctx and not args.keep_temp:
temp_ctx.cleanup()
print("\n=== Scrape Summary ===")
total = 0
for name, count in summary.items():
print(f" {name}: {count} versions")
total += count
print(f" TOTAL: {total} spec versions")
print(f" Output: {out_root.resolve()}")
if __name__ == "__main__":
main()