NLP_Homework_1 / src /classical_vectorizers.py
Kolesnikov Dmitry
feat: Вторая лабораторка
83b4881
"""
Классические методы векторизации текста: One-Hot, Bag-of-Words, TF-IDF с поддержкой n-грамм.
Предоставляет единый интерфейс fit/transform, вычисление метрик разреженности и размерности,
а также удобные функции для сравнения конфигураций и экспорта результатов.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Any
import numpy as np
import pandas as pd
from scipy import sparse
from sklearn.feature_extraction import DictVectorizer
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
@dataclass
class VectorizationConfig:
method: str # onehot | bow | tfidf
ngram_range: Tuple[int, int] = (1, 1)
lowercase: bool = True
min_df: int | float = 1
max_df: int | float = 1.0
max_features: Optional[int] = None
analyzer: str = "word" # word | char | char_wb
smooth_idf: bool = True # для TF-IDF
sublinear_tf: bool = False # для TF-IDF
@dataclass
class VectorizationReport:
method_name: str
ngram_range: str
num_docs: int
num_features: int
nnz: int
density: float
build_time_sec: float
transform_time_sec: float
memory_estimate_mb: float
class ClassicalVectorizers:
"""Универсальный интерфейс для классических векторизаторов текста."""
def __init__(self, config: VectorizationConfig):
self.config = config
self.vectorizer = self._create_vectorizer(config)
def _create_vectorizer(self, config: VectorizationConfig):
method = config.method.lower()
if method == "bow":
return CountVectorizer(
ngram_range=config.ngram_range,
lowercase=config.lowercase,
min_df=config.min_df,
max_df=config.max_df,
max_features=config.max_features,
analyzer=config.analyzer,
)
if method == "tfidf":
return TfidfVectorizer(
ngram_range=config.ngram_range,
lowercase=config.lowercase,
min_df=config.min_df,
max_df=config.max_df,
max_features=config.max_features,
analyzer=config.analyzer,
smooth_idf=config.smooth_idf,
sublinear_tf=config.sublinear_tf,
)
if method == "onehot":
# Реализуем через словари токенов -> 1 и DictVectorizer
return DictVectorizer(sparse=True)
raise ValueError(f"Неизвестный метод векторизации: {config.method}")
@staticmethod
def _texts_to_onehot_dicts(texts: List[str], ngram_range: Tuple[int, int]) -> List[Dict[str, int]]:
"""Преобразует тексты в словари признаков для one-hot (включая n-граммы)."""
def extract_ngrams(tokens: List[str], n: int) -> List[str]:
return ["_".join(tokens[i : i + n]) for i in range(len(tokens) - n + 1)]
dicts: List[Dict[str, int]] = []
n_min, n_max = ngram_range
for text in texts:
tokens = text.split()
features: Dict[str, int] = {}
for n in range(n_min, n_max + 1):
if n == 1:
grams = tokens
else:
grams = extract_ngrams(tokens, n)
for g in grams:
features[g] = 1
dicts.append(features)
return dicts
@staticmethod
def _sparsity_metrics(X: sparse.spmatrix) -> Tuple[int, int, float, float]:
nnz = int(X.nnz)
num_docs, num_features = X.shape
total = num_docs * num_features
density = (nnz / total) if total > 0 else 0.0
mem_bytes = (nnz * (8 + 8 + 8)) # грубая оценка COO/CSR: data+indices+indptr
mem_mb = mem_bytes / (1024**2)
return num_features, nnz, density, mem_mb
def fit_transform(self, texts: List[str]) -> Tuple[sparse.spmatrix, VectorizationReport]:
start = time.time()
if isinstance(self.vectorizer, DictVectorizer):
dicts = self._texts_to_onehot_dicts(texts, self.config.ngram_range)
X = self.vectorizer.fit_transform(dicts)
else:
X = self.vectorizer.fit_transform(texts)
build_time = time.time() - start
# Дополнительное преобразование для оценки времени transform
t0 = time.time()
if isinstance(self.vectorizer, DictVectorizer):
_ = self.vectorizer.transform(dicts)
else:
_ = self.vectorizer.transform(texts)
transform_time = time.time() - t0
num_features, nnz, density, mem_mb = self._sparsity_metrics(X)
report = VectorizationReport(
method_name=self.config.method,
ngram_range=f"{self.config.ngram_range[0]}-{self.config.ngram_range[1]}",
num_docs=len(texts),
num_features=num_features,
nnz=nnz,
density=round(density, 6),
build_time_sec=round(build_time, 4),
transform_time_sec=round(transform_time, 4),
memory_estimate_mb=round(mem_mb, 2),
)
return X, report
def transform(self, texts: List[str]) -> sparse.spmatrix:
if isinstance(self.vectorizer, DictVectorizer):
dicts = self._texts_to_onehot_dicts(texts, self.config.ngram_range)
return self.vectorizer.transform(dicts)
return self.vectorizer.transform(texts)
def get_feature_names(self) -> List[str]:
if hasattr(self.vectorizer, "get_feature_names_out"):
return list(self.vectorizer.get_feature_names_out())
if hasattr(self.vectorizer, "feature_names_"):
return list(self.vectorizer.feature_names_)
return []
def compare_vectorizers(
texts: List[str],
configs: List[VectorizationConfig],
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""
Сравнивает несколько конфигураций векторизации и возвращает таблицу метрик.
Дополнительно возвращает словарь с матрицами признаков по ключу <method|ngram>.
"""
results: List[VectorizationReport] = []
matrices: Dict[str, Any] = {}
for cfg in configs:
vec = ClassicalVectorizers(cfg)
X, rep = vec.fit_transform(texts)
key = f"{cfg.method}:{cfg.ngram_range}"
matrices[key] = {"X": X, "vectorizer": vec}
results.append(rep)
df = pd.DataFrame([
{
"Метод": r.method_name,
"N-граммы": r.ngram_range,
"Документов": r.num_docs,
"Признаков": r.num_features,
"Ненулевых": r.nnz,
"Плотность": r.density,
"Время fit (с)": r.build_time_sec,
"Время transform (с)": r.transform_time_sec,
"Память (MB) ~": r.memory_estimate_mb,
}
for r in results
])
return df.sort_values(["Метод", "N-граммы"]).reset_index(drop=True), matrices
def save_metrics(df: pd.DataFrame, output_csv: str) -> None:
df.to_csv(output_csv, index=False, encoding="utf-8")
if __name__ == "__main__":
sample = [
"Россия и Франция подписали новое соглашение по энергетике.",
"Путин встретился с президентом Турции и обсудил поставки газа.",
"В Москве пройдут переговоры министров иностранных дел.",
]
configs = [
VectorizationConfig(method="onehot", ngram_range=(1, 1)),
VectorizationConfig(method="bow", ngram_range=(1, 2)),
VectorizationConfig(method="tfidf", ngram_range=(1, 3), sublinear_tf=True),
]
df, _ = compare_vectorizers(sample, configs)
print(df)