import streamlit as st import requests import json import os #Custom CSS Injection st.markdown(""" """, unsafe_allow_html=True) st.set_page_config(layout="wide") #Initialize session state for dynamic elements if 'custom_words' not in st.session_state: st.session_state.custom_words = [] # Initialize session state for output visibility if 'show_output' not in st.session_state: st.session_state.show_output = False # --- API Description (for Full Width) --- st.markdown("""
API DESCRIPTION
What does this API do?

The Moderation API evaluates prompts to determine whether they are safe for use with a large language model (LLM). It returns a status (passed or failed) with detailed scores.

Moderation Checks

  • Prompt Injection: Detects attempts to hijack or manipulate the LLM behavior.
  • Jailbreak Attempts: Identifies prompts trying to bypass guardrails.
  • Toxicity & Profanity: Flags harmful, offensive, or explicit content.
  • Restricted Topics: Detects categories like cheating, conspiracy, terrorism, etc.
  • Text Quality: Measures readability and clarity (informational only).
  • Customized Theme: Block prompts using custom keywords (e.g., "atomic weapon").
  • PII Detection: Identifies AADHAR, PAN, SSN, Passport, Email, Phone, IP, Credit Card, Medical License, etc.

Full Customization

  • Enable/disable checks
  • Set thresholds
  • Select PII or restricted topics
  • Define custom block terms

Resources

""", unsafe_allow_html=True) #Main Layout: Two Columns for Inputs and Outputs input_col, output_col = st.columns([0.5, 0.5]) # Input on left, Output on right with input_col: st.markdown('

INPUT

', unsafe_allow_html=True) # Define all checks and their default properties check_configs = { "Prompt Injection": {"type": "slider", "default": 0.7, "min": 0.0, "max": 1.0, "step": 0.01}, "Jailbreak": {"type": "slider", "default": 0.7, "min": 0.0, "max": 1.0, "step": 0.01}, "Toxicity": {"type": "slider", "default": 0.6, "min": 0.0, "max": 1.0, "step": 0.01}, "Profanity": {"type": "number_input", "default": 1, "min": 1, "hint": "1"}, "Restricted Topics": {"type": "multiselect", "default": 0.7, "min": 0.0, "max": 1.0, "step": 0.01, "options": ["terrorism","explosives","nudity","cruelty","cheating","fraud","crime","hacking","immoral","unethical","illegal","robbery","forgery","misinformation"]}, "Text Quality": {"type": "no_threshold"}, "Customized Theme": {"type": "slider_and_input", "default": 0.6, "min": 0.0, "max": 1.0, "step": 0.01}, "PII Detection": {"type": "multiselect_pii", "options": ["AADHAR_NUMBER", "PAN_Number", "IN_PAN", "US_PASSPORT", "US_SSN"]}, } # Initialize session state for all checkboxes (default to checked) for check_name, config in check_configs.items(): base_key = check_name.replace(' ', '_') # Always ensure checkbox is initialized checkbox_key = f"checkbox_{base_key}" if checkbox_key not in st.session_state: st.session_state[checkbox_key] = True # default to checked # Set default values for controls if config["type"] == "slider": slider_key = f"slider_{base_key}" if slider_key not in st.session_state: st.session_state[slider_key] = config["default"] elif config["type"] == "number_input": number_key = f"number_{base_key}" if number_key not in st.session_state: st.session_state[number_key] = config["default"] elif config["type"] == "multiselect": multi_key = f"multiselect_{base_key}" if multi_key not in st.session_state: st.session_state[multi_key] = config["options"][:2] slider_key = f"slider_{base_key}" if slider_key not in st.session_state: st.session_state[slider_key] = config["default"] elif config["type"] == "multiselect_pii": multi_key = f"multiselect_{base_key}" if multi_key not in st.session_state: st.session_state[multi_key] = config["options"] elif config["type"] == "slider_and_input": slider_key = f"slider_{base_key}" if slider_key not in st.session_state: st.session_state[slider_key] = config["default"] if "custom_words" not in st.session_state: st.session_state["custom_words"] = [] # Formatting for input box, send and refresh buttons col1, col2, col3 = st.columns([7,0.7,0.7]) with col1: user_input = st.text_input( label="Prompt", placeholder="Type your prompt here...", value=st.session_state.get("prompt_input", ""), key="prompt_input", label_visibility="collapsed" ) st.markdown("""
""", unsafe_allow_html=True) with col2: refresh_clicked = st.button("↻", key="refresh_button_text", help="Reload the page") with col3: send_clicked = st.button("→", key="send_request_button", help="Send the request") st.markdown("
", unsafe_allow_html=True) #Handle Refresh Click if refresh_clicked: # Clear all checkboxes, sliders, multiselects, etc. for check_name, config in check_configs.items(): base_key = check_name.replace(' ', '_') st.session_state.pop(f"checkbox_{base_key}", None) st.session_state.pop(f"slider_{base_key}", None) st.session_state.pop(f"multiselect_{base_key}", None) st.session_state.pop(f"number_{base_key}", None) # Clear custom word logic st.session_state.pop("custom_words", None) st.session_state.pop("new_custom_word", None) # Clear the input prompt st.session_state.pop("prompt_input", None) # Optionally: hide the output section st.session_state["show_output"] = False st.rerun() if send_clicked: user_input = st.session_state.get("prompt_input", "") if not user_input.strip(): with output_col: st.warning("Please enter a prompt before sending the request.") st.stop() st.session_state["show_output"] = True selected_checks_payload_ui = {} for check_name, config in check_configs.items(): base_key = check_name.replace(' ', '_') enabled = st.session_state.get(f"checkbox_{base_key}", False) if enabled: payload_item = {"enabled": True} if config["type"] == "slider": payload_item["threshold"] = st.session_state.get(f"slider_{base_key}", config["default"]) elif config["type"] == "number_input": payload_item["threshold"] = st.session_state.get(f"number_{base_key}", config["default"]) elif config["type"] == "multiselect": payload_item["threshold"] = st.session_state.get(f"slider_{base_key}", config["default"]) payload_item["topics"] = st.session_state.get(f"multiselect_{base_key}", config["options"][:2]) elif config["type"] == "multiselect_pii": payload_item["entities_to_block"] = st.session_state.get(f"multiselect_{base_key}", config["options"]) elif config["type"] == "slider_and_input": payload_item["threshold"] = st.session_state.get(f"slider_{base_key}", config["default"]) payload_item["custom_words"] = st.session_state.get("custom_words", []) selected_checks_payload_ui[check_name] = payload_item else: selected_checks_payload_ui[check_name] = {"enabled": False} st.markdown('

Select Moderation Checks to apply:

', unsafe_allow_html=True) # Mapping from UI check names to API check names api_check_name_map = { "Prompt Injection": "PromptInjection", "Jailbreak": "JailBreak", "Toxicity": "Toxicity", "Profanity": "Profanity", "Restricted Topics": "RestrictTopic", "Text Quality": "TextQuality", "Customized Theme": "CustomizedTheme", "PII Detection": "Piidetct" } # Display checkboxes and conditional UIs for check_name, config in check_configs.items(): base_key = check_name.replace(' ', '_') col1, col2 = st.columns([0.3, 0.7]) with col1: enabled = st.checkbox(check_name, key=f"checkbox_{base_key}") if enabled and config["type"] != "no_threshold": with col2: with st.expander(f"Configure {check_name}"): if config["type"] == "slider": # CORRECT: Removed 'value=config["default"]' threshold = st.slider( f"Set Threshold for {check_name}", min_value=config["min"], max_value=config["max"], step=config["step"], key=f"slider_{base_key}" ) elif config["type"] == "number_input": profanity_threshold = st.number_input( f"Set Profanity Count Threshold (whole numbers only)", min_value=config["min"], key=f"number_{base_key}", help=f"Enter a number, e.g., {config['hint']}" ) elif config["type"] == "multiselect": threshold = st.slider( f"Set Threshold for {check_name}", min_value=config["min"], max_value=config["max"], step=config["step"], key=f"slider_{base_key}" ) if slider_key not in st.session_state: st.session_state[slider_key] = config["default"] selected_topics = st.multiselect( "Select Restricted Topics:", options=config["options"], key=f"multiselect_{base_key}" ) elif config["type"] == "multiselect_pii": selected_entities = st.multiselect( "Select PII Entities to Block:", options=config["options"], key=f"multiselect_{base_key}" ) elif config["type"] == "slider_and_input": threshold = st.slider( f"Set Threshold for {check_name}", min_value=config["min"], max_value=config["max"], step=config["step"], key=f"slider_{base_key}" ) st.markdown("---") st.subheader("Custom Words for Theme") # Using a single, consistent placeholder for messages message_placeholder = st.empty() expander_key = f"custom_words_expander_{base_key}" def add_word_callback(): new_word_input = st.session_state.get("new_custom_word", "") new_word_to_add = new_word_input.strip() if new_word_to_add and new_word_to_add not in st.session_state.custom_words: st.session_state.custom_words.append(new_word_to_add) st.session_state.new_custom_word = "" st.session_state[expander_key] = True # Keep expander open message_placeholder.success(f"'{new_word_to_add}' added!") elif new_word_to_add: st.session_state[expander_key] = True # Keep expander open message_placeholder.warning("Word already exists.") else: st.session_state[expander_key] = True # Keep expander open message_placeholder.warning("Word is empty.") def delete_word_callback(index): st.session_state.custom_words.pop(index) st.session_state[expander_key] = True # Keep expander open message_placeholder.success("Word removed successfully!") st.text_input("Add a new custom word:", key="new_custom_word") st.button("Add Word", key="add_custom_word_btn", on_click=add_word_callback) if st.session_state.custom_words: st.write("Current Custom Words:") if expander_key not in st.session_state: st.session_state[expander_key] = True with st.expander("Show/Hide Custom Words", expanded=st.session_state.get(expander_key, False)): for i, word in enumerate(st.session_state.custom_words): word_col, btn_col = st.columns([0.8, 0.2]) with word_col: st.write(f"- {word}") with btn_col: # Use the on_click callback for the delete button st.button("del", key=f"remove_word_{i}", on_click=delete_word_callback, args=(i,)) else: st.info("No custom words added yet.") if st.session_state.show_output: with output_col: # Construct the final API payload final_api_payload = { "AccountName": "None", "userid": "None", "PortfolioName": "None", "lotNumber": 1, "translate": "no", "EmojiModeration": "yes", "Prompt": user_input, "ModerationChecks": [], "ModerationCheckThresholds": {} } for check_name, data in selected_checks_payload_ui.items(): #customized theme, need to make changes here.. if check_name == "Customized Theme": final_api_payload["ModerationChecks"].append(api_check_name_map["Customized Theme"]) final_api_payload["ModerationCheckThresholds"]["CustomTheme"] = { "Themename": "string", "Themethresold": data.get("threshold", check_configs["Customized Theme"]["default"]), "ThemeTexts": data.get("custom_words", []) } elif data["enabled"]: # For all other checks, only include if enabled if check_name in api_check_name_map: final_api_payload["ModerationChecks"].append(api_check_name_map[check_name]) # Populate ModerationCheckThresholds for enabled checks from UI if check_name == "Prompt Injection": final_api_payload["ModerationCheckThresholds"]["PromptinjectionThreshold"] = data.get("threshold") elif check_name == "Jailbreak": final_api_payload["ModerationCheckThresholds"]["JailbreakThreshold"] = data.get("threshold") elif check_name == "Toxicity": toxicity_threshold = data.get("threshold") final_api_payload["ModerationCheckThresholds"]["ToxicityThresholds"] = { "ToxicityThreshold": toxicity_threshold, "SevereToxicityThreshold": toxicity_threshold, "ObsceneThreshold": toxicity_threshold, "ThreatThreshold": toxicity_threshold, "InsultThreshold": toxicity_threshold, "IdentityAttackThreshold": toxicity_threshold, "SexualExplicitThreshold": toxicity_threshold } elif check_name == "Profanity": final_api_payload["ModerationCheckThresholds"]["ProfanityCountThreshold"] = data.get("threshold") elif check_name == "Restricted Topics": final_api_payload["ModerationCheckThresholds"]["RestrictedtopicDetails"] = { "RestrictedtopicThreshold": data.get("threshold"), "Restrictedtopics": data.get("topics", []) } elif check_name == "PII Detection": final_api_payload["ModerationCheckThresholds"]["PiientitiesConfiguredToBlock"] = data.get("entities_to_block", []) # Ensure unique checks in ModerationChecks list final_api_payload["ModerationChecks"] = list(set(final_api_payload["ModerationChecks"])) with output_col: if st.session_state.show_output: st.markdown('

OUTPUT

', unsafe_allow_html=True) results_placeholder = st.empty() # Placeholder for output results with results_placeholder.container(): # Display payload in output column try: # Actual API call resp = requests.post("https://infosysenterprise-responsible-ai-moderationlayer.hf.space/rai/v1/moderations", json=final_api_payload) response_data = resp.json() if not isinstance(response_data, dict): st.error("API response was not a valid JSON object (dictionary).") st.stop() st.write("Moderation Results") overall_status = response_data.get("moderationResults", {}).get("summary", {}).get("status", "N/A") overall_reason = response_data.get("moderationResults", {}).get("summary", {}).get("reason", []) if overall_status.lower() == "passed": st.success(f"Overall Status: {overall_status.upper()}") else: st.error(f"Overall Status: {overall_status.upper()}") if overall_reason: failed_checks_str = ", ".join(overall_reason) st.warning(f"Failed Checks: {failed_checks_str}") st.write("Individual Check Details:") moderation_results = response_data.get("moderationResults", {}) if not moderation_results: st.info("No detailed moderation results available.") else: # Mapping from UI labels to API keys ui_to_api_check_map = { "Prompt Injection": "promptInjectionCheck", "Jailbreak": "jailbreakCheck", "Toxicity": "toxicityCheck", "Profanity": "profanityCheck", "Restricted Topics": "restrictedtopic", "Text Quality": "textQuality", "Customized Theme": "customThemeCheck", "PII Detection": "privacyCheck", "Refusal": "refusalCheck" } api_response_name_map = {v: k for k, v in ui_to_api_check_map.items()} moderation_results = response_data.get("moderationResults", {}) # Filter out 'summary' and other non-check entries like 'text' individual_results = { k: v for k, v in moderation_results.items() if k not in {"summary", "text"} and isinstance(v, dict) } # Determine which checks were enabled in UI and are present in API enabled_api_checks = [] for ui_check, api_key in ui_to_api_check_map.items(): if selected_checks_payload_ui.get(ui_check, {}).get("enabled"): if api_key in individual_results: enabled_api_checks.append(api_key) elif ui_check == "Customized Theme": if st.session_state.get("custom_words") or selected_checks_payload_ui.get(ui_check, {}).get("threshold") is not None: if api_key in individual_results: enabled_api_checks.append(api_key) # Sort by display name sorted_api_check_names = sorted( enabled_api_checks, key=lambda x: api_response_name_map.get(x, x) ) # Display checks if sorted_api_check_names: for check_api_name in sorted_api_check_names: details = individual_results.get(check_api_name, {}) display_name = api_response_name_map.get(check_api_name, check_api_name) status = details.get("result", "N/A") expander_style = "" if status.lower() == "passed": expander_style = "border-left: 5px solid green; padding-left: 10px;" elif status.lower() == "failed": expander_style = "border-left: 5px solid red; padding-left: 10px;" elif status.lower() == "info": expander_style = "border-left: 5px solid orange; padding-left: 10px;" with st.expander(f"**{display_name}** - Status: **{status.upper()}**"): st.markdown(f"
", unsafe_allow_html=True) # Add your detailed logic below if check_api_name == "promptInjectionCheck": st.write(f"**Confidence Score:** `{details.get('injectionConfidenceScore', 'N/A')}`") st.write(f"**Threshold:** `{details.get('injectionThreshold', 'N/A')}`") elif check_api_name == "jailbreakCheck": st.write(f"**Similarity Score:** `{details.get('jailbreakSimilarityScore', 'N/A')}`") st.write(f"**Threshold:** `{details.get('jailbreakThreshold', 'N/A')}`") elif check_api_name == "toxicityCheck": st.write(f"**Threshold:** `{details.get('toxicitythreshold', 'N/A')}`") if details.get("toxicityScore"): st.write("**Toxicity Scores:**") for score_obj in details["toxicityScore"]: for name, score in score_obj.items(): st.write(f"- **{name.title()}**: `{score}`") elif check_api_name == "profanityCheck": st.write(f"**Profanity Threshold:** `{details.get('profaneWordsthreshold', 'N/A')}`") profane_words = details.get("profaneWordsIdentified", []) st.write(f"**Profane Words Identified:** {', '.join(profane_words) if profane_words else 'None'}") elif check_api_name == "restrictedtopic": st.write(f"**Topic Threshold:** `{details.get('topicThreshold', 'N/A')}`") scores = details.get("topicScores", []) if scores: st.write("**Detected Topics Scores:**") for score_dict in scores: for topic, score in score_dict.items(): st.write(f"- **{topic}:** `{score}`") else: st.write("**Detected Topics:** None") elif check_api_name == "textQuality": st.write(f"**Readability Score:** `{details.get('readabilityScore', 'N/A')}`") st.write(f"**Text Grade:** `{details.get('textGrade', 'N/A')}`") elif check_api_name == "customThemeCheck": st.write(f"**Similarity Score:** `{details.get('customSimilarityScore', 'N/A')}`") st.write(f"**Theme Threshold:** `{details.get('themeThreshold', 'N/A')}`") elif check_api_name == "privacyCheck": entities = details.get("entitiesRecognised", []) blocked = details.get("entitiesConfiguredToBlock", []) st.write(f"**Entities Recognized:** {', '.join(entities) if entities else 'None'}") st.write(f"**Entities Configured to Block:** {', '.join(blocked) if blocked else 'None'}") elif check_api_name == "refusalCheck": st.write(f"**Similarity Score:** `{details.get('refusalSimilarityScore', 'N/A')}`") st.write(f"**Threshold:** `{details.get('RefusalThreshold', 'N/A')}`") st.markdown("
", unsafe_allow_html=True) else: st.info("No individual checks to display.") except requests.exceptions.RequestException as e: st.error(f"API Request Error: {e}") except json.JSONDecodeError: st.error("Error decoding API response as JSON. Check if the API returned valid JSON.") except Exception as e: st.error(f"An unexpected error occurred: {e}.")