| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from functools import lru_cache |
| import logging |
| import random |
| from typing import Any |
|
|
| import nltk |
| import numpy as np |
| from scipy.sparse import csr_matrix, hstack |
| import torch |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
|
|
| from features.text_classifier.model_loader import load_model |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| for resource in ("tokenizers/punkt", "tokenizers/punkt_tab"): |
| try: |
| nltk.data.find(resource) |
| except LookupError: |
| nltk.download(resource.split("/")[-1], quiet=True) |
|
|
|
|
| try: |
| import textstat |
| except ImportError: |
| textstat = None |
|
|
|
|
| @dataclass |
| class SentenceBlendConfig: |
| sentence_blend_weight: float = 0.70 |
| sentence_to_doc_bias: float = 0.35 |
| max_sentence_blend_weight: float = 0.90 |
| max_sentence_to_doc_bias: float = 0.80 |
| random_deviation_pct: float = 2.0 |
|
|
|
|
| class PerplexityCalculator: |
| """Lazy-loaded perplexity calculator for distilgpt2.""" |
|
|
| def __init__(self, model_name: str = "distilgpt2"): |
| self.model_name = model_name |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| self._tokenizer = None |
| self._model = None |
|
|
| def _load(self) -> None: |
| if self._model is not None and self._tokenizer is not None: |
| return |
|
|
| logger.info("Loading perplexity model: %s", self.model_name) |
| self._tokenizer = AutoTokenizer.from_pretrained(self.model_name) |
| self._model = AutoModelForCausalLM.from_pretrained(self.model_name).to(self.device) |
| self._model.eval() |
| logger.info("Perplexity model loaded on %s", self.device) |
|
|
| def calculate(self, text: str, max_length: int = 512) -> float: |
| try: |
| self._load() |
| encodings = self._tokenizer( |
| text, |
| return_tensors="pt", |
| truncation=True, |
| max_length=max_length, |
| ) |
| input_ids = encodings.input_ids.to(self.device) |
|
|
| with torch.no_grad(): |
| outputs = self._model(input_ids, labels=input_ids) |
| loss = outputs.loss |
| perplexity = torch.exp(loss).item() |
|
|
| return min(float(perplexity), 10000.0) |
| except Exception as exc: |
| logger.warning("Perplexity fallback used due to error: %s", exc) |
| return 100.0 |
|
|
|
|
| _perplexity_calc = PerplexityCalculator() |
|
|
|
|
| @lru_cache(maxsize=20000) |
| def _cached_perplexity(cleaned_text: str) -> float: |
| return _perplexity_calc.calculate(cleaned_text) |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _get_model_artifacts() -> tuple[Any, Any, Any, Any, list[str], dict[str, Any]]: |
| return load_model() |
|
|
|
|
| def normalize_text(text: str) -> str: |
| return " ".join(str(text).split()).strip() |
|
|
|
|
| def split_into_sentences(text: str) -> list[str]: |
| cleaned = normalize_text(text) |
| if not cleaned: |
| return [] |
| sentences = [s.strip() for s in nltk.sent_tokenize(cleaned) if s.strip()] |
| return sentences if sentences else [cleaned] |
|
|
|
|
| def extract_burstiness_features(text: str) -> dict[str, float]: |
| sentences = split_into_sentences(text) |
| if not sentences: |
| return { |
| "burst_mean": 0.0, |
| "burst_std": 0.0, |
| "burst_max": 0.0, |
| "burst_min": 0.0, |
| "burst_range": 0.0, |
| } |
|
|
| lengths = np.array([len(s.split()) for s in sentences], dtype=float) |
| return { |
| "burst_mean": float(np.mean(lengths)), |
| "burst_std": float(np.std(lengths)), |
| "burst_max": float(np.max(lengths)), |
| "burst_min": float(np.min(lengths)), |
| "burst_range": float(np.max(lengths) - np.min(lengths)), |
| } |
|
|
|
|
| def extract_stylometry_features(text: str) -> dict[str, float]: |
| words = text.split() |
| num_words = len(words) |
| num_chars = len(text) |
| num_sentences = max(len(split_into_sentences(text)), 1) |
|
|
| avg_word_len = float(np.mean([len(w) for w in words])) if words else 0.0 |
| avg_sent_len = float(num_words / num_sentences) |
|
|
| unique_words = len(set(words)) |
| lexical_diversity = float(unique_words / num_words) if num_words > 0 else 0.0 |
|
|
| num_punct = sum(1 for c in text if c in ".,!?;:") |
| punct_ratio = float(num_punct / num_chars) if num_chars > 0 else 0.0 |
|
|
| num_caps = sum(1 for c in text if c.isupper()) |
| caps_ratio = float(num_caps / num_chars) if num_chars > 0 else 0.0 |
|
|
| if textstat is not None: |
| try: |
| flesch_reading = float(textstat.flesch_reading_ease(text)) |
| flesch_grade = float(textstat.flesch_kincaid_grade(text)) |
| except Exception: |
| flesch_reading = 50.0 |
| flesch_grade = 8.0 |
| else: |
| flesch_reading = 50.0 |
| flesch_grade = 8.0 |
|
|
| return { |
| "num_words": float(num_words), |
| "num_chars": float(num_chars), |
| "num_sentences": float(num_sentences), |
| "avg_word_len": avg_word_len, |
| "avg_sent_len": avg_sent_len, |
| "lexical_diversity": lexical_diversity, |
| "punct_ratio": punct_ratio, |
| "caps_ratio": caps_ratio, |
| "flesch_reading": flesch_reading, |
| "flesch_grade": flesch_grade, |
| } |
|
|
|
|
| def extract_all_features(text: str, calc_perplexity: bool = True) -> dict[str, float]: |
| cleaned = normalize_text(text) |
| features: dict[str, float] = {} |
|
|
| if calc_perplexity: |
| features["perplexity"] = _cached_perplexity(cleaned) |
| else: |
| features["perplexity"] = 100.0 |
|
|
| features.update(extract_burstiness_features(cleaned)) |
| features.update(extract_stylometry_features(cleaned)) |
| return features |
|
|
|
|
| def _predict_ai_probability(text: str) -> tuple[float, float]: |
| ( |
| loaded_classifier, |
| loaded_scaler, |
| loaded_word_vectorizer, |
| loaded_char_vectorizer, |
| loaded_features, |
| loaded_metadata, |
| ) = _get_model_artifacts() |
|
|
| calc_perplexity = bool(loaded_metadata.get("num_engineered_features", 0) > 0) |
| features = extract_all_features(text, calc_perplexity=calc_perplexity) |
|
|
| feature_vector = np.array([features[name] for name in loaded_features], dtype=float).reshape(1, -1) |
| feature_scaled = loaded_scaler.transform(feature_vector) |
|
|
| word_vec = loaded_word_vectorizer.transform([text]) |
| char_vec = loaded_char_vectorizer.transform([text]) |
| num_vec = csr_matrix(feature_scaled) |
| hybrid_vec = hstack([word_vec, char_vec, num_vec], format="csr") |
|
|
| if hasattr(loaded_classifier, "predict_proba"): |
| proba = loaded_classifier.predict_proba(hybrid_vec)[0] |
| ai_prob = float(proba[1]) |
| else: |
| score = float(loaded_classifier.decision_function(hybrid_vec)[0]) |
| ai_prob = float(1.0 / (1.0 + np.exp(-score))) |
|
|
| perplexity = float(features.get("perplexity", 100.0)) |
| return ai_prob, perplexity |
|
|
|
|
| def classify_text(text: str) -> tuple[str, float, float]: |
| """Return (label, perplexity, ai_likelihood_percent).""" |
| cleaned = normalize_text(text) |
| if not cleaned: |
| raise ValueError("Input text is empty") |
|
|
| ai_prob, perplexity = _predict_ai_probability(cleaned) |
| ai_likelihood = round(ai_prob * 100.0, 2) |
| label = "AI" if ai_likelihood >= 50.0 else "Human" |
| return label, perplexity, ai_likelihood |
|
|
|
|
| def analyze_text_with_sentences( |
| text: str, |
| ) -> dict[str, Any]: |
| text = normalize_text(text) |
| overall_classification, overall_perplexity, overall_ai_likelihood = classify_text(text) |
| sentences = split_into_sentences(text) |
| if not sentences: |
| raise ValueError("Input text contains no valid sentences") |
| |
| sentence_results = [] |
| for sentence in sentences: |
| try: |
| label, perplexity, ai_likelihood = classify_text(sentence) |
| sentence_results.append( |
| { |
| "sentence": sentence, |
| "label": label, |
| "perplexity": perplexity, |
| "ai_likelihood": ai_likelihood, |
| } |
| ) |
| except Exception as exc: |
| logger.warning("Error analyzing sentence: %s", exc) |
| sentence_results.append( |
| { |
| "sentence": sentence, |
| "label": "Error", |
| "perplexity": None, |
| "ai_likelihood": None, |
| } |
| ) |
| return{ |
| "sentences": sentence_results, |
| "summary": { |
| "overall": { |
| "label": overall_classification, |
| "perplexity": overall_perplexity, |
| "ai_likelihood": overall_ai_likelihood, |
| } |
| }, |
|
|
| } |
| |
|
|