Spaces:
Sleeping
Sleeping
| import requests | |
| import datetime | |
| import json | |
| import os | |
| import uuid | |
| import traceback | |
| from utils import zip_session_folder, handle_path_parameters | |
| def fetch_identitynow_token(api_url, grant_type, client_id, client_secret): | |
| """Fetch OAuth token for IdentityNow""" | |
| token_endpoint = api_url.rstrip("/") + "/oauth/token" | |
| payload = { | |
| 'grant_type': grant_type, | |
| 'client_id': client_id, | |
| 'client_secret': client_secret | |
| } | |
| headers = { | |
| 'Accept': 'application/json', | |
| 'Content-Type': 'application/x-www-form-urlencoded' | |
| } | |
| try: | |
| response = requests.post(token_endpoint, data=payload, headers=headers) | |
| response.raise_for_status() | |
| token_data = response.json() | |
| access_token = token_data.get("access_token") | |
| expires_in = token_data.get("expires_in", 0) | |
| expiry_timestamp = datetime.datetime.now(datetime.timezone.utc).timestamp() + expires_in | |
| return { | |
| "access_token": access_token, | |
| "expiry_timestamp": expiry_timestamp, | |
| "success": True, | |
| "message": None | |
| } | |
| except Exception as e: | |
| return { | |
| "access_token": None, | |
| "expiry_timestamp": None, | |
| "success": False, | |
| "message": str(e) | |
| } | |
| def handle_identitynow_call(api_base_url, oauth_grant, oauth_client, oauth_secret, session_id, param_values, *checkbox_values): | |
| """Handle IdentityNow API calls with parameter support""" | |
| if not session_id: | |
| session_id = str(uuid.uuid4()) | |
| # Get OAuth token | |
| token_result = fetch_identitynow_token( | |
| api_base_url, | |
| oauth_grant, | |
| oauth_client, | |
| oauth_secret | |
| ) | |
| if not token_result["success"]: | |
| return ( | |
| {"error": f"Failed to get OAuth token: {token_result['message']}"}, | |
| None, | |
| session_id, | |
| "❌ OAuth token fetch failed" | |
| ) | |
| # Format expiry time | |
| expiry_dt = datetime.datetime.fromtimestamp(token_result["expiry_timestamp"], tz=datetime.timezone.utc) | |
| expiry_str = expiry_dt.strftime("%Y-%m-%d %H:%M:%S UTC") | |
| token_msg = f"✅ OAuth token received! Valid until: {expiry_str}" | |
| # Process selected endpoints | |
| responses = {} | |
| base_save_folder = os.path.join("sessions", session_id, "IdentityNow") | |
| os.makedirs(base_save_folder, exist_ok=True) | |
| headers = { | |
| 'Accept': 'application/json', | |
| 'Authorization': f'Bearer {token_result["access_token"]}' | |
| } | |
| # Parse and call selected endpoints | |
| for selections in checkbox_values: | |
| if isinstance(selections, list): | |
| for selection in selections: | |
| try: | |
| if " | " in selection: | |
| endpoint, method_part = selection.split(" | ") | |
| method = method_part.split(" - ")[0].lower() | |
| else: | |
| endpoint = selection | |
| method = "get" | |
| # Handle parameter replacement if needed | |
| if any(char in endpoint for char in ['{', '}']): | |
| full_url, error = handle_path_parameters(endpoint, f"{api_base_url}/v3", param_values) | |
| if error: | |
| responses[endpoint] = f"Error: {error}" | |
| continue | |
| else: | |
| full_url = f"{api_base_url.rstrip('/')}/v3{endpoint}" | |
| print(f"Calling endpoint: {full_url}") | |
| r = requests.get(full_url, headers=headers) | |
| r.raise_for_status() | |
| data = r.json() if r.headers.get('content-type', '').startswith('application/json') else r.text | |
| responses[endpoint] = data | |
| # Save response data | |
| save_response_data(data, endpoint, base_save_folder) | |
| except Exception as e: | |
| responses[endpoint] = f"Error: {traceback.format_exc()}" | |
| # Create and return session zip | |
| zip_filename = create_session_zip(session_id) | |
| return responses, zip_filename, session_id, "✅ API calls complete!" | |
| def save_response_data(data, endpoint, base_save_folder): | |
| """Save API response data to file""" | |
| safe_endpoint_name = endpoint.strip("/").replace("/", "_") or "root" | |
| timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| save_folder = os.path.join(base_save_folder, f"{safe_endpoint_name} ({timestamp})") | |
| os.makedirs(save_folder, exist_ok=True) | |
| filename = os.path.join(save_folder, "data.jsonl") | |
| with open(filename, "w", encoding="utf-8") as f: | |
| if isinstance(data, list): | |
| for item in data: | |
| f.write(json.dumps(item) + "\n") | |
| elif isinstance(data, dict): | |
| f.write(json.dumps(data) + "\n") | |
| else: | |
| f.write(str(data)) | |
| def create_session_zip(session_id): | |
| """Create ZIP file of session data""" | |
| session_folder = os.path.join("sessions", session_id) | |
| zip_file = zip_session_folder(session_folder) | |
| zip_filename = f"session_{session_id}.zip" | |
| with open(zip_filename, "wb") as f: | |
| f.write(zip_file.read()) | |
| return zip_filename |