Spaces:
Sleeping
Sleeping
File size: 5,407 Bytes
12bee28 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | import requests
import datetime
import json
import os
import uuid
import traceback
from utils import zip_session_folder, handle_path_parameters
def fetch_identitynow_token(api_url, grant_type, client_id, client_secret):
"""Fetch OAuth token for IdentityNow"""
token_endpoint = api_url.rstrip("/") + "/oauth/token"
payload = {
'grant_type': grant_type,
'client_id': client_id,
'client_secret': client_secret
}
headers = {
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded'
}
try:
response = requests.post(token_endpoint, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
access_token = token_data.get("access_token")
expires_in = token_data.get("expires_in", 0)
expiry_timestamp = datetime.datetime.now(datetime.timezone.utc).timestamp() + expires_in
return {
"access_token": access_token,
"expiry_timestamp": expiry_timestamp,
"success": True,
"message": None
}
except Exception as e:
return {
"access_token": None,
"expiry_timestamp": None,
"success": False,
"message": str(e)
}
def handle_identitynow_call(api_base_url, oauth_grant, oauth_client, oauth_secret, session_id, param_values, *checkbox_values):
"""Handle IdentityNow API calls with parameter support"""
if not session_id:
session_id = str(uuid.uuid4())
# Get OAuth token
token_result = fetch_identitynow_token(
api_base_url,
oauth_grant,
oauth_client,
oauth_secret
)
if not token_result["success"]:
return (
{"error": f"Failed to get OAuth token: {token_result['message']}"},
None,
session_id,
"❌ OAuth token fetch failed"
)
# Format expiry time
expiry_dt = datetime.datetime.fromtimestamp(token_result["expiry_timestamp"], tz=datetime.timezone.utc)
expiry_str = expiry_dt.strftime("%Y-%m-%d %H:%M:%S UTC")
token_msg = f"✅ OAuth token received! Valid until: {expiry_str}"
# Process selected endpoints
responses = {}
base_save_folder = os.path.join("sessions", session_id, "IdentityNow")
os.makedirs(base_save_folder, exist_ok=True)
headers = {
'Accept': 'application/json',
'Authorization': f'Bearer {token_result["access_token"]}'
}
# Parse and call selected endpoints
for selections in checkbox_values:
if isinstance(selections, list):
for selection in selections:
try:
if " | " in selection:
endpoint, method_part = selection.split(" | ")
method = method_part.split(" - ")[0].lower()
else:
endpoint = selection
method = "get"
# Handle parameter replacement if needed
if any(char in endpoint for char in ['{', '}']):
full_url, error = handle_path_parameters(endpoint, f"{api_base_url}/v3", param_values)
if error:
responses[endpoint] = f"Error: {error}"
continue
else:
full_url = f"{api_base_url.rstrip('/')}/v3{endpoint}"
print(f"Calling endpoint: {full_url}")
r = requests.get(full_url, headers=headers)
r.raise_for_status()
data = r.json() if r.headers.get('content-type', '').startswith('application/json') else r.text
responses[endpoint] = data
# Save response data
save_response_data(data, endpoint, base_save_folder)
except Exception as e:
responses[endpoint] = f"Error: {traceback.format_exc()}"
# Create and return session zip
zip_filename = create_session_zip(session_id)
return responses, zip_filename, session_id, "✅ API calls complete!"
def save_response_data(data, endpoint, base_save_folder):
"""Save API response data to file"""
safe_endpoint_name = endpoint.strip("/").replace("/", "_") or "root"
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_folder = os.path.join(base_save_folder, f"{safe_endpoint_name} ({timestamp})")
os.makedirs(save_folder, exist_ok=True)
filename = os.path.join(save_folder, "data.jsonl")
with open(filename, "w", encoding="utf-8") as f:
if isinstance(data, list):
for item in data:
f.write(json.dumps(item) + "\n")
elif isinstance(data, dict):
f.write(json.dumps(data) + "\n")
else:
f.write(str(data))
def create_session_zip(session_id):
"""Create ZIP file of session data"""
session_folder = os.path.join("sessions", session_id)
zip_file = zip_session_folder(session_folder)
zip_filename = f"session_{session_id}.zip"
with open(zip_filename, "wb") as f:
f.write(zip_file.read())
return zip_filename |