github-actions[bot]
Deploy from GitHub Actions (commit: 8b247ffacd77c0672965b8378f1d52a7dcd187ae)
9366995
| """ | |
| Toxicity Evaluator | |
| Detects toxic, severe toxic, obscene, threat, insult, and identity hate in utterances. | |
| Uses Detoxify library with pre-trained models. | |
| """ | |
| from typing import List, Dict, Any | |
| import logging | |
| import ssl | |
| from evaluators.base import Evaluator | |
| from evaluators.registry import register_evaluator | |
| from custom_types import Utterance, EvaluationResult | |
| from utils.evaluation_helpers import create_numerical_score, create_utterance_result | |
| logger = logging.getLogger(__name__) | |
| from detoxify import Detoxify | |
| class ToxicityEvaluator(Evaluator): | |
| """ | |
| Evaluator for toxicity detection using Detoxify. | |
| Detoxify provides scores for: | |
| - toxicity: overall toxicity | |
| - severe_toxicity: severe toxic content | |
| - obscene: obscene language | |
| - threat: threatening language | |
| - insult: insulting language | |
| - identity_attack: identity-based hate speech | |
| - sexual_explicit: sexually explicit content (unbiased model only) | |
| """ | |
| METRIC_NAME = "toxicity" | |
| # Available models | |
| MODELS = { | |
| "original": "original", # Standard model | |
| "unbiased": "unbiased", # Less biased model (recommended) | |
| "multilingual": "multilingual" # Supports multiple languages | |
| } | |
| def __init__( | |
| self, | |
| api_key: str = None, | |
| model_type: str = "unbiased", | |
| device: str = "cpu", | |
| threshold: float = 0.5 | |
| ): | |
| """ | |
| Initialize Toxicity Evaluator. | |
| Args: | |
| api_key: Not used for Detoxify (local model), kept for interface consistency | |
| model_type: Which Detoxify model to use ("original", "unbiased", "multilingual") | |
| device: Device to run model on ("cpu" or "cuda") | |
| threshold: Threshold for flagging content as toxic (0-1) | |
| """ | |
| super().__init__() | |
| self.model_type = model_type | |
| self.device = device | |
| self.threshold = threshold | |
| # Load model | |
| logger.info(f"Loading Detoxify model: {model_type} on {device}...") | |
| # Fix SSL certificate verification issue on macOS | |
| # Temporarily disable SSL verification for model download | |
| original_https_context = ssl._create_default_https_context | |
| ssl._create_default_https_context = ssl._create_unverified_context | |
| try: | |
| self.model = Detoxify(model_type, device=device) | |
| except Exception as e: | |
| logger.error(f"Failed to load Detoxify model: {e}") | |
| raise | |
| finally: | |
| # Restore original SSL context | |
| ssl._create_default_https_context = original_https_context | |
| logger.info(f"Initialized {self.METRIC_NAME} evaluator with {model_type} model") | |
| def execute(self, conversation: List[Utterance], **kwargs) -> EvaluationResult: | |
| """ | |
| Evaluate toxicity for each utterance in the conversation. | |
| Args: | |
| conversation: List of utterances with 'speaker' and 'text' | |
| **kwargs: Optional parameters: | |
| - threshold: Override default threshold for this evaluation | |
| - batch_size: Process in batches (default: process all at once) | |
| Returns: | |
| EvaluationResult with per-utterance toxicity scores | |
| """ | |
| threshold = kwargs.get('threshold', self.threshold) | |
| batch_size = kwargs.get('batch_size', None) | |
| scores_per_utterance = [] | |
| # Extract all texts for batch prediction | |
| texts = [utt["text"] for utt in conversation] | |
| if batch_size: | |
| # Process in batches | |
| all_predictions = [] | |
| for i in range(0, len(texts), batch_size): | |
| batch_texts = texts[i:i + batch_size] | |
| batch_results = self.model.predict(batch_texts) | |
| all_predictions.append(batch_results) | |
| # Merge batch results | |
| predictions = self._merge_batch_predictions(all_predictions) | |
| else: | |
| # Process all at once | |
| predictions = self.model.predict(texts) | |
| # Convert predictions to per-utterance scores | |
| for i, utt in enumerate(conversation): | |
| utterance_scores = self._extract_scores(predictions, i, threshold) | |
| # Directly append the scores dict (not nested under "toxicity") | |
| # This matches the pattern used by other evaluators | |
| scores_per_utterance.append(utterance_scores) | |
| return create_utterance_result(conversation, scores_per_utterance) | |
| def _extract_scores( | |
| self, | |
| predictions: Dict[str, Any], | |
| index: int, | |
| threshold: float | |
| ) -> Dict[str, Any]: | |
| """ | |
| Extract toxicity scores for a single utterance. | |
| Args: | |
| predictions: Full predictions dict from Detoxify | |
| index: Index of the utterance | |
| threshold: Threshold for flagging | |
| Returns: | |
| Dictionary with individual toxicity scores | |
| """ | |
| # Available metrics (depends on model) | |
| available_metrics = list(predictions.keys()) | |
| scores = {} | |
| max_score = 0.0 | |
| max_category = None | |
| for metric in available_metrics: | |
| value = float(predictions[metric][index]) | |
| scores[metric] = create_numerical_score( | |
| value=value, | |
| max_value=1.0, | |
| label="High" if value >= threshold else "Low" | |
| ) | |
| # Track highest score | |
| if value > max_score: | |
| max_score = value | |
| max_category = metric | |
| # Add overall assessment | |
| scores["is_toxic"] = { | |
| "type": "categorical", | |
| "label": "Toxic" if max_score >= threshold else "Safe", | |
| "confidence": max_score | |
| } | |
| if max_category and max_score >= threshold: | |
| scores["primary_category"] = { | |
| "type": "categorical", | |
| "label": max_category.replace('_', ' ').title(), | |
| "confidence": max_score | |
| } | |
| return scores | |
| def _merge_batch_predictions(self, batch_results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Merge multiple batch prediction results into a single dictionary. | |
| Args: | |
| batch_results: List of prediction dictionaries | |
| Returns: | |
| Merged predictions dictionary | |
| """ | |
| if not batch_results: | |
| return {} | |
| # Get all metric keys from first batch | |
| metrics = list(batch_results[0].keys()) | |
| # Merge each metric's values | |
| merged = {} | |
| for metric in metrics: | |
| merged[metric] = [] | |
| for batch in batch_results: | |
| if isinstance(batch[metric], list): | |
| merged[metric].extend(batch[metric]) | |
| else: | |
| merged[metric].append(batch[metric]) | |
| return merged | |
| def get_summary_statistics(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Calculate summary statistics for toxicity across all utterances. | |
| Args: | |
| results: List of per-utterance results from execute() | |
| Returns: | |
| Dictionary with summary statistics | |
| """ | |
| total_utterances = len(results) | |
| toxic_count = 0 | |
| category_counts = {} | |
| avg_scores = {} | |
| for row in results: | |
| toxicity_scores = row.get("toxicity_scores", {}) | |
| # Count toxic utterances | |
| is_toxic = toxicity_scores.get("is_toxic", {}) | |
| if is_toxic.get("label") == "Toxic": | |
| toxic_count += 1 | |
| # Count by category | |
| primary_cat = toxicity_scores.get("primary_category", {}) | |
| if primary_cat: | |
| cat_label = primary_cat.get("label", "Unknown") | |
| category_counts[cat_label] = category_counts.get(cat_label, 0) + 1 | |
| # Accumulate scores for averaging | |
| for key, score in toxicity_scores.items(): | |
| if key not in ["is_toxic", "primary_category"] and score.get("type") == "numerical": | |
| if key not in avg_scores: | |
| avg_scores[key] = [] | |
| avg_scores[key].append(score["value"]) | |
| # Calculate averages | |
| for key in avg_scores: | |
| avg_scores[key] = sum(avg_scores[key]) / len(avg_scores[key]) | |
| return { | |
| "total_utterances": total_utterances, | |
| "toxic_utterances": toxic_count, | |
| "toxic_percentage": (toxic_count / total_utterances * 100) if total_utterances > 0 else 0, | |
| "category_breakdown": category_counts, | |
| "average_scores": avg_scores | |
| } | |