| """ |
| Extracts a numerical style vector from any text sample. |
| The style vector encodes the author's unique writing fingerprint |
| and is used both to condition the generation model and to evaluate |
| style preservation after correction. |
| |
| Style vector dimensions (total: 512 after projection): |
| Raw features (~40) → MLP projection → 512-dim dense vector |
| |
| Raw features: |
| - sentence_length_mean, sentence_length_std, sentence_length_skew [3] |
| - word_length_mean, word_length_std [2] |
| - type_token_ratio (TTR) [1] |
| - passive_voice_ratio [1] |
| - active_voice_ratio [1] |
| - subordinate_clause_ratio [1] |
| - avg_dependency_tree_depth [1] |
| - hedging_frequency (per 100 words) [1] |
| - discourse_marker_counts [however, therefore, moreover, ...] [20] |
| - formality_score (0-1) [1] |
| - lexical_density [1] |
| - nominalization_ratio [1] |
| - question_sentence_ratio [1] |
| - exclamation_ratio [1] |
| - first_person_ratio [1] |
| - third_person_ratio [1] |
| - academic_word_coverage [1] |
| - avg_syllables_per_word [1] |
| - flesch_reading_ease [1] |
| """ |
|
|
| import spacy |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from typing import List, Dict, Optional |
| from scipy import stats |
| from loguru import logger |
|
|
|
|
| HEDGING_WORDS = { |
| "perhaps", "possibly", "probably", "might", "may", "could", "seem", |
| "appears", "suggests", "indicates", "tend", "often", "generally", |
| "approximately", "roughly", "somewhat", "relatively", "fairly", |
| } |
|
|
| DISCOURSE_MARKERS = [ |
| "however", "therefore", "moreover", "furthermore", "consequently", |
| "nevertheless", "nonetheless", "additionally", "alternatively", |
| "subsequently", "previously", "similarly", "conversely", "thus", |
| "hence", "accordingly", "meanwhile", "indeed", "notably", "specifically", |
| ] |
|
|
| NOMINALISATION_SUFFIXES = ( |
| "tion", "sion", "ment", "ness", "ity", "ance", "ence", |
| "hood", "ship", "ism", "al", "ure", |
| ) |
|
|
| FEATURE_DIM = 41 |
|
|
|
|
| class StyleProjectionMLP(nn.Module): |
| """Projects raw feature vector to 512-dim style embedding.""" |
|
|
| def __init__(self, input_dim: int = 41, hidden_dim: int = 256, output_dim: int = 512): |
| super().__init__() |
| self.net = nn.Sequential( |
| nn.Linear(input_dim, hidden_dim), |
| nn.LayerNorm(hidden_dim), |
| nn.GELU(), |
| nn.Dropout(0.1), |
| nn.Linear(hidden_dim, output_dim), |
| nn.LayerNorm(output_dim), |
| ) |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| return self.net(x) |
|
|
|
|
| class StyleFingerprinter: |
| """Extracts style fingerprint vectors from text samples.""" |
|
|
| def __init__(self, spacy_model: str = "en_core_web_trf", awl_path: str = "data/awl/coxhead_awl.txt"): |
| |
| try: |
| self.nlp = spacy.load(spacy_model) |
| except OSError: |
| logger.warning(f"spaCy model '{spacy_model}' not found, falling back to 'en_core_web_sm'") |
| self.nlp = spacy.load("en_core_web_sm") |
|
|
| |
| self.awl = self._load_awl(awl_path) |
|
|
| |
| self.projection = StyleProjectionMLP( |
| input_dim=FEATURE_DIM, hidden_dim=256, output_dim=512 |
| ) |
| self.projection.eval() |
| logger.info(f"StyleFingerprinter initialised (AWL size: {len(self.awl)})") |
|
|
| def _load_awl(self, path: str) -> set: |
| """Load Academic Word List from file.""" |
| awl = set() |
| try: |
| with open(path) as f: |
| for line in f: |
| word = line.strip().lower() |
| if word: |
| awl.add(word) |
| except FileNotFoundError: |
| logger.warning(f"AWL file not found at {path}, using empty set") |
| return awl |
|
|
| def _passive_voice_ratio(self, doc) -> float: |
| """Compute ratio of passive voice constructions.""" |
| passive_count = 0 |
| verb_count = 0 |
| for token in doc: |
| if token.pos_ == "VERB": |
| verb_count += 1 |
| if token.dep_ in ("nsubjpass", "auxpass"): |
| passive_count += 1 |
| if verb_count == 0: |
| return 0.0 |
| return passive_count / verb_count |
|
|
| def _avg_dep_tree_depth(self, doc) -> float: |
| """Compute average dependency tree depth across all tokens.""" |
| def _depth(token): |
| d = 0 |
| current = token |
| while current.head != current: |
| d += 1 |
| current = current.head |
| if d > 50: |
| break |
| return d |
|
|
| depths = [_depth(token) for token in doc if not token.is_punct] |
| if not depths: |
| return 0.0 |
| return sum(depths) / len(depths) |
|
|
| def _lexical_density(self, doc) -> float: |
| """Compute ratio of content words to total words.""" |
| content_pos = {"NOUN", "VERB", "ADJ", "ADV"} |
| total = 0 |
| content = 0 |
| for token in doc: |
| if not token.is_punct and not token.is_space: |
| total += 1 |
| if token.pos_ in content_pos: |
| content += 1 |
| if total == 0: |
| return 0.0 |
| return content / total |
|
|
| @staticmethod |
| def _count_syllables(word: str) -> int: |
| """Count syllables in a word using a vowel-group heuristic. |
| Avoids NLTK cmudict which has a known AssertionError bug.""" |
| word = word.lower().strip() |
| if not word: |
| return 1 |
| vowels = "aeiouy" |
| count = 0 |
| prev_vowel = False |
| for char in word: |
| is_vowel = char in vowels |
| if is_vowel and not prev_vowel: |
| count += 1 |
| prev_vowel = is_vowel |
| |
| if word.endswith("e") and count > 1: |
| count -= 1 |
| |
| return max(count, 1) |
|
|
| def _avg_syllables_per_word(self, words: list) -> float: |
| """Average syllables per word.""" |
| if not words: |
| return 0.0 |
| total = sum(self._count_syllables(w) for w in words) |
| return total / len(words) |
|
|
| @staticmethod |
| def _flesch_reading_ease(words: list, sent_lengths: list) -> float: |
| """Compute Flesch Reading Ease score without textstat. |
| Formula: 206.835 - 1.015 * ASL - 84.6 * ASW |
| ASL = average sentence length, ASW = average syllables per word.""" |
| if not words or not sent_lengths: |
| return 0.0 |
| asl = sum(sent_lengths) / max(len(sent_lengths), 1) |
| vowels = "aeiouy" |
| total_syllables = 0 |
| for w in words: |
| w_lower = w.lower() |
| count = 0 |
| prev = False |
| for c in w_lower: |
| v = c in vowels |
| if v and not prev: |
| count += 1 |
| prev = v |
| if w_lower.endswith("e") and count > 1: |
| count -= 1 |
| total_syllables += max(count, 1) |
| asw = total_syllables / max(len(words), 1) |
| return 206.835 - 1.015 * asl - 84.6 * asw |
|
|
| def extract_raw_features(self, text: str) -> Dict[str, float]: |
| """Extract ~40 raw style features from text.""" |
| if not text or not text.strip(): |
| return {f"f_{i}": 0.0 for i in range(FEATURE_DIM)} |
|
|
| doc = self.nlp(text) |
| words = [t.text.lower() for t in doc if not t.is_punct and not t.is_space] |
| word_count = max(len(words), 1) |
|
|
| |
| sentences = list(doc.sents) |
| sent_lengths = [len([t for t in s if not t.is_punct and not t.is_space]) for s in sentences] |
| if not sent_lengths: |
| sent_lengths = [0] |
|
|
| features = {} |
|
|
| |
| features["sentence_length_mean"] = np.mean(sent_lengths) |
| features["sentence_length_std"] = np.std(sent_lengths) if len(sent_lengths) > 1 else 0.0 |
| features["sentence_length_skew"] = float(stats.skew(sent_lengths)) if len(sent_lengths) > 2 else 0.0 |
|
|
| |
| word_lengths = [len(w) for w in words] |
| features["word_length_mean"] = np.mean(word_lengths) if word_lengths else 0.0 |
| features["word_length_std"] = np.std(word_lengths) if len(word_lengths) > 1 else 0.0 |
|
|
| |
| unique_words = set(words) |
| features["type_token_ratio"] = len(unique_words) / word_count |
|
|
| |
| features["passive_voice_ratio"] = self._passive_voice_ratio(doc) |
|
|
| |
| features["active_voice_ratio"] = 1.0 - features["passive_voice_ratio"] |
|
|
| |
| sub_clauses = sum(1 for t in doc if t.dep_ in ("advcl", "relcl", "ccomp", "xcomp", "acl")) |
| features["subordinate_clause_ratio"] = sub_clauses / max(len(sent_lengths), 1) |
|
|
| |
| features["avg_dependency_tree_depth"] = self._avg_dep_tree_depth(doc) |
|
|
| |
| hedging_count = sum(1 for w in words if w in HEDGING_WORDS) |
| features["hedging_frequency"] = (hedging_count / word_count) * 100 |
|
|
| |
| for marker in DISCOURSE_MARKERS: |
| marker_count = words.count(marker) |
| features[f"discourse_{marker}"] = (marker_count / word_count) * 100 |
|
|
| |
| if not hasattr(self, '_formality_clf'): |
| from .formality_classifier import FormalityClassifier |
| self._formality_clf = FormalityClassifier() |
| features["formality_score"] = self._formality_clf.score(text) |
|
|
| |
| features["lexical_density"] = self._lexical_density(doc) |
|
|
| |
| nom_count = sum(1 for w in words if any(w.endswith(s) for s in NOMINALISATION_SUFFIXES)) |
| features["nominalization_ratio"] = nom_count / word_count |
|
|
| |
| question_sents = sum(1 for s in sentences if s.text.strip().endswith("?")) |
| features["question_sentence_ratio"] = question_sents / max(len(sentences), 1) |
|
|
| |
| excl_sents = sum(1 for s in sentences if s.text.strip().endswith("!")) |
| features["exclamation_ratio"] = excl_sents / max(len(sentences), 1) |
|
|
| |
| first_person = {"i", "me", "my", "mine", "myself", "we", "our", "ours"} |
| fp_count = sum(1 for w in words if w in first_person) |
| features["first_person_ratio"] = fp_count / word_count |
|
|
| |
| third_person = {"he", "she", "it", "they", "him", "her", "his", "its", "their", "them"} |
| tp_count = sum(1 for w in words if w in third_person) |
| features["third_person_ratio"] = tp_count / word_count |
|
|
| |
| academic_count = sum(1 for w in words if w in self.awl) |
| features["academic_word_coverage"] = academic_count / word_count |
|
|
| |
| features["avg_syllables_per_word"] = self._avg_syllables_per_word(words) |
|
|
| |
| flesch = self._flesch_reading_ease(words, sent_lengths) |
| features["flesch_reading_ease"] = max(0.0, min(1.0, flesch / 100.0)) |
|
|
| return features |
|
|
| def extract_vector(self, text: str) -> torch.Tensor: |
| """Returns a 512-dim style embedding tensor.""" |
| features = self.extract_raw_features(text) |
|
|
| |
| values = list(features.values()) |
|
|
| |
| if len(values) < FEATURE_DIM: |
| values.extend([0.0] * (FEATURE_DIM - len(values))) |
| else: |
| values = values[:FEATURE_DIM] |
|
|
| |
| feature_tensor = torch.tensor(values, dtype=torch.float32).unsqueeze(0) |
|
|
| with torch.no_grad(): |
| embedding = self.projection(feature_tensor) |
|
|
| |
| embedding = F.normalize(embedding, p=2, dim=-1) |
|
|
| return embedding.squeeze(0) |
|
|
| def blend_vectors( |
| self, |
| user_vec: torch.Tensor, |
| master_vec: Optional[torch.Tensor], |
| alpha: float = 0.6, |
| ) -> torch.Tensor: |
| """ |
| Blend user style with master copy style. |
| alpha = weight given to user's own style (0.6 = user dominates) |
| Formula: target = alpha * user_vec + (1 - alpha) * master_vec |
| """ |
| if master_vec is None: |
| return F.normalize(user_vec, p=2, dim=-1) |
|
|
| blended = alpha * user_vec + (1 - alpha) * master_vec |
| |
| return F.normalize(blended, p=2, dim=-1) |
|
|