apishift-env / scripts /extract_client_samples.py
yaswanth169's picture
Initial APIShift env push
3040bf7 verified
"""Extract real client code samples from official SDK repos.
For each provider, clones the official Python/JS SDK and extracts
representative code samples that demonstrate real API usage patterns.
These become the client_files in real scenarios.
Usage:
python scripts/extract_client_samples.py [--out scenarios/layer1_real]
"""
import argparse
import ast
import re
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import Dict, List, Optional
SDK_REPOS = {
"stripe": {
"url": "https://github.com/stripe/stripe-python.git",
"search_paths": ["stripe/api_resources/", "examples/"],
"patterns": [r"stripe\.", r"Invoice\.", r"Webhook\.", r"PaymentIntent\."],
"max_files": 15,
"language": "python",
},
"github": {
"url": "https://github.com/PyGithub/PyGithub.git",
"search_paths": ["github/", "examples/"],
"patterns": [r"github\.Github", r"repo\.get_", r"requests\.get.*github"],
"max_files": 10,
"language": "python",
},
"twilio": {
"url": "https://github.com/twilio/twilio-python.git",
"search_paths": ["twilio/rest/", "examples/"],
"patterns": [r"client\.messages", r"client\.calls", r"twilio\."],
"max_files": 10,
"language": "python",
},
"slack": {
"url": "https://github.com/slackapi/python-slack-sdk.git",
"search_paths": ["slack_sdk/", "slack/", "tutorial/"],
"patterns": [r"WebClient", r"client\.chat_postMessage", r"slack\."],
"max_files": 10,
"language": "python",
},
"openai": {
"url": "https://github.com/openai/openai-python.git",
"search_paths": ["openai/", "examples/"],
"patterns": [r"openai\.", r"client\.chat", r"client\.completions"],
"max_files": 8,
"language": "python",
},
}
# Curated fallback samples when SDK clone fails or no matches found
FALLBACK_SAMPLES: Dict[str, Dict[str, str]] = {
"stripe": {
"invoice_create.py": (
"import stripe\n"
"stripe.api_key = 'sk_test_...'\n\n"
"def create_invoice(customer_id: str, amount: int) -> dict:\n"
" invoice = stripe.Invoice.create(\n"
" customer=customer_id,\n"
" auto_advance=True,\n"
" )\n"
" stripe.InvoiceItem.create(\n"
" customer=customer_id,\n"
" amount=amount,\n"
" currency='usd',\n"
" invoice=invoice.id,\n"
" )\n"
" return invoice\n"
),
"webhook_handler.py": (
"import stripe\n"
"from flask import request, jsonify\n\n"
"def handle_webhook(endpoint_secret: str):\n"
" payload = request.get_data(as_text=True)\n"
" sig_header = request.headers.get('Stripe-Signature')\n"
" try:\n"
" event = stripe.Webhook.construct_event(\n"
" payload, sig_header, endpoint_secret\n"
" )\n"
" except stripe.error.SignatureVerificationError:\n"
" return jsonify({'error': 'Invalid signature'}), 400\n"
" return jsonify({'status': 'ok'})\n"
),
"payment_intent.py": (
"import stripe\n\n"
"def create_payment(amount: int, currency: str = 'usd') -> str:\n"
" intent = stripe.PaymentIntent.create(\n"
" amount=amount,\n"
" currency=currency,\n"
" payment_method_types=['card'],\n"
" )\n"
" return intent.client_secret\n"
),
},
"github": {
"github_client.py": (
"import requests\n\n"
"BASE = 'https://api.github.com'\n\n"
"def get_repo(owner: str, repo: str, token: str) -> dict:\n"
" r = requests.get(\n"
" f'{BASE}/repos/{owner}/{repo}',\n"
" headers={'Authorization': f'token {token}',\n"
" 'Accept': 'application/vnd.github.v3+json'},\n"
" )\n"
" r.raise_for_status()\n"
" return r.json()\n\n"
"def list_issues(owner: str, repo: str, token: str) -> list:\n"
" r = requests.get(\n"
" f'{BASE}/repos/{owner}/{repo}/issues',\n"
" headers={'Authorization': f'token {token}'},\n"
" )\n"
" return r.json()\n"
),
"release_manager.py": (
"import requests\n\n"
"def create_release(owner: str, repo: str, tag: str, token: str) -> dict:\n"
" r = requests.post(\n"
" f'https://api.github.com/repos/{owner}/{repo}/releases',\n"
" json={'tag_name': tag, 'name': tag, 'draft': False},\n"
" headers={'Authorization': f'token {token}'},\n"
" )\n"
" return r.json()\n"
),
},
"twilio": {
"send_sms.py": (
"from twilio.rest import Client\n\n"
"def send_sms(account_sid: str, auth_token: str, to: str, from_: str, body: str) -> str:\n"
" client = Client(account_sid, auth_token)\n"
" message = client.messages.create(\n"
" body=body,\n"
" from_=from_,\n"
" to=to,\n"
" )\n"
" return message.sid\n"
),
"voice_call.py": (
"from twilio.rest import Client\n\n"
"def make_call(account_sid: str, auth_token: str, to: str, from_: str, twiml_url: str) -> str:\n"
" client = Client(account_sid, auth_token)\n"
" call = client.calls.create(\n"
" to=to,\n"
" from_=from_,\n"
" url=twiml_url,\n"
" )\n"
" return call.sid\n"
),
},
"slack": {
"slack_client.py": (
"from slack_sdk import WebClient\n"
"from slack_sdk.errors import SlackApiError\n\n"
"def post_message(token: str, channel: str, text: str) -> dict:\n"
" client = WebClient(token=token)\n"
" try:\n"
" result = client.chat_postMessage(channel=channel, text=text)\n"
" return result.data\n"
" except SlackApiError as e:\n"
" raise RuntimeError(f'Slack error: {e.response[\"error\"]}') from e\n"
),
"slack_logout.py": (
"import requests\n\n"
"def logout(token: str) -> bool:\n"
" r = requests.post(\n"
" 'https://slack.com/api/auth.revoke',\n"
" headers={'Authorization': f'Bearer {token}'},\n"
" )\n"
" return r.json().get('ok', False)\n"
),
},
"openai": {
"chat_client.py": (
"from openai import OpenAI\n\n"
"def chat(prompt: str, model: str = 'gpt-4') -> str:\n"
" client = OpenAI()\n"
" response = client.chat.completions.create(\n"
" model=model,\n"
" messages=[{'role': 'user', 'content': prompt}],\n"
" )\n"
" return response.choices[0].message.content\n"
),
},
}
def _run(cmd: List[str], cwd: Optional[Path] = None) -> int:
result = subprocess.run(
cmd, cwd=str(cwd) if cwd else None,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
return result.returncode
def _is_interesting_py(content: str, patterns: List[str]) -> bool:
for pat in patterns:
if re.search(pat, content):
return True
return False
def extract_from_clone(
provider: str, cfg: dict, temp_dir: Path, out_dir: Path
) -> int:
repo_dir = temp_dir / provider
print(f" [clone] {cfg['url']} -> {repo_dir}")
rc = _run(["git", "clone", "--depth=1", "--filter=blob:none", cfg["url"], str(repo_dir)])
if rc != 0:
print(f" [warn] SDK clone failed for {provider}, using fallback samples")
return 0
count = 0
for search_path in cfg["search_paths"]:
search_dir = repo_dir / search_path
if not search_dir.exists():
continue
for py_file in search_dir.rglob("*.py"):
if count >= cfg["max_files"]:
break
try:
content = py_file.read_text(encoding="utf-8", errors="ignore")
except Exception:
continue
if not _is_interesting_py(content, cfg["patterns"]):
continue
if len(content) < 100 or len(content) > 5000:
continue
dest = out_dir / py_file.name
dest.write_text(content, encoding="utf-8")
count += 1
return count
def write_fallbacks(provider: str, out_dir: Path) -> int:
samples = FALLBACK_SAMPLES.get(provider, {})
for fname, content in samples.items():
(out_dir / fname).write_text(content, encoding="utf-8")
return len(samples)
def main() -> None:
parser = argparse.ArgumentParser(description="Extract real client code samples from SDK repos")
parser.add_argument("--out", default="scenarios/layer1_real")
parser.add_argument("--providers", default=",".join(SDK_REPOS.keys()))
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
out_root = Path(args.out)
providers_list = [p.strip() for p in args.providers.split(",") if p.strip() in SDK_REPOS]
with tempfile.TemporaryDirectory(prefix="apishift_sdk_") as tmp:
temp_dir = Path(tmp)
for provider in providers_list:
cfg = SDK_REPOS[provider]
out_dir = out_root / provider / "client_samples"
out_dir.mkdir(parents=True, exist_ok=True)
print(f"\n[{provider}] Extracting client samples...")
if args.dry_run:
print(f" [dry-run] would clone {cfg['url']}")
continue
count = extract_from_clone(provider, cfg, temp_dir, out_dir)
if count < 2:
fb = write_fallbacks(provider, out_dir)
print(f" Wrote {fb} fallback samples (clone yielded {count})")
else:
print(f" Extracted {count} real samples from SDK repo")
print("\n=== Client Sample Extraction Done ===")
if __name__ == "__main__":
main()