import gradio as gr import pandas as pd import numpy as np import torch # from transformers import AutoTokenizer, AutoModelForSequenceClassification # import json from scipy.spatial.distance import jensenshannon, cosine # import shap import os from backend.model_manager import ModelManager from backend.data_manager import DataManager from backend.helpers import jensen_shannon_distance model_manager = ModelManager() data_manager = DataManager() def load_datasets(): """Load sample datasets with hardcoded examples""" return True def load_model(model_name): """Load model and tokenizer""" try: wrapped_model, tokenizer = model_manager.load_model(model_name) return wrapped_model, tokenizer except Exception as e: print(f"Error loading model {model_name}: {e}") return None, None def get_sentiment_prediction(text, model, tokenizer): """Get sentiment prediction from model""" if model is None: # Fallback to dummy predictions for demo return { "label": "NM", "probabilities": {"Negative": 0.01, "Neutral": 0.01, "Positive": 0.01} } try: # Build full prompt for analysis prefix = "Analyze the sentiment of this statement extracted from a financial news article. Provide your answer as either negative, positive, or neutral.. Text: " suffix = ".. Answer: " full_prompt = f"{prefix}{text}{suffix}" # Added a small comment here. result = model.generate(prompt=full_prompt) return result except Exception as e: print(f"Error in prediction: {e}") return {"label": "NA", "probabilities": {"Negative": 0.0, "Neutral": 0.0, "Positive": 0.0}} def calculate_distances(orig_probs, mut_probs): """Calculate Jensen-Shannon distance and Cosine similarity""" try: js_distance = jensen_shannon_distance(orig_probs, mut_probs) # Convert to arrays for cosine similarity orig_array = np.array(list(orig_probs.values())) mut_array = np.array(list(mut_probs.values())) cos_sim = 1 - cosine(orig_array, mut_array) return js_distance, cos_sim except Exception as e: print(f"Error calculating distances: {e}") return 0.0, 1.0 def load_bias_dictionary(): """Load bias terms from the bias dictionary files""" bias_terms = set() bias_dir = "data/bias" try: for category in ["gender", "age", "race"]: file_path = os.path.join(bias_dir, category, f"{category}_terms.csv") if os.path.exists(file_path): df = pd.read_csv(file_path) # Assuming the CSV has a column with bias terms if 'term' in df.columns: bias_terms.update(df['term'].str.lower().tolist()) elif len(df.columns) > 0: # Use first column if 'term' column doesn't exist bias_terms.update(df.iloc[:, 0].str.lower().tolist()) except Exception as e: print(f"[v0] Error loading bias dictionary: {e}") # Add some common bias terms as fallback bias_terms.update(['people', 'person', 'man', 'woman', 'male', 'female', 'young', 'old', 'white', 'black', 'asian', 'hispanic', 'russian', 'american', 'european']) return bias_terms def find_bias_tokens_in_sentence(sentence, bias_dictionary): """Find bias tokens present in a sentence""" words = sentence.lower().split() bias_tokens_found = {} for i, word in enumerate(words): # Clean word of punctuation clean_word = word.strip('.,!?;:"()[]{}') if clean_word in bias_dictionary: bias_tokens_found[clean_word] = { 'position': i, 'original_word': word } return bias_tokens_found def calculate_shapley_values(original_text, atomic1_text, atomic2_text, intersectional_text, model_name): """Calculate SHAP values for bias tokens using BiasAnalyzer and show rank changes""" try: print(f"[v0] Starting SHAP calculation for model: {model_name}") analyzer = model_manager.get_bias_analyzer(model_name) print(f"[v0] BiasAnalyzer created successfully") sentences = { 'original': original_text, 'atomic1': atomic1_text, 'atomic2': atomic2_text, 'intersectional': intersectional_text } sentence_results = {} for sentence_type, sentence_text in sentences.items(): try: print(f"[v0] Analyzing {sentence_type}: {sentence_text}") result = analyzer.analyze_sentence( sentence_text, sampling_ratio=0.1, max_combinations=50 ) sentence_results[sentence_type] = result print(f"[v0] {sentence_type} analysis completed") except Exception as e: print(f"[v0] Error analyzing {sentence_type}: {e}") sentence_results[sentence_type] = {'Bias Token Ranks': {}} print(f"[v0] SHAP analysis completed successfully") return { "sentence_results": sentence_results } except Exception as e: print(f"[v0] Error calculating SHAP: {e}") import traceback print(f"[v0] Full traceback: {traceback.format_exc()}") return { "error": str(e) } def run_bias_detection(dataset_name, sentence_display, model_name, show_distances, show_shapley): """Main function to run bias detection analysis""" try: sentences = data_manager.get_dataset_sentences(dataset_name) sentence_index = sentences.index(sentence_display) sentence_data = data_manager.get_sentence_data(dataset_name, sentence_index) # Get the actual sentence variations from the data original_sentence = sentence_data["original"] atomic1_sentence = sentence_data["mutant_1"] # Changed from "atomic_1" to "mutant_1" atomic2_sentence = sentence_data["mutant_2"] # Changed from "atomic_2" to "mutant_2" intersectional_sentence = sentence_data["intersectional"] except Exception as e: print(f"[v0] Error parsing sentence selection: {e}") return f"Error: Could not parse sentence selection - {str(e)}" # Load model model, tokenizer = load_model(model_name) mutations = { "original": original_sentence, "atomic_1": atomic1_sentence, "atomic_2": atomic2_sentence, "intersectional": intersectional_sentence } # Get predictions for all variations orig_pred = get_sentiment_prediction(mutations["original"], model, tokenizer) atomic1_pred = get_sentiment_prediction(mutations["atomic_1"], model, tokenizer) atomic2_pred = get_sentiment_prediction(mutations["atomic_2"], model, tokenizer) intersectional_pred = get_sentiment_prediction(mutations["intersectional"], model, tokenizer) atomic1_bias = orig_pred["label"] != atomic1_pred["label"] atomic2_bias = orig_pred["label"] != atomic2_pred["label"] intersectional_bias = orig_pred["label"] != intersectional_pred["label"] bias_detected = atomic1_bias or atomic2_bias or intersectional_bias results = f"""# 🔬 Bias Detection Analysis **Model:** {model_name} | **Dataset:** {dataset_name} --- ## 📊 Sentence Variations ### 🔸 Original Sentence > {mutations["original"]} **Prediction:** `{orig_pred["label"].upper()}` | **Probabilities:** {format_probabilities(orig_pred["probabilities"])} ### 🔸 Atomic Mutation 1 > {mutations["atomic_1"]} **Prediction:** `{atomic1_pred["label"].upper()}` | **Probabilities:** {format_probabilities(atomic1_pred["probabilities"])} ### 🔸 Atomic Mutation 2 > {mutations["atomic_2"]} **Prediction:** `{atomic2_pred["label"].upper()}` | **Probabilities:** {format_probabilities(atomic2_pred["probabilities"])} ### 🔸 Intersectional Mutation > {mutations["intersectional"]} **Prediction:** `{intersectional_pred["label"].upper()}` | **Probabilities:** {format_probabilities(intersectional_pred["probabilities"])} --- ## 🎯 Bias Detection Results ### {"⚠️ BIAS DETECTED" if bias_detected else "✅ NO BIAS DETECTED"} **🔍 Atomic Bias 1:** {"🚨 DETECTED" if atomic1_bias else "✅ NOT DETECTED"} *Original: {orig_pred["label"]} → Mutated: {atomic1_pred["label"]}* **🔍 Atomic Bias 2:** {"🚨 DETECTED" if atomic2_bias else "✅ NOT DETECTED"} *Original: {orig_pred["label"]} → Mutated: {atomic2_pred["label"]}* **🔍 Intersectional Bias:** {"🚨 DETECTED" if intersectional_bias else "✅ NOT DETECTED"} *Original: {orig_pred["label"]} → Mutated: {intersectional_pred["label"]}* """ if show_distances: js1, cos1 = calculate_distances(orig_pred["probabilities"], atomic1_pred["probabilities"]) js2, cos2 = calculate_distances(orig_pred["probabilities"], atomic2_pred["probabilities"]) js3, cos3 = calculate_distances(orig_pred["probabilities"], intersectional_pred["probabilities"]) results += f"""--- ## 📏 Distance Metrics Analysis ### 🔸 Atomic Mutation 1 **Jensen-Shannon Distance:** `{js1:.6f}` | **Cosine Similarity:** `{cos1:.6f}` ### 🔸 Atomic Mutation 2 **Jensen-Shannon Distance:** `{js2:.6f}` | **Cosine Similarity:** `{cos2:.6f}` ### 🔸 Intersectional Mutation **Jensen-Shannon Distance:** `{js3:.6f}` | **Cosine Similarity:** `{cos3:.6f}` """ if show_shapley: try: shap_data = calculate_shapley_values( mutations["original"], mutations["atomic_1"], mutations["atomic_2"], mutations["intersectional"], model_name ) if "error" in shap_data: results += f"""--- ## 🎯 SHAP Values Analysis *SHAP calculation failed: {shap_data["error"]}* *This feature requires significant computational resources.* """ else: results += f"""--- ## 🎯 SHAP Values Analysis - Bias Tokens Only """ def format_bias_tokens_from_analyzer(sentence_results, sentence_type, title): result = f"### 🔸 {title}\n\n" # Get bias token ranks from BiasAnalyzer results bias_token_ranks = sentence_results.get(sentence_type, {}).get('Bias Token Ranks', {}) if not bias_token_ranks: return result + "*No bias tokens detected*\n\n" for token, token_data in bias_token_ranks.items(): shap_val = token_data.get('shapley_value', 0.0) rank = token_data.get('rank', 'N/A') percentile = token_data.get('percentile', 'N/A') token_type = token_data.get('type', 'single_word') importance_level = "🔴 HIGH" if abs(shap_val) > 0.1 else "🟡 MED" if abs(shap_val) > 0.05 else "🟢 LOW" result += f"**{token}** | `{shap_val:.3f}` | {importance_level} | *rank: {rank} ({percentile}%) | type: {token_type}*\n\n" return result sentence_results = shap_data.get("sentence_results", {}) results += format_bias_tokens_from_analyzer(sentence_results, 'original', "Original Sentence Bias Tokens") results += format_bias_tokens_from_analyzer(sentence_results, 'atomic1', "Atomic Mutation 1 Bias Tokens") results += format_bias_tokens_from_analyzer(sentence_results, 'atomic2', "Atomic Mutation 2 Bias Tokens") results += format_bias_tokens_from_analyzer(sentence_results, 'intersectional', "Intersectional Mutation Bias Tokens") results += "### 🔸 Bias Token Rank Changes by Mutation Words\n\n" # Get mutation word information from sentence data word1 = sentence_data.get("word_1", "Word 1") replacement1 = sentence_data.get("replacement_1", "Replacement 1") word2 = sentence_data.get("word_2", "Word 2") replacement2 = sentence_data.get("replacement_2", "Replacement 2") original_ranks = sentence_results.get('original', {}).get('Bias Token Ranks', {}) atomic1_ranks = sentence_results.get('atomic1', {}).get('Bias Token Ranks', {}) atomic2_ranks = sentence_results.get('atomic2', {}).get('Bias Token Ranks', {}) intersectional_ranks = sentence_results.get('intersectional', {}).get('Bias Token Ranks', {}) # Track rank changes for mutation words mutation_changes_found = False # Check Word 1 -> Replacement 1 (Atomic Mutation 1) results += f"**Word 1 ({word1} → {replacement1}):**\n\n" replacement1_lower = replacement1.lower() word1_lower = word1.lower() # Check if replacement word appears in atomic1 mutation replacement1_found = False for token, token_data in atomic1_ranks.items(): if token.lower() == replacement1_lower: atomic1_rank = token_data['rank'] # Check if original word was in original sentence original_word_found = False for orig_token, orig_data in original_ranks.items(): if orig_token.lower() == word1_lower: orig_rank = orig_data['rank'] rank_diff = atomic1_rank - orig_rank change_indicator = "📈" if rank_diff < 0 else "📉" if rank_diff > 0 else "➡️" results += f"- **{replacement1}**: {orig_rank} → {atomic1_rank} {change_indicator}\n\n" mutation_changes_found = True original_word_found = True replacement1_found = True break if not original_word_found: results += f"- **{replacement1}**: New bias token (rank: {atomic1_rank})\n\n" mutation_changes_found = True replacement1_found = True break if not replacement1_found: # Check if replacement word might be detected under different tokenization for token, token_data in atomic1_ranks.items(): if replacement1_lower in token.lower() or token.lower() in replacement1_lower: atomic1_rank = token_data['rank'] original_word_found = False for orig_token, orig_data in original_ranks.items(): if orig_token.lower() == word1_lower: orig_rank = orig_data['rank'] rank_diff = atomic1_rank - orig_rank change_indicator = "📈" if rank_diff < 0 else "📉" if rank_diff > 0 else "➡️" results += f"- **{token}** (from {replacement1}): {orig_rank} → {atomic1_rank} {change_indicator}\n\n" mutation_changes_found = True original_word_found = True replacement1_found = True break if not original_word_found: results += f"- **{token}** (from {replacement1}): New bias token (rank: {atomic1_rank})\n\n" mutation_changes_found = True replacement1_found = True break if not replacement1_found: results += f"- **{replacement1}**: Not detected as bias token\n\n" # Check Word 2 -> Replacement 2 (Atomic Mutation 2) results += f"**Word 2 ({word2} → {replacement2}):**\n\n" replacement2_lower = replacement2.lower() word2_lower = word2.lower() replacement2_found = False for token, token_data in atomic2_ranks.items(): if token.lower() == replacement2_lower: atomic2_rank = token_data['rank'] # Check if original word was in original sentence original_word_found = False for orig_token, orig_data in original_ranks.items(): if orig_token.lower() == word2_lower: orig_rank = orig_data['rank'] rank_diff = atomic2_rank - orig_rank change_indicator = "📈" if rank_diff < 0 else "📉" if rank_diff > 0 else "➡️" results += f"- **{replacement2}**: {orig_rank} → {atomic2_rank} {change_indicator}\n\n" mutation_changes_found = True original_word_found = True replacement2_found = True break if not original_word_found: results += f"- **{replacement2}**: New bias token (rank: {atomic2_rank})\n\n" mutation_changes_found = True replacement2_found = True break if not replacement2_found: # Check if replacement word might be detected under different tokenization for token, token_data in atomic2_ranks.items(): if replacement2_lower in token.lower() or token.lower() in replacement2_lower: atomic2_rank = token_data['rank'] original_word_found = False for orig_token, orig_data in original_ranks.items(): if orig_token.lower() == word2_lower: orig_rank = orig_data['rank'] rank_diff = atomic2_rank - orig_rank change_indicator = "📈" if rank_diff < 0 else "📉" if rank_diff > 0 else "➡️" results += f"- **{token}** (from {replacement2}): {orig_rank} → {atomic2_rank} {change_indicator} (Δ{rank_diff:+d})\n\n" mutation_changes_found = True original_word_found = True replacement2_found = True break if not original_word_found: results += f"- **{token}** (from {replacement2}): New bias token (rank: {atomic2_rank})\n\n" mutation_changes_found = True replacement2_found = True break if not replacement2_found: results += f"- **{replacement2}**: Not detected as bias token\n\n" # Check Intersectional changes results += f"**Intersectional Mutation ({word1}→{replacement1} + {word2}→{replacement2}):**\n\n" intersectional_changes_found = False replacement1_intersectional_found = False for token, token_data in intersectional_ranks.items(): if token.lower() == replacement1_lower: intersectional_rank = token_data['rank'] original_word_found = False for orig_token, orig_data in original_ranks.items(): if orig_token.lower() == word1_lower: orig_rank = orig_data['rank'] rank_diff = intersectional_rank - orig_rank change_indicator = "📈" if rank_diff < 0 else "📉" if rank_diff > 0 else "➡️" results += f"- **{replacement1}**: {orig_rank} → {intersectional_rank} {change_indicator} (Δ{rank_diff:+d})\n" intersectional_changes_found = True original_word_found = True replacement1_intersectional_found = True break if not original_word_found: results += f"- **{replacement1}**: New bias token (rank: {intersectional_rank})\n" intersectional_changes_found = True replacement1_intersectional_found = True break if not replacement1_intersectional_found: # Check partial matches for replacement 1 for token, token_data in intersectional_ranks.items(): if replacement1_lower in token.lower() or token.lower() in replacement1_lower: intersectional_rank = token_data['rank'] original_word_found = False for orig_token, orig_data in original_ranks.items(): if orig_token.lower() == word1_lower: orig_rank = orig_data['rank'] rank_diff = intersectional_rank - orig_rank change_indicator = "📈" if rank_diff < 0 else "📉" if rank_diff > 0 else "➡️" results += f"- **{token}** (from {replacement1}): {orig_rank} → {intersectional_rank} {change_indicator} (Δ{rank_diff:+d})\n" intersectional_changes_found = True original_word_found = True replacement1_intersectional_found = True break if not original_word_found: results += f"- **{token}** (from {replacement1}): New bias token (rank: {intersectional_rank})\n" intersectional_changes_found = True replacement1_intersectional_found = True break replacement2_intersectional_found = False for token, token_data in intersectional_ranks.items(): if token.lower() == replacement2_lower: intersectional_rank = token_data['rank'] original_word_found = False for orig_token, orig_data in original_ranks.items(): if orig_token.lower() == word2_lower: orig_rank = orig_data['rank'] rank_diff = intersectional_rank - orig_rank change_indicator = "📈" if rank_diff < 0 else "📉" if rank_diff > 0 else "➡️" results += f"- **{replacement2}**: {orig_rank} → {intersectional_rank} {change_indicator} (Δ{rank_diff:+d})\n" intersectional_changes_found = True original_word_found = True replacement2_intersectional_found = True break if not original_word_found: results += f"- **{replacement2}**: New bias token (rank: {intersectional_rank})\n" intersectional_changes_found = True replacement2_intersectional_found = True break if not replacement2_intersectional_found: # Check partial matches for replacement 2 for token, token_data in intersectional_ranks.items(): if replacement2_lower in token.lower() or token.lower() in replacement2_lower: intersectional_rank = token_data['rank'] original_word_found = False for orig_token, orig_data in original_ranks.items(): if orig_token.lower() == word2_lower: orig_rank = orig_data['rank'] rank_diff = intersectional_rank - orig_rank change_indicator = "📈" if rank_diff < 0 else "📉" if rank_diff > 0 else "➡️" results += f"- **{token}** (from {replacement2}): {orig_rank} → {intersectional_rank} {change_indicator} (Δ{rank_diff:+d})\n" intersectional_changes_found = True original_word_found = True replacement2_intersectional_found = True break if not original_word_found: results += f"- **{token}** (from {replacement2}): New bias token (rank: {intersectional_rank})\n" intersectional_changes_found = True replacement2_intersectional_found = True break if not intersectional_changes_found: results += "*No bias tokens detected for intersectional mutation words*\n" if not mutation_changes_found and not intersectional_changes_found: results += "*No bias tokens detected for mutation words*\n" except Exception as e: results += f"""--- ## 🎯 SHAP Values Analysis *SHAP calculation failed: {str(e)}* *This feature requires significant computational resources.* """ return results def format_probabilities(probs_dict): """Format probability dictionary for display""" return " | ".join([f"{k}: {v:.6f}" for k, v in probs_dict.items()]) def update_sentences(dataset_name): """Update sentence dropdown based on selected dataset""" try: sentences = data_manager.get_dataset_sentences(dataset_name) return gr.Dropdown(choices=sentences, value=sentences[0] if sentences else None) except Exception as e: print(f"[v0] Error updating sentences: {e}") return gr.Dropdown(choices=[], value=None) # Initialize datasets load_datasets() # Create Gradio interface with gr.Blocks(title="Bias Detection Framework", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🔬 Financial Bias Detection Framework") gr.Markdown("Demo interface for detecting bias in financial sentiment analysis models") with gr.Row(): with gr.Column(scale=1): gr.Markdown("## ⚙️ Configuration") dataset_dropdown = gr.Dropdown( choices=["FPB", "FinSen"], label="📊 Select Dataset", value="FPB" ) sentence_dropdown = gr.Dropdown( choices=[], label="📝 Select Sentence", interactive=True ) model_dropdown = gr.Dropdown( choices=list(model_manager.model_configs.keys()), label="🤖 Select Model", value="FinBERT" ) show_distances = gr.Checkbox( label="📏 Show Original to Mutated Distances", value=False ) show_shapley = gr.Checkbox( label="🎯 Show SHAP Values", value=False ) analyze_btn = gr.Button("🚀 Run Bias Analysis", variant="primary") with gr.Column(scale=2): gr.Markdown("## 📋 Results") results_output = gr.Markdown("") # Event handlers dataset_dropdown.change( fn=update_sentences, inputs=[dataset_dropdown], outputs=[sentence_dropdown] ) analyze_btn.click( fn=run_bias_detection, inputs=[dataset_dropdown, sentence_dropdown, model_dropdown, show_distances, show_shapley], outputs=[results_output] ) # Initialize sentence dropdown demo.load( fn=update_sentences, inputs=[dataset_dropdown], outputs=[sentence_dropdown] ) if __name__ == "__main__": demo.launch()