# DEPENDENCIES import re import numpy as np from typing import Any from typing import Dict from typing import List from loguru import logger from collections import Counter from config.enums import Domain from config.schemas import MetricResult from metrics.base_metric import BaseMetric from models.model_manager import get_model_manager from config.constants import semantic_analysis_params from sklearn.metrics.pairwise import cosine_similarity from config.threshold_config import get_threshold_for_domain class SemanticAnalysisMetric(BaseMetric): """ Semantic coherence and consistency analysis Measures (Aligned with Documentation): - Semantic similarity between sentences - Topic consistency across text - Coherence and logical flow - Repetition patterns and redundancy - Contextual consistency """ def __init__(self): super().__init__(name = "semantic_analysis", description = "Semantic coherence, repetition patterns, and contextual consistency analysis", ) self.sentence_model = None def initialize(self) -> bool: """ Initialize the semantic analysis metric """ try: logger.info("Initializing semantic analysis metric...") # Load sentence transformer for semantic embeddings model_manager = get_model_manager() self.sentence_model = model_manager.load_model("semantic_primary") self.is_initialized = True logger.success("Semantic analysis metric initialized successfully") return True except Exception as e: logger.error(f"Failed to initialize semantic analysis metric: {repr(e)}") return False def compute(self, text: str, **kwargs) -> MetricResult: """ Compute semantic analysis measures with FULL DOMAIN THRESHOLD INTEGRATION """ try: params = semantic_analysis_params if (not text or (len(text.strip()) < params.MIN_TEXT_LENGTH_FOR_ANALYSIS)): return self._default_result(error = "Text too short for semantic analysis") # Get domain-specific thresholds domain = kwargs.get('domain', Domain.GENERAL) domain_thresholds = get_threshold_for_domain(domain) semantic_thresholds = domain_thresholds.semantic # Calculate comprehensive semantic features features = self._calculate_semantic_features(text) # Calculate raw semantic score (0-1 scale) raw_semantic_score, confidence = self._analyze_semantic_patterns(features) # Apply domain-specific thresholds to convert raw score to probabilities synthetic_prob, authentic_prob, hybrid_prob = self._apply_domain_thresholds(raw_score = raw_semantic_score, thresholds = semantic_thresholds, features = features, ) # Apply confidence multiplier from domain thresholds confidence *= semantic_thresholds.confidence_multiplier confidence = max(params.MIN_CONFIDENCE, min(params.MAX_CONFIDENCE, confidence)) return MetricResult(metric_name = self.name, synthetic_probability = synthetic_prob, authentic_probability = authentic_prob, hybrid_probability = hybrid_prob, confidence = confidence, details = {**features, 'domain_used' : domain.value, 'synthetic_threshold' : semantic_thresholds.synthetic_threshold, 'authentic_threshold' : semantic_thresholds.authentic_threshold, 'raw_score' : raw_semantic_score, }, ) except Exception as e: logger.error(f"Error in semantic analysis computation: {repr(e)}") return self._default_result(error = str(e)) def _apply_domain_thresholds(self, raw_score: float, thresholds: Any, features: Dict[str, Any]) -> tuple: """ Apply domain-specific thresholds to convert raw score to probabilities """ params = semantic_analysis_params synthetic_threshold = thresholds.synthetic_threshold authentic_threshold = thresholds.authentic_threshold # Calculate probabilities based on threshold distances if (raw_score >= synthetic_threshold): # Above synthetic threshold - strongly synthetic distance_from_threshold = raw_score - synthetic_threshold synthetic_prob = params.STRONG_SYNTHETIC_BASE_PROB + (distance_from_threshold * params.WEAK_PROBABILITY_ADJUSTMENT) authentic_prob = (params.MAX_PROBABILITY - params.STRONG_SYNTHETIC_BASE_PROB) - (distance_from_threshold * params.WEAK_PROBABILITY_ADJUSTMENT) elif (raw_score <= authentic_threshold): # Below authentic threshold - strongly authentic distance_from_threshold = authentic_threshold - raw_score synthetic_prob = (params.MAX_PROBABILITY - params.STRONG_AUTHENTIC_BASE_PROB) - (distance_from_threshold * params.WEAK_PROBABILITY_ADJUSTMENT) authentic_prob = params.STRONG_AUTHENTIC_BASE_PROB + (distance_from_threshold * params.WEAK_PROBABILITY_ADJUSTMENT) else: # Between thresholds - uncertain zone range_width = synthetic_threshold - authentic_threshold if (range_width > params.ZERO_TOLERANCE): position_in_range = (raw_score - authentic_threshold) / range_width synthetic_prob = params.UNCERTAIN_SYNTHETIC_RANGE_START + (position_in_range * params.UNCERTAIN_RANGE_WIDTH) authentic_prob = params.UNCERTAIN_AUTHENTIC_RANGE_START - (position_in_range * params.UNCERTAIN_RANGE_WIDTH) else: synthetic_prob = params.NEUTRAL_PROBABILITY authentic_prob = params.NEUTRAL_PROBABILITY # Ensure probabilities are valid synthetic_prob = max(params.MIN_PROBABILITY, min(params.MAX_PROBABILITY, synthetic_prob)) authentic_prob = max(params.MIN_PROBABILITY, min(params.MAX_PROBABILITY, authentic_prob)) # Calculate hybrid probability based on semantic variance hybrid_prob = self._calculate_hybrid_probability(features = features) # Normalize to sum to 1.0 total = synthetic_prob + authentic_prob + hybrid_prob if (total > params.ZERO_TOLERANCE): synthetic_prob /= total authentic_prob /= total hybrid_prob /= total return synthetic_prob, authentic_prob, hybrid_prob def _calculate_semantic_features(self, text: str) -> Dict[str, Any]: """ Calculate comprehensive semantic analysis features """ params = semantic_analysis_params # Split text into sentences sentences = self._split_sentences(text) if (len(sentences) < params.MIN_SENTENCES_FOR_ANALYSIS): return self._get_default_features() # Calculate semantic embeddings for all sentences sentence_embeddings, valid_sentences = self._get_sentence_embeddings(sentences = sentences) if sentence_embeddings is None: return self._get_default_features() # Calculate semantic similarity matrix similarity_matrix = cosine_similarity(sentence_embeddings) # Calculate various semantic metrics coherence_score = self._calculate_coherence(similarity_matrix = similarity_matrix) consistency_score = self._calculate_consistency(similarity_matrix = similarity_matrix) repetition_score = self._detect_repetition_patterns(sentences = valid_sentences, similarity_matrix = similarity_matrix, ) topic_drift_score = self._calculate_topic_drift(similarity_matrix = similarity_matrix) contextual_consistency = self._calculate_contextual_consistency(sentences = sentences) # Chunk-based analysis for whole-text understanding chunk_coherence = self._calculate_chunk_coherence(text = text, chunk_size = params.CHUNK_SIZE_WORDS, ) return {"coherence_score" : round(coherence_score, 4), "consistency_score" : round(consistency_score, 4), "repetition_score" : round(repetition_score, 4), "topic_drift_score" : round(topic_drift_score, 4), "contextual_consistency" : round(contextual_consistency, 4), "avg_chunk_coherence" : round(np.mean(chunk_coherence) if chunk_coherence else params.DEFAULT_COHERENCE, 4), "coherence_variance" : round(np.var(chunk_coherence) if chunk_coherence else params.DEFAULT_COHERENCE_VARIANCE, 4), "num_sentences" : len(valid_sentences), "num_chunks_analyzed" : len(chunk_coherence), } def _split_sentences(self, text: str) -> List[str]: """ Split text into sentences """ sentences = re.split(semantic_analysis_params.SENTENCE_SPLIT_PATTERN, text) return [s.strip() for s in sentences if s.strip() and len(s.strip()) > semantic_analysis_params.MIN_SENTENCE_LENGTH] def _get_sentence_embeddings(self, sentences: List[str]) -> np.ndarray: """ Get semantic embeddings for sentences """ try: if not self.sentence_model: return None # Filter out very short sentences that might cause issues valid_sentences = [s for s in sentences if len(s.strip()) > semantic_analysis_params.MIN_VALID_SENTENCE_LENGTH] if not valid_sentences: return None, None # Encode sentences to get embeddings embeddings = self.sentence_model.encode(valid_sentences) # Check if embeddings are valid if ((embeddings is None) or (len(embeddings) == 0)): return None, None return embeddings, valid_sentences except Exception as e: logger.warning(f"Sentence embedding failed: {repr(e)}") return None, None def _calculate_coherence(self, similarity_matrix: np.ndarray) -> float: """ Calculate overall text coherence : Higher coherence = more logically connected sentences """ params = semantic_analysis_params if (similarity_matrix.size == 0): return params.MIN_PROBABILITY # Calculate average similarity between adjacent sentences adjacent_similarities = list() for i in range(len(similarity_matrix) - 1): adjacent_similarities.append(similarity_matrix[i, i + 1]) if (not adjacent_similarities): return params.MIN_PROBABILITY return np.mean(adjacent_similarities) def _calculate_consistency(self, similarity_matrix: np.ndarray) -> float: """ Calculate topic consistency throughout the text : Lower variance in similarities = more consistent """ params = semantic_analysis_params if (similarity_matrix.size == 0): return params.MIN_PROBABILITY # Calculate variance of similarities (lower variance = more consistent) all_similarities = similarity_matrix[np.triu_indices_from(similarity_matrix, k=1)] if (len(all_similarities) == 0): return params.MIN_PROBABILITY variance = np.var(all_similarities) # Convert to consistency score (higher = more consistent) consistency = params.MAX_PROBABILITY - min(params.MAX_PROBABILITY, variance * params.SIMILARITY_VARIANCE_FACTOR) return max(params.MIN_PROBABILITY, consistency) def _detect_repetition_patterns(self, sentences: List[str], similarity_matrix: np.ndarray) -> float: """ Detect repetition patterns in semantic content : AI text sometimes shows more semantic repetition """ params = semantic_analysis_params if (len(sentences) < params.MIN_SENTENCES_FOR_REPETITION): return params.MIN_PROBABILITY # Look for high similarity between non-adjacent sentences repetition_count = 0 total_comparisons = 0 for i in range(len(sentences)): for j in range(i + 2, len(sentences)): # Skip adjacent sentences # High semantic similarity if (similarity_matrix[i, j] > params.REPETITION_SIMILARITY_THRESHOLD): repetition_count += 1 total_comparisons += 1 if (total_comparisons == 0): return params.MIN_PROBABILITY repetition_score = repetition_count / total_comparisons # Scale to make differences more noticeable return min(params.MAX_PROBABILITY, repetition_score * params.REPETITION_SCORE_SCALING) def _calculate_topic_drift(self, similarity_matrix: np.ndarray) -> float: """ Calculate topic drift throughout the text : Higher drift = less focused content """ params = semantic_analysis_params if (len(similarity_matrix) < 3): return params.MIN_PROBABILITY # Calculate similarity between beginning and end sections start_size = min(params.START_SECTION_SIZE, len(similarity_matrix) // params.SECTION_SIZE_RATIO) end_size = min(params.END_SECTION_SIZE, len(similarity_matrix) // params.SECTION_SIZE_RATIO) start_indices = list(range(start_size)) end_indices = list(range(len(similarity_matrix) - end_size, len(similarity_matrix))) cross_similarities = list() for i in start_indices: for j in end_indices: cross_similarities.append(similarity_matrix[i, j]) if not cross_similarities: return params.MIN_PROBABILITY avg_cross_similarity = np.mean(cross_similarities) # Lower similarity between start and end = higher topic drift topic_drift = params.MAX_PROBABILITY - avg_cross_similarity return max(params.MIN_PROBABILITY, topic_drift) def _calculate_contextual_consistency(self, sentences: List[str]) -> float: """ Calculate contextual consistency using keyword and entity analysis """ params = semantic_analysis_params if (len(sentences) < params.MIN_SENTENCES_FOR_ANALYSIS): return params.MIN_PROBABILITY # Simple keyword consistency analysis : Extract meaningful words (nouns, adjectives) all_words = list() for sentence in sentences: words = re.findall(params.WORD_EXTRACTION_PATTERN, sentence.lower()) all_words.extend(words) if (len(all_words) < params.MIN_WORDS_FOR_KEYWORD_ANALYSIS): return params.MIN_PROBABILITY # Calculate how consistently keywords are used across sentences word_freq = Counter(all_words) top_keywords = [word for word, count in word_freq.most_common(params.TOP_KEYWORDS_COUNT) if count > params.MIN_KEYWORD_FREQUENCY] if not top_keywords: return params.MIN_PROBABILITY # Check if top keywords appear consistently across sentences keyword_presence = list() for keyword in top_keywords: sentences_with_keyword = sum(1 for sentence in sentences if keyword in sentence.lower()) presence_ratio = sentences_with_keyword / len(sentences) keyword_presence.append(presence_ratio) consistency = np.mean(keyword_presence) return consistency def _calculate_chunk_coherence(self, text: str, chunk_size: int = 200) -> List[float]: """ Calculate coherence across text chunks for whole-text analysis """ params = semantic_analysis_params chunks = list() words = text.split() # Create overlapping chunks overlap = int(chunk_size * params.CHUNK_OVERLAP_RATIO) for i in range(0, len(words), overlap): chunk = ' '.join(words[i:i + chunk_size]) # Minimum chunk size if (len(chunk) > params.MIN_CHUNK_LENGTH): chunk_sentences = self._split_sentences(chunk) if (len(chunk_sentences) >= params.MIN_SENTENCES_PER_CHUNK): sentence_embeddings, valid_sentences = self._get_sentence_embeddings(sentences = chunk_sentences) if ((sentence_embeddings is not None) and (len(sentence_embeddings) >= params.MIN_SENTENCES_PER_CHUNK)): similarity_matrix = cosine_similarity(sentence_embeddings) coherence = self._calculate_coherence(similarity_matrix) chunks.append(coherence) return chunks if chunks else [params.DEFAULT_COHERENCE] def _analyze_semantic_patterns(self, features: Dict[str, Any]) -> tuple: """ Analyze semantic patterns to determine RAW semantic score (0-1 scale) """ params = semantic_analysis_params # Check feature validity first required_features = ['coherence_score', 'consistency_score', 'repetition_score', 'topic_drift_score', 'coherence_variance'] valid_features = [features.get(feat, params.MIN_PROBABILITY) for feat in required_features if features.get(feat, params.MIN_PROBABILITY) > params.ZERO_TOLERANCE] if (len(valid_features) < params.MIN_REQUIRED_FEATURES): # Low confidence if insufficient features return params.NEUTRAL_PROBABILITY, params.LOW_FEATURE_CONFIDENCE # Initialize synthetic indicator list synthetic_indicators = list() # AI text often has very high coherence (too perfect) if (features['coherence_score'] > params.COHERENCE_HIGH_THRESHOLD): # Suspiciously high coherence synthetic_indicators.append(params.STRONG_SYNTHETIC_WEIGHT) elif (features['coherence_score'] > params.COHERENCE_MEDIUM_THRESHOLD): # Moderate coherence synthetic_indicators.append(params.MEDIUM_SYNTHETIC_WEIGHT) else: # Low coherence - more human-like synthetic_indicators.append(params.LOW_SYNTHETIC_WEIGHT) # Very high consistency suggests AI (unnaturally consistent) if (features['consistency_score'] > params.CONSISTENCY_HIGH_THRESHOLD): synthetic_indicators.append(params.STRONG_SYNTHETIC_WEIGHT) elif (features['consistency_score'] > params.CONSISTENCY_MEDIUM_THRESHOLD): synthetic_indicators.append(params.MODERATE_SYNTHETIC_WEIGHT) else: synthetic_indicators.append(params.VERY_LOW_SYNTHETIC_WEIGHT) # High repetition suggests AI if (features['repetition_score'] > params.REPETITION_HIGH_THRESHOLD): synthetic_indicators.append(params.MODERATE_SYNTHETIC_WEIGHT) elif (features['repetition_score'] > params.REPETITION_MEDIUM_THRESHOLD): synthetic_indicators.append(params.VERY_WEAK_SYNTHETIC_WEIGHT) else: synthetic_indicators.append(params.LOW_SYNTHETIC_WEIGHT) # Very low topic drift suggests AI (stays too focused) if (features['topic_drift_score'] < params.TOPIC_DRIFT_LOW_THRESHOLD): synthetic_indicators.append(params.MODERATE_SYNTHETIC_WEIGHT) elif (features['topic_drift_score'] < params.TOPIC_DRIFT_MEDIUM_THRESHOLD): synthetic_indicators.append(params.WEAK_SYNTHETIC_WEIGHT) else: synthetic_indicators.append(params.VERY_LOW_SYNTHETIC_WEIGHT) # Low coherence variance across chunks suggests AI if (features['coherence_variance'] < params.COHERENCE_VARIANCE_LOW_THRESHOLD): synthetic_indicators.append(params.MODERATE_SYNTHETIC_WEIGHT) elif (features['coherence_variance'] < params.COHERENCE_VARIANCE_MEDIUM_THRESHOLD): synthetic_indicators.append(params.VERY_WEAK_SYNTHETIC_WEIGHT) else: synthetic_indicators.append(params.LOW_SYNTHETIC_WEIGHT) # Calculate raw score and confidence if synthetic_indicators: raw_score = np.mean(synthetic_indicators) confidence = params.MAX_PROBABILITY - (np.std(synthetic_indicators) / params.CONFIDENCE_STD_NORMALIZER) confidence = max(params.MIN_CONFIDENCE, min(params.MAX_CONFIDENCE, confidence)) else: raw_score = params.NEUTRAL_PROBABILITY confidence = params.NEUTRAL_CONFIDENCE return raw_score, confidence def _calculate_hybrid_probability(self, features: Dict[str, Any]) -> float: """ Calculate probability of hybrid synthetic/authentic content """ mixed_indicators = list() params = semantic_analysis_params # Moderate coherence values might indicate mixing if (params.COHERENCE_MIXED_MIN <= features['coherence_score'] <= params.COHERENCE_MIXED_MAX): mixed_indicators.append(params.WEAK_HYBRID_WEIGHT) else: mixed_indicators.append(params.MIN_PROBABILITY) # High coherence variance suggests mixed content if (features['coherence_variance'] > params.COHERENCE_VARIANCE_HIGH_THRESHOLD): mixed_indicators.append(params.MODERATE_HYBRID_WEIGHT) elif (features['coherence_variance'] > params.COHERENCE_VARIANCE_MEDIUM_THRESHOLD): mixed_indicators.append(params.WEAK_HYBRID_WEIGHT) else: mixed_indicators.append(params.MIN_PROBABILITY) # Inconsistent repetition patterns if (params.REPETITION_MIXED_MIN <= features['repetition_score'] <= params.REPETITION_MIXED_MAX): mixed_indicators.append(params.WEAK_HYBRID_WEIGHT) else: mixed_indicators.append(params.MIN_PROBABILITY) if mixed_indicators: hybrid_prob = np.mean(mixed_indicators) return min(params.MAX_HYBRID_PROBABILITY, hybrid_prob) return params.MIN_PROBABILITY def _get_default_features(self) -> Dict[str, Any]: """ Return default features when analysis is not possible """ params = semantic_analysis_params return {"coherence_score" : params.DEFAULT_COHERENCE, "consistency_score" : params.DEFAULT_CONSISTENCY, "repetition_score" : params.DEFAULT_REPETITION, "topic_drift_score" : params.DEFAULT_TOPIC_DRIFT, "contextual_consistency" : params.DEFAULT_CONTEXTUAL_CONSISTENCY, "avg_chunk_coherence" : params.DEFAULT_CHUNK_COHERENCE, "coherence_variance" : params.DEFAULT_COHERENCE_VARIANCE, "num_sentences" : 0, "num_chunks_analyzed" : 0, } def cleanup(self): """ Clean up resources """ self.sentence_model = None super().cleanup() # Export __all__ = ["SemanticAnalysisMetric"]