File size: 9,210 Bytes
9366995
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
"""
Toxicity Evaluator

Detects toxic, severe toxic, obscene, threat, insult, and identity hate in utterances.
Uses Detoxify library with pre-trained models.
"""
from typing import List, Dict, Any
import logging
import ssl

from evaluators.base import Evaluator
from evaluators.registry import register_evaluator
from custom_types import Utterance, EvaluationResult
from utils.evaluation_helpers import create_numerical_score, create_utterance_result

logger = logging.getLogger(__name__)
from detoxify import Detoxify


@register_evaluator(
    "toxicity",
    label="Toxicity Detection",
    description="Detects toxic, severe toxic, obscene, threat, insult, and identity hate content",
    category="Safety"
)
class ToxicityEvaluator(Evaluator):
    """
    Evaluator for toxicity detection using Detoxify.
    
    Detoxify provides scores for:
    - toxicity: overall toxicity
    - severe_toxicity: severe toxic content
    - obscene: obscene language
    - threat: threatening language
    - insult: insulting language
    - identity_attack: identity-based hate speech
    - sexual_explicit: sexually explicit content (unbiased model only)
    """
    
    METRIC_NAME = "toxicity"
    
    # Available models
    MODELS = {
        "original": "original",       # Standard model
        "unbiased": "unbiased",       # Less biased model (recommended)
        "multilingual": "multilingual" # Supports multiple languages
    }
    
    def __init__(
        self, 
        api_key: str = None,
        model_type: str = "unbiased",
        device: str = "cpu",
        threshold: float = 0.5
    ):
        """
        Initialize Toxicity Evaluator.
        
        Args:
            api_key: Not used for Detoxify (local model), kept for interface consistency
            model_type: Which Detoxify model to use ("original", "unbiased", "multilingual")
            device: Device to run model on ("cpu" or "cuda")
            threshold: Threshold for flagging content as toxic (0-1)
        """
        super().__init__()
        
        self.model_type = model_type
        self.device = device
        self.threshold = threshold
        
        # Load model
        logger.info(f"Loading Detoxify model: {model_type} on {device}...")
        
        # Fix SSL certificate verification issue on macOS
        # Temporarily disable SSL verification for model download
        original_https_context = ssl._create_default_https_context
        ssl._create_default_https_context = ssl._create_unverified_context
        
        try:
            self.model = Detoxify(model_type, device=device)
        except Exception as e:
            logger.error(f"Failed to load Detoxify model: {e}")
            raise
        finally:
            # Restore original SSL context
            ssl._create_default_https_context = original_https_context
        
        logger.info(f"Initialized {self.METRIC_NAME} evaluator with {model_type} model")
    
    def execute(self, conversation: List[Utterance], **kwargs) -> EvaluationResult:
        """
        Evaluate toxicity for each utterance in the conversation.
        
        Args:
            conversation: List of utterances with 'speaker' and 'text'
            **kwargs: Optional parameters:
                - threshold: Override default threshold for this evaluation
                - batch_size: Process in batches (default: process all at once)
            
        Returns:
            EvaluationResult with per-utterance toxicity scores
        """
        threshold = kwargs.get('threshold', self.threshold)
        batch_size = kwargs.get('batch_size', None)
        
        scores_per_utterance = []
        
        # Extract all texts for batch prediction
        texts = [utt["text"] for utt in conversation]
        
        if batch_size:
            # Process in batches
            all_predictions = []
            for i in range(0, len(texts), batch_size):
                batch_texts = texts[i:i + batch_size]
                batch_results = self.model.predict(batch_texts)
                all_predictions.append(batch_results)
            
            # Merge batch results
            predictions = self._merge_batch_predictions(all_predictions)
        else:
            # Process all at once
            predictions = self.model.predict(texts)
        
        # Convert predictions to per-utterance scores
        for i, utt in enumerate(conversation):
            utterance_scores = self._extract_scores(predictions, i, threshold)
            # Directly append the scores dict (not nested under "toxicity")
            # This matches the pattern used by other evaluators
            scores_per_utterance.append(utterance_scores)
        
        return create_utterance_result(conversation, scores_per_utterance)
    
    def _extract_scores(
        self, 
        predictions: Dict[str, Any], 
        index: int,
        threshold: float
    ) -> Dict[str, Any]:
        """
        Extract toxicity scores for a single utterance.
        
        Args:
            predictions: Full predictions dict from Detoxify
            index: Index of the utterance
            threshold: Threshold for flagging
            
        Returns:
            Dictionary with individual toxicity scores
        """
        # Available metrics (depends on model)
        available_metrics = list(predictions.keys())
        
        scores = {}
        max_score = 0.0
        max_category = None
        
        for metric in available_metrics:
            value = float(predictions[metric][index])
            scores[metric] = create_numerical_score(
                value=value,
                max_value=1.0,
                label="High" if value >= threshold else "Low"
            )
            
            # Track highest score
            if value > max_score:
                max_score = value
                max_category = metric
        
        # Add overall assessment
        scores["is_toxic"] = {
            "type": "categorical",
            "label": "Toxic" if max_score >= threshold else "Safe",
            "confidence": max_score
        }
        
        if max_category and max_score >= threshold:
            scores["primary_category"] = {
                "type": "categorical",
                "label": max_category.replace('_', ' ').title(),
                "confidence": max_score
            }
        
        return scores
    
    def _merge_batch_predictions(self, batch_results: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Merge multiple batch prediction results into a single dictionary.
        
        Args:
            batch_results: List of prediction dictionaries
            
        Returns:
            Merged predictions dictionary
        """
        if not batch_results:
            return {}
        
        # Get all metric keys from first batch
        metrics = list(batch_results[0].keys())
        
        # Merge each metric's values
        merged = {}
        for metric in metrics:
            merged[metric] = []
            for batch in batch_results:
                if isinstance(batch[metric], list):
                    merged[metric].extend(batch[metric])
                else:
                    merged[metric].append(batch[metric])
        
        return merged
    
    def get_summary_statistics(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Calculate summary statistics for toxicity across all utterances.
        
        Args:
            results: List of per-utterance results from execute()
            
        Returns:
            Dictionary with summary statistics
        """
        total_utterances = len(results)
        toxic_count = 0
        category_counts = {}
        avg_scores = {}
        
        for row in results:
            toxicity_scores = row.get("toxicity_scores", {})
            
            # Count toxic utterances
            is_toxic = toxicity_scores.get("is_toxic", {})
            if is_toxic.get("label") == "Toxic":
                toxic_count += 1
            
            # Count by category
            primary_cat = toxicity_scores.get("primary_category", {})
            if primary_cat:
                cat_label = primary_cat.get("label", "Unknown")
                category_counts[cat_label] = category_counts.get(cat_label, 0) + 1
            
            # Accumulate scores for averaging
            for key, score in toxicity_scores.items():
                if key not in ["is_toxic", "primary_category"] and score.get("type") == "numerical":
                    if key not in avg_scores:
                        avg_scores[key] = []
                    avg_scores[key].append(score["value"])
        
        # Calculate averages
        for key in avg_scores:
            avg_scores[key] = sum(avg_scores[key]) / len(avg_scores[key])
        
        return {
            "total_utterances": total_utterances,
            "toxic_utterances": toxic_count,
            "toxic_percentage": (toxic_count / total_utterances * 100) if total_utterances > 0 else 0,
            "category_breakdown": category_counts,
            "average_scores": avg_scores
        }