File size: 20,703 Bytes
cd9f831
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
import requests
import json
import yaml
import argparse
import os
from urllib.parse import urljoin, urlparse

# Attempt to import ApiSchemaGeneratorV5, or define a minimal version if not found
try:
    from api_schema_generatorV5 import ApiSchemaGeneratorV5
except ImportError:
    print("Warning: api_schema_generatorV5.py not found. Using minimal local parser.")
    # Define a minimal parser if ApiSchemaGeneratorV5 is not available
    # This is a simplified version for basic functionality
    class MinimalApiParser:
        def __init__(self, api_spec_url: str):
            self.api_spec_url = api_spec_url
            self.api_spec = None
            self.servers = []
            self.security_schemes = {}
            self.global_security = []
            self.paths = {}

        def fetch_api_spec(self):
            try:
                if self.api_spec_url.startswith(('http://', 'https://')):
                    response = requests.get(self.api_spec_url)
                    response.raise_for_status()
                    content = response.text
                else:
                    with open(self.api_spec_url, 'r', encoding='utf-8') as f:
                        content = f.read()
                
                try:
                    self.api_spec = json.loads(content)
                except json.JSONDecodeError:
                    self.api_spec = yaml.safe_load(content)
                return True
            except Exception as e:
                print(f"Error fetching/parsing API specification: {e}")
                return False

        def extract_data(self):
            if not self.api_spec:
                if not self.fetch_api_spec():
                    return False
            
            self.servers = self.api_spec.get('servers', [])
            self.security_schemes = self.api_spec.get('components', {}).get('securitySchemes', {})
            self.global_security = self.api_spec.get('security', [])
            self.paths = self.api_spec.get('paths', {})
            return True

    ApiSchemaGeneratorV5 = MinimalApiParser # Use the minimal parser

def get_input(prompt_message, default_value=None):
    if default_value:
        return input(f"{prompt_message} [{default_value}]: ") or default_value
    return input(f"{prompt_message}: ")

def get_base_url(servers, api_spec_path):
    suggested_url = None
    if servers:
        # Prefer HTTPS if available
        https_server = next((s['url'] for s in servers if s['url'].startswith('https://')), None)
        if https_server:
            suggested_url = https_server.rstrip('/')
        elif servers[0].get('url'):
            suggested_url = servers[0]['url'].rstrip('/')
    
    if suggested_url:
        print(f"A server URL was found in the API specification: {suggested_url}")
        return get_input("Please enter the base API URL (e.g., https://api.example.com/v1)", default_value=suggested_url)
    else:
        if os.path.exists(api_spec_path): # Check if it's a local file to provide context for the warning
            print("Warning: No 'servers' block found in the API specification.")
        return get_input("Please enter the base API URL (e.g., https://api.example.com/v1)")

def get_auth_details(security_schemes, active_security_requirements):
    """
    Determines the authentication method and prompts user for credentials.
    Uses the first active security requirement.
    """
    auth_config = {}
    if not active_security_requirements:
        print("No active security requirements found for this endpoint/API. Proceeding without authentication.")
        return auth_config

    # Use the first security requirement listed
    first_req_name = list(active_security_requirements[0].keys())[0]
    
    if first_req_name not in security_schemes:
        print(f"Warning: Security scheme '{first_req_name}' not defined in components.securitySchemes.")
        return auth_config

    scheme = security_schemes[first_req_name]
    auth_type = scheme.get('type')
    print(f"\n--- Authentication Required: {first_req_name} ({auth_type}) ---")

    if auth_type == 'apiKey':
        auth_config['type'] = 'apiKey'
        auth_config['name'] = scheme.get('name')
        auth_config['in'] = scheme.get('in')
        auth_config['value'] = get_input(f"Enter API Key for '{auth_config['name']}' (in {auth_config['in']})")
    elif auth_type == 'http':
        http_scheme = scheme.get('scheme', '').lower()
        auth_config['type'] = 'http'
        auth_config['scheme'] = http_scheme
        if http_scheme == 'basic':
            username = get_input("Enter Basic Auth Username")
            password = get_input("Enter Basic Auth Password", "") # nosec B105
            auth_config['username'] = username
            auth_config['password'] = password
        elif http_scheme == 'bearer':
            auth_config['token'] = get_input("Enter Bearer Token")
        else:
            print(f"Unsupported HTTP scheme: {http_scheme}")
    elif auth_type == 'oauth2':
        auth_config['type'] = 'oauth2'
        # Simplified: For clientCredentials flow
        flows = scheme.get('flows', {})
        if 'clientCredentials' in flows:
            auth_config['flow'] = 'clientCredentials'
            cc_flow = flows['clientCredentials']
            auth_config['token_url'] = get_input("Enter OAuth2 Token URL", cc_flow.get('tokenUrl'))
            auth_config['client_id'] = get_input("Enter OAuth2 Client ID")
            auth_config['client_secret'] = get_input("Enter OAuth2 Client Secret", "") # nosec B105
            # Optionally, handle scopes
            # scopes_available = cc_flow.get('scopes', {})
            # if scopes_available:
            #     print("Available scopes:", scopes_available)
            #     auth_config['scope'] = get_input("Enter scopes (space-separated)", "")
            
            # Fetch token
            token_data = {
                'grant_type': 'client_credentials',
                'client_id': auth_config['client_id'],
                'client_secret': auth_config['client_secret'],
            }
            # if auth_config.get('scope'):
            #     token_data['scope'] = auth_config['scope']
            
            try:
                print(f"Attempting to fetch OAuth2 token from {auth_config['token_url']}...")
                token_res = requests.post(auth_config['token_url'], data=token_data, timeout=10)
                token_res.raise_for_status()
                auth_config['token'] = token_res.json().get('access_token')
                if auth_config['token']:
                    print("OAuth2 token obtained successfully.")
                else:
                    print("Failed to obtain OAuth2 token. Check credentials and token URL.")
                    print("Response:", token_res.text)
            except requests.exceptions.RequestException as e:
                print(f"Error obtaining OAuth2 token: {e}")
        else:
            print(f"Unsupported OAuth2 flow. Only clientCredentials supported in this script.")
    else:
        print(f"Unsupported security scheme type: {auth_type}")
    
    return auth_config

def select_endpoints(paths):
    print("\n--- Available Endpoints ---")
    endpoint_options = []
    for path, methods in paths.items():
        for method, details in methods.items():
            # We are interested in callable methods like get, post, put, delete, patch
            if method.lower() not in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head', 'trace']:
                continue # Skip parameters, $ref etc. at this level
            
            summary = details.get('summary', 'No summary')
            endpoint_options.append({
                'path': path,
                'method': method.upper(),
                'details': details
            })
            print(f"{len(endpoint_options)}. {method.upper()} {path} - {summary}")

    if not endpoint_options:
        print("No callable endpoints found in the specification.")
        return []

    selected_indices_str = get_input("Enter numbers of endpoints to call (comma-separated, e.g., 1,3): ")
    selected_endpoints = []
    try:
        selected_indices = [int(i.strip()) - 1 for i in selected_indices_str.split(',')]
        for index in selected_indices:
            if 0 <= index < len(endpoint_options):
                selected_endpoints.append(endpoint_options[index])
            else:
                print(f"Warning: Invalid endpoint number {index + 1} skipped.")
    except ValueError:
        print("Invalid input for endpoint selection.")
    return selected_endpoints

def make_api_call(base_url, endpoint_info, auth_details, security_schemes):
    path = endpoint_info['path']
    method = endpoint_info['method']
    details = endpoint_info['details']

    print(f"\n--- Calling: {method} {path} ---")

    # Determine active security for this endpoint
    endpoint_security = details.get('security') # Endpoint specific
    # If not defined at endpoint, it might fall back to global, handled by `auth_details` already if it was based on global.
    # For simplicity, if endpoint_security is defined, we re-evaluate auth.
    # This part could be more sophisticated to merge/override global.
    # For now, if endpoint has `security`, we assume `auth_details` should be re-evaluated or specific.
    # However, `get_auth_details` is called once globally. A more robust system would check per endpoint.
    # Current `auth_details` is based on the *first* global or first scheme if no global.

    headers = {'Accept': 'application/json'}
    params = {}
    data = None
    json_payload = None

    # Apply authentication
    if auth_details:
        if auth_details.get('type') == 'apiKey':
            if auth_details.get('in') == 'header':
                headers[auth_details['name']] = auth_details['value']
            elif auth_details.get('in') == 'query':
                params[auth_details['name']] = auth_details['value']
        elif auth_details.get('type') == 'http':
            if auth_details.get('scheme') == 'basic' and auth_details.get('username') is not None:
                # Basic auth is handled by requests' `auth` parameter
                pass
            elif auth_details.get('scheme') == 'bearer' and auth_details.get('token'):
                headers['Authorization'] = f"Bearer {auth_details['token']}"
        elif auth_details.get('type') == 'oauth2' and auth_details.get('token'):
             headers['Authorization'] = f"Bearer {auth_details['token']}"


    # Collect parameters
    path_params = {}
    if 'parameters' in details:
        for param_spec in details['parameters']:
            # Resolve $ref if it's a reference to a component parameter
            if '$ref' in param_spec:
                ref_path = param_spec['$ref'].split('/')
                if ref_path[0] == '#' and ref_path[1] == 'components' and ref_path[2] == 'parameters':
                    param_name_ref = ref_path[3]
                    # This requires having the full spec parsed, including components.parameters
                    # The minimal parser doesn't do this deeply. ApiSchemaGeneratorV5 would.
                    # For now, assume direct definition or skip complex $refs for parameters.
                    print(f"Skipping parameter with $ref: {param_spec['$ref']} (full $ref resolution for params not in minimal script)")
                    continue # Simplified handling
                else: # Unrecognized $ref
                    print(f"Skipping parameter with unrecognized $ref: {param_spec['$ref']}")
                    continue

            param_name = param_spec.get('name')
            param_in = param_spec.get('in')
            param_required = param_spec.get('required', False)
            param_schema = param_spec.get('schema', {})
            param_type = param_schema.get('type', 'string')
            param_description = param_spec.get('description', '')
            
            prompt_msg = f"Enter value for {param_in} parameter '{param_name}' ({param_type})"
            if param_description:
                prompt_msg += f" ({param_description})"
            if param_required:
                prompt_msg += " (required)"
            
            user_value = get_input(prompt_msg, param_schema.get('default'))

            if user_value or (param_required and not user_value): # Process if value given, or if required and no value (let API validate)
                if not user_value and param_required:
                    print(f"Warning: Required parameter '{param_name}' not provided.")
                
                if param_in == 'path':
                    path_params[param_name] = user_value
                elif param_in == 'query':
                    params[param_name] = user_value
                elif param_in == 'header':
                    headers[param_name] = user_value
                # Other 'in' types (e.g., cookie) are less common for basic clients

    # Substitute path parameters
    request_path = path
    for p_name, p_val in path_params.items():
        request_path = request_path.replace(f"{{{p_name}}}", str(p_val))
    
    full_url = urljoin(base_url.rstrip('/') + '/', request_path.lstrip('/'))


    # Handle request body for POST, PUT, PATCH
    if method in ['POST', 'PUT', 'PATCH']:
        request_body_spec = details.get('requestBody')
        if request_body_spec:
            # Resolve $ref for requestBody
            if '$ref' in request_body_spec:
                ref_path = request_body_spec['$ref'].split('/')
                if ref_path[0] == '#' and ref_path[1] == 'components' and ref_path[2] == 'requestBodies':
                    # This requires full spec parsing. Minimal script won't resolve this.
                    print(f"Skipping requestBody with $ref: {request_body_spec['$ref']} (full $ref resolution not in minimal script)")
                    request_body_spec = None # Cannot proceed with this $ref
                else:
                    print(f"Skipping requestBody with unrecognized $ref: {request_body_spec['$ref']}")
                    request_body_spec = None


            if request_body_spec and 'content' in request_body_spec:
                content_types = request_body_spec['content']
                if 'application/json' in content_types:
                    headers['Content-Type'] = 'application/json'
                    # Potentially build JSON based on schema, for now, raw JSON input
                    print("This endpoint expects a JSON request body.")
                    print(f"Schema hint: {json.dumps(content_types['application/json'].get('schema', {}), indent=2)}")
                    body_str = get_input("Enter JSON body as a single line string (or leave empty):")
                    if body_str:
                        try:
                            json_payload = json.loads(body_str)
                        except json.JSONDecodeError:
                            print("Invalid JSON provided for request body. Sending as raw string if possible or failing.")
                            data = body_str # Fallback for malformed JSON, might fail
                elif 'application/x-www-form-urlencoded' in content_types:
                    headers['Content-Type'] = 'application/x-www-form-urlencoded'
                    print("This endpoint expects form data. Enter key=value pairs, one per line. End with an empty line.")
                    form_data = {}
                    while True:
                        line = get_input("key=value (or empty to finish): ")
                        if not line:
                            break
                        if '=' in line:
                            key, value = line.split('=', 1)
                            form_data[key.strip()] = value.strip()
                        else:
                            print("Invalid format. Use key=value.")
                    if form_data:
                        data = form_data
                else:
                    print(f"Unsupported request body content type: {list(content_types.keys())[0]}. Please handle manually.")

    # Make the request
    try:
        print(f"Requesting: {method} {full_url}")
        print(f"Headers: {headers}")
        if params: print(f"Query Params: {params}")
        if json_payload: print(f"JSON Payload: {json.dumps(json_payload)}")
        if data: print(f"Form Data: {data}")

        current_auth = None
        if auth_details.get('type') == 'http' and auth_details.get('scheme') == 'basic':
            current_auth = (auth_details['username'], auth_details['password'])

        response = requests.request(
            method,
            full_url,
            headers=headers,
            params=params,
            json=json_payload,
            data=data,
            auth=current_auth,
            timeout=30 
        )
        print(f"\nResponse Status: {response.status_code}")
        
        content_type = response.headers.get('Content-Type', '')
        if 'application/json' in content_type:
            try:
                print("Response JSON:")
                print(json.dumps(response.json(), indent=2))
            except json.JSONDecodeError:
                print("Response Content (not valid JSON):")
                print(response.text)
        else:
            print("Response Content:")
            print(response.text[:500] + "..." if len(response.text) > 500 else response.text)

    except requests.exceptions.RequestException as e:
        print(f"API call failed: {e}")

def main():
    parser = argparse.ArgumentParser(description="Dynamic API client based on OpenAPI specification.")
    parser.add_argument("spec_file", help="Path or URL to the OpenAPI (JSON or YAML) specification file.")
    args = parser.parse_args()

    # Use ApiSchemaGeneratorV5 if available and spec_file is a path, otherwise minimal parser
    if os.path.exists(args.spec_file) and 'MinimalApiParser' not in str(ApiSchemaGeneratorV5):
        # This assumes ApiSchemaGeneratorV5 takes api_spec_url and selected_endpoints
        # We are not using selected_endpoints here for the initial parsing.
        # The constructor of ApiSchemaGeneratorV5 is:
        # __init__(self, api_spec_url: str, api_name: str = None, selected_endpoints: List[str] = None)
        # We'll pass api_name=None and selected_endpoints=None for its internal use if any.
        api_parser = ApiSchemaGeneratorV5(api_spec_url=args.spec_file)
        api_parser.extract_api_info() # This populates self.api_spec, self.auth_info, self.endpoints etc.
        
        # Adapt ApiSchemaGeneratorV5's attributes to what this script expects
        spec_servers = api_parser.api_spec.get('servers', []) if api_parser.api_spec else []
        spec_security_schemes = api_parser.auth_info if api_parser.auth_info else {}
        # ApiSchemaGeneratorV5 doesn't directly expose global_security in a simple attribute after extract_api_info
        # It's in api_parser.api_spec.get('security', [])
        spec_global_security = api_parser.api_spec.get('security', []) if api_parser.api_spec else []
        spec_paths = api_parser.endpoints if api_parser.endpoints else {}
    else: # Minimal parser or URL
        api_parser = MinimalApiParser(args.spec_file)
        if not api_parser.extract_data():
            return
        spec_servers = api_parser.servers
        spec_security_schemes = api_parser.security_schemes
        spec_global_security = api_parser.global_security
        spec_paths = api_parser.paths

    base_api_url = get_base_url(spec_servers, args.spec_file)

    # Determine active security requirements (global first)
    # A more complex app would check endpoint-specific security overrides
    active_sec_reqs = spec_global_security
    if not active_sec_reqs and spec_security_schemes: # If no global, but schemes exist, pick first scheme
        first_scheme_name = list(spec_security_schemes.keys())[0]
        active_sec_reqs = [{first_scheme_name: []}] # Mimic structure of security requirements list
        print(f"No global security requirements in spec. Using first defined scheme: {first_scheme_name}")
    
    auth = get_auth_details(spec_security_schemes, active_sec_reqs)
    
    selected = select_endpoints(spec_paths)
    if not selected:
        print("No endpoints selected. Exiting.")
        return

    for endpoint_data in selected:
        # Note: make_api_call currently doesn't re-evaluate auth per endpoint.
        # It uses the globally determined `auth`.
        make_api_call(base_api_url, endpoint_data, auth, spec_security_schemes)

if __name__ == "__main__":
    main()