|
|
|
|
|
import math |
|
|
import numpy as np |
|
|
from typing import Any |
|
|
from typing import Dict |
|
|
from typing import List |
|
|
from loguru import logger |
|
|
from collections import Counter |
|
|
from metrics.base_metric import BaseMetric |
|
|
from config.threshold_config import Domain |
|
|
from metrics.base_metric import MetricResult |
|
|
from models.model_manager import get_model_manager |
|
|
from config.threshold_config import get_threshold_for_domain |
|
|
|
|
|
|
|
|
class EntropyMetric(BaseMetric): |
|
|
""" |
|
|
Enhanced entropy analysis for text randomness and predictability |
|
|
|
|
|
Measures (Aligned with Documentation): |
|
|
- Character-level entropy and diversity |
|
|
- Word-level entropy and burstiness |
|
|
- Token-level diversity and unpredictability in sequences |
|
|
- Entropy distribution across text chunks |
|
|
- AI-specific pattern detection |
|
|
""" |
|
|
def __init__(self): |
|
|
super().__init__(name = "entropy", |
|
|
description = "Token-level diversity and unpredictability in text sequences", |
|
|
) |
|
|
self.tokenizer = None |
|
|
|
|
|
|
|
|
def initialize(self) -> bool: |
|
|
""" |
|
|
Initialize the entropy metric |
|
|
""" |
|
|
try: |
|
|
logger.info("Initializing entropy metric...") |
|
|
|
|
|
|
|
|
model_manager = get_model_manager() |
|
|
gpt_model = model_manager.load_model("perplexity_gpt2") |
|
|
|
|
|
if isinstance(gpt_model, tuple): |
|
|
self.tokenizer = gpt_model[1] |
|
|
|
|
|
else: |
|
|
logger.warning("Could not get tokenizer, using character-level entropy only") |
|
|
|
|
|
self.is_initialized = True |
|
|
logger.success("Entropy metric initialized successfully") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize entropy metric: {repr(e)}") |
|
|
return False |
|
|
|
|
|
|
|
|
def compute(self, text: str, **kwargs) -> MetricResult: |
|
|
""" |
|
|
Compute enhanced entropy measures for text with FULL DOMAIN THRESHOLD INTEGRATION |
|
|
""" |
|
|
try: |
|
|
if (not text or (len(text.strip()) < 50)): |
|
|
return MetricResult(metric_name = self.name, |
|
|
ai_probability = 0.5, |
|
|
human_probability = 0.5, |
|
|
mixed_probability = 0.0, |
|
|
confidence = 0.1, |
|
|
error = "Text too short for entropy analysis", |
|
|
) |
|
|
|
|
|
|
|
|
domain = kwargs.get('domain', Domain.GENERAL) |
|
|
domain_thresholds = get_threshold_for_domain(domain) |
|
|
entropy_thresholds = domain_thresholds.entropy |
|
|
|
|
|
|
|
|
features = self._calculate_enhanced_entropy_features(text) |
|
|
|
|
|
|
|
|
raw_entropy_score, confidence = self._analyze_entropy_patterns(features) |
|
|
|
|
|
|
|
|
ai_prob, human_prob, mixed_prob = self._apply_domain_thresholds(raw_entropy_score, entropy_thresholds, features) |
|
|
|
|
|
|
|
|
confidence *= entropy_thresholds.confidence_multiplier |
|
|
confidence = max(0.0, min(1.0, confidence)) |
|
|
|
|
|
return MetricResult(metric_name = self.name, |
|
|
ai_probability = ai_prob, |
|
|
human_probability = human_prob, |
|
|
mixed_probability = mixed_prob, |
|
|
confidence = confidence, |
|
|
details = {**features, |
|
|
'domain_used' : domain.value, |
|
|
'ai_threshold' : entropy_thresholds.ai_threshold, |
|
|
'human_threshold' : entropy_thresholds.human_threshold, |
|
|
'raw_score' : raw_entropy_score, |
|
|
}, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in entropy computation: {repr(e)}") |
|
|
return MetricResult(metric_name = self.name, |
|
|
ai_probability = 0.5, |
|
|
human_probability = 0.5, |
|
|
mixed_probability = 0.0, |
|
|
confidence = 0.0, |
|
|
error = str(e), |
|
|
) |
|
|
|
|
|
|
|
|
def _apply_domain_thresholds(self, raw_score: float, thresholds: Any, features: Dict[str, Any]) -> tuple: |
|
|
""" |
|
|
Apply domain-specific thresholds to convert raw score to probabilities |
|
|
""" |
|
|
ai_threshold = thresholds.ai_threshold |
|
|
human_threshold = thresholds.human_threshold |
|
|
|
|
|
|
|
|
if (raw_score >= ai_threshold): |
|
|
|
|
|
distance_from_threshold = raw_score - ai_threshold |
|
|
ai_prob = 0.7 + (distance_from_threshold * 0.3) |
|
|
human_prob = 0.3 - (distance_from_threshold * 0.3) |
|
|
|
|
|
elif (raw_score <= human_threshold): |
|
|
|
|
|
distance_from_threshold = human_threshold - raw_score |
|
|
ai_prob = 0.3 - (distance_from_threshold * 0.3) |
|
|
human_prob = 0.7 + (distance_from_threshold * 0.3) |
|
|
|
|
|
else: |
|
|
|
|
|
range_width = ai_threshold - human_threshold |
|
|
if (range_width > 0): |
|
|
position_in_range = (raw_score - human_threshold) / range_width |
|
|
ai_prob = 0.3 + (position_in_range * 0.4) |
|
|
human_prob = 0.7 - (position_in_range * 0.4) |
|
|
|
|
|
else: |
|
|
ai_prob = 0.5 |
|
|
human_prob = 0.5 |
|
|
|
|
|
|
|
|
ai_prob = max(0.0, min(1.0, ai_prob)) |
|
|
human_prob = max(0.0, min(1.0, human_prob)) |
|
|
|
|
|
|
|
|
mixed_prob = self._calculate_mixed_probability(features) |
|
|
|
|
|
|
|
|
total = ai_prob + human_prob + mixed_prob |
|
|
|
|
|
if (total > 0): |
|
|
ai_prob /= total |
|
|
human_prob /= total |
|
|
mixed_prob /= total |
|
|
|
|
|
return ai_prob, human_prob, mixed_prob |
|
|
|
|
|
|
|
|
def _calculate_enhanced_entropy_features(self, text: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Calculate comprehensive entropy measures including document-required features |
|
|
""" |
|
|
|
|
|
char_entropy = self._calculate_character_entropy(text) |
|
|
word_entropy = self._calculate_word_entropy(text) |
|
|
token_entropy = self._calculate_token_entropy(text) if self.tokenizer else 0.0 |
|
|
|
|
|
|
|
|
token_diversity = self._calculate_token_diversity(text) |
|
|
|
|
|
|
|
|
sequence_unpredictability = self._calculate_sequence_unpredictability(text) |
|
|
|
|
|
|
|
|
chunk_entropies = self._calculate_chunk_entropy(text, chunk_size=100) |
|
|
entropy_variance = np.var(chunk_entropies) if chunk_entropies else 0.0 |
|
|
avg_chunk_entropy = np.mean(chunk_entropies) if chunk_entropies else 0.0 |
|
|
|
|
|
|
|
|
ai_pattern_score = self._detect_ai_entropy_patterns(text) |
|
|
|
|
|
|
|
|
predictability = 1.0 - min(1.0, char_entropy / 4.0) |
|
|
|
|
|
return {"char_entropy" : round(char_entropy, 4), |
|
|
"word_entropy" : round(word_entropy, 4), |
|
|
"token_entropy" : round(token_entropy, 4), |
|
|
"token_diversity" : round(token_diversity, 4), |
|
|
"sequence_unpredictability" : round(sequence_unpredictability, 4), |
|
|
"entropy_variance" : round(entropy_variance, 4), |
|
|
"avg_chunk_entropy" : round(avg_chunk_entropy, 4), |
|
|
"predictability_score" : round(predictability, 4), |
|
|
"ai_pattern_score" : round(ai_pattern_score, 4), |
|
|
"num_chunks_analyzed" : len(chunk_entropies), |
|
|
} |
|
|
|
|
|
|
|
|
def _calculate_character_entropy(self, text: str) -> float: |
|
|
""" |
|
|
Calculate character-level entropy |
|
|
""" |
|
|
|
|
|
clean_text = ''.join(c for c in text.lower() if c.isalnum() or c.isspace()) |
|
|
|
|
|
if not clean_text: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
char_counts = Counter(clean_text) |
|
|
total_chars = len(clean_text) |
|
|
|
|
|
|
|
|
entropy = 0.0 |
|
|
|
|
|
for count in char_counts.values(): |
|
|
probability = count / total_chars |
|
|
entropy -= probability * math.log2(probability) |
|
|
|
|
|
return entropy |
|
|
|
|
|
|
|
|
def _calculate_word_entropy(self, text: str) -> float: |
|
|
""" |
|
|
Calculate word-level entropy |
|
|
""" |
|
|
words = text.lower().split() |
|
|
if (len(words) < 5): |
|
|
return 0.0 |
|
|
|
|
|
word_counts = Counter(words) |
|
|
total_words = len(words) |
|
|
|
|
|
entropy = 0.0 |
|
|
|
|
|
for count in word_counts.values(): |
|
|
probability = count / total_words |
|
|
entropy -= probability * math.log2(probability) |
|
|
|
|
|
return entropy |
|
|
|
|
|
|
|
|
def _calculate_token_entropy(self, text: str) -> float: |
|
|
""" |
|
|
Calculate token-level entropy using GPT-2 tokenizer |
|
|
""" |
|
|
try: |
|
|
if not self.tokenizer: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
if (len(text.strip()) < 10): |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
tokens = self.tokenizer.encode(text, |
|
|
add_special_tokens = False, |
|
|
truncation = True, |
|
|
) |
|
|
|
|
|
if (len(tokens) < 10): |
|
|
return 0.0 |
|
|
|
|
|
token_counts = Counter(tokens) |
|
|
total_tokens = len(tokens) |
|
|
|
|
|
entropy = 0.0 |
|
|
|
|
|
for count in token_counts.values(): |
|
|
probability = count / total_tokens |
|
|
entropy -= probability * math.log2(probability) |
|
|
|
|
|
return entropy |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Token entropy calculation failed: {repr(e)}") |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
def _calculate_token_diversity(self, text: str) -> float: |
|
|
""" |
|
|
Calculate token-level diversity : Higher diversity = more human-like |
|
|
""" |
|
|
if not self.tokenizer: |
|
|
return 0.0 |
|
|
|
|
|
try: |
|
|
tokens = self.tokenizer.encode(text, add_special_tokens=False) |
|
|
if (len(tokens) < 10): |
|
|
return 0.0 |
|
|
|
|
|
unique_tokens = len(set(tokens)) |
|
|
total_tokens = len(tokens) |
|
|
|
|
|
|
|
|
diversity = unique_tokens / total_tokens |
|
|
|
|
|
return diversity |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Token diversity calculation failed: {repr(e)}") |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
def _calculate_sequence_unpredictability(self, text: str) -> float: |
|
|
""" |
|
|
Calculate unpredictability in text sequences, it measures how unpredictable the token sequences are |
|
|
""" |
|
|
if not self.tokenizer: |
|
|
return 0.0 |
|
|
|
|
|
try: |
|
|
tokens = self.tokenizer.encode(text, add_special_tokens=False) |
|
|
if (len(tokens) < 20): |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
bigrams = [(tokens[i], tokens[i+1]) for i in range(len(tokens)-1)] |
|
|
bigram_counts = Counter(bigrams) |
|
|
total_bigrams = len(bigrams) |
|
|
|
|
|
|
|
|
sequence_entropy = 0.0 |
|
|
|
|
|
for count in bigram_counts.values(): |
|
|
probability = count / total_bigrams |
|
|
sequence_entropy -= probability * math.log2(probability) |
|
|
|
|
|
|
|
|
normalized_entropy = min(1.0, sequence_entropy / 8.0) |
|
|
|
|
|
return normalized_entropy |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Sequence unpredictability calculation failed: {repr(e)}") |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
def _calculate_chunk_entropy(self, text: str, chunk_size: int = 100) -> List[float]: |
|
|
""" |
|
|
Calculate entropy distribution across text chunks |
|
|
""" |
|
|
chunks = list() |
|
|
words = text.split() |
|
|
|
|
|
|
|
|
for i in range(0, len(words), chunk_size // 2): |
|
|
chunk = ' '.join(words[i:i + chunk_size]) |
|
|
|
|
|
|
|
|
if (len(chunk) > 20): |
|
|
entropy = self._calculate_character_entropy(chunk) |
|
|
chunks.append(entropy) |
|
|
|
|
|
return chunks |
|
|
|
|
|
|
|
|
def _detect_ai_entropy_patterns(self, text: str) -> float: |
|
|
""" |
|
|
Detect AI-specific entropy patterns: AI text often shows specific entropy signatures |
|
|
""" |
|
|
patterns_detected = 0 |
|
|
total_patterns = 4 |
|
|
|
|
|
|
|
|
char_entropy = self._calculate_character_entropy(text) |
|
|
|
|
|
|
|
|
if (char_entropy < 3.8): |
|
|
patterns_detected += 1 |
|
|
|
|
|
|
|
|
token_diversity = self._calculate_token_diversity(text) |
|
|
|
|
|
|
|
|
if (token_diversity < 0.7): |
|
|
patterns_detected += 1 |
|
|
|
|
|
|
|
|
sequence_unpredictability = self._calculate_sequence_unpredictability(text) |
|
|
|
|
|
|
|
|
if (sequence_unpredictability < 0.4): |
|
|
patterns_detected += 1 |
|
|
|
|
|
|
|
|
chunk_entropies = self._calculate_chunk_entropy(text, chunk_size = 100) |
|
|
entropy_variance = np.var(chunk_entropies) if chunk_entropies else 0.0 |
|
|
|
|
|
|
|
|
if (entropy_variance < 0.2): |
|
|
patterns_detected += 1 |
|
|
|
|
|
return patterns_detected / total_patterns |
|
|
|
|
|
|
|
|
def _analyze_entropy_patterns(self, features: Dict[str, Any]) -> tuple: |
|
|
""" |
|
|
Analyze entropy patterns to determine RAW entropy score (0-1 scale) |
|
|
This raw score will later be converted using domain thresholds |
|
|
""" |
|
|
|
|
|
valid_features = [score for score in [features.get('char_entropy', 0), |
|
|
features.get('token_diversity', 0), |
|
|
features.get('sequence_unpredictability', 0), |
|
|
features.get('ai_pattern_score', 0) |
|
|
] if score > 0 |
|
|
] |
|
|
|
|
|
if (len(valid_features) < 2): |
|
|
|
|
|
return 0.5, 0.3 |
|
|
|
|
|
ai_indicators = list() |
|
|
|
|
|
|
|
|
if (features['char_entropy'] < 3.5): |
|
|
|
|
|
ai_indicators.append(0.8) |
|
|
|
|
|
elif (features['char_entropy'] < 4.0): |
|
|
|
|
|
ai_indicators.append(0.6) |
|
|
|
|
|
else: |
|
|
|
|
|
ai_indicators.append(0.2) |
|
|
|
|
|
|
|
|
if (features['entropy_variance'] < 0.1): |
|
|
|
|
|
ai_indicators.append(0.9) |
|
|
|
|
|
elif (features['entropy_variance'] < 0.3): |
|
|
|
|
|
ai_indicators.append(0.5) |
|
|
|
|
|
else: |
|
|
|
|
|
ai_indicators.append(0.1) |
|
|
|
|
|
|
|
|
if (features['token_diversity'] < 0.6): |
|
|
ai_indicators.append(0.7) |
|
|
|
|
|
elif (features['token_diversity'] < 0.8): |
|
|
ai_indicators.append(0.4) |
|
|
|
|
|
else: |
|
|
ai_indicators.append(0.2) |
|
|
|
|
|
|
|
|
if (features['sequence_unpredictability'] < 0.3): |
|
|
ai_indicators.append(0.8) |
|
|
|
|
|
elif (features['sequence_unpredictability'] < 0.5): |
|
|
ai_indicators.append(0.5) |
|
|
|
|
|
else: |
|
|
ai_indicators.append(0.2) |
|
|
|
|
|
|
|
|
if (features['ai_pattern_score'] > 0.75): |
|
|
ai_indicators.append(0.9) |
|
|
|
|
|
elif (features['ai_pattern_score'] > 0.5): |
|
|
ai_indicators.append(0.7) |
|
|
|
|
|
else: |
|
|
ai_indicators.append(0.3) |
|
|
|
|
|
|
|
|
raw_score = np.mean(ai_indicators) if ai_indicators else 0.5 |
|
|
confidence = 1.0 - (np.std(ai_indicators) / 0.5) if ai_indicators else 0.5 |
|
|
confidence = max(0.1, min(0.9, confidence)) |
|
|
|
|
|
return raw_score, confidence |
|
|
|
|
|
|
|
|
def _calculate_mixed_probability(self, features: Dict[str, Any]) -> float: |
|
|
""" |
|
|
Calculate probability of mixed AI/Human content with better indicators |
|
|
""" |
|
|
mixed_indicators = list() |
|
|
|
|
|
|
|
|
entropy_variance = features.get('entropy_variance', 0) |
|
|
|
|
|
if (entropy_variance > 0.5): |
|
|
|
|
|
mixed_indicators.append(0.6) |
|
|
|
|
|
elif (entropy_variance > 0.3): |
|
|
mixed_indicators.append(0.3) |
|
|
|
|
|
else: |
|
|
mixed_indicators.append(0.0) |
|
|
|
|
|
|
|
|
char_entropy = features.get('char_entropy', 0) |
|
|
word_entropy = features.get('word_entropy', 0) |
|
|
|
|
|
if ((char_entropy > 0) and (word_entropy > 0)): |
|
|
entropy_discrepancy = abs(char_entropy - word_entropy) |
|
|
|
|
|
|
|
|
if (entropy_discrepancy > 1.0): |
|
|
mixed_indicators.append(0.4) |
|
|
|
|
|
|
|
|
ai_pattern_score = features.get('ai_pattern_score', 0) |
|
|
if (0.4 <= ai_pattern_score <= 0.6): |
|
|
mixed_indicators.append(0.3) |
|
|
|
|
|
mixed_probability = min(0.4, np.mean(mixed_indicators)) if mixed_indicators else 0.0 |
|
|
|
|
|
return mixed_probability |
|
|
|
|
|
|
|
|
def cleanup(self): |
|
|
""" |
|
|
Clean up resources |
|
|
""" |
|
|
self.tokenizer = None |
|
|
super().cleanup() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = ["EntropyMetric"] |
|
|
|