import streamlit as st
import requests
import json
import os
#Custom CSS Injection
st.markdown("""
""", unsafe_allow_html=True)
st.set_page_config(layout="wide")
#Initialize session state for dynamic elements
if 'custom_words' not in st.session_state:
st.session_state.custom_words = []
# Initialize session state for output visibility
if 'show_output' not in st.session_state:
st.session_state.show_output = False
# --- API Description (for Full Width) ---
st.markdown("""
API DESCRIPTION
What does this API do?
The Moderation API evaluates prompts to determine whether they are safe for use with a large language model (LLM). It returns a status (passed or failed) with detailed scores.
Moderation Checks
- Prompt Injection: Detects attempts to hijack or manipulate the LLM behavior.
- Jailbreak Attempts: Identifies prompts trying to bypass guardrails.
- Toxicity & Profanity: Flags harmful, offensive, or explicit content.
- Restricted Topics: Detects categories like cheating, conspiracy, terrorism, etc.
- Text Quality: Measures readability and clarity (informational only).
- Customized Theme: Block prompts using custom keywords (e.g., "atomic weapon").
- PII Detection: Identifies AADHAR, PAN, SSN, Passport, Email, Phone, IP, Credit Card, Medical License, etc.
Full Customization
- Enable/disable checks
- Set thresholds
- Select PII or restricted topics
- Define custom block terms
Resources
""", unsafe_allow_html=True)
#Main Layout: Two Columns for Inputs and Outputs
input_col, output_col = st.columns([0.5, 0.5]) # Input on left, Output on right
with input_col:
st.markdown('INPUT
', unsafe_allow_html=True)
# Define all checks and their default properties
check_configs = {
"Prompt Injection": {"type": "slider", "default": 0.7, "min": 0.0, "max": 1.0, "step": 0.01},
"Jailbreak": {"type": "slider", "default": 0.7, "min": 0.0, "max": 1.0, "step": 0.01},
"Toxicity": {"type": "slider", "default": 0.6, "min": 0.0, "max": 1.0, "step": 0.01},
"Profanity": {"type": "number_input", "default": 1, "min": 1, "hint": "1"},
"Restricted Topics": {"type": "multiselect", "default": 0.7, "min": 0.0, "max": 1.0, "step": 0.01,
"options": ["terrorism","explosives","nudity","cruelty","cheating","fraud","crime","hacking","immoral","unethical","illegal","robbery","forgery","misinformation"]},
"Text Quality": {"type": "no_threshold"},
"Customized Theme": {"type": "slider_and_input", "default": 0.6, "min": 0.0, "max": 1.0, "step": 0.01},
"PII Detection": {"type": "multiselect_pii",
"options": ["AADHAR_NUMBER", "PAN_Number", "IN_PAN", "US_PASSPORT", "US_SSN"]},
}
# Initialize session state for all checkboxes (default to checked)
for check_name, config in check_configs.items():
base_key = check_name.replace(' ', '_')
# Always ensure checkbox is initialized
checkbox_key = f"checkbox_{base_key}"
if checkbox_key not in st.session_state:
st.session_state[checkbox_key] = True # default to checked
# Set default values for controls
if config["type"] == "slider":
slider_key = f"slider_{base_key}"
if slider_key not in st.session_state:
st.session_state[slider_key] = config["default"]
elif config["type"] == "number_input":
number_key = f"number_{base_key}"
if number_key not in st.session_state:
st.session_state[number_key] = config["default"]
elif config["type"] == "multiselect":
multi_key = f"multiselect_{base_key}"
if multi_key not in st.session_state:
st.session_state[multi_key] = config["options"][:2]
slider_key = f"slider_{base_key}"
if slider_key not in st.session_state:
st.session_state[slider_key] = config["default"]
elif config["type"] == "multiselect_pii":
multi_key = f"multiselect_{base_key}"
if multi_key not in st.session_state:
st.session_state[multi_key] = config["options"]
elif config["type"] == "slider_and_input":
slider_key = f"slider_{base_key}"
if slider_key not in st.session_state:
st.session_state[slider_key] = config["default"]
if "custom_words" not in st.session_state:
st.session_state["custom_words"] = []
# Formatting for input box, send and refresh buttons
col1, col2, col3 = st.columns([7,0.7,0.7])
with col1:
user_input = st.text_input(
label="Prompt",
placeholder="Type your prompt here...",
value=st.session_state.get("prompt_input", ""),
key="prompt_input",
label_visibility="collapsed"
)
st.markdown("""
""", unsafe_allow_html=True)
with col2:
refresh_clicked = st.button("↻", key="refresh_button_text", help="Reload the page")
with col3:
send_clicked = st.button("→", key="send_request_button", help="Send the request")
st.markdown("
", unsafe_allow_html=True)
#Handle Refresh Click
if refresh_clicked:
# Clear all checkboxes, sliders, multiselects, etc.
for check_name, config in check_configs.items():
base_key = check_name.replace(' ', '_')
st.session_state.pop(f"checkbox_{base_key}", None)
st.session_state.pop(f"slider_{base_key}", None)
st.session_state.pop(f"multiselect_{base_key}", None)
st.session_state.pop(f"number_{base_key}", None)
# Clear custom word logic
st.session_state.pop("custom_words", None)
st.session_state.pop("new_custom_word", None)
# Clear the input prompt
st.session_state.pop("prompt_input", None)
# Optionally: hide the output section
st.session_state["show_output"] = False
st.rerun()
if send_clicked:
user_input = st.session_state.get("prompt_input", "")
if not user_input.strip():
with output_col:
st.warning("Please enter a prompt before sending the request.")
st.stop()
st.session_state["show_output"] = True
selected_checks_payload_ui = {}
for check_name, config in check_configs.items():
base_key = check_name.replace(' ', '_')
enabled = st.session_state.get(f"checkbox_{base_key}", False)
if enabled:
payload_item = {"enabled": True}
if config["type"] == "slider":
payload_item["threshold"] = st.session_state.get(f"slider_{base_key}", config["default"])
elif config["type"] == "number_input":
payload_item["threshold"] = st.session_state.get(f"number_{base_key}", config["default"])
elif config["type"] == "multiselect":
payload_item["threshold"] = st.session_state.get(f"slider_{base_key}", config["default"])
payload_item["topics"] = st.session_state.get(f"multiselect_{base_key}", config["options"][:2])
elif config["type"] == "multiselect_pii":
payload_item["entities_to_block"] = st.session_state.get(f"multiselect_{base_key}", config["options"])
elif config["type"] == "slider_and_input":
payload_item["threshold"] = st.session_state.get(f"slider_{base_key}", config["default"])
payload_item["custom_words"] = st.session_state.get("custom_words", [])
selected_checks_payload_ui[check_name] = payload_item
else:
selected_checks_payload_ui[check_name] = {"enabled": False}
st.markdown('Select Moderation Checks to apply:
', unsafe_allow_html=True)
# Mapping from UI check names to API check names
api_check_name_map = {
"Prompt Injection": "PromptInjection",
"Jailbreak": "JailBreak",
"Toxicity": "Toxicity",
"Profanity": "Profanity",
"Restricted Topics": "RestrictTopic",
"Text Quality": "TextQuality",
"Customized Theme": "CustomizedTheme",
"PII Detection": "Piidetct"
}
# Display checkboxes and conditional UIs
for check_name, config in check_configs.items():
base_key = check_name.replace(' ', '_')
col1, col2 = st.columns([0.3, 0.7])
with col1:
enabled = st.checkbox(check_name, key=f"checkbox_{base_key}")
if enabled and config["type"] != "no_threshold":
with col2:
with st.expander(f"Configure {check_name}"):
if config["type"] == "slider":
# CORRECT: Removed 'value=config["default"]'
threshold = st.slider(
f"Set Threshold for {check_name}",
min_value=config["min"],
max_value=config["max"],
step=config["step"],
key=f"slider_{base_key}"
)
elif config["type"] == "number_input":
profanity_threshold = st.number_input(
f"Set Profanity Count Threshold (whole numbers only)",
min_value=config["min"],
key=f"number_{base_key}",
help=f"Enter a number, e.g., {config['hint']}"
)
elif config["type"] == "multiselect":
threshold = st.slider(
f"Set Threshold for {check_name}",
min_value=config["min"],
max_value=config["max"],
step=config["step"],
key=f"slider_{base_key}"
)
if slider_key not in st.session_state:
st.session_state[slider_key] = config["default"]
selected_topics = st.multiselect(
"Select Restricted Topics:",
options=config["options"],
key=f"multiselect_{base_key}"
)
elif config["type"] == "multiselect_pii":
selected_entities = st.multiselect(
"Select PII Entities to Block:",
options=config["options"],
key=f"multiselect_{base_key}"
)
elif config["type"] == "slider_and_input":
threshold = st.slider(
f"Set Threshold for {check_name}",
min_value=config["min"],
max_value=config["max"],
step=config["step"],
key=f"slider_{base_key}"
)
st.markdown("---")
st.subheader("Custom Words for Theme")
# Using a single, consistent placeholder for messages
message_placeholder = st.empty()
expander_key = f"custom_words_expander_{base_key}"
def add_word_callback():
new_word_input = st.session_state.get("new_custom_word", "")
new_word_to_add = new_word_input.strip()
if new_word_to_add and new_word_to_add not in st.session_state.custom_words:
st.session_state.custom_words.append(new_word_to_add)
st.session_state.new_custom_word = ""
st.session_state[expander_key] = True # Keep expander open
message_placeholder.success(f"'{new_word_to_add}' added!")
elif new_word_to_add:
st.session_state[expander_key] = True # Keep expander open
message_placeholder.warning("Word already exists.")
else:
st.session_state[expander_key] = True # Keep expander open
message_placeholder.warning("Word is empty.")
def delete_word_callback(index):
st.session_state.custom_words.pop(index)
st.session_state[expander_key] = True # Keep expander open
message_placeholder.success("Word removed successfully!")
st.text_input("Add a new custom word:", key="new_custom_word")
st.button("Add Word", key="add_custom_word_btn", on_click=add_word_callback)
if st.session_state.custom_words:
st.write("Current Custom Words:")
if expander_key not in st.session_state:
st.session_state[expander_key] = True
with st.expander("Show/Hide Custom Words", expanded=st.session_state.get(expander_key, False)):
for i, word in enumerate(st.session_state.custom_words):
word_col, btn_col = st.columns([0.8, 0.2])
with word_col:
st.write(f"- {word}")
with btn_col:
# Use the on_click callback for the delete button
st.button("del", key=f"remove_word_{i}", on_click=delete_word_callback, args=(i,))
else:
st.info("No custom words added yet.")
if st.session_state.show_output:
with output_col:
# Construct the final API payload
final_api_payload = {
"AccountName": "None",
"userid": "None",
"PortfolioName": "None",
"lotNumber": 1,
"translate": "no",
"EmojiModeration": "yes",
"Prompt": user_input,
"ModerationChecks": [],
"ModerationCheckThresholds": {}
}
for check_name, data in selected_checks_payload_ui.items():
#customized theme, need to make changes here..
if check_name == "Customized Theme":
final_api_payload["ModerationChecks"].append(api_check_name_map["Customized Theme"])
final_api_payload["ModerationCheckThresholds"]["CustomTheme"] = {
"Themename": "string",
"Themethresold": data.get("threshold", check_configs["Customized Theme"]["default"]),
"ThemeTexts": data.get("custom_words", [])
}
elif data["enabled"]: # For all other checks, only include if enabled
if check_name in api_check_name_map:
final_api_payload["ModerationChecks"].append(api_check_name_map[check_name])
# Populate ModerationCheckThresholds for enabled checks from UI
if check_name == "Prompt Injection":
final_api_payload["ModerationCheckThresholds"]["PromptinjectionThreshold"] = data.get("threshold")
elif check_name == "Jailbreak":
final_api_payload["ModerationCheckThresholds"]["JailbreakThreshold"] = data.get("threshold")
elif check_name == "Toxicity":
toxicity_threshold = data.get("threshold")
final_api_payload["ModerationCheckThresholds"]["ToxicityThresholds"] = {
"ToxicityThreshold": toxicity_threshold,
"SevereToxicityThreshold": toxicity_threshold,
"ObsceneThreshold": toxicity_threshold,
"ThreatThreshold": toxicity_threshold,
"InsultThreshold": toxicity_threshold,
"IdentityAttackThreshold": toxicity_threshold,
"SexualExplicitThreshold": toxicity_threshold
}
elif check_name == "Profanity":
final_api_payload["ModerationCheckThresholds"]["ProfanityCountThreshold"] = data.get("threshold")
elif check_name == "Restricted Topics":
final_api_payload["ModerationCheckThresholds"]["RestrictedtopicDetails"] = {
"RestrictedtopicThreshold": data.get("threshold"),
"Restrictedtopics": data.get("topics", [])
}
elif check_name == "PII Detection":
final_api_payload["ModerationCheckThresholds"]["PiientitiesConfiguredToBlock"] = data.get("entities_to_block", [])
# Ensure unique checks in ModerationChecks list
final_api_payload["ModerationChecks"] = list(set(final_api_payload["ModerationChecks"]))
with output_col:
if st.session_state.show_output:
st.markdown('OUTPUT
', unsafe_allow_html=True)
results_placeholder = st.empty() # Placeholder for output results
with results_placeholder.container(): # Display payload in output column
try:
# Actual API call
resp = requests.post("https://infosysenterprise-responsible-ai-moderationlayer.hf.space/rai/v1/moderations", json=final_api_payload)
response_data = resp.json()
if not isinstance(response_data, dict):
st.error("API response was not a valid JSON object (dictionary).")
st.stop()
st.write("Moderation Results")
overall_status = response_data.get("moderationResults", {}).get("summary", {}).get("status", "N/A")
overall_reason = response_data.get("moderationResults", {}).get("summary", {}).get("reason", [])
if overall_status.lower() == "passed":
st.success(f"Overall Status: {overall_status.upper()}")
else:
st.error(f"Overall Status: {overall_status.upper()}")
if overall_reason:
failed_checks_str = ", ".join(overall_reason)
st.warning(f"Failed Checks: {failed_checks_str}")
st.write("Individual Check Details:")
moderation_results = response_data.get("moderationResults", {})
if not moderation_results:
st.info("No detailed moderation results available.")
else:
# Mapping from UI labels to API keys
ui_to_api_check_map = {
"Prompt Injection": "promptInjectionCheck",
"Jailbreak": "jailbreakCheck",
"Toxicity": "toxicityCheck",
"Profanity": "profanityCheck",
"Restricted Topics": "restrictedtopic",
"Text Quality": "textQuality",
"Customized Theme": "customThemeCheck",
"PII Detection": "privacyCheck",
"Refusal": "refusalCheck"
}
api_response_name_map = {v: k for k, v in ui_to_api_check_map.items()}
moderation_results = response_data.get("moderationResults", {})
# Filter out 'summary' and other non-check entries like 'text'
individual_results = {
k: v for k, v in moderation_results.items()
if k not in {"summary", "text"} and isinstance(v, dict)
}
# Determine which checks were enabled in UI and are present in API
enabled_api_checks = []
for ui_check, api_key in ui_to_api_check_map.items():
if selected_checks_payload_ui.get(ui_check, {}).get("enabled"):
if api_key in individual_results:
enabled_api_checks.append(api_key)
elif ui_check == "Customized Theme":
if st.session_state.get("custom_words") or selected_checks_payload_ui.get(ui_check, {}).get("threshold") is not None:
if api_key in individual_results:
enabled_api_checks.append(api_key)
# Sort by display name
sorted_api_check_names = sorted(
enabled_api_checks,
key=lambda x: api_response_name_map.get(x, x)
)
# Display checks
if sorted_api_check_names:
for check_api_name in sorted_api_check_names:
details = individual_results.get(check_api_name, {})
display_name = api_response_name_map.get(check_api_name, check_api_name)
status = details.get("result", "N/A")
expander_style = ""
if status.lower() == "passed":
expander_style = "border-left: 5px solid green; padding-left: 10px;"
elif status.lower() == "failed":
expander_style = "border-left: 5px solid red; padding-left: 10px;"
elif status.lower() == "info":
expander_style = "border-left: 5px solid orange; padding-left: 10px;"
with st.expander(f"**{display_name}** - Status: **{status.upper()}**"):
st.markdown(f"", unsafe_allow_html=True)
# Add your detailed logic below
if check_api_name == "promptInjectionCheck":
st.write(f"**Confidence Score:** `{details.get('injectionConfidenceScore', 'N/A')}`")
st.write(f"**Threshold:** `{details.get('injectionThreshold', 'N/A')}`")
elif check_api_name == "jailbreakCheck":
st.write(f"**Similarity Score:** `{details.get('jailbreakSimilarityScore', 'N/A')}`")
st.write(f"**Threshold:** `{details.get('jailbreakThreshold', 'N/A')}`")
elif check_api_name == "toxicityCheck":
st.write(f"**Threshold:** `{details.get('toxicitythreshold', 'N/A')}`")
if details.get("toxicityScore"):
st.write("**Toxicity Scores:**")
for score_obj in details["toxicityScore"]:
for name, score in score_obj.items():
st.write(f"- **{name.title()}**: `{score}`")
elif check_api_name == "profanityCheck":
st.write(f"**Profanity Threshold:** `{details.get('profaneWordsthreshold', 'N/A')}`")
profane_words = details.get("profaneWordsIdentified", [])
st.write(f"**Profane Words Identified:** {', '.join(profane_words) if profane_words else 'None'}")
elif check_api_name == "restrictedtopic":
st.write(f"**Topic Threshold:** `{details.get('topicThreshold', 'N/A')}`")
scores = details.get("topicScores", [])
if scores:
st.write("**Detected Topics Scores:**")
for score_dict in scores:
for topic, score in score_dict.items():
st.write(f"- **{topic}:** `{score}`")
else:
st.write("**Detected Topics:** None")
elif check_api_name == "textQuality":
st.write(f"**Readability Score:** `{details.get('readabilityScore', 'N/A')}`")
st.write(f"**Text Grade:** `{details.get('textGrade', 'N/A')}`")
elif check_api_name == "customThemeCheck":
st.write(f"**Similarity Score:** `{details.get('customSimilarityScore', 'N/A')}`")
st.write(f"**Theme Threshold:** `{details.get('themeThreshold', 'N/A')}`")
elif check_api_name == "privacyCheck":
entities = details.get("entitiesRecognised", [])
blocked = details.get("entitiesConfiguredToBlock", [])
st.write(f"**Entities Recognized:** {', '.join(entities) if entities else 'None'}")
st.write(f"**Entities Configured to Block:** {', '.join(blocked) if blocked else 'None'}")
elif check_api_name == "refusalCheck":
st.write(f"**Similarity Score:** `{details.get('refusalSimilarityScore', 'N/A')}`")
st.write(f"**Threshold:** `{details.get('RefusalThreshold', 'N/A')}`")
st.markdown("
", unsafe_allow_html=True)
else:
st.info("No individual checks to display.")
except requests.exceptions.RequestException as e:
st.error(f"API Request Error: {e}")
except json.JSONDecodeError:
st.error("Error decoding API response as JSON. Check if the API returned valid JSON.")
except Exception as e:
st.error(f"An unexpected error occurred: {e}.")