Demo / identityNow.py
Juggernaut1397's picture
Add all project files
12bee28
import requests
import datetime
import json
import os
import uuid
import traceback
from utils import zip_session_folder, handle_path_parameters
def fetch_identitynow_token(api_url, grant_type, client_id, client_secret):
"""Fetch OAuth token for IdentityNow"""
token_endpoint = api_url.rstrip("/") + "/oauth/token"
payload = {
'grant_type': grant_type,
'client_id': client_id,
'client_secret': client_secret
}
headers = {
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded'
}
try:
response = requests.post(token_endpoint, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
access_token = token_data.get("access_token")
expires_in = token_data.get("expires_in", 0)
expiry_timestamp = datetime.datetime.now(datetime.timezone.utc).timestamp() + expires_in
return {
"access_token": access_token,
"expiry_timestamp": expiry_timestamp,
"success": True,
"message": None
}
except Exception as e:
return {
"access_token": None,
"expiry_timestamp": None,
"success": False,
"message": str(e)
}
def handle_identitynow_call(api_base_url, oauth_grant, oauth_client, oauth_secret, session_id, param_values, *checkbox_values):
"""Handle IdentityNow API calls with parameter support"""
if not session_id:
session_id = str(uuid.uuid4())
# Get OAuth token
token_result = fetch_identitynow_token(
api_base_url,
oauth_grant,
oauth_client,
oauth_secret
)
if not token_result["success"]:
return (
{"error": f"Failed to get OAuth token: {token_result['message']}"},
None,
session_id,
"❌ OAuth token fetch failed"
)
# Format expiry time
expiry_dt = datetime.datetime.fromtimestamp(token_result["expiry_timestamp"], tz=datetime.timezone.utc)
expiry_str = expiry_dt.strftime("%Y-%m-%d %H:%M:%S UTC")
token_msg = f"✅ OAuth token received! Valid until: {expiry_str}"
# Process selected endpoints
responses = {}
base_save_folder = os.path.join("sessions", session_id, "IdentityNow")
os.makedirs(base_save_folder, exist_ok=True)
headers = {
'Accept': 'application/json',
'Authorization': f'Bearer {token_result["access_token"]}'
}
# Parse and call selected endpoints
for selections in checkbox_values:
if isinstance(selections, list):
for selection in selections:
try:
if " | " in selection:
endpoint, method_part = selection.split(" | ")
method = method_part.split(" - ")[0].lower()
else:
endpoint = selection
method = "get"
# Handle parameter replacement if needed
if any(char in endpoint for char in ['{', '}']):
full_url, error = handle_path_parameters(endpoint, f"{api_base_url}/v3", param_values)
if error:
responses[endpoint] = f"Error: {error}"
continue
else:
full_url = f"{api_base_url.rstrip('/')}/v3{endpoint}"
print(f"Calling endpoint: {full_url}")
r = requests.get(full_url, headers=headers)
r.raise_for_status()
data = r.json() if r.headers.get('content-type', '').startswith('application/json') else r.text
responses[endpoint] = data
# Save response data
save_response_data(data, endpoint, base_save_folder)
except Exception as e:
responses[endpoint] = f"Error: {traceback.format_exc()}"
# Create and return session zip
zip_filename = create_session_zip(session_id)
return responses, zip_filename, session_id, "✅ API calls complete!"
def save_response_data(data, endpoint, base_save_folder):
"""Save API response data to file"""
safe_endpoint_name = endpoint.strip("/").replace("/", "_") or "root"
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_folder = os.path.join(base_save_folder, f"{safe_endpoint_name} ({timestamp})")
os.makedirs(save_folder, exist_ok=True)
filename = os.path.join(save_folder, "data.jsonl")
with open(filename, "w", encoding="utf-8") as f:
if isinstance(data, list):
for item in data:
f.write(json.dumps(item) + "\n")
elif isinstance(data, dict):
f.write(json.dumps(data) + "\n")
else:
f.write(str(data))
def create_session_zip(session_id):
"""Create ZIP file of session data"""
session_folder = os.path.join("sessions", session_id)
zip_file = zip_session_folder(session_folder)
zip_filename = f"session_{session_id}.zip"
with open(zip_filename, "wb") as f:
f.write(zip_file.read())
return zip_filename