File size: 8,348 Bytes
83b4881
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
"""
Классические методы векторизации текста: One-Hot, Bag-of-Words, TF-IDF с поддержкой n-грамм.
Предоставляет единый интерфейс fit/transform, вычисление метрик разреженности и размерности,
а также удобные функции для сравнения конфигураций и экспорта результатов.
"""

from __future__ import annotations

import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Any

import numpy as np
import pandas as pd
from scipy import sparse
from sklearn.feature_extraction import DictVectorizer
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer


@dataclass
class VectorizationConfig:
    method: str  # onehot | bow | tfidf
    ngram_range: Tuple[int, int] = (1, 1)
    lowercase: bool = True
    min_df: int | float = 1
    max_df: int | float = 1.0
    max_features: Optional[int] = None
    analyzer: str = "word"  # word | char | char_wb
    smooth_idf: bool = True  # для TF-IDF
    sublinear_tf: bool = False  # для TF-IDF


@dataclass
class VectorizationReport:
    method_name: str
    ngram_range: str
    num_docs: int
    num_features: int
    nnz: int
    density: float
    build_time_sec: float
    transform_time_sec: float
    memory_estimate_mb: float


class ClassicalVectorizers:
    """Универсальный интерфейс для классических векторизаторов текста."""

    def __init__(self, config: VectorizationConfig):
        self.config = config
        self.vectorizer = self._create_vectorizer(config)

    def _create_vectorizer(self, config: VectorizationConfig):
        method = config.method.lower()

        if method == "bow":
            return CountVectorizer(
                ngram_range=config.ngram_range,
                lowercase=config.lowercase,
                min_df=config.min_df,
                max_df=config.max_df,
                max_features=config.max_features,
                analyzer=config.analyzer,
            )

        if method == "tfidf":
            return TfidfVectorizer(
                ngram_range=config.ngram_range,
                lowercase=config.lowercase,
                min_df=config.min_df,
                max_df=config.max_df,
                max_features=config.max_features,
                analyzer=config.analyzer,
                smooth_idf=config.smooth_idf,
                sublinear_tf=config.sublinear_tf,
            )

        if method == "onehot":
            # Реализуем через словари токенов -> 1 и DictVectorizer
            return DictVectorizer(sparse=True)

        raise ValueError(f"Неизвестный метод векторизации: {config.method}")

    @staticmethod
    def _texts_to_onehot_dicts(texts: List[str], ngram_range: Tuple[int, int]) -> List[Dict[str, int]]:
        """Преобразует тексты в словари признаков для one-hot (включая n-граммы)."""
        def extract_ngrams(tokens: List[str], n: int) -> List[str]:
            return ["_".join(tokens[i : i + n]) for i in range(len(tokens) - n + 1)]

        dicts: List[Dict[str, int]] = []
        n_min, n_max = ngram_range
        for text in texts:
            tokens = text.split()
            features: Dict[str, int] = {}
            for n in range(n_min, n_max + 1):
                if n == 1:
                    grams = tokens
                else:
                    grams = extract_ngrams(tokens, n)
                for g in grams:
                    features[g] = 1
            dicts.append(features)
        return dicts

    @staticmethod
    def _sparsity_metrics(X: sparse.spmatrix) -> Tuple[int, int, float, float]:
        nnz = int(X.nnz)
        num_docs, num_features = X.shape
        total = num_docs * num_features
        density = (nnz / total) if total > 0 else 0.0
        mem_bytes = (nnz * (8 + 8 + 8))  # грубая оценка COO/CSR: data+indices+indptr
        mem_mb = mem_bytes / (1024**2)
        return num_features, nnz, density, mem_mb

    def fit_transform(self, texts: List[str]) -> Tuple[sparse.spmatrix, VectorizationReport]:
        start = time.time()

        if isinstance(self.vectorizer, DictVectorizer):
            dicts = self._texts_to_onehot_dicts(texts, self.config.ngram_range)
            X = self.vectorizer.fit_transform(dicts)
        else:
            X = self.vectorizer.fit_transform(texts)

        build_time = time.time() - start
        # Дополнительное преобразование для оценки времени transform
        t0 = time.time()
        if isinstance(self.vectorizer, DictVectorizer):
            _ = self.vectorizer.transform(dicts)
        else:
            _ = self.vectorizer.transform(texts)
        transform_time = time.time() - t0

        num_features, nnz, density, mem_mb = self._sparsity_metrics(X)

        report = VectorizationReport(
            method_name=self.config.method,
            ngram_range=f"{self.config.ngram_range[0]}-{self.config.ngram_range[1]}",
            num_docs=len(texts),
            num_features=num_features,
            nnz=nnz,
            density=round(density, 6),
            build_time_sec=round(build_time, 4),
            transform_time_sec=round(transform_time, 4),
            memory_estimate_mb=round(mem_mb, 2),
        )
        return X, report

    def transform(self, texts: List[str]) -> sparse.spmatrix:
        if isinstance(self.vectorizer, DictVectorizer):
            dicts = self._texts_to_onehot_dicts(texts, self.config.ngram_range)
            return self.vectorizer.transform(dicts)
        return self.vectorizer.transform(texts)

    def get_feature_names(self) -> List[str]:
        if hasattr(self.vectorizer, "get_feature_names_out"):
            return list(self.vectorizer.get_feature_names_out())
        if hasattr(self.vectorizer, "feature_names_"):
            return list(self.vectorizer.feature_names_)
        return []


def compare_vectorizers(
    texts: List[str],
    configs: List[VectorizationConfig],
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Сравнивает несколько конфигураций векторизации и возвращает таблицу метрик.
    Дополнительно возвращает словарь с матрицами признаков по ключу <method|ngram>.
    """
    results: List[VectorizationReport] = []
    matrices: Dict[str, Any] = {}

    for cfg in configs:
        vec = ClassicalVectorizers(cfg)
        X, rep = vec.fit_transform(texts)
        key = f"{cfg.method}:{cfg.ngram_range}"
        matrices[key] = {"X": X, "vectorizer": vec}
        results.append(rep)

    df = pd.DataFrame([
        {
            "Метод": r.method_name,
            "N-граммы": r.ngram_range,
            "Документов": r.num_docs,
            "Признаков": r.num_features,
            "Ненулевых": r.nnz,
            "Плотность": r.density,
            "Время fit (с)": r.build_time_sec,
            "Время transform (с)": r.transform_time_sec,
            "Память (MB) ~": r.memory_estimate_mb,
        }
        for r in results
    ])
    return df.sort_values(["Метод", "N-граммы"]).reset_index(drop=True), matrices


def save_metrics(df: pd.DataFrame, output_csv: str) -> None:
    df.to_csv(output_csv, index=False, encoding="utf-8")


if __name__ == "__main__":
    sample = [
        "Россия и Франция подписали новое соглашение по энергетике.",
        "Путин встретился с президентом Турции и обсудил поставки газа.",
        "В Москве пройдут переговоры министров иностранных дел.",
    ]
    configs = [
        VectorizationConfig(method="onehot", ngram_range=(1, 1)),
        VectorizationConfig(method="bow", ngram_range=(1, 2)),
        VectorizationConfig(method="tfidf", ngram_range=(1, 3), sublinear_tf=True),
    ]
    df, _ = compare_vectorizers(sample, configs)
    print(df)