Spaces:
Running
Running
| # DEPENDENCIES | |
| import re | |
| import math | |
| import torch | |
| import numpy as np | |
| from typing import Any | |
| from typing import Dict | |
| from typing import List | |
| from loguru import logger | |
| from config.threshold_config import Domain | |
| from metrics.base_metric import BaseMetric | |
| from metrics.base_metric import MetricResult | |
| from models.model_manager import get_model_manager | |
| from config.threshold_config import get_threshold_for_domain | |
| class PerplexityMetric(BaseMetric): | |
| """ | |
| Text predictability analysis using GPT-2 for perplexity calculation | |
| Measures (Aligned with Documentation): | |
| - Overall text perplexity (lower = more predictable = more AI-like) | |
| - Perplexity distribution across text chunks | |
| - Sentence-level perplexity patterns | |
| - Cross-entropy analysis | |
| """ | |
| def __init__(self): | |
| super().__init__(name = "perplexity", | |
| description = "GPT-2 based perplexity calculation for text predictability analysis", | |
| ) | |
| self.model = None | |
| self.tokenizer = None | |
| def initialize(self) -> bool: | |
| """ | |
| Initialize the perplexity metric | |
| """ | |
| try: | |
| logger.info("Initializing perplexity metric...") | |
| # Load GPT-2 model and tokenizer | |
| model_manager = get_model_manager() | |
| model_result = model_manager.load_model(model_name = "perplexity_gpt2") | |
| if isinstance(model_result, tuple): | |
| self.model, self.tokenizer = model_result | |
| else: | |
| logger.error("Failed to load GPT-2 model for perplexity calculation") | |
| return False | |
| self.is_initialized = True | |
| logger.success("Perplexity metric initialized successfully") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to initialize perplexity metric: {repr(e)}") | |
| return False | |
| def compute(self, text: str, **kwargs) -> MetricResult: | |
| """ | |
| Compute perplexity measures with FULL DOMAIN THRESHOLD INTEGRATION | |
| """ | |
| try: | |
| if not text or len(text.strip()) < 50: | |
| return MetricResult(metric_name = self.name, | |
| ai_probability = 0.5, | |
| human_probability = 0.5, | |
| mixed_probability = 0.0, | |
| confidence = 0.1, | |
| error = "Text too short for perplexity analysis", | |
| ) | |
| # Get domain-specific thresholds | |
| domain = kwargs.get('domain', Domain.GENERAL) | |
| domain_thresholds = get_threshold_for_domain(domain) | |
| perplexity_thresholds = domain_thresholds.perplexity | |
| # Calculate comprehensive perplexity features | |
| features = self._calculate_perplexity_features(text) | |
| # Calculate raw perplexity score (0-1 scale) | |
| raw_perplexity_score, confidence = self._analyze_perplexity_patterns(features) | |
| # Apply domain-specific thresholds to convert raw score to probabilities | |
| ai_prob, human_prob, mixed_prob = self._apply_domain_thresholds(raw_perplexity_score, perplexity_thresholds, features) | |
| # Apply confidence multiplier from domain thresholds | |
| confidence *= perplexity_thresholds.confidence_multiplier | |
| confidence = max(0.0, min(1.0, confidence)) | |
| return MetricResult(metric_name = self.name, | |
| ai_probability = ai_prob, | |
| human_probability = human_prob, | |
| mixed_probability = mixed_prob, | |
| confidence = confidence, | |
| details = {**features, | |
| 'domain_used' : domain.value, | |
| 'ai_threshold' : perplexity_thresholds.ai_threshold, | |
| 'human_threshold' : perplexity_thresholds.human_threshold, | |
| 'raw_score' : raw_perplexity_score, | |
| }, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in perplexity computation: {repr(e)}") | |
| return MetricResult(metric_name = self.name, | |
| ai_probability = 0.5, | |
| human_probability = 0.5, | |
| mixed_probability = 0.0, | |
| confidence = 0.0, | |
| error = str(e), | |
| ) | |
| def _apply_domain_thresholds(self, raw_score: float, thresholds: Any, features: Dict[str, Any]) -> tuple: | |
| """ | |
| Apply domain-specific thresholds to convert raw score to probabilities | |
| """ | |
| ai_threshold = thresholds.ai_threshold # e.g., 0.60 for GENERAL, 0.55 for ACADEMIC | |
| human_threshold = thresholds.human_threshold # e.g., 0.40 for GENERAL, 0.35 for ACADEMIC | |
| # Calculate probabilities based on threshold distances | |
| if (raw_score >= ai_threshold): | |
| # Above AI threshold - strongly AI | |
| distance_from_threshold = raw_score - ai_threshold | |
| ai_prob = 0.7 + (distance_from_threshold * 0.3) # 0.7 to 1.0 | |
| human_prob = 0.3 - (distance_from_threshold * 0.3) # 0.3 to 0.0 | |
| elif (raw_score <= human_threshold): | |
| # Below human threshold - strongly human | |
| distance_from_threshold = human_threshold - raw_score | |
| ai_prob = 0.3 - (distance_from_threshold * 0.3) # 0.3 to 0.0 | |
| human_prob = 0.7 + (distance_from_threshold * 0.3) # 0.7 to 1.0 | |
| else: | |
| # Between thresholds - uncertain zone | |
| range_width = ai_threshold - human_threshold | |
| if (range_width > 0): | |
| position_in_range = (raw_score - human_threshold) / range_width | |
| ai_prob = 0.3 + (position_in_range * 0.4) # 0.3 to 0.7 | |
| human_prob = 0.7 - (position_in_range * 0.4) # 0.7 to 0.3 | |
| else: | |
| ai_prob = 0.5 | |
| human_prob = 0.5 | |
| # Ensure probabilities are valid | |
| ai_prob = max(0.0, min(1.0, ai_prob)) | |
| human_prob = max(0.0, min(1.0, human_prob)) | |
| # Calculate mixed probability based on perplexity variance | |
| mixed_prob = self._calculate_mixed_probability(features) | |
| # Normalize to sum to 1.0 | |
| total = ai_prob + human_prob + mixed_prob | |
| if (total > 0): | |
| ai_prob /= total | |
| human_prob /= total | |
| mixed_prob /= total | |
| return ai_prob, human_prob, mixed_prob | |
| def _calculate_perplexity_features(self, text: str) -> Dict[str, Any]: | |
| """ | |
| Calculate comprehensive perplexity measures | |
| """ | |
| if not self.model or not self.tokenizer: | |
| return self._get_default_features() | |
| # Calculate overall perplexity | |
| overall_perplexity = self._calculate_perplexity(text) | |
| # Split into sentences for sentence-level analysis | |
| sentences = self._split_sentences(text) | |
| # Calculate sentence-level perplexities | |
| sentence_perplexities = list() | |
| valid_sentences = 0 | |
| for sentence in sentences: | |
| # Minimum sentence length | |
| if (len(sentence.strip()) > 20): | |
| sent_perplexity = self._calculate_perplexity(sentence) | |
| if (sent_perplexity > 0): | |
| sentence_perplexities.append(sent_perplexity) | |
| valid_sentences += 1 | |
| # Calculate statistical features | |
| if sentence_perplexities: | |
| avg_sentence_perplexity = np.mean(sentence_perplexities) | |
| std_sentence_perplexity = np.std(sentence_perplexities) | |
| min_sentence_perplexity = np.min(sentence_perplexities) | |
| max_sentence_perplexity = np.max(sentence_perplexities) | |
| else: | |
| avg_sentence_perplexity = overall_perplexity | |
| std_sentence_perplexity = 0.0 | |
| min_sentence_perplexity = overall_perplexity | |
| max_sentence_perplexity = overall_perplexity | |
| # Chunk-based analysis for whole-text understanding | |
| chunk_perplexities = self._calculate_chunk_perplexity(text, chunk_size = 200) | |
| perplexity_variance = np.var(chunk_perplexities) if chunk_perplexities else 0.0 | |
| avg_chunk_perplexity = np.mean(chunk_perplexities) if chunk_perplexities else overall_perplexity | |
| # Normalize perplexity to 0-1 scale for easier interpretation | |
| normalized_perplexity = self._normalize_perplexity(overall_perplexity) | |
| # Cross-entropy analysis | |
| cross_entropy_score = self._calculate_cross_entropy(text) | |
| return {"overall_perplexity" : round(overall_perplexity, 2), | |
| "normalized_perplexity" : round(normalized_perplexity, 4), | |
| "avg_sentence_perplexity" : round(avg_sentence_perplexity, 2), | |
| "std_sentence_perplexity" : round(std_sentence_perplexity, 2), | |
| "min_sentence_perplexity" : round(min_sentence_perplexity, 2), | |
| "max_sentence_perplexity" : round(max_sentence_perplexity, 2), | |
| "perplexity_variance" : round(perplexity_variance, 4), | |
| "avg_chunk_perplexity" : round(avg_chunk_perplexity, 2), | |
| "cross_entropy_score" : round(cross_entropy_score, 4), | |
| "num_sentences_analyzed" : valid_sentences, | |
| "num_chunks_analyzed" : len(chunk_perplexities), | |
| } | |
| def _calculate_perplexity(self, text: str) -> float: | |
| """ | |
| Calculate perplexity for given text using GPT-2 : Lower perplexity = more predictable = more AI-like | |
| """ | |
| try: | |
| # Check text length before tokenization | |
| if (len(text.strip()) < 10): | |
| return 0.0 | |
| # Tokenize the text | |
| encodings = self.tokenizer(text, | |
| return_tensors = 'pt', | |
| truncation = True, | |
| max_length = 1024, | |
| ) | |
| input_ids = encodings.input_ids | |
| # Minimum tokens | |
| if ((input_ids.numel() == 0) or (input_ids.size(1) < 5)): | |
| return 0.0 | |
| # Calculate loss (cross-entropy) | |
| with torch.no_grad(): | |
| outputs = self.model(input_ids, labels = input_ids) | |
| loss = outputs.loss | |
| # Convert loss to perplexity | |
| perplexity = math.exp(loss.item()) | |
| return perplexity | |
| except Exception as e: | |
| logger.warning(f"Perplexity calculation failed: {repr(e)}") | |
| return 0.0 | |
| def _split_sentences(self, text: str) -> List[str]: | |
| """ | |
| Split text into sentences | |
| """ | |
| sentences = re.split(r'[.!?]+', text) | |
| return [s.strip() for s in sentences if s.strip() and len(s.strip()) > 10] | |
| def _calculate_chunk_perplexity(self, text: str, chunk_size: int = 200) -> List[float]: | |
| """ | |
| Calculate perplexity across text chunks for whole-text analysis | |
| """ | |
| chunks = list() | |
| words = text.split() | |
| # Ensure we have enough words for meaningful chunks | |
| if (len(words) < chunk_size // 2): | |
| return [self._calculate_perplexity(text)] if text.strip() else [] | |
| # Create overlapping chunks for better analysis | |
| for i in range(0, len(words), chunk_size // 2): | |
| chunk = ' '.join(words[i:i + chunk_size]) | |
| # Minimum chunk size | |
| if (len(chunk) > 50): | |
| perplexity = self._calculate_perplexity(chunk) | |
| # Reasonable range check | |
| if ((perplexity > 0) and (perplexity < 1000)): | |
| chunks.append(perplexity) | |
| return chunks if chunks else [0.0] | |
| def _normalize_perplexity(self, perplexity: float) -> float: | |
| """ | |
| Normalize perplexity using sigmoid transformation | |
| Lower perplexity = higher normalized score = more AI-like | |
| """ | |
| # Use exponential normalization : Typical ranges: AI = 10-40, Human = 20-100 | |
| normalized = 1.0 / (1.0 + np.exp((perplexity - 30) / 10)) | |
| return normalized | |
| def _calculate_cross_entropy(self, text: str) -> float: | |
| """ | |
| Calculate cross-entropy as an alternative measure | |
| """ | |
| try: | |
| encodings = self.tokenizer(text, return_tensors='pt', truncation=True, max_length=1024) | |
| input_ids = encodings.input_ids | |
| if (input_ids.numel() == 0): | |
| return 0.0 | |
| with torch.no_grad(): | |
| outputs = self.model(input_ids, labels = input_ids) | |
| loss = outputs.loss | |
| # Normalize cross-entropy to 0-1 scale : Assuming max ~5 nats | |
| cross_entropy = loss.item() | |
| normalized_ce = min(1.0, cross_entropy / 5.0) | |
| return normalized_ce | |
| except Exception as e: | |
| logger.warning(f"Cross-entropy calculation failed: {repr(e)}") | |
| return 0.0 | |
| def _analyze_perplexity_patterns(self, features: Dict[str, Any]) -> tuple: | |
| """ | |
| Analyze perplexity patterns to determine RAW perplexity score (0-1 scale) : Higher score = more AI-like | |
| """ | |
| # Check feature validity first | |
| required_features = ['normalized_perplexity', 'perplexity_variance', 'std_sentence_perplexity', 'cross_entropy_score'] | |
| valid_features = [features.get(feat, 0) for feat in required_features if features.get(feat, 0) > 0] | |
| if (len(valid_features) < 3): | |
| # Low confidence if insufficient features | |
| return 0.5, 0.3 | |
| # Initialize ai_indicator list | |
| ai_indicators = list() | |
| # Low overall perplexity suggests AI | |
| if (features['normalized_perplexity'] > 0.7): | |
| # Very AI-like | |
| ai_indicators.append(0.8) | |
| elif (features['normalized_perplexity'] > 0.5): | |
| # AI-like | |
| ai_indicators.append(0.6) | |
| else: | |
| # Human-like | |
| ai_indicators.append(0.2) | |
| # Low perplexity variance suggests AI (consistent predictability) | |
| if (features['perplexity_variance'] < 50): | |
| ai_indicators.append(0.7) | |
| elif (features['perplexity_variance'] < 200): | |
| ai_indicators.append(0.4) | |
| else: | |
| ai_indicators.append(0.2) | |
| # Low sentence perplexity std suggests AI (consistent across sentences) | |
| if (features['std_sentence_perplexity'] < 20): | |
| ai_indicators.append(0.8) | |
| elif (features['std_sentence_perplexity'] < 50): | |
| ai_indicators.append(0.5) | |
| else: | |
| ai_indicators.append(0.2) | |
| # Low cross-entropy suggests AI (more predictable) | |
| if (features['cross_entropy_score'] < 0.3): | |
| ai_indicators.append(0.7) | |
| elif (features['cross_entropy_score'] < 0.6): | |
| ai_indicators.append(0.4) | |
| else: | |
| ai_indicators.append(0.2) | |
| # Consistent chunk perplexity suggests AI | |
| chunk_variance = features['perplexity_variance'] | |
| if (chunk_variance < 25): | |
| ai_indicators.append(0.9) | |
| elif (chunk_variance < 100): | |
| ai_indicators.append(0.6) | |
| else: | |
| ai_indicators.append(0.3) | |
| # Calculate raw score and confidence | |
| raw_score = np.mean(ai_indicators) if ai_indicators else 0.5 | |
| confidence = 1.0 - (np.std(ai_indicators) / 0.5) if ai_indicators else 0.5 | |
| confidence = max(0.1, min(0.9, confidence)) | |
| return raw_score, confidence | |
| def _calculate_mixed_probability(self, features: Dict[str, Any]) -> float: | |
| """ | |
| Calculate probability of mixed AI/Human content | |
| """ | |
| mixed_indicators = list() | |
| # Moderate perplexity values might indicate mixing | |
| if (0.4 <= features['normalized_perplexity'] <= 0.6): | |
| mixed_indicators.append(0.3) | |
| else: | |
| mixed_indicators.append(0.0) | |
| # High perplexity variance suggests mixed content | |
| if (features['perplexity_variance'] > 200): | |
| mixed_indicators.append(0.4) | |
| elif (features['perplexity_variance'] > 100): | |
| mixed_indicators.append(0.2) | |
| else: | |
| mixed_indicators.append(0.0) | |
| # Inconsistent sentence perplexities | |
| if (20 <= features['std_sentence_perplexity'] <= 60): | |
| mixed_indicators.append(0.3) | |
| else: | |
| mixed_indicators.append(0.0) | |
| return min(0.3, np.mean(mixed_indicators)) if mixed_indicators else 0.0 | |
| def _get_default_features(self) -> Dict[str, Any]: | |
| """ | |
| Return default features when analysis is not possible | |
| """ | |
| return {"overall_perplexity" : 50.0, | |
| "normalized_perplexity" : 0.5, | |
| "avg_sentence_perplexity" : 50.0, | |
| "std_sentence_perplexity" : 25.0, | |
| "min_sentence_perplexity" : 30.0, | |
| "max_sentence_perplexity" : 70.0, | |
| "perplexity_variance" : 100.0, | |
| "avg_chunk_perplexity" : 50.0, | |
| "cross_entropy_score" : 0.5, | |
| "num_sentences_analyzed" : 0, | |
| "num_chunks_analyzed" : 0, | |
| } | |
| def cleanup(self): | |
| """ | |
| Clean up resources | |
| """ | |
| self.model = None | |
| self.tokenizer = None | |
| super().cleanup() | |
| # Export | |
| __all__ = ["PerplexityMetric"] | |