Spaces:
Running
Running
| """Extract real client code samples from official SDK repos. | |
| For each provider, clones the official Python/JS SDK and extracts | |
| representative code samples that demonstrate real API usage patterns. | |
| These become the client_files in real scenarios. | |
| Usage: | |
| python scripts/extract_client_samples.py [--out scenarios/layer1_real] | |
| """ | |
| import argparse | |
| import ast | |
| import re | |
| import subprocess | |
| import sys | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| SDK_REPOS = { | |
| "stripe": { | |
| "url": "https://github.com/stripe/stripe-python.git", | |
| "search_paths": ["stripe/api_resources/", "examples/"], | |
| "patterns": [r"stripe\.", r"Invoice\.", r"Webhook\.", r"PaymentIntent\."], | |
| "max_files": 15, | |
| "language": "python", | |
| }, | |
| "github": { | |
| "url": "https://github.com/PyGithub/PyGithub.git", | |
| "search_paths": ["github/", "examples/"], | |
| "patterns": [r"github\.Github", r"repo\.get_", r"requests\.get.*github"], | |
| "max_files": 10, | |
| "language": "python", | |
| }, | |
| "twilio": { | |
| "url": "https://github.com/twilio/twilio-python.git", | |
| "search_paths": ["twilio/rest/", "examples/"], | |
| "patterns": [r"client\.messages", r"client\.calls", r"twilio\."], | |
| "max_files": 10, | |
| "language": "python", | |
| }, | |
| "slack": { | |
| "url": "https://github.com/slackapi/python-slack-sdk.git", | |
| "search_paths": ["slack_sdk/", "slack/", "tutorial/"], | |
| "patterns": [r"WebClient", r"client\.chat_postMessage", r"slack\."], | |
| "max_files": 10, | |
| "language": "python", | |
| }, | |
| "openai": { | |
| "url": "https://github.com/openai/openai-python.git", | |
| "search_paths": ["openai/", "examples/"], | |
| "patterns": [r"openai\.", r"client\.chat", r"client\.completions"], | |
| "max_files": 8, | |
| "language": "python", | |
| }, | |
| } | |
| # Curated fallback samples when SDK clone fails or no matches found | |
| FALLBACK_SAMPLES: Dict[str, Dict[str, str]] = { | |
| "stripe": { | |
| "invoice_create.py": ( | |
| "import stripe\n" | |
| "stripe.api_key = 'sk_test_...'\n\n" | |
| "def create_invoice(customer_id: str, amount: int) -> dict:\n" | |
| " invoice = stripe.Invoice.create(\n" | |
| " customer=customer_id,\n" | |
| " auto_advance=True,\n" | |
| " )\n" | |
| " stripe.InvoiceItem.create(\n" | |
| " customer=customer_id,\n" | |
| " amount=amount,\n" | |
| " currency='usd',\n" | |
| " invoice=invoice.id,\n" | |
| " )\n" | |
| " return invoice\n" | |
| ), | |
| "webhook_handler.py": ( | |
| "import stripe\n" | |
| "from flask import request, jsonify\n\n" | |
| "def handle_webhook(endpoint_secret: str):\n" | |
| " payload = request.get_data(as_text=True)\n" | |
| " sig_header = request.headers.get('Stripe-Signature')\n" | |
| " try:\n" | |
| " event = stripe.Webhook.construct_event(\n" | |
| " payload, sig_header, endpoint_secret\n" | |
| " )\n" | |
| " except stripe.error.SignatureVerificationError:\n" | |
| " return jsonify({'error': 'Invalid signature'}), 400\n" | |
| " return jsonify({'status': 'ok'})\n" | |
| ), | |
| "payment_intent.py": ( | |
| "import stripe\n\n" | |
| "def create_payment(amount: int, currency: str = 'usd') -> str:\n" | |
| " intent = stripe.PaymentIntent.create(\n" | |
| " amount=amount,\n" | |
| " currency=currency,\n" | |
| " payment_method_types=['card'],\n" | |
| " )\n" | |
| " return intent.client_secret\n" | |
| ), | |
| }, | |
| "github": { | |
| "github_client.py": ( | |
| "import requests\n\n" | |
| "BASE = 'https://api.github.com'\n\n" | |
| "def get_repo(owner: str, repo: str, token: str) -> dict:\n" | |
| " r = requests.get(\n" | |
| " f'{BASE}/repos/{owner}/{repo}',\n" | |
| " headers={'Authorization': f'token {token}',\n" | |
| " 'Accept': 'application/vnd.github.v3+json'},\n" | |
| " )\n" | |
| " r.raise_for_status()\n" | |
| " return r.json()\n\n" | |
| "def list_issues(owner: str, repo: str, token: str) -> list:\n" | |
| " r = requests.get(\n" | |
| " f'{BASE}/repos/{owner}/{repo}/issues',\n" | |
| " headers={'Authorization': f'token {token}'},\n" | |
| " )\n" | |
| " return r.json()\n" | |
| ), | |
| "release_manager.py": ( | |
| "import requests\n\n" | |
| "def create_release(owner: str, repo: str, tag: str, token: str) -> dict:\n" | |
| " r = requests.post(\n" | |
| " f'https://api.github.com/repos/{owner}/{repo}/releases',\n" | |
| " json={'tag_name': tag, 'name': tag, 'draft': False},\n" | |
| " headers={'Authorization': f'token {token}'},\n" | |
| " )\n" | |
| " return r.json()\n" | |
| ), | |
| }, | |
| "twilio": { | |
| "send_sms.py": ( | |
| "from twilio.rest import Client\n\n" | |
| "def send_sms(account_sid: str, auth_token: str, to: str, from_: str, body: str) -> str:\n" | |
| " client = Client(account_sid, auth_token)\n" | |
| " message = client.messages.create(\n" | |
| " body=body,\n" | |
| " from_=from_,\n" | |
| " to=to,\n" | |
| " )\n" | |
| " return message.sid\n" | |
| ), | |
| "voice_call.py": ( | |
| "from twilio.rest import Client\n\n" | |
| "def make_call(account_sid: str, auth_token: str, to: str, from_: str, twiml_url: str) -> str:\n" | |
| " client = Client(account_sid, auth_token)\n" | |
| " call = client.calls.create(\n" | |
| " to=to,\n" | |
| " from_=from_,\n" | |
| " url=twiml_url,\n" | |
| " )\n" | |
| " return call.sid\n" | |
| ), | |
| }, | |
| "slack": { | |
| "slack_client.py": ( | |
| "from slack_sdk import WebClient\n" | |
| "from slack_sdk.errors import SlackApiError\n\n" | |
| "def post_message(token: str, channel: str, text: str) -> dict:\n" | |
| " client = WebClient(token=token)\n" | |
| " try:\n" | |
| " result = client.chat_postMessage(channel=channel, text=text)\n" | |
| " return result.data\n" | |
| " except SlackApiError as e:\n" | |
| " raise RuntimeError(f'Slack error: {e.response[\"error\"]}') from e\n" | |
| ), | |
| "slack_logout.py": ( | |
| "import requests\n\n" | |
| "def logout(token: str) -> bool:\n" | |
| " r = requests.post(\n" | |
| " 'https://slack.com/api/auth.revoke',\n" | |
| " headers={'Authorization': f'Bearer {token}'},\n" | |
| " )\n" | |
| " return r.json().get('ok', False)\n" | |
| ), | |
| }, | |
| "openai": { | |
| "chat_client.py": ( | |
| "from openai import OpenAI\n\n" | |
| "def chat(prompt: str, model: str = 'gpt-4') -> str:\n" | |
| " client = OpenAI()\n" | |
| " response = client.chat.completions.create(\n" | |
| " model=model,\n" | |
| " messages=[{'role': 'user', 'content': prompt}],\n" | |
| " )\n" | |
| " return response.choices[0].message.content\n" | |
| ), | |
| }, | |
| } | |
| def _run(cmd: List[str], cwd: Optional[Path] = None) -> int: | |
| result = subprocess.run( | |
| cmd, cwd=str(cwd) if cwd else None, | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, | |
| ) | |
| return result.returncode | |
| def _is_interesting_py(content: str, patterns: List[str]) -> bool: | |
| for pat in patterns: | |
| if re.search(pat, content): | |
| return True | |
| return False | |
| def extract_from_clone( | |
| provider: str, cfg: dict, temp_dir: Path, out_dir: Path | |
| ) -> int: | |
| repo_dir = temp_dir / provider | |
| print(f" [clone] {cfg['url']} -> {repo_dir}") | |
| rc = _run(["git", "clone", "--depth=1", "--filter=blob:none", cfg["url"], str(repo_dir)]) | |
| if rc != 0: | |
| print(f" [warn] SDK clone failed for {provider}, using fallback samples") | |
| return 0 | |
| count = 0 | |
| for search_path in cfg["search_paths"]: | |
| search_dir = repo_dir / search_path | |
| if not search_dir.exists(): | |
| continue | |
| for py_file in search_dir.rglob("*.py"): | |
| if count >= cfg["max_files"]: | |
| break | |
| try: | |
| content = py_file.read_text(encoding="utf-8", errors="ignore") | |
| except Exception: | |
| continue | |
| if not _is_interesting_py(content, cfg["patterns"]): | |
| continue | |
| if len(content) < 100 or len(content) > 5000: | |
| continue | |
| dest = out_dir / py_file.name | |
| dest.write_text(content, encoding="utf-8") | |
| count += 1 | |
| return count | |
| def write_fallbacks(provider: str, out_dir: Path) -> int: | |
| samples = FALLBACK_SAMPLES.get(provider, {}) | |
| for fname, content in samples.items(): | |
| (out_dir / fname).write_text(content, encoding="utf-8") | |
| return len(samples) | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Extract real client code samples from SDK repos") | |
| parser.add_argument("--out", default="scenarios/layer1_real") | |
| parser.add_argument("--providers", default=",".join(SDK_REPOS.keys())) | |
| parser.add_argument("--dry-run", action="store_true") | |
| args = parser.parse_args() | |
| out_root = Path(args.out) | |
| providers_list = [p.strip() for p in args.providers.split(",") if p.strip() in SDK_REPOS] | |
| with tempfile.TemporaryDirectory(prefix="apishift_sdk_") as tmp: | |
| temp_dir = Path(tmp) | |
| for provider in providers_list: | |
| cfg = SDK_REPOS[provider] | |
| out_dir = out_root / provider / "client_samples" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| print(f"\n[{provider}] Extracting client samples...") | |
| if args.dry_run: | |
| print(f" [dry-run] would clone {cfg['url']}") | |
| continue | |
| count = extract_from_clone(provider, cfg, temp_dir, out_dir) | |
| if count < 2: | |
| fb = write_fallbacks(provider, out_dir) | |
| print(f" Wrote {fb} fallback samples (clone yielded {count})") | |
| else: | |
| print(f" Extracted {count} real samples from SDK repo") | |
| print("\n=== Client Sample Extraction Done ===") | |
| if __name__ == "__main__": | |
| main() | |