import re import string from typing import List, Optional, Union, Dict, Any, Callable import numpy as np import pandas as pd from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer from nltk.corpus import stopwords from nltk.tokenize import word_tokenize from nltk import download as nltk_download from nltk.stem import WordNetLemmatizer import spacy from gensim.models import KeyedVectors from transformers import AutoTokenizer, AutoModel import torch import emoji print('PREPROCESSING IMPORTED') try: nltk_download('punkt', quiet=True) nltk_download('stopwords', quiet=True) nltk_download('wordnet', quiet=True) except Exception as e: print(f"Warning: NLTK data download failed: {e}") _SPACY_MODEL = None _NLTK_LEMMATIZER = None _BERT_TOKENIZER = None _BERT_MODEL = None def _load_spacy_model(lang: str = "en_core_web_sm"): global _SPACY_MODEL if _SPACY_MODEL is None: try: _SPACY_MODEL = spacy.load(lang) except OSError: raise ValueError( f"spaCy model '{lang}' not found. Please install it via: python -m spacy download {lang}" ) return _SPACY_MODEL def _load_nltk_lemmatizer(): global _NLTK_LEMMATIZER if _NLTK_LEMMATIZER is None: _NLTK_LEMMATIZER = WordNetLemmatizer() return _NLTK_LEMMATIZER def _load_bert_model(model_name: str = "bert-base-uncased"): global _BERT_TOKENIZER, _BERT_MODEL if _BERT_TOKENIZER is None or _BERT_MODEL is None: _BERT_TOKENIZER = AutoTokenizer.from_pretrained(model_name) _BERT_MODEL = AutoModel.from_pretrained(model_name) return _BERT_TOKENIZER, _BERT_MODEL def clean_text(text: str) -> str: text = re.sub(r"<[^>]+>", "", text) text = re.sub(r"https?://\S+|www\.\S+", "", text) text = "".join(ch for ch in text if ch in string.printable) text = re.sub(r"\s+", " ", text).strip() return text def replace_emojis(text: str) -> str: return emoji.demojize(text, delimiters=(" ", " ")) def preprocess_text( text: str, lang: str = "en", remove_stopwords: bool = True, use_spacy: bool = True, lemmatize: bool = True, emoji_to_text: bool = True, lowercase: bool = True, spacy_model: Optional[str] = None, replace_entities: bool = False # ← новая опция: по умолчанию НЕ заменяем числа/URL ) -> List[str]: import re import string if emoji_to_text: text = replace_emojis(text) text = re.sub(r"<[^>]+>", "", text) text = re.sub(r"[^\w\s]", " ", text) # заменяем НЕ-слова и НЕ-пробелы на пробел text = re.sub(r"\s+", " ", text).strip() if replace_entities: text = re.sub(r"\b\d+\b", "", text) text = re.sub(r"https?://\S+|www\.\S+", "", text) text = re.sub(r"\S+@\S+", "", text) if lowercase: text = text.lower() if use_spacy: spacy_lang = spacy_model or ("en_core_web_sm" if lang == "en" else f"{lang}_core_news_sm") nlp = _load_spacy_model(spacy_lang) doc = nlp(text) if lemmatize: tokens = [token.lemma_ for token in doc if not token.is_space and not token.is_punct] else: tokens = [token.text for token in doc if not token.is_space and not token.is_punct] if remove_stopwords: tokens = [token for token in tokens if not nlp.vocab[token].is_stop] else: tokens = word_tokenize(text) if lemmatize: lemmatizer = _load_nltk_lemmatizer() tokens = [lemmatizer.lemmatize(token) for token in tokens] if remove_stopwords: stop_words = set(stopwords.words(lang)) if lang in stopwords.fileids() else set() tokens = [token for token in tokens if token not in stop_words] tokens = [token for token in tokens if token not in string.punctuation and len(token) > 0] return tokens class TextVectorizer: def __init__(self): self.bow_vectorizer = None self.tfidf_vectorizer = None def bow(self, texts: List[str], **kwargs) -> np.ndarray: self.bow_vectorizer = CountVectorizer(**kwargs) return self.bow_vectorizer.fit_transform(texts).toarray() def tfidf(self, texts: List[str], max_features: int = 5000, **kwargs) -> np.ndarray: kwargs['max_features'] = max_features self.tfidf_vectorizer = TfidfVectorizer(lowercase=False, **kwargs) return self.tfidf_vectorizer.fit_transform(texts).toarray() def ngrams(self, texts: List[str], ngram_range: tuple = (1, 2), **kwargs) -> np.ndarray: kwargs.setdefault("ngram_range", ngram_range) return self.tfidf(texts, **kwargs) class EmbeddingVectorizer: def __init__(self): self.word2vec_model = None self.fasttext_model = None self.glove_vectors = None def load_word2vec(self, path: str): self.word2vec_model = KeyedVectors.load_word2vec_format(path, binary=True) def load_fasttext(self, path: str): self.fasttext_model = KeyedVectors.load(path) def load_glove(self, glove_file: str, vocab_size: int = 400000, dim: int = 300): self.glove_vectors = {} with open(glove_file, "r", encoding="utf-8") as f: for i, line in enumerate(f): if i >= vocab_size: break values = line.split() word = values[0] vector = np.array(values[1:], dtype="float32") self.glove_vectors[word] = vector def _get_word_vector(self, word: str, method: str = "word2vec") -> Optional[np.ndarray]: if method == "word2vec" and self.word2vec_model and word in self.word2vec_model: return self.word2vec_model[word] elif method == "fasttext" and self.fasttext_model and word in self.fasttext_model: return self.fasttext_model[word] elif method == "glove" and self.glove_vectors and word in self.glove_vectors: return self.glove_vectors[word] return None def _aggregate_vectors( self, vectors: List[np.ndarray], strategy: str = "mean" ) -> np.ndarray: if not vectors: return np.zeros(300) # default dim if strategy == "mean": return np.mean(vectors, axis=0) elif strategy == "max": return np.max(vectors, axis=0) else: raise ValueError("Strategy must be 'mean' or 'max'") def get_embeddings( self, tokenized_texts: List[List[str]], method: str = "word2vec", aggregation: str = "mean", ) -> np.ndarray: embeddings = [] for tokens in tokenized_texts: vectors = [ self._get_word_vector(token, method=method) for token in tokens ] vectors = [v for v in vectors if v is not None] doc_vec = self._aggregate_vectors(vectors, strategy=aggregation) embeddings.append(doc_vec) return np.array(embeddings) def get_contextual_embeddings( texts: List[str], model_name: str = "bert-base-uncased", aggregation: str = "mean", device: str = "cpu", ) -> np.ndarray: tokenizer, model = _load_bert_model(model_name) model.to(device) model.eval() embeddings = [] with torch.no_grad(): for text in texts: inputs = tokenizer( text, return_tensors="pt", truncation=True, padding=True, max_length=512, ) inputs = {k: v.to(device) for k, v in inputs.items()} outputs = model(**inputs) token_embeddings = outputs.last_hidden_state[0].cpu().numpy() # Exclude [CLS] and [SEP] if needed (simple heuristic: skip first and last) if len(token_embeddings) > 2: token_embeddings = token_embeddings[1:-1] if aggregation == "mean": doc_emb = np.mean(token_embeddings, axis=0) elif aggregation == "max": doc_emb = np.max(token_embeddings, axis=0) else: raise ValueError("aggregation must be 'mean' or 'max'") embeddings.append(doc_emb) return np.array(embeddings) def extract_meta_features(texts: Union[List[str], pd.Series]) -> pd.DataFrame: if isinstance(texts, pd.Series): texts = texts.tolist() features = [] for text in texts: original_len = len(text) words = text.split() word_lengths = [len(w) for w in words] if words else [0] avg_word_len = np.mean(word_lengths) num_unique_words = len(set(words)) if words else 0 num_punct = sum(1 for c in text if c in string.punctuation) num_upper = sum(1 for c in text if c.isupper()) num_digits = sum(1 for c in text if c.isdigit()) try: flesch = np.nan except Exception: flesch = np.nan features.append({ "text_length": original_len, "avg_word_length": avg_word_len, "num_unique_words": num_unique_words, "punctuation_ratio": num_punct / original_len if original_len > 0 else 0, "uppercase_ratio": num_upper / original_len if original_len > 0 else 0, "digit_ratio": num_digits / original_len if original_len > 0 else 0, "flesch_reading_ease": flesch, }) return pd.DataFrame(features)