import requests import json import yaml import argparse import os from urllib.parse import urljoin, urlparse # Attempt to import ApiSchemaGeneratorV5, or define a minimal version if not found try: from api_schema_generatorV5 import ApiSchemaGeneratorV5 except ImportError: print("Warning: api_schema_generatorV5.py not found. Using minimal local parser.") # Define a minimal parser if ApiSchemaGeneratorV5 is not available # This is a simplified version for basic functionality class MinimalApiParser: def __init__(self, api_spec_url: str): self.api_spec_url = api_spec_url self.api_spec = None self.servers = [] self.security_schemes = {} self.global_security = [] self.paths = {} def fetch_api_spec(self): try: if self.api_spec_url.startswith(('http://', 'https://')): response = requests.get(self.api_spec_url) response.raise_for_status() content = response.text else: with open(self.api_spec_url, 'r', encoding='utf-8') as f: content = f.read() try: self.api_spec = json.loads(content) except json.JSONDecodeError: self.api_spec = yaml.safe_load(content) return True except Exception as e: print(f"Error fetching/parsing API specification: {e}") return False def extract_data(self): if not self.api_spec: if not self.fetch_api_spec(): return False self.servers = self.api_spec.get('servers', []) self.security_schemes = self.api_spec.get('components', {}).get('securitySchemes', {}) self.global_security = self.api_spec.get('security', []) self.paths = self.api_spec.get('paths', {}) return True ApiSchemaGeneratorV5 = MinimalApiParser # Use the minimal parser def get_input(prompt_message, default_value=None): if default_value: return input(f"{prompt_message} [{default_value}]: ") or default_value return input(f"{prompt_message}: ") def get_base_url(servers, api_spec_path): suggested_url = None if servers: # Prefer HTTPS if available https_server = next((s['url'] for s in servers if s['url'].startswith('https://')), None) if https_server: suggested_url = https_server.rstrip('/') elif servers[0].get('url'): suggested_url = servers[0]['url'].rstrip('/') if suggested_url: print(f"A server URL was found in the API specification: {suggested_url}") return get_input("Please enter the base API URL (e.g., https://api.example.com/v1)", default_value=suggested_url) else: if os.path.exists(api_spec_path): # Check if it's a local file to provide context for the warning print("Warning: No 'servers' block found in the API specification.") return get_input("Please enter the base API URL (e.g., https://api.example.com/v1)") def get_auth_details(security_schemes, active_security_requirements): """ Determines the authentication method and prompts user for credentials. Uses the first active security requirement. """ auth_config = {} if not active_security_requirements: print("No active security requirements found for this endpoint/API. Proceeding without authentication.") return auth_config # Use the first security requirement listed first_req_name = list(active_security_requirements[0].keys())[0] if first_req_name not in security_schemes: print(f"Warning: Security scheme '{first_req_name}' not defined in components.securitySchemes.") return auth_config scheme = security_schemes[first_req_name] auth_type = scheme.get('type') print(f"\n--- Authentication Required: {first_req_name} ({auth_type}) ---") if auth_type == 'apiKey': auth_config['type'] = 'apiKey' auth_config['name'] = scheme.get('name') auth_config['in'] = scheme.get('in') auth_config['value'] = get_input(f"Enter API Key for '{auth_config['name']}' (in {auth_config['in']})") elif auth_type == 'http': http_scheme = scheme.get('scheme', '').lower() auth_config['type'] = 'http' auth_config['scheme'] = http_scheme if http_scheme == 'basic': username = get_input("Enter Basic Auth Username") password = get_input("Enter Basic Auth Password", "") # nosec B105 auth_config['username'] = username auth_config['password'] = password elif http_scheme == 'bearer': auth_config['token'] = get_input("Enter Bearer Token") else: print(f"Unsupported HTTP scheme: {http_scheme}") elif auth_type == 'oauth2': auth_config['type'] = 'oauth2' # Simplified: For clientCredentials flow flows = scheme.get('flows', {}) if 'clientCredentials' in flows: auth_config['flow'] = 'clientCredentials' cc_flow = flows['clientCredentials'] auth_config['token_url'] = get_input("Enter OAuth2 Token URL", cc_flow.get('tokenUrl')) auth_config['client_id'] = get_input("Enter OAuth2 Client ID") auth_config['client_secret'] = get_input("Enter OAuth2 Client Secret", "") # nosec B105 # Optionally, handle scopes # scopes_available = cc_flow.get('scopes', {}) # if scopes_available: # print("Available scopes:", scopes_available) # auth_config['scope'] = get_input("Enter scopes (space-separated)", "") # Fetch token token_data = { 'grant_type': 'client_credentials', 'client_id': auth_config['client_id'], 'client_secret': auth_config['client_secret'], } # if auth_config.get('scope'): # token_data['scope'] = auth_config['scope'] try: print(f"Attempting to fetch OAuth2 token from {auth_config['token_url']}...") token_res = requests.post(auth_config['token_url'], data=token_data, timeout=10) token_res.raise_for_status() auth_config['token'] = token_res.json().get('access_token') if auth_config['token']: print("OAuth2 token obtained successfully.") else: print("Failed to obtain OAuth2 token. Check credentials and token URL.") print("Response:", token_res.text) except requests.exceptions.RequestException as e: print(f"Error obtaining OAuth2 token: {e}") else: print(f"Unsupported OAuth2 flow. Only clientCredentials supported in this script.") else: print(f"Unsupported security scheme type: {auth_type}") return auth_config def select_endpoints(paths): print("\n--- Available Endpoints ---") endpoint_options = [] for path, methods in paths.items(): for method, details in methods.items(): # We are interested in callable methods like get, post, put, delete, patch if method.lower() not in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head', 'trace']: continue # Skip parameters, $ref etc. at this level summary = details.get('summary', 'No summary') endpoint_options.append({ 'path': path, 'method': method.upper(), 'details': details }) print(f"{len(endpoint_options)}. {method.upper()} {path} - {summary}") if not endpoint_options: print("No callable endpoints found in the specification.") return [] selected_indices_str = get_input("Enter numbers of endpoints to call (comma-separated, e.g., 1,3): ") selected_endpoints = [] try: selected_indices = [int(i.strip()) - 1 for i in selected_indices_str.split(',')] for index in selected_indices: if 0 <= index < len(endpoint_options): selected_endpoints.append(endpoint_options[index]) else: print(f"Warning: Invalid endpoint number {index + 1} skipped.") except ValueError: print("Invalid input for endpoint selection.") return selected_endpoints def make_api_call(base_url, endpoint_info, auth_details, security_schemes): path = endpoint_info['path'] method = endpoint_info['method'] details = endpoint_info['details'] print(f"\n--- Calling: {method} {path} ---") # Determine active security for this endpoint endpoint_security = details.get('security') # Endpoint specific # If not defined at endpoint, it might fall back to global, handled by `auth_details` already if it was based on global. # For simplicity, if endpoint_security is defined, we re-evaluate auth. # This part could be more sophisticated to merge/override global. # For now, if endpoint has `security`, we assume `auth_details` should be re-evaluated or specific. # However, `get_auth_details` is called once globally. A more robust system would check per endpoint. # Current `auth_details` is based on the *first* global or first scheme if no global. headers = {'Accept': 'application/json'} params = {} data = None json_payload = None # Apply authentication if auth_details: if auth_details.get('type') == 'apiKey': if auth_details.get('in') == 'header': headers[auth_details['name']] = auth_details['value'] elif auth_details.get('in') == 'query': params[auth_details['name']] = auth_details['value'] elif auth_details.get('type') == 'http': if auth_details.get('scheme') == 'basic' and auth_details.get('username') is not None: # Basic auth is handled by requests' `auth` parameter pass elif auth_details.get('scheme') == 'bearer' and auth_details.get('token'): headers['Authorization'] = f"Bearer {auth_details['token']}" elif auth_details.get('type') == 'oauth2' and auth_details.get('token'): headers['Authorization'] = f"Bearer {auth_details['token']}" # Collect parameters path_params = {} if 'parameters' in details: for param_spec in details['parameters']: # Resolve $ref if it's a reference to a component parameter if '$ref' in param_spec: ref_path = param_spec['$ref'].split('/') if ref_path[0] == '#' and ref_path[1] == 'components' and ref_path[2] == 'parameters': param_name_ref = ref_path[3] # This requires having the full spec parsed, including components.parameters # The minimal parser doesn't do this deeply. ApiSchemaGeneratorV5 would. # For now, assume direct definition or skip complex $refs for parameters. print(f"Skipping parameter with $ref: {param_spec['$ref']} (full $ref resolution for params not in minimal script)") continue # Simplified handling else: # Unrecognized $ref print(f"Skipping parameter with unrecognized $ref: {param_spec['$ref']}") continue param_name = param_spec.get('name') param_in = param_spec.get('in') param_required = param_spec.get('required', False) param_schema = param_spec.get('schema', {}) param_type = param_schema.get('type', 'string') param_description = param_spec.get('description', '') prompt_msg = f"Enter value for {param_in} parameter '{param_name}' ({param_type})" if param_description: prompt_msg += f" ({param_description})" if param_required: prompt_msg += " (required)" user_value = get_input(prompt_msg, param_schema.get('default')) if user_value or (param_required and not user_value): # Process if value given, or if required and no value (let API validate) if not user_value and param_required: print(f"Warning: Required parameter '{param_name}' not provided.") if param_in == 'path': path_params[param_name] = user_value elif param_in == 'query': params[param_name] = user_value elif param_in == 'header': headers[param_name] = user_value # Other 'in' types (e.g., cookie) are less common for basic clients # Substitute path parameters request_path = path for p_name, p_val in path_params.items(): request_path = request_path.replace(f"{{{p_name}}}", str(p_val)) full_url = urljoin(base_url.rstrip('/') + '/', request_path.lstrip('/')) # Handle request body for POST, PUT, PATCH if method in ['POST', 'PUT', 'PATCH']: request_body_spec = details.get('requestBody') if request_body_spec: # Resolve $ref for requestBody if '$ref' in request_body_spec: ref_path = request_body_spec['$ref'].split('/') if ref_path[0] == '#' and ref_path[1] == 'components' and ref_path[2] == 'requestBodies': # This requires full spec parsing. Minimal script won't resolve this. print(f"Skipping requestBody with $ref: {request_body_spec['$ref']} (full $ref resolution not in minimal script)") request_body_spec = None # Cannot proceed with this $ref else: print(f"Skipping requestBody with unrecognized $ref: {request_body_spec['$ref']}") request_body_spec = None if request_body_spec and 'content' in request_body_spec: content_types = request_body_spec['content'] if 'application/json' in content_types: headers['Content-Type'] = 'application/json' # Potentially build JSON based on schema, for now, raw JSON input print("This endpoint expects a JSON request body.") print(f"Schema hint: {json.dumps(content_types['application/json'].get('schema', {}), indent=2)}") body_str = get_input("Enter JSON body as a single line string (or leave empty):") if body_str: try: json_payload = json.loads(body_str) except json.JSONDecodeError: print("Invalid JSON provided for request body. Sending as raw string if possible or failing.") data = body_str # Fallback for malformed JSON, might fail elif 'application/x-www-form-urlencoded' in content_types: headers['Content-Type'] = 'application/x-www-form-urlencoded' print("This endpoint expects form data. Enter key=value pairs, one per line. End with an empty line.") form_data = {} while True: line = get_input("key=value (or empty to finish): ") if not line: break if '=' in line: key, value = line.split('=', 1) form_data[key.strip()] = value.strip() else: print("Invalid format. Use key=value.") if form_data: data = form_data else: print(f"Unsupported request body content type: {list(content_types.keys())[0]}. Please handle manually.") # Make the request try: print(f"Requesting: {method} {full_url}") print(f"Headers: {headers}") if params: print(f"Query Params: {params}") if json_payload: print(f"JSON Payload: {json.dumps(json_payload)}") if data: print(f"Form Data: {data}") current_auth = None if auth_details.get('type') == 'http' and auth_details.get('scheme') == 'basic': current_auth = (auth_details['username'], auth_details['password']) response = requests.request( method, full_url, headers=headers, params=params, json=json_payload, data=data, auth=current_auth, timeout=30 ) print(f"\nResponse Status: {response.status_code}") content_type = response.headers.get('Content-Type', '') if 'application/json' in content_type: try: print("Response JSON:") print(json.dumps(response.json(), indent=2)) except json.JSONDecodeError: print("Response Content (not valid JSON):") print(response.text) else: print("Response Content:") print(response.text[:500] + "..." if len(response.text) > 500 else response.text) except requests.exceptions.RequestException as e: print(f"API call failed: {e}") def main(): parser = argparse.ArgumentParser(description="Dynamic API client based on OpenAPI specification.") parser.add_argument("spec_file", help="Path or URL to the OpenAPI (JSON or YAML) specification file.") args = parser.parse_args() # Use ApiSchemaGeneratorV5 if available and spec_file is a path, otherwise minimal parser if os.path.exists(args.spec_file) and 'MinimalApiParser' not in str(ApiSchemaGeneratorV5): # This assumes ApiSchemaGeneratorV5 takes api_spec_url and selected_endpoints # We are not using selected_endpoints here for the initial parsing. # The constructor of ApiSchemaGeneratorV5 is: # __init__(self, api_spec_url: str, api_name: str = None, selected_endpoints: List[str] = None) # We'll pass api_name=None and selected_endpoints=None for its internal use if any. api_parser = ApiSchemaGeneratorV5(api_spec_url=args.spec_file) api_parser.extract_api_info() # This populates self.api_spec, self.auth_info, self.endpoints etc. # Adapt ApiSchemaGeneratorV5's attributes to what this script expects spec_servers = api_parser.api_spec.get('servers', []) if api_parser.api_spec else [] spec_security_schemes = api_parser.auth_info if api_parser.auth_info else {} # ApiSchemaGeneratorV5 doesn't directly expose global_security in a simple attribute after extract_api_info # It's in api_parser.api_spec.get('security', []) spec_global_security = api_parser.api_spec.get('security', []) if api_parser.api_spec else [] spec_paths = api_parser.endpoints if api_parser.endpoints else {} else: # Minimal parser or URL api_parser = MinimalApiParser(args.spec_file) if not api_parser.extract_data(): return spec_servers = api_parser.servers spec_security_schemes = api_parser.security_schemes spec_global_security = api_parser.global_security spec_paths = api_parser.paths base_api_url = get_base_url(spec_servers, args.spec_file) # Determine active security requirements (global first) # A more complex app would check endpoint-specific security overrides active_sec_reqs = spec_global_security if not active_sec_reqs and spec_security_schemes: # If no global, but schemes exist, pick first scheme first_scheme_name = list(spec_security_schemes.keys())[0] active_sec_reqs = [{first_scheme_name: []}] # Mimic structure of security requirements list print(f"No global security requirements in spec. Using first defined scheme: {first_scheme_name}") auth = get_auth_details(spec_security_schemes, active_sec_reqs) selected = select_endpoints(spec_paths) if not selected: print("No endpoints selected. Exiting.") return for endpoint_data in selected: # Note: make_api_call currently doesn't re-evaluate auth per endpoint. # It uses the globally determined `auth`. make_api_call(base_api_url, endpoint_data, auth, spec_security_schemes) if __name__ == "__main__": main()