| |
| import math |
| import numpy as np |
| from typing import Any |
| from typing import Dict |
| from typing import List |
| from loguru import logger |
| from collections import Counter |
| from config.enums import Domain |
| from config.schemas import MetricResult |
| from metrics.base_metric import BaseMetric |
| from models.model_manager import get_model_manager |
| from config.constants import entropy_metric_params |
| from config.threshold_config import get_threshold_for_domain |
|
|
|
|
| class EntropyMetric(BaseMetric): |
| """ |
| Entropy analysis for text randomness and predictability |
| |
| Mathematical Foundation: |
| ------------------------ |
| - Shannon Entropy: H = -Σ p(x) * log2(p(x)) |
| - Higher entropy = more random/diverse = typically human |
| - Lower entropy = more predictable/uniform = typically synthetic |
| |
| Measures: |
| --------- |
| - Character-level entropy and diversity |
| - Word-level entropy and burstiness |
| - Token-level diversity and unpredictability in sequences |
| - Entropy distribution across text chunks |
| - Synthetic-specific pattern detection |
| """ |
| def __init__(self): |
| super().__init__(name = "entropy", |
| description = "Token-level diversity and unpredictability in text sequences", |
| ) |
| self.tokenizer = None |
| self.params = entropy_metric_params |
| |
|
|
| def initialize(self) -> bool: |
| """ |
| Initialize the entropy metric |
| """ |
| try: |
| logger.info("Initializing entropy metric...") |
| |
| |
| model_manager = get_model_manager() |
| gpt_model = model_manager.load_model("perplexity_reference_lm") |
| |
| if isinstance(gpt_model, tuple): |
| self.tokenizer = gpt_model[1] |
|
|
| else: |
| logger.warning("Could not get tokenizer, using character-level entropy only") |
| |
| self.is_initialized = True |
| logger.success("Entropy metric initialized successfully") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Failed to initialize entropy metric: {repr(e)}") |
| return False |
| |
|
|
| def compute(self, text: str, **kwargs) -> MetricResult: |
| """ |
| Compute entropy measures for text |
| """ |
| try: |
| if (not text or (len(text.strip()) < self.params.MIN_TEXT_LENGTH_FOR_ANALYSIS)): |
| return MetricResult(metric_name = self.name, |
| synthetic_probability = self.params.NEUTRAL_PROBABILITY, |
| authentic_probability = self.params.NEUTRAL_PROBABILITY, |
| hybrid_probability = self.params.MIN_PROBABILITY, |
| confidence = self.params.MIN_CONFIDENCE, |
| error = "Text too short for entropy analysis", |
| ) |
| |
| |
| domain = kwargs.get('domain', Domain.GENERAL) |
| domain_thresholds = get_threshold_for_domain(domain) |
| entropy_thresholds = domain_thresholds.entropy |
| |
| |
| features = self._calculate_entropy_features(text = text) |
| |
| |
| raw_entropy_score, confidence = self._analyze_entropy_patterns(features = features) |
| |
| |
| synthetic_prob, authentic_prob, hybrid_prob = self._apply_domain_thresholds(raw_score = raw_entropy_score, |
| thresholds = entropy_thresholds, |
| features = features, |
| ) |
| |
| |
| confidence *= entropy_thresholds.confidence_multiplier |
| confidence = max(self.params.MIN_CONFIDENCE, min(self.params.MAX_CONFIDENCE, confidence)) |
| |
| return MetricResult(metric_name = self.name, |
| synthetic_probability = synthetic_prob, |
| authentic_probability = authentic_prob, |
| hybrid_probability = hybrid_prob, |
| confidence = confidence, |
| details = {**features, |
| 'domain_used' : domain.value, |
| 'synthetic_threshold': entropy_thresholds.synthetic_threshold, |
| 'authentic_threshold': entropy_thresholds.authentic_threshold, |
| 'raw_score' : raw_entropy_score, |
| }, |
| ) |
| |
| except Exception as e: |
| logger.error(f"Error in entropy computation: {repr(e)}") |
| return self._default_result(error = str(e)) |
| |
|
|
| def _apply_domain_thresholds(self, raw_score: float, thresholds: Any, features: Dict[str, Any]) -> tuple: |
| """ |
| Apply domain-specific thresholds to convert raw score to probabilities |
| """ |
| synthetic_threshold = thresholds.synthetic_threshold |
| authentic_threshold = thresholds.authentic_threshold |
| |
| |
| if (raw_score >= synthetic_threshold): |
| |
| distance_from_threshold = raw_score - synthetic_threshold |
| synthetic_prob = self.params.STRONG_SYNTHETIC_BASE_PROB + (distance_from_threshold * self.params.WEAK_PROBABILITY_ADJUSTMENT) |
| authentic_prob = self.params.UNCERTAIN_AUTHENTIC_RANGE_START - (distance_from_threshold * self.params.WEAK_PROBABILITY_ADJUSTMENT) |
| |
| elif (raw_score <= authentic_threshold): |
| |
| distance_from_threshold = authentic_threshold - raw_score |
| synthetic_prob = self.params.UNCERTAIN_SYNTHETIC_RANGE_START - (distance_from_threshold * self.params.WEAK_PROBABILITY_ADJUSTMENT) |
| authentic_prob = self.params.STRONG_AUTHENTIC_BASE_PROB + (distance_from_threshold * self.params.WEAK_PROBABILITY_ADJUSTMENT) |
| |
| else: |
| |
| range_width = synthetic_threshold - authentic_threshold |
| if (range_width > self.params.ZERO_TOLERANCE): |
| position_in_range = (raw_score - authentic_threshold) / range_width |
| synthetic_prob = self.params.UNCERTAIN_SYNTHETIC_RANGE_START + (position_in_range * self.params.UNCERTAIN_RANGE_WIDTH) |
| authentic_prob = self.params.UNCERTAIN_AUTHENTIC_RANGE_START - (position_in_range * self.params.UNCERTAIN_RANGE_WIDTH) |
| |
| else: |
| synthetic_prob = self.params.NEUTRAL_PROBABILITY |
| authentic_prob = self.params.NEUTRAL_PROBABILITY |
| |
| |
| synthetic_prob = max(self.params.MIN_PROBABILITY, min(self.params.MAX_PROBABILITY, synthetic_prob)) |
| authentic_prob = max(self.params.MIN_PROBABILITY, min(self.params.MAX_PROBABILITY, authentic_prob)) |
| |
| |
| hybrid_prob = self._calculate_hybrid_probability(features) |
| |
| |
| total = synthetic_prob + authentic_prob + hybrid_prob |
|
|
| if (total > self.params.ZERO_TOLERANCE): |
| synthetic_prob /= total |
| authentic_prob /= total |
| hybrid_prob /= total |
| |
| return synthetic_prob, authentic_prob, hybrid_prob |
| |
|
|
| def _calculate_entropy_features(self, text: str) -> Dict[str, Any]: |
| """ |
| Calculate comprehensive entropy measures including document-required features |
| """ |
| |
| char_entropy = self._calculate_character_entropy(text = text) |
| word_entropy = self._calculate_word_entropy(text = text) |
| token_entropy = self._calculate_token_entropy(text = text) if self.tokenizer else 0.0 |
| |
| |
| token_diversity = self._calculate_token_diversity(text = text) |
| |
| |
| sequence_unpredictability = self._calculate_sequence_unpredictability(text = text) |
| |
| |
| chunk_entropies = self._calculate_chunk_entropy(text = text) |
| entropy_variance = np.var(chunk_entropies) if chunk_entropies else 0.0 |
| avg_chunk_entropy = np.mean(chunk_entropies) if chunk_entropies else 0.0 |
| |
| |
| synthetic_pattern_score = self._detect_synthetic_entropy_patterns(text = text) |
| |
| |
| predictability = 1.0 - min(1.0, char_entropy / self.params.MAX_CHAR_ENTROPY) |
| |
| |
| num_tokens = len(self.tokenizer.encode(text, add_special_tokens = False)) if self.tokenizer else len(text.split()) |
| |
| return {"char_entropy" : round(char_entropy, 4), |
| "word_entropy" : round(word_entropy, 4), |
| "token_entropy" : round(token_entropy, 4), |
| "token_diversity" : round(token_diversity, 4), |
| "sequence_unpredictability" : round(sequence_unpredictability, 4), |
| "entropy_variance" : round(entropy_variance, 4), |
| "avg_chunk_entropy" : round(avg_chunk_entropy, 4), |
| "predictability_score" : round(predictability, 4), |
| "synthetic_pattern_score" : round(synthetic_pattern_score, 4), |
| "num_chunks_analyzed" : len(chunk_entropies), |
| "num_tokens_analyzed" : num_tokens, |
| } |
| |
|
|
| def _calculate_character_entropy(self, text: str) -> float: |
| """ |
| Calculate character-level Shannon entropy |
| |
| Formula: H = -Σ p(x) * log2(p(x)) |
| |
| Typical English text: 3.0-4.5 bits |
| """ |
| |
| clean_text = ''.join(c for c in text.lower() if c.isalnum() or c.isspace()) |
| |
| if not clean_text: |
| return 0.0 |
| |
| |
| char_counts = Counter(clean_text) |
| total_chars = len(clean_text) |
| |
| |
| entropy = 0.0 |
|
|
| for count in char_counts.values(): |
| probability = count / total_chars |
|
|
| if (probability > self.params.ZERO_TOLERANCE): |
| entropy -= probability * math.log2(probability) |
| |
| return entropy |
| |
|
|
| def _calculate_word_entropy(self, text: str) -> float: |
| """ |
| Calculate word-level Shannon entropy |
| """ |
| words = text.lower().split() |
| if (len(words) < self.params.MIN_WORDS_FOR_ANALYSIS): |
| return 0.0 |
| |
| word_counts = Counter(words) |
| total_words = len(words) |
| |
| entropy = 0.0 |
|
|
| for count in word_counts.values(): |
| probability = count / total_words |
|
|
| if (probability > self.params.ZERO_TOLERANCE): |
| entropy -= probability * math.log2(probability) |
| |
| return entropy |
| |
|
|
| def _calculate_token_entropy(self, text: str) -> float: |
| """ |
| Calculate token-level Shannon entropy using GPT-2 tokenizer |
| """ |
| try: |
| if not self.tokenizer: |
| return 0.0 |
| |
| |
| if (len(text.strip()) < self.params.MIN_SENTENCE_LENGTH): |
| return 0.0 |
|
|
| |
| tokens = self.tokenizer.encode(text, |
| add_special_tokens = False, |
| truncation = True, |
| ) |
| |
| if (len(tokens) < self.params.MIN_TOKENS_FOR_ANALYSIS): |
| return 0.0 |
| |
| token_counts = Counter(tokens) |
| total_tokens = len(tokens) |
| |
| entropy = 0.0 |
|
|
| for count in token_counts.values(): |
| probability = count / total_tokens |
| if (probability > self.params.ZERO_TOLERANCE): |
| entropy -= probability * math.log2(probability) |
| |
| return entropy |
| |
| except Exception as e: |
| logger.warning(f"Token entropy calculation failed: {repr(e)}") |
| return 0.0 |
| |
|
|
| def _calculate_token_diversity(self, text: str) -> float: |
| """ |
| Calculate token-level diversity (type-token ratio) |
| |
| Interpretation: |
| -------------- |
| Higher diversity = more authentic-like |
| Lower diversity = more synthetic-like (vocabulary reuse) |
| """ |
| if not self.tokenizer: |
| return 0.0 |
| |
| try: |
| tokens = self.tokenizer.encode(text, add_special_tokens = False) |
| if (len(tokens) < self.params.MIN_TOKENS_FOR_ANALYSIS): |
| return 0.0 |
| |
| unique_tokens = len(set(tokens)) |
| total_tokens = len(tokens) |
| |
| |
| diversity = unique_tokens / total_tokens |
|
|
| return diversity |
| |
| except Exception as e: |
| logger.warning(f"Token diversity calculation failed: {repr(e)}") |
| return 0.0 |
| |
|
|
| def _calculate_sequence_unpredictability(self, text: str) -> float: |
| """ |
| Calculate unpredictability in text sequences using bigram entropy |
| |
| Measures how unpredictable the token sequences are: Higher unpredictability = more human-like |
| """ |
| if not self.tokenizer: |
| return 0.0 |
| |
| try: |
| tokens = self.tokenizer.encode(text, add_special_tokens = False) |
| if (len(tokens) < self.params.MIN_TOKENS_FOR_SEQUENCE): |
| return 0.0 |
| |
| |
| bigrams = [(tokens[i], tokens[i+1]) for i in range(len(tokens)-1)] |
| bigram_counts = Counter(bigrams) |
| total_bigrams = len(bigrams) |
| |
| |
| sequence_entropy = 0.0 |
|
|
| for count in bigram_counts.values(): |
| probability = count / total_bigrams |
| if (probability > self.params.ZERO_TOLERANCE): |
| sequence_entropy -= probability * math.log2(probability) |
| |
| |
| normalized_entropy = min(1.0, sequence_entropy / self.params.MAX_BIGRAM_ENTROPY) |
| |
| return normalized_entropy |
| |
| except Exception as e: |
| logger.warning(f"Sequence unpredictability calculation failed: {repr(e)}") |
| return 0.0 |
| |
|
|
| def _calculate_chunk_entropy(self, text: str) -> List[float]: |
| """ |
| Calculate entropy distribution across text chunks |
| """ |
| chunks = list() |
| words = text.split() |
| chunk_size = self.params.CHUNK_SIZE_WORDS |
| overlap = int(chunk_size * self.params.CHUNK_OVERLAP_RATIO) |
| step = max(1, chunk_size - overlap) |
| |
| |
| for i in range(0, len(words), step): |
| chunk = ' '.join(words[i:i + chunk_size]) |
| |
| |
| if (len(chunk) > self.params.MIN_CHUNK_LENGTH): |
| entropy = self._calculate_character_entropy(text = chunk) |
| |
| if (entropy > self.params.ZERO_TOLERANCE): |
| chunks.append(entropy) |
| |
| return chunks |
| |
|
|
| def _detect_synthetic_entropy_patterns(self, text: str) -> float: |
| """ |
| Detect synthetic-specific entropy patterns |
| |
| Synthetic text often shows specific entropy signatures: |
| - Overly consistent character distribution |
| - Low token diversity |
| - Predictable sequences |
| - Low entropy variance across chunks |
| """ |
| patterns_detected = 0 |
| total_patterns = 4 |
| |
| |
| char_entropy = self._calculate_character_entropy(text = text) |
| |
| |
| if (char_entropy < self.params.CHAR_ENTROPY_LOW_THRESHOLD): |
| patterns_detected += 1 |
| |
| |
| token_diversity = self._calculate_token_diversity(text) |
|
|
| |
| if (token_diversity < self.params.TOKEN_DIVERSITY_MEDIUM_THRESHOLD): |
| patterns_detected += 1 |
| |
| |
| sequence_unpredictability = self._calculate_sequence_unpredictability(text = text) |
| |
| |
| if (sequence_unpredictability < self.params.SEQUENCE_UNPREDICTABILITY_MEDIUM_THRESHOLD): |
| patterns_detected += 1 |
| |
| |
| chunk_entropies = self._calculate_chunk_entropy(text = text) |
| entropy_variance = np.var(chunk_entropies) if chunk_entropies else 0.0 |
| |
| |
| if (entropy_variance < self.params.ENTROPY_VARIANCE_LOW_THRESHOLD): |
| patterns_detected += 1 |
| |
| return patterns_detected / total_patterns |
| |
|
|
| def _analyze_entropy_patterns(self, features: Dict[str, Any]) -> tuple: |
| """ |
| Analyze entropy patterns to determine RAW entropy score (0-1 scale) with confidence |
| |
| Returns: |
| -------- |
| (raw_score, confidence) where: |
| - raw_score: Higher = more synthetic-like |
| - confidence: Based on sample size and agreement |
| """ |
| |
| valid_features = [score for score in [features.get('char_entropy', 0), |
| features.get('token_entropy', 0), |
| features.get('token_diversity', 0), |
| features.get('sequence_unpredictability', 0), |
| features.get('synthetic_pattern_score', 0) |
| ] if score > self.params.ZERO_TOLERANCE |
| ] |
| |
| if (len(valid_features) < self.params.MIN_REQUIRED_FEATURES): |
| |
| return self.params.NEUTRAL_PROBABILITY, self.params.LOW_FEATURE_CONFIDENCE |
|
|
| synthetic_indicators = list() |
| |
| |
| if (features['char_entropy'] < self.params.CHAR_ENTROPY_VERY_LOW_THRESHOLD): |
| |
| synthetic_indicators.append(self.params.VERY_STRONG_SYNTHETIC_WEIGHT) |
|
|
| elif (features['char_entropy'] < self.params.CHAR_ENTROPY_LOW_THRESHOLD): |
| |
| synthetic_indicators.append(self.params.MODERATE_SYNTHETIC_WEIGHT) |
|
|
| else: |
| |
| synthetic_indicators.append(self.params.MINIMAL_SYNTHETIC_WEIGHT) |
|
|
| |
| if (features['token_entropy'] < self.params.TOKEN_ENTROPY_LOW_THRESHOLD): |
| synthetic_indicators.append(self.params.MODERATE_SYNTHETIC_WEIGHT) |
| |
| else: |
| synthetic_indicators.append(self.params.MINIMAL_SYNTHETIC_WEIGHT) |
|
|
| |
| |
| if (features['entropy_variance'] < self.params.ENTROPY_VARIANCE_VERY_LOW_THRESHOLD): |
| |
| synthetic_indicators.append(self.params.STRONG_SYNTHETIC_WEIGHT) |
|
|
| elif (features['entropy_variance'] < self.params.ENTROPY_VARIANCE_MEDIUM_THRESHOLD): |
| |
| synthetic_indicators.append(self.params.WEAK_SYNTHETIC_WEIGHT) |
|
|
| else: |
| |
| synthetic_indicators.append(self.params.VERY_LOW_SYNTHETIC_WEIGHT) |
| |
| |
| if (features['token_diversity'] < self.params.TOKEN_DIVERSITY_LOW_THRESHOLD): |
| synthetic_indicators.append(self.params.MEDIUM_SYNTHETIC_WEIGHT) |
|
|
| elif (features['token_diversity'] < self.params.TOKEN_DIVERSITY_MEDIUM_THRESHOLD): |
| synthetic_indicators.append(self.params.VERY_WEAK_SYNTHETIC_WEIGHT) |
|
|
| else: |
| synthetic_indicators.append(self.params.MINIMAL_SYNTHETIC_WEIGHT) |
| |
| |
| if (features['sequence_unpredictability'] < self.params.SEQUENCE_UNPREDICTABILITY_LOW_THRESHOLD): |
| synthetic_indicators.append(self.params.VERY_STRONG_SYNTHETIC_WEIGHT) |
|
|
| elif (features['sequence_unpredictability'] < self.params.SEQUENCE_UNPREDICTABILITY_MEDIUM_THRESHOLD): |
| synthetic_indicators.append(self.params.WEAK_SYNTHETIC_WEIGHT) |
|
|
| else: |
| synthetic_indicators.append(self.params.MINIMAL_SYNTHETIC_WEIGHT) |
| |
| |
| if (features['synthetic_pattern_score'] > self.params.SYNTHETIC_PATTERN_SCORE_HIGH_THRESHOLD): |
| synthetic_indicators.append(self.params.STRONG_SYNTHETIC_WEIGHT) |
| |
| elif (features['synthetic_pattern_score'] > self.params.SYNTHETIC_PATTERN_SCORE_MEDIUM_THRESHOLD): |
| synthetic_indicators.append(self.params.MEDIUM_SYNTHETIC_WEIGHT) |
| |
| else: |
| synthetic_indicators.append(self.params.LOW_SYNTHETIC_WEIGHT) |
| |
| |
| raw_score = np.mean(synthetic_indicators) if synthetic_indicators else self.params.NEUTRAL_PROBABILITY |
| |
| |
| agreement_confidence = 1.0 - min(1.0, np.std(synthetic_indicators) / self.params.CONFIDENCE_STD_NORMALIZER) |
| |
| |
| num_chunks = features.get('num_chunks_analyzed', 0) |
| num_tokens = features.get('num_tokens_analyzed', 0) |
| chunk_confidence = min(1.0, num_chunks / self.params.MIN_CHUNKS_FOR_CONFIDENCE) |
| token_confidence = min(1.0, num_tokens / self.params.MIN_TOKENS_FOR_CONFIDENCE) |
| sample_confidence = (chunk_confidence + token_confidence) / 2.0 |
| |
| |
| confidence = (self.params.CONFIDENCE_BASE + self.params.CONFIDENCE_STD_FACTOR * agreement_confidence + self.params.CONFIDENCE_SAMPLE_FACTOR * sample_confidence) |
| |
| confidence = max(self.params.MIN_CONFIDENCE, min(self.params.MAX_CONFIDENCE, confidence)) |
| |
| return raw_score, confidence |
| |
|
|
| def _calculate_hybrid_probability(self, features: Dict[str, Any]) -> float: |
| """ |
| Calculate probability of hybrid synthetic/authentic content with better indicators |
| """ |
| hybrid_indicators = list() |
| |
| |
| entropy_variance = features.get('entropy_variance', 0) |
| |
| if (entropy_variance > self.params.ENTROPY_VARIANCE_HIGH_THRESHOLD): |
| |
| hybrid_indicators.append(self.params.STRONG_HYBRID_WEIGHT) |
|
|
| elif (entropy_variance > self.params.ENTROPY_VARIANCE_MIXED_THRESHOLD): |
| hybrid_indicators.append(self.params.MODERATE_HYBRID_WEIGHT) |
|
|
| else: |
| hybrid_indicators.append(self.params.MINIMAL_HYBRID_WEIGHT) |
| |
| |
| char_entropy = features.get('char_entropy', 0) |
| word_entropy = features.get('word_entropy', 0) |
| |
| if ((char_entropy > self.params.ZERO_TOLERANCE) and (word_entropy > self.params.ZERO_TOLERANCE)): |
| entropy_discrepancy = abs(char_entropy - word_entropy) |
|
|
| |
| if (entropy_discrepancy > self.params.ENTROPY_DISCREPANCY_THRESHOLD): |
| hybrid_indicators.append(self.params.MODERATE_HYBRID_WEIGHT) |
| |
| |
| synthetic_pattern_score = features.get('synthetic_pattern_score', 0) |
| if (self.params.SYNTHETIC_PATTERN_MIXED_MIN <= synthetic_pattern_score <= self.params.SYNTHETIC_PATTERN_MIXED_MAX): |
| hybrid_indicators.append(self.params.WEAK_HYBRID_WEIGHT) |
| |
| hybrid_probability = min(self.params.MAX_HYBRID_PROBABILITY, np.mean(hybrid_indicators)) if hybrid_indicators else 0.0 |
| |
| return hybrid_probability |
|
|
|
|
| def cleanup(self): |
| """ |
| Clean up resources |
| """ |
| self.tokenizer = None |
| super().cleanup() |
|
|
|
|
|
|
| |
| __all__ = ["EntropyMetric"] |