text_classificators / src /text_preprocessing.py
theformatisvalid's picture
Upload 7 files
2153792 verified
import re
import string
from typing import List, Optional, Union, Dict, Any, Callable
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk import download as nltk_download
from nltk.stem import WordNetLemmatizer
import spacy
from gensim.models import KeyedVectors
from transformers import AutoTokenizer, AutoModel
import torch
import emoji
print('PREPROCESSING IMPORTED')
try:
nltk_download('punkt', quiet=True)
nltk_download('stopwords', quiet=True)
nltk_download('wordnet', quiet=True)
except Exception as e:
print(f"Warning: NLTK data download failed: {e}")
_SPACY_MODEL = None
_NLTK_LEMMATIZER = None
_BERT_TOKENIZER = None
_BERT_MODEL = None
def _load_spacy_model(lang: str = "en_core_web_sm"):
global _SPACY_MODEL
if _SPACY_MODEL is None:
try:
_SPACY_MODEL = spacy.load(lang)
except OSError:
raise ValueError(
f"spaCy model '{lang}' not found. Please install it via: python -m spacy download {lang}"
)
return _SPACY_MODEL
def _load_nltk_lemmatizer():
global _NLTK_LEMMATIZER
if _NLTK_LEMMATIZER is None:
_NLTK_LEMMATIZER = WordNetLemmatizer()
return _NLTK_LEMMATIZER
def _load_bert_model(model_name: str = "bert-base-uncased"):
global _BERT_TOKENIZER, _BERT_MODEL
if _BERT_TOKENIZER is None or _BERT_MODEL is None:
_BERT_TOKENIZER = AutoTokenizer.from_pretrained(model_name)
_BERT_MODEL = AutoModel.from_pretrained(model_name)
return _BERT_TOKENIZER, _BERT_MODEL
def clean_text(text: str) -> str:
text = re.sub(r"<[^>]+>", "", text)
text = re.sub(r"https?://\S+|www\.\S+", "", text)
text = "".join(ch for ch in text if ch in string.printable)
text = re.sub(r"\s+", " ", text).strip()
return text
def replace_emojis(text: str) -> str:
return emoji.demojize(text, delimiters=(" ", " "))
def preprocess_text(
text: str,
lang: str = "en",
remove_stopwords: bool = True,
use_spacy: bool = True,
lemmatize: bool = True,
emoji_to_text: bool = True,
lowercase: bool = True,
spacy_model: Optional[str] = None,
replace_entities: bool = False # ← новая опция: по умолчанию НЕ заменяем числа/URL
) -> List[str]:
import re
import string
if emoji_to_text:
text = replace_emojis(text)
text = re.sub(r"<[^>]+>", "", text)
text = re.sub(r"[^\w\s]", " ", text) # заменяем НЕ-слова и НЕ-пробелы на пробел
text = re.sub(r"\s+", " ", text).strip()
if replace_entities:
text = re.sub(r"\b\d+\b", "<NUM>", text)
text = re.sub(r"https?://\S+|www\.\S+", "<URL>", text)
text = re.sub(r"\S+@\S+", "<EMAIL>", text)
if lowercase:
text = text.lower()
if use_spacy:
spacy_lang = spacy_model or ("en_core_web_sm" if lang == "en" else f"{lang}_core_news_sm")
nlp = _load_spacy_model(spacy_lang)
doc = nlp(text)
if lemmatize:
tokens = [token.lemma_ for token in doc if not token.is_space and not token.is_punct]
else:
tokens = [token.text for token in doc if not token.is_space and not token.is_punct]
if remove_stopwords:
tokens = [token for token in tokens if not nlp.vocab[token].is_stop]
else:
tokens = word_tokenize(text)
if lemmatize:
lemmatizer = _load_nltk_lemmatizer()
tokens = [lemmatizer.lemmatize(token) for token in tokens]
if remove_stopwords:
stop_words = set(stopwords.words(lang)) if lang in stopwords.fileids() else set()
tokens = [token for token in tokens if token not in stop_words]
tokens = [token for token in tokens if token not in string.punctuation and len(token) > 0]
return tokens
class TextVectorizer:
def __init__(self):
self.bow_vectorizer = None
self.tfidf_vectorizer = None
def bow(self, texts: List[str], **kwargs) -> np.ndarray:
self.bow_vectorizer = CountVectorizer(**kwargs)
return self.bow_vectorizer.fit_transform(texts).toarray()
def tfidf(self, texts: List[str], max_features: int = 5000, **kwargs) -> np.ndarray:
kwargs['max_features'] = max_features
self.tfidf_vectorizer = TfidfVectorizer(lowercase=False, **kwargs)
return self.tfidf_vectorizer.fit_transform(texts).toarray()
def ngrams(self, texts: List[str], ngram_range: tuple = (1, 2), **kwargs) -> np.ndarray:
kwargs.setdefault("ngram_range", ngram_range)
return self.tfidf(texts, **kwargs)
class EmbeddingVectorizer:
def __init__(self):
self.word2vec_model = None
self.fasttext_model = None
self.glove_vectors = None
def load_word2vec(self, path: str):
self.word2vec_model = KeyedVectors.load_word2vec_format(path, binary=True)
def load_fasttext(self, path: str):
self.fasttext_model = KeyedVectors.load(path)
def load_glove(self, glove_file: str, vocab_size: int = 400000, dim: int = 300):
self.glove_vectors = {}
with open(glove_file, "r", encoding="utf-8") as f:
for i, line in enumerate(f):
if i >= vocab_size:
break
values = line.split()
word = values[0]
vector = np.array(values[1:], dtype="float32")
self.glove_vectors[word] = vector
def _get_word_vector(self, word: str, method: str = "word2vec") -> Optional[np.ndarray]:
if method == "word2vec" and self.word2vec_model and word in self.word2vec_model:
return self.word2vec_model[word]
elif method == "fasttext" and self.fasttext_model and word in self.fasttext_model:
return self.fasttext_model[word]
elif method == "glove" and self.glove_vectors and word in self.glove_vectors:
return self.glove_vectors[word]
return None
def _aggregate_vectors(
self, vectors: List[np.ndarray], strategy: str = "mean"
) -> np.ndarray:
if not vectors:
return np.zeros(300) # default dim
if strategy == "mean":
return np.mean(vectors, axis=0)
elif strategy == "max":
return np.max(vectors, axis=0)
else:
raise ValueError("Strategy must be 'mean' or 'max'")
def get_embeddings(
self,
tokenized_texts: List[List[str]],
method: str = "word2vec",
aggregation: str = "mean",
) -> np.ndarray:
embeddings = []
for tokens in tokenized_texts:
vectors = [
self._get_word_vector(token, method=method) for token in tokens
]
vectors = [v for v in vectors if v is not None]
doc_vec = self._aggregate_vectors(vectors, strategy=aggregation)
embeddings.append(doc_vec)
return np.array(embeddings)
def get_contextual_embeddings(
texts: List[str],
model_name: str = "bert-base-uncased",
aggregation: str = "mean",
device: str = "cpu",
) -> np.ndarray:
tokenizer, model = _load_bert_model(model_name)
model.to(device)
model.eval()
embeddings = []
with torch.no_grad():
for text in texts:
inputs = tokenizer(
text,
return_tensors="pt",
truncation=True,
padding=True,
max_length=512,
)
inputs = {k: v.to(device) for k, v in inputs.items()}
outputs = model(**inputs)
token_embeddings = outputs.last_hidden_state[0].cpu().numpy()
# Exclude [CLS] and [SEP] if needed (simple heuristic: skip first and last)
if len(token_embeddings) > 2:
token_embeddings = token_embeddings[1:-1]
if aggregation == "mean":
doc_emb = np.mean(token_embeddings, axis=0)
elif aggregation == "max":
doc_emb = np.max(token_embeddings, axis=0)
else:
raise ValueError("aggregation must be 'mean' or 'max'")
embeddings.append(doc_emb)
return np.array(embeddings)
def extract_meta_features(texts: Union[List[str], pd.Series]) -> pd.DataFrame:
if isinstance(texts, pd.Series):
texts = texts.tolist()
features = []
for text in texts:
original_len = len(text)
words = text.split()
word_lengths = [len(w) for w in words] if words else [0]
avg_word_len = np.mean(word_lengths)
num_unique_words = len(set(words)) if words else 0
num_punct = sum(1 for c in text if c in string.punctuation)
num_upper = sum(1 for c in text if c.isupper())
num_digits = sum(1 for c in text if c.isdigit())
try:
flesch = np.nan
except Exception:
flesch = np.nan
features.append({
"text_length": original_len,
"avg_word_length": avg_word_len,
"num_unique_words": num_unique_words,
"punctuation_ratio": num_punct / original_len if original_len > 0 else 0,
"uppercase_ratio": num_upper / original_len if original_len > 0 else 0,
"digit_ratio": num_digits / original_len if original_len > 0 else 0,
"flesch_reading_ease": flesch,
})
return pd.DataFrame(features)