Spaces:
Sleeping
Sleeping
File size: 20,703 Bytes
cd9f831 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 |
import requests
import json
import yaml
import argparse
import os
from urllib.parse import urljoin, urlparse
# Attempt to import ApiSchemaGeneratorV5, or define a minimal version if not found
try:
from api_schema_generatorV5 import ApiSchemaGeneratorV5
except ImportError:
print("Warning: api_schema_generatorV5.py not found. Using minimal local parser.")
# Define a minimal parser if ApiSchemaGeneratorV5 is not available
# This is a simplified version for basic functionality
class MinimalApiParser:
def __init__(self, api_spec_url: str):
self.api_spec_url = api_spec_url
self.api_spec = None
self.servers = []
self.security_schemes = {}
self.global_security = []
self.paths = {}
def fetch_api_spec(self):
try:
if self.api_spec_url.startswith(('http://', 'https://')):
response = requests.get(self.api_spec_url)
response.raise_for_status()
content = response.text
else:
with open(self.api_spec_url, 'r', encoding='utf-8') as f:
content = f.read()
try:
self.api_spec = json.loads(content)
except json.JSONDecodeError:
self.api_spec = yaml.safe_load(content)
return True
except Exception as e:
print(f"Error fetching/parsing API specification: {e}")
return False
def extract_data(self):
if not self.api_spec:
if not self.fetch_api_spec():
return False
self.servers = self.api_spec.get('servers', [])
self.security_schemes = self.api_spec.get('components', {}).get('securitySchemes', {})
self.global_security = self.api_spec.get('security', [])
self.paths = self.api_spec.get('paths', {})
return True
ApiSchemaGeneratorV5 = MinimalApiParser # Use the minimal parser
def get_input(prompt_message, default_value=None):
if default_value:
return input(f"{prompt_message} [{default_value}]: ") or default_value
return input(f"{prompt_message}: ")
def get_base_url(servers, api_spec_path):
suggested_url = None
if servers:
# Prefer HTTPS if available
https_server = next((s['url'] for s in servers if s['url'].startswith('https://')), None)
if https_server:
suggested_url = https_server.rstrip('/')
elif servers[0].get('url'):
suggested_url = servers[0]['url'].rstrip('/')
if suggested_url:
print(f"A server URL was found in the API specification: {suggested_url}")
return get_input("Please enter the base API URL (e.g., https://api.example.com/v1)", default_value=suggested_url)
else:
if os.path.exists(api_spec_path): # Check if it's a local file to provide context for the warning
print("Warning: No 'servers' block found in the API specification.")
return get_input("Please enter the base API URL (e.g., https://api.example.com/v1)")
def get_auth_details(security_schemes, active_security_requirements):
"""
Determines the authentication method and prompts user for credentials.
Uses the first active security requirement.
"""
auth_config = {}
if not active_security_requirements:
print("No active security requirements found for this endpoint/API. Proceeding without authentication.")
return auth_config
# Use the first security requirement listed
first_req_name = list(active_security_requirements[0].keys())[0]
if first_req_name not in security_schemes:
print(f"Warning: Security scheme '{first_req_name}' not defined in components.securitySchemes.")
return auth_config
scheme = security_schemes[first_req_name]
auth_type = scheme.get('type')
print(f"\n--- Authentication Required: {first_req_name} ({auth_type}) ---")
if auth_type == 'apiKey':
auth_config['type'] = 'apiKey'
auth_config['name'] = scheme.get('name')
auth_config['in'] = scheme.get('in')
auth_config['value'] = get_input(f"Enter API Key for '{auth_config['name']}' (in {auth_config['in']})")
elif auth_type == 'http':
http_scheme = scheme.get('scheme', '').lower()
auth_config['type'] = 'http'
auth_config['scheme'] = http_scheme
if http_scheme == 'basic':
username = get_input("Enter Basic Auth Username")
password = get_input("Enter Basic Auth Password", "") # nosec B105
auth_config['username'] = username
auth_config['password'] = password
elif http_scheme == 'bearer':
auth_config['token'] = get_input("Enter Bearer Token")
else:
print(f"Unsupported HTTP scheme: {http_scheme}")
elif auth_type == 'oauth2':
auth_config['type'] = 'oauth2'
# Simplified: For clientCredentials flow
flows = scheme.get('flows', {})
if 'clientCredentials' in flows:
auth_config['flow'] = 'clientCredentials'
cc_flow = flows['clientCredentials']
auth_config['token_url'] = get_input("Enter OAuth2 Token URL", cc_flow.get('tokenUrl'))
auth_config['client_id'] = get_input("Enter OAuth2 Client ID")
auth_config['client_secret'] = get_input("Enter OAuth2 Client Secret", "") # nosec B105
# Optionally, handle scopes
# scopes_available = cc_flow.get('scopes', {})
# if scopes_available:
# print("Available scopes:", scopes_available)
# auth_config['scope'] = get_input("Enter scopes (space-separated)", "")
# Fetch token
token_data = {
'grant_type': 'client_credentials',
'client_id': auth_config['client_id'],
'client_secret': auth_config['client_secret'],
}
# if auth_config.get('scope'):
# token_data['scope'] = auth_config['scope']
try:
print(f"Attempting to fetch OAuth2 token from {auth_config['token_url']}...")
token_res = requests.post(auth_config['token_url'], data=token_data, timeout=10)
token_res.raise_for_status()
auth_config['token'] = token_res.json().get('access_token')
if auth_config['token']:
print("OAuth2 token obtained successfully.")
else:
print("Failed to obtain OAuth2 token. Check credentials and token URL.")
print("Response:", token_res.text)
except requests.exceptions.RequestException as e:
print(f"Error obtaining OAuth2 token: {e}")
else:
print(f"Unsupported OAuth2 flow. Only clientCredentials supported in this script.")
else:
print(f"Unsupported security scheme type: {auth_type}")
return auth_config
def select_endpoints(paths):
print("\n--- Available Endpoints ---")
endpoint_options = []
for path, methods in paths.items():
for method, details in methods.items():
# We are interested in callable methods like get, post, put, delete, patch
if method.lower() not in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head', 'trace']:
continue # Skip parameters, $ref etc. at this level
summary = details.get('summary', 'No summary')
endpoint_options.append({
'path': path,
'method': method.upper(),
'details': details
})
print(f"{len(endpoint_options)}. {method.upper()} {path} - {summary}")
if not endpoint_options:
print("No callable endpoints found in the specification.")
return []
selected_indices_str = get_input("Enter numbers of endpoints to call (comma-separated, e.g., 1,3): ")
selected_endpoints = []
try:
selected_indices = [int(i.strip()) - 1 for i in selected_indices_str.split(',')]
for index in selected_indices:
if 0 <= index < len(endpoint_options):
selected_endpoints.append(endpoint_options[index])
else:
print(f"Warning: Invalid endpoint number {index + 1} skipped.")
except ValueError:
print("Invalid input for endpoint selection.")
return selected_endpoints
def make_api_call(base_url, endpoint_info, auth_details, security_schemes):
path = endpoint_info['path']
method = endpoint_info['method']
details = endpoint_info['details']
print(f"\n--- Calling: {method} {path} ---")
# Determine active security for this endpoint
endpoint_security = details.get('security') # Endpoint specific
# If not defined at endpoint, it might fall back to global, handled by `auth_details` already if it was based on global.
# For simplicity, if endpoint_security is defined, we re-evaluate auth.
# This part could be more sophisticated to merge/override global.
# For now, if endpoint has `security`, we assume `auth_details` should be re-evaluated or specific.
# However, `get_auth_details` is called once globally. A more robust system would check per endpoint.
# Current `auth_details` is based on the *first* global or first scheme if no global.
headers = {'Accept': 'application/json'}
params = {}
data = None
json_payload = None
# Apply authentication
if auth_details:
if auth_details.get('type') == 'apiKey':
if auth_details.get('in') == 'header':
headers[auth_details['name']] = auth_details['value']
elif auth_details.get('in') == 'query':
params[auth_details['name']] = auth_details['value']
elif auth_details.get('type') == 'http':
if auth_details.get('scheme') == 'basic' and auth_details.get('username') is not None:
# Basic auth is handled by requests' `auth` parameter
pass
elif auth_details.get('scheme') == 'bearer' and auth_details.get('token'):
headers['Authorization'] = f"Bearer {auth_details['token']}"
elif auth_details.get('type') == 'oauth2' and auth_details.get('token'):
headers['Authorization'] = f"Bearer {auth_details['token']}"
# Collect parameters
path_params = {}
if 'parameters' in details:
for param_spec in details['parameters']:
# Resolve $ref if it's a reference to a component parameter
if '$ref' in param_spec:
ref_path = param_spec['$ref'].split('/')
if ref_path[0] == '#' and ref_path[1] == 'components' and ref_path[2] == 'parameters':
param_name_ref = ref_path[3]
# This requires having the full spec parsed, including components.parameters
# The minimal parser doesn't do this deeply. ApiSchemaGeneratorV5 would.
# For now, assume direct definition or skip complex $refs for parameters.
print(f"Skipping parameter with $ref: {param_spec['$ref']} (full $ref resolution for params not in minimal script)")
continue # Simplified handling
else: # Unrecognized $ref
print(f"Skipping parameter with unrecognized $ref: {param_spec['$ref']}")
continue
param_name = param_spec.get('name')
param_in = param_spec.get('in')
param_required = param_spec.get('required', False)
param_schema = param_spec.get('schema', {})
param_type = param_schema.get('type', 'string')
param_description = param_spec.get('description', '')
prompt_msg = f"Enter value for {param_in} parameter '{param_name}' ({param_type})"
if param_description:
prompt_msg += f" ({param_description})"
if param_required:
prompt_msg += " (required)"
user_value = get_input(prompt_msg, param_schema.get('default'))
if user_value or (param_required and not user_value): # Process if value given, or if required and no value (let API validate)
if not user_value and param_required:
print(f"Warning: Required parameter '{param_name}' not provided.")
if param_in == 'path':
path_params[param_name] = user_value
elif param_in == 'query':
params[param_name] = user_value
elif param_in == 'header':
headers[param_name] = user_value
# Other 'in' types (e.g., cookie) are less common for basic clients
# Substitute path parameters
request_path = path
for p_name, p_val in path_params.items():
request_path = request_path.replace(f"{{{p_name}}}", str(p_val))
full_url = urljoin(base_url.rstrip('/') + '/', request_path.lstrip('/'))
# Handle request body for POST, PUT, PATCH
if method in ['POST', 'PUT', 'PATCH']:
request_body_spec = details.get('requestBody')
if request_body_spec:
# Resolve $ref for requestBody
if '$ref' in request_body_spec:
ref_path = request_body_spec['$ref'].split('/')
if ref_path[0] == '#' and ref_path[1] == 'components' and ref_path[2] == 'requestBodies':
# This requires full spec parsing. Minimal script won't resolve this.
print(f"Skipping requestBody with $ref: {request_body_spec['$ref']} (full $ref resolution not in minimal script)")
request_body_spec = None # Cannot proceed with this $ref
else:
print(f"Skipping requestBody with unrecognized $ref: {request_body_spec['$ref']}")
request_body_spec = None
if request_body_spec and 'content' in request_body_spec:
content_types = request_body_spec['content']
if 'application/json' in content_types:
headers['Content-Type'] = 'application/json'
# Potentially build JSON based on schema, for now, raw JSON input
print("This endpoint expects a JSON request body.")
print(f"Schema hint: {json.dumps(content_types['application/json'].get('schema', {}), indent=2)}")
body_str = get_input("Enter JSON body as a single line string (or leave empty):")
if body_str:
try:
json_payload = json.loads(body_str)
except json.JSONDecodeError:
print("Invalid JSON provided for request body. Sending as raw string if possible or failing.")
data = body_str # Fallback for malformed JSON, might fail
elif 'application/x-www-form-urlencoded' in content_types:
headers['Content-Type'] = 'application/x-www-form-urlencoded'
print("This endpoint expects form data. Enter key=value pairs, one per line. End with an empty line.")
form_data = {}
while True:
line = get_input("key=value (or empty to finish): ")
if not line:
break
if '=' in line:
key, value = line.split('=', 1)
form_data[key.strip()] = value.strip()
else:
print("Invalid format. Use key=value.")
if form_data:
data = form_data
else:
print(f"Unsupported request body content type: {list(content_types.keys())[0]}. Please handle manually.")
# Make the request
try:
print(f"Requesting: {method} {full_url}")
print(f"Headers: {headers}")
if params: print(f"Query Params: {params}")
if json_payload: print(f"JSON Payload: {json.dumps(json_payload)}")
if data: print(f"Form Data: {data}")
current_auth = None
if auth_details.get('type') == 'http' and auth_details.get('scheme') == 'basic':
current_auth = (auth_details['username'], auth_details['password'])
response = requests.request(
method,
full_url,
headers=headers,
params=params,
json=json_payload,
data=data,
auth=current_auth,
timeout=30
)
print(f"\nResponse Status: {response.status_code}")
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
try:
print("Response JSON:")
print(json.dumps(response.json(), indent=2))
except json.JSONDecodeError:
print("Response Content (not valid JSON):")
print(response.text)
else:
print("Response Content:")
print(response.text[:500] + "..." if len(response.text) > 500 else response.text)
except requests.exceptions.RequestException as e:
print(f"API call failed: {e}")
def main():
parser = argparse.ArgumentParser(description="Dynamic API client based on OpenAPI specification.")
parser.add_argument("spec_file", help="Path or URL to the OpenAPI (JSON or YAML) specification file.")
args = parser.parse_args()
# Use ApiSchemaGeneratorV5 if available and spec_file is a path, otherwise minimal parser
if os.path.exists(args.spec_file) and 'MinimalApiParser' not in str(ApiSchemaGeneratorV5):
# This assumes ApiSchemaGeneratorV5 takes api_spec_url and selected_endpoints
# We are not using selected_endpoints here for the initial parsing.
# The constructor of ApiSchemaGeneratorV5 is:
# __init__(self, api_spec_url: str, api_name: str = None, selected_endpoints: List[str] = None)
# We'll pass api_name=None and selected_endpoints=None for its internal use if any.
api_parser = ApiSchemaGeneratorV5(api_spec_url=args.spec_file)
api_parser.extract_api_info() # This populates self.api_spec, self.auth_info, self.endpoints etc.
# Adapt ApiSchemaGeneratorV5's attributes to what this script expects
spec_servers = api_parser.api_spec.get('servers', []) if api_parser.api_spec else []
spec_security_schemes = api_parser.auth_info if api_parser.auth_info else {}
# ApiSchemaGeneratorV5 doesn't directly expose global_security in a simple attribute after extract_api_info
# It's in api_parser.api_spec.get('security', [])
spec_global_security = api_parser.api_spec.get('security', []) if api_parser.api_spec else []
spec_paths = api_parser.endpoints if api_parser.endpoints else {}
else: # Minimal parser or URL
api_parser = MinimalApiParser(args.spec_file)
if not api_parser.extract_data():
return
spec_servers = api_parser.servers
spec_security_schemes = api_parser.security_schemes
spec_global_security = api_parser.global_security
spec_paths = api_parser.paths
base_api_url = get_base_url(spec_servers, args.spec_file)
# Determine active security requirements (global first)
# A more complex app would check endpoint-specific security overrides
active_sec_reqs = spec_global_security
if not active_sec_reqs and spec_security_schemes: # If no global, but schemes exist, pick first scheme
first_scheme_name = list(spec_security_schemes.keys())[0]
active_sec_reqs = [{first_scheme_name: []}] # Mimic structure of security requirements list
print(f"No global security requirements in spec. Using first defined scheme: {first_scheme_name}")
auth = get_auth_details(spec_security_schemes, active_sec_reqs)
selected = select_endpoints(spec_paths)
if not selected:
print("No endpoints selected. Exiting.")
return
for endpoint_data in selected:
# Note: make_api_call currently doesn't re-evaluate auth per endpoint.
# It uses the globally determined `auth`.
make_api_call(base_api_url, endpoint_data, auth, spec_security_schemes)
if __name__ == "__main__":
main()
|