| | import logging |
| | import math |
| | import numpy as np |
| | from textblob import TextBlob |
| | import textstat |
| | from transformers import GPT2LMHeadModel, GPT2TokenizerFast |
| | import torch |
| | import re |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class NLPService: |
| | _instance = None |
| | _perplex_model = None |
| | _perplex_tokenizer = None |
| |
|
| | def __new__(cls): |
| | if cls._instance is None: |
| | cls._instance = super(NLPService, cls).__new__(cls) |
| | return cls._instance |
| |
|
| | def _load_model(self): |
| | """Lazy load the model to avoid huge startup time.""" |
| | if self._perplex_model is None: |
| | logger.info("Loading NLP models (DistilGPT2)...") |
| | try: |
| | model_id = 'distilgpt2' |
| | self._perplex_model = GPT2LMHeadModel.from_pretrained(model_id) |
| | self._perplex_tokenizer = GPT2TokenizerFast.from_pretrained(model_id) |
| | logger.info("NLP models loaded successfully.") |
| | except Exception as e: |
| | logger.error(f"Failed to load NLP models: {e}") |
| | raise e |
| |
|
| | MAX_PERPLEXITY_CHARS = 50000 |
| |
|
| | def calculate_perplexity(self, text: str) -> float: |
| | """ |
| | Calculate perplexity of the text using a small GPT-2 model. |
| | Lower perplexity = more likely to be generated by AI. |
| | """ |
| | if not text or len(text.strip()) < 10: |
| | return 0.0 |
| |
|
| | if len(text) > self.MAX_PERPLEXITY_CHARS: |
| | text = text[:self.MAX_PERPLEXITY_CHARS] |
| |
|
| | self._load_model() |
| | encodings = self._perplex_tokenizer( |
| | text, |
| | return_tensors='pt', |
| | truncation=True, |
| | max_length=self.MAX_PERPLEXITY_CHARS |
| | ) |
| | |
| | max_length = self._perplex_model.config.n_positions |
| | stride = 512 |
| | seq_len = encodings.input_ids.size(1) |
| |
|
| | nlls = [] |
| | prev_end_loc = 0 |
| | |
| | for begin_loc in range(0, seq_len, stride): |
| | end_loc = min(begin_loc + max_length, seq_len) |
| | trg_len = end_loc - prev_end_loc |
| | |
| | input_ids = encodings.input_ids[:, begin_loc:end_loc] |
| | |
| | |
| | if input_ids.size(1) > max_length: |
| | input_ids = input_ids[:, :max_length] |
| | |
| | target_ids = input_ids.clone() |
| | target_ids[:, :-trg_len] = -100 |
| |
|
| | with torch.no_grad(): |
| | outputs = self._perplex_model(input_ids, labels=target_ids) |
| | neg_log_likelihood = outputs.loss |
| |
|
| | nlls.append(neg_log_likelihood) |
| | prev_end_loc = end_loc |
| | if end_loc == seq_len: |
| | break |
| |
|
| | if not nlls: |
| | return 0.0 |
| |
|
| | ppl = torch.exp(torch.stack(nlls).mean()) |
| | return round(float(ppl), 2) |
| |
|
| | def analyze_sentiment(self, text: str) -> dict: |
| | """Returns Polarity (-1 to 1) and Subjectivity (0 to 1).""" |
| | blob = TextBlob(text) |
| | return { |
| | "polarity": round(blob.sentiment.polarity, 2), |
| | "subjectivity": round(blob.sentiment.subjectivity, 2) |
| | } |
| |
|
| | def calculate_lexical_diversity(self, text: str) -> float: |
| | """Type-Token Ratio (TTR). Higher = richer vocabulary.""" |
| | if not text: |
| | return 0.0 |
| | |
| | words = re.findall(r'\w+', text.lower()) |
| | if not words: |
| | return 0.0 |
| | |
| | unique_words = set(words) |
| | return round(len(unique_words) / len(words), 3) |
| |
|
| | def calculate_burstiness(self, text: str) -> float: |
| | """Variation in sentence length. proxy for AI detection.""" |
| | blob = TextBlob(text) |
| | |
| | try: |
| | sentences = blob.sentences |
| | except Exception as e: |
| | logger.error(f"TextBlob/NLTK error: {e}") |
| | return 0.0 |
| |
|
| | if not sentences or len(sentences) < 2: |
| | return 0.0 |
| | |
| | lengths = [len(s.words) for s in sentences] |
| | std_dev = np.std(lengths) |
| | mean = np.mean(lengths) |
| | |
| | if mean == 0: |
| | return 0.0 |
| | |
| | return round(float(std_dev / mean), 3) |
| |
|
| | def compute_all_metrics(self, text: str) -> dict: |
| | return { |
| | "perplexity": self.calculate_perplexity(text), |
| | "sentiment": self.analyze_sentiment(text), |
| | "lexical_diversity": self.calculate_lexical_diversity(text), |
| | "burstiness": self.calculate_burstiness(text), |
| | "readability": textstat.flesch_reading_ease(text) |
| | } |