import requests import datetime import json import os import uuid import traceback from utils import zip_session_folder, handle_path_parameters def validate_okta_token(api_token): """Validate Okta API token""" try: return { "access_token": api_token, "success": True, "message": None } except Exception as e: return { "success": False, "message": str(e) } def save_response_data(data, endpoint, base_save_folder): """Save API response data to file""" safe_endpoint_name = endpoint.strip("/").replace("/", "_") or "root" timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") save_folder = os.path.join(base_save_folder, f"{safe_endpoint_name} ({timestamp})") os.makedirs(save_folder, exist_ok=True) filename = os.path.join(save_folder, "data.jsonl") with open(filename, "w", encoding="utf-8") as f: if isinstance(data, list): for item in data: f.write(json.dumps(item) + "\n") elif isinstance(data, dict): f.write(json.dumps(data) + "\n") else: f.write(str(data)) def create_session_zip(session_id): """Create ZIP file of session data""" session_folder = os.path.join("sessions", session_id) zip_file = zip_session_folder(session_folder) zip_filename = f"session_{session_id}.zip" with open(zip_filename, "wb") as f: f.write(zip_file.read()) return zip_filename def handle_okta_call(api_base_url, api_token, session_id, param_values, endpoints): """ Make API calls to Okta endpoints. Args: api_base_url: The base URL for Okta API api_token: The Okta API token for authentication session_id: Current session ID param_values: Dictionary of parameter values for the endpoints endpoints: List of endpoints to call Returns: Tuple of (responses, download_file, session_id, message) """ # Strip trailing slashes for consistency api_base_url = api_base_url.rstrip('/') # Set up headers with authentication headers = { "Authorization": f"SSWS {api_token}", "Accept": "application/json", "Content-Type": "application/json" } responses = {} # Process each endpoint for endpoint in endpoints: try: # Apply path parameters if present formatted_endpoint = endpoint for param_name, param_value in param_values.items(): if "{" + param_name + "}" in endpoint: formatted_endpoint = formatted_endpoint.replace("{" + param_name + "}", param_value) # Build full URL url = f"{api_base_url}{formatted_endpoint}" # Extract query parameters for this endpoint query_params = {} for param_name, param_value in param_values.items(): if param_name not in endpoint and param_value: # Not a path param and has value query_params[param_name] = param_value # Special handling for credential verification endpoint if endpoint == "/api/v1/users/me": try: r = requests.get(url, headers=headers, timeout=30) r.raise_for_status() # This will raise an exception for 4xx/5xx status codes responses[endpoint] = r.json() except requests.exceptions.HTTPError as e: # Handle authentication errors specifically if e.response.status_code in (401, 403): responses[endpoint] = {"error": f"Authentication failed: {str(e)}"} else: responses[endpoint] = {"error": f"HTTP Error: {str(e)}"} except Exception as e: responses[endpoint] = {"error": str(e)} continue # Make the API request r = requests.get(url, headers=headers, params=query_params, timeout=30) # Check for successful response if r.status_code == 200: try: responses[endpoint] = r.json() except ValueError: responses[endpoint] = {"error": "Invalid JSON response"} else: responses[endpoint] = {"error": f"Error {r.status_code}: {r.text}"} except requests.exceptions.RequestException as e: responses[endpoint] = {"error": f"Request failed: {str(e)}"} except Exception as e: responses[endpoint] = {"error": f"Error: {str(e)}"} # Create a ZIP file with the results if responses and not all(isinstance(resp, dict) and "error" in resp for _, resp in responses.items()): try: import tempfile import zipfile import json import os from datetime import datetime # Create a temporary directory temp_dir = tempfile.mkdtemp() zip_path = os.path.join(temp_dir, "okta_data.zip") # Create ZIP file with zipfile.ZipFile(zip_path, 'w') as zipf: for endpoint, data in responses.items(): # Clean endpoint name for filename safe_name = endpoint.replace('/', '_').replace('{', '').replace('}', '') if safe_name.startswith('_'): safe_name = safe_name[1:] filename = f"{safe_name}.json" json_data = json.dumps(data, indent=2) zipf.writestr(filename, json_data) # Add metadata metadata = { "timestamp": datetime.now().isoformat(), "api_base_url": api_base_url, "endpoints": endpoints, "session_id": session_id or str(datetime.now().timestamp()) } zipf.writestr("metadata.json", json.dumps(metadata, indent=2)) return responses, zip_path, session_id, "✅ API calls completed successfully" except Exception as e: return responses, None, session_id, f"⚠️ API calls completed but export failed: {str(e)}" # Handle case where all responses are errors if all(isinstance(resp.get(endpoint), dict) and "error" in resp.get(endpoint, {}) for resp in [responses]): return responses, None, session_id, "❌ All API calls failed" return responses, None, session_id, "✅ API calls completed" # Include save_response_data and create_session_zip functions (same as IdentityNow)