Spaces:
Build error
Build error
| from typing import List, Dict, Any, Optional, Union, Callable, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from collections import defaultdict | |
| import torch | |
| from sklearn.base import BaseEstimator | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.decomposition import PCA | |
| from sklearn.manifold import TSNE | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| SHAP_AVAILABLE = False | |
| LIME_AVAILABLE = False | |
| CAPTUM_AVAILABLE = False | |
| UMAP_AVAILABLE = False | |
| try: | |
| import shap | |
| SHAP_AVAILABLE = True | |
| except ImportError: | |
| pass | |
| try: | |
| import lime | |
| import lime.lime_text | |
| LIME_AVAILABLE = True | |
| except ImportError: | |
| pass | |
| try: | |
| import captum | |
| import captum.attr | |
| CAPTUM_AVAILABLE = True | |
| except ImportError: | |
| pass | |
| try: | |
| import umap | |
| UMAP_AVAILABLE = True | |
| except ImportError: | |
| pass | |
| def get_linear_feature_importance( | |
| model: BaseEstimator, | |
| feature_names: Optional[List[str]] = None, | |
| class_index: int = -1 | |
| ) -> pd.DataFrame: | |
| if hasattr(model, "coef_"): | |
| coef = model.coef_ | |
| if coef.ndim == 1: | |
| weights = coef | |
| else: | |
| if class_index == -1: | |
| weights = np.mean(coef, axis=0) | |
| else: | |
| weights = coef[class_index] | |
| else: | |
| raise ValueError("Model does not have coef_ attribute") | |
| if feature_names is None: | |
| feature_names = [f"feature_{i}" for i in range(len(weights))] | |
| df = pd.DataFrame({"feature": feature_names, "weight": weights}) | |
| df = df.sort_values("weight", key=abs, ascending=False).reset_index(drop=True) | |
| return df | |
| def analyze_tfidf_class_keywords( | |
| tfidf_matrix: np.ndarray, | |
| y: np.ndarray, | |
| feature_names: List[str], | |
| top_k: int = 20 | |
| ) -> Dict[Any, pd.DataFrame]: | |
| classes = np.unique(y) | |
| results = {} | |
| for cls in classes: | |
| mask = (y == cls) | |
| avg_tfidf = np.mean(tfidf_matrix[mask], axis=0).A1 if hasattr(tfidf_matrix, 'A1') else np.mean(tfidf_matrix[mask], axis=0) | |
| top_indices = np.argsort(avg_tfidf)[::-1][:top_k] | |
| top_words = [feature_names[i] for i in top_indices] | |
| top_scores = [avg_tfidf[i] for i in top_indices] | |
| results[cls] = pd.DataFrame({"word": top_words, "tfidf_score": top_scores}) | |
| return results | |
| def explain_with_shap( | |
| model: BaseEstimator, | |
| X_train: np.ndarray, | |
| X_test: np.ndarray, | |
| feature_names: Optional[List[str]] = None, | |
| plot_type: str = "bar", | |
| max_display: int = 20 | |
| ): | |
| if "tree" in str(type(model)).lower(): | |
| explainer = shap.TreeExplainer(model) | |
| else: | |
| explainer = shap.KernelExplainer(model.predict_proba, X_train[:100]) | |
| shap_values = explainer.shap_values(X_test[:100]) | |
| if feature_names is None: | |
| feature_names = [f"feat_{i}" for i in range(X_test.shape[1])] | |
| plt.figure(figsize=(10, 6)) | |
| if isinstance(shap_values, list): | |
| shap.summary_plot(shap_values, X_test[:100], feature_names=feature_names, plot_type=plot_type, max_display=max_display, show=False) | |
| else: | |
| shap.summary_plot(shap_values, X_test[:100], feature_names=feature_names, plot_type=plot_type, max_display=max_display, show=False) | |
| plt.tight_layout() | |
| plt.show() | |
| def explain_text_with_lime( | |
| model: Any, | |
| text: str, | |
| tokenizer: Callable, | |
| class_names: List[str], | |
| num_features: int = 10, | |
| num_samples: int = 5000 | |
| ): | |
| def predict_fn(texts): | |
| tokenized = [tokenizer(t) for t in texts] | |
| if hasattr(model, "vectorizer"): | |
| X = model.vectorizer.transform(texts) | |
| else: | |
| raise NotImplementedError("Custom predict_fn needed for your pipeline") | |
| return model.predict_proba(X.toarray()) | |
| explainer = lime.lime_text.LimeTextExplainer(class_names=class_names) | |
| exp = explainer.explain_instance(text, predict_fn, num_features=num_features, num_samples=num_samples) | |
| exp.show_in_notebook() | |
| def visualize_attention_weights( | |
| tokens: List[str], | |
| attention_weights: np.ndarray, | |
| layer: int = 0, | |
| head: int = 0, | |
| figsize: Tuple[int, int] = (10, 2) | |
| ): | |
| if attention_weights.ndim != 4: | |
| raise ValueError("attention_weights must be 4D: (layers, heads, seq, seq)") | |
| weights = attention_weights[layer, head, :len(tokens), :len(tokens)] | |
| plt.figure(figsize=figsize) | |
| sns.heatmap( | |
| weights, | |
| xticklabels=tokens, | |
| yticklabels=tokens, | |
| cmap="viridis", | |
| cbar=True | |
| ) | |
| plt.title(f"Attention Layer {layer}, Head {head}") | |
| plt.xticks(rotation=45, ha="right") | |
| plt.yticks(rotation=0) | |
| plt.tight_layout() | |
| plt.show() | |
| def get_transformer_attention( | |
| model: 'torch.nn.Module', | |
| tokenizer: 'transformers.PreTrainedTokenizer', | |
| text: str, | |
| device: str = "cpu" | |
| ) -> Tuple[List[str], np.ndarray]: | |
| if not CAPTUM_AVAILABLE: | |
| raise ImportError("Install Captum: pip install captum") | |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512) | |
| input_ids = inputs["input_ids"].to(device) | |
| model = model.to(device) | |
| model.eval() | |
| with torch.no_grad(): | |
| outputs = model(input_ids, output_attentions=True) | |
| attentions = outputs.attentions | |
| attn = torch.stack(attentions, dim=0).squeeze(1).cpu().numpy() | |
| tokens = tokenizer.convert_ids_to_tokens(input_ids[0].cpu().numpy()) | |
| return tokens, attn | |
| def analyze_errors( | |
| y_true: np.ndarray, | |
| y_pred: np.ndarray, | |
| texts: List[str], | |
| labels: Optional[List[Any]] = None | |
| ) -> pd.DataFrame: | |
| errors = [] | |
| for i, (true, pred, text) in enumerate(zip(y_true, y_pred, texts)): | |
| if true != pred: | |
| errors.append({ | |
| "index": i, | |
| "text": text, | |
| "true_label": true, | |
| "pred_label": pred | |
| }) | |
| return pd.DataFrame(errors) | |
| def compare_model_errors( | |
| models: Dict[str, BaseEstimator], | |
| X_test: np.ndarray, | |
| y_test: np.ndarray, | |
| texts: List[str] | |
| ) -> Dict[str, pd.DataFrame]: | |
| results = {} | |
| for name, model in models.items(): | |
| y_pred = model.predict(X_test) | |
| errors = analyze_errors(y_test, y_pred, texts) | |
| results[name] = errors | |
| return results | |
| def plot_embeddings( | |
| embeddings: np.ndarray, | |
| labels: np.ndarray, | |
| method: str = "umap", | |
| n_components: int = 2, | |
| figsize: Tuple[int, int] = (12, 8), | |
| title: str = "Embedding Projection" | |
| ): | |
| if method == "tsne": | |
| reducer = TSNE(n_components=n_components, random_state=42, n_jobs=-1) | |
| elif method == "umap": | |
| if not UMAP_AVAILABLE: | |
| raise ImportError("Install UMAP: pip install umap-learn") | |
| reducer = umap.UMAP(n_components=n_components, random_state=42, n_jobs=-1) | |
| else: | |
| raise ValueError("method must be 'tsne' or 'umap'") | |
| proj = reducer.fit_transform(embeddings) | |
| plt.figure(figsize=figsize) | |
| scatter = plt.scatter(proj[:, 0], proj[:, 1], c=labels, cmap="tab10", alpha=0.7) | |
| plt.colorbar(scatter) | |
| plt.title(title) | |
| plt.xlabel("Component 1") | |
| plt.ylabel("Component 2") | |
| plt.tight_layout() | |
| plt.show() | |
| def get_token_importance_captum( | |
| model: 'torch.nn.Module', | |
| tokenizer: 'transformers.PreTrainedTokenizer', | |
| text: str, | |
| device: str = "cpu" | |
| ) -> Tuple[List[str], np.ndarray]: | |
| if not CAPTUM_AVAILABLE: | |
| raise ImportError("Install Captum: pip install captum") | |
| from captum.attr import LayerIntegratedGradients | |
| import torch | |
| inputs = tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512, | |
| padding=True | |
| ) | |
| input_ids = inputs["input_ids"].to(device) | |
| attention_mask = inputs["attention_mask"].to(device) | |
| model = model.to(device) | |
| model.eval() | |
| with torch.no_grad(): | |
| outputs = model(input_ids=input_ids, attention_mask=attention_mask) | |
| pred_class = torch.argmax(outputs.logits, dim=1).item() | |
| def forward_func(input_ids): | |
| return model(input_ids=input_ids, attention_mask=attention_mask).logits | |
| baseline_ids = torch.zeros_like(input_ids).to(device) | |
| baseline_ids[:, 0] = tokenizer.cls_token_id | |
| baseline_ids[:, -1] = tokenizer.sep_token_id | |
| lig = LayerIntegratedGradients(forward_func, model.bert.embeddings) | |
| attributions, delta = lig.attribute( | |
| inputs=input_ids, | |
| baselines=baseline_ids, | |
| target=pred_class, | |
| return_convergence_delta=True | |
| ) | |
| attributions = attributions.sum(dim=-1).squeeze(0).cpu().detach().numpy() | |
| tokens = tokenizer.convert_ids_to_tokens(input_ids[0].cpu().numpy()) | |
| return tokens, attributions | |
| def plot_token_importance(tokens: List[str], importance: np.ndarray, top_k: int = 20): | |
| valid = [(t, imp) for t, imp in zip(tokens, importance) if t not in ["[CLS]", "[SEP]", "[PAD]"]] | |
| if not valid: | |
| return | |
| tokens_clean, imp_clean = zip(*valid) | |
| indices = np.argsort(np.abs(imp_clean))[-top_k:][::-1] | |
| tokens_top = [tokens_clean[i] for i in indices] | |
| imp_top = [imp_clean[i] for i in indices] | |
| plt.figure(figsize=(10, 6)) | |
| colors = ["red" if x < 0 else "green" for x in imp_top] | |
| plt.barh(range(len(imp_top)), imp_top, color=colors) | |
| plt.yticks(range(len(imp_top)), tokens_top) | |
| plt.gca().invert_yaxis() | |
| plt.xlabel("Attribution Score") | |
| plt.title("Token Importance (Green: positive, Red: negative)") | |
| plt.tight_layout() | |
| plt.show() |