Juggernaut1397 commited on
Commit
b01ff3d
·
verified ·
1 Parent(s): efb09de

Update okta.py

Browse files
Files changed (1) hide show
  1. okta.py +108 -47
okta.py CHANGED
@@ -46,64 +46,125 @@ def create_session_zip(session_id):
46
  f.write(zip_file.read())
47
  return zip_filename
48
 
49
- def handle_okta_call(api_base_url, api_token, session_id, param_values, *checkbox_values):
50
- """Handle Okta API calls with parameter support"""
51
- if not session_id:
52
- session_id = str(uuid.uuid4())
53
 
54
- if not api_base_url.startswith('https://'):
55
- api_base_url = f"https://{api_base_url}"
 
 
 
 
 
 
 
 
 
 
56
 
 
57
  headers = {
 
58
  "Accept": "application/json",
59
- "Authorization": f"SSWS {api_token}"
60
  }
61
 
62
  responses = {}
63
- base_save_folder = os.path.join("sessions", session_id, "Okta")
64
- os.makedirs(base_save_folder, exist_ok=True)
65
 
66
- # Process selected endpoints
67
- for selections in checkbox_values:
68
- if isinstance(selections, list):
69
- for selection in selections:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
70
  try:
71
- if " | " in selection:
72
- endpoint, method_part = selection.split(" | ")
73
- method = method_part.split(" - ")[0].lower()
 
 
 
 
74
  else:
75
- endpoint = selection
76
- method = "get"
77
-
78
- # Ensure endpoint starts with /api/v1
79
- if not endpoint.startswith('/api/v1'):
80
- endpoint = f"/api/v1{endpoint}"
81
-
82
- # Handle parameter replacement if needed
83
- if any(char in endpoint for char in ['{', '}']):
84
- full_url, error = handle_path_parameters(endpoint, api_base_url, param_values)
85
- if error:
86
- responses[endpoint] = f"Error: {error}"
87
- continue
88
- else:
89
- full_url = f"{api_base_url.rstrip('/')}{endpoint}"
90
-
91
- print(f"Calling Okta endpoint: {full_url}")
92
-
93
- r = requests.get(full_url, headers=headers)
94
- r.raise_for_status()
95
-
96
- data = r.json() if r.headers.get('content-type', '').startswith('application/json') else r.text
97
- responses[endpoint] = data
98
-
99
- # Save response data
100
- save_response_data(data, endpoint, base_save_folder)
101
-
102
  except Exception as e:
103
- responses[endpoint] = f"Error: {traceback.format_exc()}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
 
105
- # Create and return session zip
106
- zip_filename = create_session_zip(session_id)
107
- return responses, zip_filename, session_id, "✅ Okta API calls complete!"
108
 
109
  # Include save_response_data and create_session_zip functions (same as IdentityNow)
 
46
  f.write(zip_file.read())
47
  return zip_filename
48
 
49
+ def handle_okta_call(api_base_url, api_token, session_id, param_values, endpoints):
50
+ """
51
+ Make API calls to Okta endpoints.
 
52
 
53
+ Args:
54
+ api_base_url: The base URL for Okta API
55
+ api_token: The Okta API token for authentication
56
+ session_id: Current session ID
57
+ param_values: Dictionary of parameter values for the endpoints
58
+ endpoints: List of endpoints to call
59
+
60
+ Returns:
61
+ Tuple of (responses, download_file, session_id, message)
62
+ """
63
+ # Strip trailing slashes for consistency
64
+ api_base_url = api_base_url.rstrip('/')
65
 
66
+ # Set up headers with authentication
67
  headers = {
68
+ "Authorization": f"SSWS {api_token}",
69
  "Accept": "application/json",
70
+ "Content-Type": "application/json"
71
  }
72
 
73
  responses = {}
 
 
74
 
75
+ # Process each endpoint
76
+ for endpoint in endpoints:
77
+ try:
78
+ # Apply path parameters if present
79
+ formatted_endpoint = endpoint
80
+ for param_name, param_value in param_values.items():
81
+ if "{" + param_name + "}" in endpoint:
82
+ formatted_endpoint = formatted_endpoint.replace("{" + param_name + "}", param_value)
83
+
84
+ # Build full URL
85
+ url = f"{api_base_url}{formatted_endpoint}"
86
+
87
+ # Extract query parameters for this endpoint
88
+ query_params = {}
89
+ for param_name, param_value in param_values.items():
90
+ if param_name not in endpoint and param_value: # Not a path param and has value
91
+ query_params[param_name] = param_value
92
+
93
+ # Special handling for credential verification endpoint
94
+ if endpoint == "/api/v1/users/me":
95
  try:
96
+ r = requests.get(url, headers=headers, timeout=30)
97
+ r.raise_for_status() # This will raise an exception for 4xx/5xx status codes
98
+ responses[endpoint] = r.json()
99
+ except requests.exceptions.HTTPError as e:
100
+ # Handle authentication errors specifically
101
+ if e.response.status_code in (401, 403):
102
+ responses[endpoint] = {"error": f"Authentication failed: {str(e)}"}
103
  else:
104
+ responses[endpoint] = {"error": f"HTTP Error: {str(e)}"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
  except Exception as e:
106
+ responses[endpoint] = {"error": str(e)}
107
+ continue
108
+
109
+ # Make the API request
110
+ r = requests.get(url, headers=headers, params=query_params, timeout=30)
111
+
112
+ # Check for successful response
113
+ if r.status_code == 200:
114
+ try:
115
+ responses[endpoint] = r.json()
116
+ except ValueError:
117
+ responses[endpoint] = {"error": "Invalid JSON response"}
118
+ else:
119
+ responses[endpoint] = {"error": f"Error {r.status_code}: {r.text}"}
120
+
121
+ except requests.exceptions.RequestException as e:
122
+ responses[endpoint] = {"error": f"Request failed: {str(e)}"}
123
+ except Exception as e:
124
+ responses[endpoint] = {"error": f"Error: {str(e)}"}
125
+
126
+ # Create a ZIP file with the results
127
+ if responses and not all(isinstance(resp, dict) and "error" in resp for _, resp in responses.items()):
128
+ try:
129
+ import tempfile
130
+ import zipfile
131
+ import json
132
+ import os
133
+ from datetime import datetime
134
+
135
+ # Create a temporary directory
136
+ temp_dir = tempfile.mkdtemp()
137
+ zip_path = os.path.join(temp_dir, "okta_data.zip")
138
+
139
+ # Create ZIP file
140
+ with zipfile.ZipFile(zip_path, 'w') as zipf:
141
+ for endpoint, data in responses.items():
142
+ # Clean endpoint name for filename
143
+ safe_name = endpoint.replace('/', '_').replace('{', '').replace('}', '')
144
+ if safe_name.startswith('_'):
145
+ safe_name = safe_name[1:]
146
+
147
+ filename = f"{safe_name}.json"
148
+ json_data = json.dumps(data, indent=2)
149
+ zipf.writestr(filename, json_data)
150
+
151
+ # Add metadata
152
+ metadata = {
153
+ "timestamp": datetime.now().isoformat(),
154
+ "api_base_url": api_base_url,
155
+ "endpoints": endpoints,
156
+ "session_id": session_id or str(datetime.now().timestamp())
157
+ }
158
+ zipf.writestr("metadata.json", json.dumps(metadata, indent=2))
159
+
160
+ return responses, zip_path, session_id, "✅ API calls completed successfully"
161
+ except Exception as e:
162
+ return responses, None, session_id, f"⚠️ API calls completed but export failed: {str(e)}"
163
+
164
+ # Handle case where all responses are errors
165
+ if all(isinstance(resp.get(endpoint), dict) and "error" in resp.get(endpoint, {}) for resp in [responses]):
166
+ return responses, None, session_id, "❌ All API calls failed"
167
 
168
+ return responses, None, session_id, "✅ API calls completed"
 
 
169
 
170
  # Include save_response_data and create_session_zip functions (same as IdentityNow)