Spaces:
Sleeping
Sleeping
| # openapi_loader.py | |
| from __future__ import annotations | |
| import json, os, re | |
| from typing import Any, Dict, Tuple, Optional | |
| from urllib.parse import urlparse | |
| import requests | |
| import yaml | |
| import gradio as gr | |
| import inspect, keyword, base64 | |
| from openapi_spec_validator import validate_spec | |
| Json = Dict[str, Any] | |
| _VAR_RE = re.compile(r"\{([^}]+)\}") | |
| _PY_IDENT = re.compile(r"^[A-Za-z_]\w*$") | |
| _PY_IDENT = re.compile(r"^[A-Za-z_]\w*$") | |
| def _is_valid_identifier(name: str) -> bool: | |
| return bool(_PY_IDENT.match(name)) and not keyword.iskeyword(name) | |
| def sanitize_header_params(spec: Json) -> Tuple[Json, list]: | |
| """ | |
| Remove header parameters whose names aren't valid Python identifiers | |
| (e.g., 'If-Modified-Since'), which break gr.load_openapi()'s dynamic | |
| function signatures. Returns (mutated_spec, dropped_headers). | |
| """ | |
| dropped = [] | |
| def _clean_params(params): | |
| if not isinstance(params, list): | |
| return params | |
| keep = [] | |
| for p in params: | |
| try: | |
| if p.get("in") == "header" and not _is_valid_identifier(p.get("name", "")): | |
| dropped.append(p.get("name", "")) | |
| continue | |
| except Exception: | |
| pass | |
| keep.append(p) | |
| return keep | |
| # path-level params | |
| paths = spec.get("paths") or {} | |
| for _path, item in list(paths.items()): | |
| if isinstance(item, dict): | |
| if "parameters" in item: | |
| item["parameters"] = _clean_params(item["parameters"]) | |
| # operation-level params | |
| for method, op in list(item.items()): | |
| if method.lower() in ("get", "put", "post", "delete", "patch", "head", "options", "trace"): | |
| if isinstance(op, dict) and "parameters" in op: | |
| op["parameters"] = _clean_params(op["parameters"]) | |
| return spec, dropped | |
| def _is_url(s: str) -> bool: | |
| try: | |
| u = urlparse(s) | |
| return bool(u.scheme and u.netloc) | |
| except Exception: | |
| return False | |
| def load_spec(spec_source: str) -> Tuple[Json, str]: | |
| if _is_url(spec_source): | |
| r = requests.get(spec_source, timeout=30) | |
| r.raise_for_status() | |
| text = r.text | |
| origin = spec_source | |
| else: | |
| with open(spec_source, "r", encoding="utf-8") as f: | |
| text = f.read() | |
| origin = os.path.abspath(spec_source) | |
| try: | |
| spec = json.loads(text) | |
| except json.JSONDecodeError: | |
| spec = yaml.safe_load(text) | |
| if not isinstance(spec, dict): | |
| raise ValueError("Loaded spec is not a JSON/YAML object.") | |
| if "openapi" not in spec: | |
| raise ValueError("Spec must be OpenAPI 3.x (missing 'openapi' key).") | |
| return spec, origin | |
| def normalize_security(spec: Json) -> Json: | |
| components = spec.get("components", {}) or {} | |
| schemes = components.get("securitySchemes", {}) or {} | |
| if not schemes: | |
| return spec | |
| if "security" not in spec or not spec["security"]: | |
| first_key = next(iter(schemes.keys())) | |
| spec.setdefault("security", [{first_key: []}]) | |
| return spec | |
| def _resolve_server_base_url(spec: Json) -> Optional[str]: | |
| servers = spec.get("servers") or [] | |
| if not isinstance(servers, list) or not servers: | |
| return None | |
| server = servers[0] or {} | |
| url = server.get("url") | |
| if not url: | |
| return None | |
| variables = server.get("variables", {}) or {} | |
| def _sub(m: re.Match) -> str: | |
| name = m.group(1) | |
| meta = variables.get(name, {}) or {} | |
| if "default" in meta and meta["default"] is not None: | |
| return str(meta["default"]) | |
| enum = meta.get("enum") or [] | |
| return str(enum[0]) if enum else "" | |
| return _VAR_RE.sub(_sub, url).rstrip("/") | |
| # NEW: follow redirects (HEAD first, then GET if HEAD is not allowed) | |
| def _follow_redirects(url: str) -> str: | |
| try: | |
| resp = requests.head(url, allow_redirects=True, timeout=10) | |
| # Some APIs return 405 for HEAD; try GET in that case | |
| if resp.status_code == 405 or (400 <= resp.status_code < 600 and not resp.history): | |
| resp = requests.get(url, allow_redirects=True, timeout=10, headers={"Accept": "application/json"}) | |
| # If there was a redirect chain, resp.url is the resolved location | |
| final = resp.url if getattr(resp, "url", None) else url | |
| return str(final).rstrip("/") | |
| except Exception: | |
| # If anything goes wrong, fall back to the original | |
| return url.rstrip("/") | |
| def parse_security_schemes(spec: Json) -> Dict[str, Dict[str, Any]]: | |
| comps = spec.get("components") or {} | |
| schemes = comps.get("securitySchemes") or {} | |
| # Normalize minimal fields we care about | |
| out = {} | |
| for name, s in schemes.items(): | |
| typ = (s.get("type") or "").lower() | |
| scheme = (s.get("scheme") or "").lower() | |
| out[name] = { | |
| "type": typ, # apiKey | http | oauth2 | openIdConnect | |
| "in": s.get("in"), # for apiKey: header|query|cookie | |
| "name": s.get("name"), # apiKey header/query name | |
| "scheme": scheme, # for http: bearer|basic|digest | |
| "bearerFormat": s.get("bearerFormat"), | |
| "flows": (s.get("flows") or {}), # for oauth2 | |
| } | |
| return out | |
| def best_default_security(spec: Json) -> Optional[str]: | |
| # If top-level security is declared, pick the first scheme name mentioned. | |
| sec = spec.get("security") or [] | |
| if isinstance(sec, list) and sec: | |
| first = sec[0] or {} | |
| if isinstance(first, dict) and first: | |
| return next(iter(first.keys())) | |
| # Fallback: first declared security scheme name | |
| schemes = parse_security_schemes(spec) | |
| return next(iter(schemes.keys())) if schemes else None | |
| def build_gradio_app_from_spec(spec: Json, base_url: Optional[str] = None, | |
| bearer_token: Optional[str] = None) -> gr.Blocks: | |
| if not base_url: | |
| base_url = _resolve_server_base_url(spec) | |
| if not base_url: | |
| raise ValueError( | |
| "No base_url provided and none found in spec.servers[].url. " | |
| "Pass --base-url or add a 'servers' entry to the spec." | |
| ) | |
| base_url = _follow_redirects(base_url) | |
| # --- Version-adaptive load_openapi (auth token if supported) --- | |
| supports_auth = False | |
| try: | |
| sig = inspect.signature(gr.load_openapi) | |
| supports_auth = ("auth_token" in sig.parameters) or ("bearer_token" in sig.parameters) | |
| except Exception: | |
| pass | |
| try: | |
| if bearer_token and supports_auth: | |
| loaded_app = gr.load_openapi(spec, base_url, auth_token=bearer_token) | |
| else: | |
| loaded_app = gr.load_openapi(spec, base_url) | |
| except TypeError: | |
| print("[AnyAPI→MCP] This Gradio version doesn’t accept an auth token in load_openapi(); proceeding without it.") | |
| loaded_app = gr.load_openapi(spec, base_url) | |
| # --- Build the Blocks UI shell --- | |
| schemes = parse_security_schemes(spec) | |
| default_scheme = best_default_security(spec) | |
| banner_auth = ("Bearer (provided)" if (bearer_token and supports_auth) else | |
| ("Bearer (ignored by this Gradio version)" if bearer_token else "None")) | |
| with gr.Blocks(fill_height=True) as demo: | |
| gr.Markdown("### AnyAPI→MCP Factory — OpenAPI Explorer") | |
| gr.Markdown(f"**Base URL:** `{base_url}` \n**Auth:** {banner_auth}") | |
| # 1) Spec validator (local) | |
| def validate_btn(): | |
| try: | |
| validate_spec(spec) | |
| return "OpenAPI spec: ✅ valid" | |
| except Exception as e: | |
| return f"OpenAPI spec: ❌ {type(e).__name__}: {e}" | |
| val_btn = gr.Button("Validate spec") | |
| val_out = gr.Textbox(label="Validation result", interactive=False) | |
| val_btn.click(fn=validate_btn, outputs=val_out) | |
| # 2) Auth Wizard state (shared to proxy_request) | |
| auth_ctx = gr.State({ | |
| "mode": None, # "apiKey" | "http_bearer" | "http_basic" | "oauth2" | |
| "apiKey": {"in": None, "name": None, "value": None}, | |
| "bearer": {"token": bearer_token or None}, | |
| "basic": {"user": None, "pass": None}, | |
| "oauth2": {"token": None} | |
| }) | |
| # === Auth (minimal) === | |
| with gr.Accordion("Auth", open=True): | |
| scheme_names = list(schemes.keys()) or ["<none>"] | |
| scheme_dd = gr.Dropdown( | |
| scheme_names, | |
| value=(default_scheme or scheme_names[0]), | |
| label="Select a security scheme" | |
| ) | |
| with gr.Row(): | |
| api_in = gr.Dropdown(["header", "query", "cookie"], value="header", label="apiKey.in") | |
| api_name = gr.Textbox(label="apiKey.name", interactive=True, placeholder="X-API-Key") | |
| api_value = gr.Textbox(label="apiKey.value (secret)", type="password") | |
| with gr.Row(): | |
| bearer_inp = gr.Textbox(label="Bearer token (Authorization: Bearer ...)", type="password") | |
| with gr.Row(): | |
| basic_user = gr.Textbox(label="Basic user") | |
| basic_pass = gr.Textbox(label="Basic password", type="password") | |
| def on_scheme_change(sel): | |
| data = schemes.get(sel, {}) if sel and sel in schemes else {} | |
| # Only prefill apiKey.in; leave name editable | |
| return (data.get("in") or "header") | |
| scheme_dd.change(on_scheme_change, inputs=[scheme_dd], outputs=[api_in]) | |
| apply_btn = gr.Button("Apply auth") | |
| apply_out = gr.Textbox(label="Auth status", interactive=False) | |
| def apply_auth(sel, api_in_v, api_name_v, api_value_v, bearer_v, basic_u, basic_p): | |
| ctx = { | |
| "mode": None, | |
| "apiKey": {"in": None, "name": None, "value": None}, | |
| "bearer": {"token": None}, | |
| "basic": {"user": None, "pass": None}, | |
| } | |
| if sel in schemes: | |
| stype = (schemes[sel].get("type") or "").lower() | |
| scheme_name = (schemes[sel].get("scheme") or "").lower() | |
| if stype == "apiKey": | |
| ctx["mode"] = "apiKey" | |
| ctx["apiKey"] = { | |
| "in": api_in_v or schemes[sel].get("in") or "header", | |
| "name": api_name_v or schemes[sel].get("name") or "X-API-Key", | |
| "value": api_value_v | |
| } | |
| elif stype == "http" and scheme_name == "bearer": | |
| ctx["mode"] = "http_bearer" | |
| ctx["bearer"]["token"] = bearer_v | |
| elif stype == "http" and scheme_name == "basic": | |
| ctx["mode"] = "http_basic" | |
| ctx["basic"] = {"user": basic_u, "pass": basic_p} | |
| # Fallbacks if the spec doesn't define a scheme | |
| if ctx["mode"] is None: | |
| if api_value_v and (api_in_v or api_name_v): | |
| ctx["mode"] = "apiKey" | |
| ctx["apiKey"] = {"in": api_in_v or "header", "name": api_name_v or "X-API-Key", "value": api_value_v} | |
| elif bearer_v: | |
| ctx["mode"] = "http_bearer"; ctx["bearer"]["token"] = bearer_v | |
| elif basic_u or basic_p: | |
| ctx["mode"] = "http_basic"; ctx["basic"] = {"user": basic_u, "pass": basic_p} | |
| return ctx, f"Applied auth mode: {ctx['mode']}" | |
| apply_btn.click( | |
| fn=apply_auth, | |
| inputs=[scheme_dd, api_in, api_name, api_value, bearer_inp, basic_user, basic_pass], | |
| outputs=[auth_ctx, apply_out] | |
| ) | |
| # === end Auth === | |
| # 4) Render auto-generated API UI | |
| loaded_app.render() | |
| # 5) Advanced proxy (reuses your earlier implementation, but now reads auth_ctx) | |
| def _safe_json(s): | |
| if not s or not s.strip(): return None | |
| try: return json.loads(s) | |
| except Exception: return None | |
| def proxy_request( | |
| method: str, path: str, query_json: str, headers_json: str, body_json: str, | |
| basic_user_v: str, basic_pass_v: str, api_key_header: str, api_key_value_v: str, _ctx | |
| ) -> tuple[str, str, str]: | |
| url = f"{base_url.rstrip('/')}/{path.lstrip('/')}" | |
| q = _safe_json(query_json) or {} | |
| h = _safe_json(headers_json) or {} | |
| # Apply Auth Wizard context first | |
| mode = (_ctx or {}).get("mode") | |
| if mode == "apiKey": | |
| where = _ctx["apiKey"].get("in") or "header" | |
| nm = _ctx["apiKey"].get("name") or "X-API-Key" | |
| val = _ctx["apiKey"].get("value") | |
| if val: | |
| if where == "query": | |
| q[nm] = val | |
| else: | |
| h[nm] = val | |
| elif mode == "http_bearer": | |
| tok = (_ctx.get("bearer") or {}).get("token") | |
| if tok and "authorization" not in {k.lower():v for k,v in h.items()}: | |
| h["Authorization"] = f"Bearer {tok}" | |
| elif mode == "http_basic": | |
| u = (_ctx.get("basic") or {}).get("user") or "" | |
| p = (_ctx.get("basic") or {}).get("pass") or "" | |
| token = base64.b64encode(f"{u}:{p}".encode()).decode() | |
| h["Authorization"] = f"Basic {token}" | |
| elif mode == "oauth2": | |
| tok = (_ctx.get("oauth2") or {}).get("token") | |
| if tok and "authorization" not in {k.lower():v for k,v in h.items()}: | |
| h["Authorization"] = f"Bearer {tok}" | |
| # Then apply panel overrides (panel wins) | |
| if basic_user_v or basic_pass_v: | |
| token = base64.b64encode(f"{basic_user_v}:{basic_pass_v}".encode()).decode() | |
| h["Authorization"] = f"Basic {token}" | |
| if api_key_header and api_key_value_v: | |
| h[api_key_header] = api_key_value_v | |
| # Body handling | |
| json_body = _safe_json(body_json) | |
| kwargs = {"params": q, "headers": h, "timeout": 30} | |
| if json_body is not None: | |
| kwargs["json"] = json_body | |
| elif body_json and body_json.strip(): | |
| kwargs["data"] = body_json | |
| resp = requests.request(method.upper(), url, **kwargs) | |
| try: | |
| body_out = json.dumps(resp.json(), indent=2, ensure_ascii=False) | |
| except Exception: | |
| body_out = resp.text | |
| headers_out = json.dumps(dict(resp.headers), indent=2) | |
| status_out = f"{resp.status_code} {resp.reason}" | |
| return status_out, body_out, headers_out | |
| with gr.Accordion("Advanced request (API-Key / Basic / custom headers)", open=False): | |
| with gr.Row(): | |
| method = gr.Dropdown(["GET","POST","PUT","PATCH","DELETE","HEAD","OPTIONS"], value="GET", label="Method") | |
| path = gr.Textbox(label="Path (relative to base_url)", value="/get", scale=2) | |
| with gr.Row(): | |
| query = gr.Textbox(label="Query (JSON)", placeholder='{"foo":"bar"}') | |
| headers_box = gr.Textbox(label="Headers (JSON)", placeholder='{"X-Custom-Header":"123"}') | |
| body = gr.Textbox(label="Body (JSON or raw text)", lines=4) | |
| with gr.Row(): | |
| api_key_header = gr.Textbox(label="API-Key Header (e.g., X-API-Key)") | |
| api_key_value = gr.Textbox(label="API-Key Value", type="password") | |
| with gr.Row(): | |
| basic_user = gr.Textbox(label="Basic user") | |
| basic_pass = gr.Textbox(label="Basic password", type="password") | |
| run = gr.Button("Send") | |
| status = gr.Textbox(label="Status") | |
| body_out = gr.Textbox(label="Response Body", lines=12) | |
| headers_out = gr.Textbox(label="Response Headers", lines=8) | |
| run.click( | |
| proxy_request, | |
| inputs=[method, path, query, headers_box, body, basic_user, basic_pass, api_key_header, api_key_value, auth_ctx], | |
| outputs=[status, body_out, headers_out], | |
| api_name="proxy_request" | |
| ) | |
| # === File Upload (multipart/form-data) === | |
| with gr.Accordion("File upload (multipart/form-data)", open=False): | |
| with gr.Row(): | |
| up_method = gr.Dropdown( | |
| ["POST", "PUT", "PATCH"], | |
| value="POST", | |
| label="Method" | |
| ) | |
| up_path = gr.Textbox( | |
| label="Path (relative to base_url)", | |
| value="/post", | |
| scale=2 | |
| ) | |
| with gr.Row(): | |
| up_query = gr.Textbox( | |
| label="Query (JSON)", | |
| placeholder='{"tag":"demo"}' | |
| ) | |
| up_headers = gr.Textbox( | |
| label="Headers (JSON)", | |
| placeholder='{"X-Custom":"123"}' | |
| ) | |
| with gr.Row(): | |
| file_field = gr.Textbox( | |
| label="Form field name for file", | |
| placeholder="file", | |
| value="file" | |
| ) | |
| file_input = gr.File( | |
| label="Choose file to upload", | |
| file_count="single", | |
| type="filepath" | |
| ) | |
| up_send = gr.Button("Upload") | |
| up_status = gr.Textbox(label="Status") | |
| up_body_out = gr.Textbox(label="Response Body", lines=12) | |
| up_headers_out = gr.Textbox(label="Response Headers", lines=8) | |
| def _safe_json(s): | |
| if not s or not s.strip(): | |
| return None | |
| try: | |
| import json as _json | |
| return _json.loads(s) | |
| except Exception: | |
| return None | |
| def upload_request(method, path, query_json, headers_json, field_name, file_obj, _ctx): | |
| import json as _json | |
| import base64 | |
| url = f"{base_url.rstrip('/')}/{path.lstrip('/')}" | |
| q = _safe_json(query_json) or {} | |
| h = _safe_json(headers_json) or {} | |
| # Apply Auth Wizard context | |
| mode = (_ctx or {}).get("mode") | |
| if mode == "apiKey": | |
| where = _ctx["apiKey"].get("in") or "header" | |
| nm = _ctx["apiKey"].get("name") or "X-API-Key" | |
| val = _ctx["apiKey"].get("value") | |
| if val: | |
| if where == "query": | |
| q[nm] = val | |
| else: | |
| h[nm] = val | |
| elif mode == "http_bearer": | |
| tok = (_ctx.get("bearer") or {}).get("token") | |
| if tok and "authorization" not in {k.lower(): v for k, v in h.items()}: | |
| h["Authorization"] = f"Bearer {tok}" | |
| elif mode == "http_basic": | |
| u = (_ctx.get("basic") or {}).get("user") or "" | |
| p = (_ctx.get("basic") or {}).get("pass") or "" | |
| token = base64.b64encode(f"{u}:{p}".encode()).decode() | |
| h["Authorization"] = f"Basic {token}" | |
| elif mode == "oauth2": | |
| tok = (_ctx.get("oauth2") or {}).get("token") | |
| if tok and "authorization" not in {k.lower(): v for k, v in h.items()}: | |
| h["Authorization"] = f"Bearer {tok}" | |
| files = None | |
| data = {} | |
| # Prepare multipart file tuple: (filename, bytes, content_type?) | |
| if file_obj is None: | |
| return "400 Bad Request", "No file provided", "{}" | |
| import os | |
| field = field_name or "file" | |
| # Accept multiple shapes: filepath str, NamedString (with .name), or file-like .read() | |
| filename = "upload.bin" | |
| file_bytes = None | |
| if isinstance(file_obj, str) and os.path.exists(file_obj): | |
| filename = os.path.basename(file_obj) | |
| with open(file_obj, "rb") as f: | |
| file_bytes = f.read() | |
| elif hasattr(file_obj, "read"): | |
| filename = os.path.basename(getattr(file_obj, "name", filename)) | |
| file_bytes = file_obj.read() | |
| elif hasattr(file_obj, "name") and isinstance(file_obj.name, str) and os.path.exists(file_obj.name): | |
| # Gradio NamedString: .name is a temp file path; .orig_name may hold original name | |
| filename = os.path.basename(getattr(file_obj, "orig_name", file_obj.name)) | |
| with open(file_obj.name, "rb") as f: | |
| file_bytes = f.read() | |
| else: | |
| return "400 Bad Request", f"Unsupported file object: {type(file_obj).__name__}", "{}" | |
| files = { field: (filename, file_bytes) } | |
| import requests as _req | |
| try: | |
| resp = _req.request( | |
| method.upper(), | |
| url, | |
| params=q, | |
| headers=h, | |
| files=files, | |
| data=data, | |
| timeout=60, | |
| ) | |
| try: | |
| body_out = _json.dumps(resp.json(), indent=2, ensure_ascii=False) | |
| except Exception: | |
| body_out = resp.text | |
| headers_out = _json.dumps(dict(resp.headers), indent=2) | |
| status_out = f"{resp.status_code} {resp.reason}" | |
| return status_out, body_out, headers_out | |
| except Exception as e: | |
| return "000 Upload Error", str(e), "{}" | |
| up_send.click( | |
| upload_request, | |
| inputs=[up_method, up_path, up_query, up_headers, file_field, file_input, auth_ctx], | |
| outputs=[up_status, up_body_out, up_headers_out], | |
| api_name="proxy_upload" # also exposed as MCP tool | |
| ) | |
| return demo | |
| def prepare_app(spec_source: str, base_url: Optional[str] = None, | |
| bearer_token: Optional[str] = None) -> Tuple[gr.Blocks, str]: | |
| spec, origin = load_spec(spec_source) | |
| spec = normalize_security(spec) | |
| spec, dropped = sanitize_header_params(spec) | |
| if dropped: | |
| print(f"[AnyAPI→MCP] Dropped non-Pythonic header params: {sorted(set(dropped))}") | |
| demo = build_gradio_app_from_spec(spec, base_url=base_url, bearer_token=bearer_token) | |
| return demo, origin | |