File size: 9,210 Bytes
9366995 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
"""
Toxicity Evaluator
Detects toxic, severe toxic, obscene, threat, insult, and identity hate in utterances.
Uses Detoxify library with pre-trained models.
"""
from typing import List, Dict, Any
import logging
import ssl
from evaluators.base import Evaluator
from evaluators.registry import register_evaluator
from custom_types import Utterance, EvaluationResult
from utils.evaluation_helpers import create_numerical_score, create_utterance_result
logger = logging.getLogger(__name__)
from detoxify import Detoxify
@register_evaluator(
"toxicity",
label="Toxicity Detection",
description="Detects toxic, severe toxic, obscene, threat, insult, and identity hate content",
category="Safety"
)
class ToxicityEvaluator(Evaluator):
"""
Evaluator for toxicity detection using Detoxify.
Detoxify provides scores for:
- toxicity: overall toxicity
- severe_toxicity: severe toxic content
- obscene: obscene language
- threat: threatening language
- insult: insulting language
- identity_attack: identity-based hate speech
- sexual_explicit: sexually explicit content (unbiased model only)
"""
METRIC_NAME = "toxicity"
# Available models
MODELS = {
"original": "original", # Standard model
"unbiased": "unbiased", # Less biased model (recommended)
"multilingual": "multilingual" # Supports multiple languages
}
def __init__(
self,
api_key: str = None,
model_type: str = "unbiased",
device: str = "cpu",
threshold: float = 0.5
):
"""
Initialize Toxicity Evaluator.
Args:
api_key: Not used for Detoxify (local model), kept for interface consistency
model_type: Which Detoxify model to use ("original", "unbiased", "multilingual")
device: Device to run model on ("cpu" or "cuda")
threshold: Threshold for flagging content as toxic (0-1)
"""
super().__init__()
self.model_type = model_type
self.device = device
self.threshold = threshold
# Load model
logger.info(f"Loading Detoxify model: {model_type} on {device}...")
# Fix SSL certificate verification issue on macOS
# Temporarily disable SSL verification for model download
original_https_context = ssl._create_default_https_context
ssl._create_default_https_context = ssl._create_unverified_context
try:
self.model = Detoxify(model_type, device=device)
except Exception as e:
logger.error(f"Failed to load Detoxify model: {e}")
raise
finally:
# Restore original SSL context
ssl._create_default_https_context = original_https_context
logger.info(f"Initialized {self.METRIC_NAME} evaluator with {model_type} model")
def execute(self, conversation: List[Utterance], **kwargs) -> EvaluationResult:
"""
Evaluate toxicity for each utterance in the conversation.
Args:
conversation: List of utterances with 'speaker' and 'text'
**kwargs: Optional parameters:
- threshold: Override default threshold for this evaluation
- batch_size: Process in batches (default: process all at once)
Returns:
EvaluationResult with per-utterance toxicity scores
"""
threshold = kwargs.get('threshold', self.threshold)
batch_size = kwargs.get('batch_size', None)
scores_per_utterance = []
# Extract all texts for batch prediction
texts = [utt["text"] for utt in conversation]
if batch_size:
# Process in batches
all_predictions = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
batch_results = self.model.predict(batch_texts)
all_predictions.append(batch_results)
# Merge batch results
predictions = self._merge_batch_predictions(all_predictions)
else:
# Process all at once
predictions = self.model.predict(texts)
# Convert predictions to per-utterance scores
for i, utt in enumerate(conversation):
utterance_scores = self._extract_scores(predictions, i, threshold)
# Directly append the scores dict (not nested under "toxicity")
# This matches the pattern used by other evaluators
scores_per_utterance.append(utterance_scores)
return create_utterance_result(conversation, scores_per_utterance)
def _extract_scores(
self,
predictions: Dict[str, Any],
index: int,
threshold: float
) -> Dict[str, Any]:
"""
Extract toxicity scores for a single utterance.
Args:
predictions: Full predictions dict from Detoxify
index: Index of the utterance
threshold: Threshold for flagging
Returns:
Dictionary with individual toxicity scores
"""
# Available metrics (depends on model)
available_metrics = list(predictions.keys())
scores = {}
max_score = 0.0
max_category = None
for metric in available_metrics:
value = float(predictions[metric][index])
scores[metric] = create_numerical_score(
value=value,
max_value=1.0,
label="High" if value >= threshold else "Low"
)
# Track highest score
if value > max_score:
max_score = value
max_category = metric
# Add overall assessment
scores["is_toxic"] = {
"type": "categorical",
"label": "Toxic" if max_score >= threshold else "Safe",
"confidence": max_score
}
if max_category and max_score >= threshold:
scores["primary_category"] = {
"type": "categorical",
"label": max_category.replace('_', ' ').title(),
"confidence": max_score
}
return scores
def _merge_batch_predictions(self, batch_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Merge multiple batch prediction results into a single dictionary.
Args:
batch_results: List of prediction dictionaries
Returns:
Merged predictions dictionary
"""
if not batch_results:
return {}
# Get all metric keys from first batch
metrics = list(batch_results[0].keys())
# Merge each metric's values
merged = {}
for metric in metrics:
merged[metric] = []
for batch in batch_results:
if isinstance(batch[metric], list):
merged[metric].extend(batch[metric])
else:
merged[metric].append(batch[metric])
return merged
def get_summary_statistics(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Calculate summary statistics for toxicity across all utterances.
Args:
results: List of per-utterance results from execute()
Returns:
Dictionary with summary statistics
"""
total_utterances = len(results)
toxic_count = 0
category_counts = {}
avg_scores = {}
for row in results:
toxicity_scores = row.get("toxicity_scores", {})
# Count toxic utterances
is_toxic = toxicity_scores.get("is_toxic", {})
if is_toxic.get("label") == "Toxic":
toxic_count += 1
# Count by category
primary_cat = toxicity_scores.get("primary_category", {})
if primary_cat:
cat_label = primary_cat.get("label", "Unknown")
category_counts[cat_label] = category_counts.get(cat_label, 0) + 1
# Accumulate scores for averaging
for key, score in toxicity_scores.items():
if key not in ["is_toxic", "primary_category"] and score.get("type") == "numerical":
if key not in avg_scores:
avg_scores[key] = []
avg_scores[key].append(score["value"])
# Calculate averages
for key in avg_scores:
avg_scores[key] = sum(avg_scores[key]) / len(avg_scores[key])
return {
"total_utterances": total_utterances,
"toxic_utterances": toxic_count,
"toxic_percentage": (toxic_count / total_utterances * 100) if total_utterances > 0 else 0,
"category_breakdown": category_counts,
"average_scores": avg_scores
}
|