Spaces:
Sleeping
Sleeping
| """ | |
| Методы борьбы с дисбалансом классов в текстовых данных: | |
| взвешивание классов, сэмплирование, аугментация текстов. | |
| """ | |
| from __future__ import annotations | |
| from typing import List, Tuple, Dict, Any, Optional | |
| from collections import Counter | |
| import numpy as np | |
| from sklearn.utils import resample | |
| from sklearn.utils.class_weight import compute_class_weight | |
| try: | |
| from imblearn.over_sampling import SMOTE, ADASYN, RandomOverSampler | |
| from imblearn.under_sampling import RandomUnderSampler | |
| IMBLEARN_AVAILABLE = True | |
| except ImportError: | |
| IMBLEARN_AVAILABLE = False | |
| print("⚠️ imbalanced-learn не установлен. SMOTE/ADASYN недоступны.") | |
| try: | |
| import nlpaug.augmenter.word as naw | |
| NLPAUG_AVAILABLE = True | |
| except ImportError: | |
| NLPAUG_AVAILABLE = False | |
| print("⚠️ nlpaug не установлен. Аугментация текстов недоступна.") | |
| def compute_class_weights(y: np.ndarray, method: str = "balanced") -> Dict[int, float]: | |
| """ | |
| Вычисляет веса классов. | |
| Args: | |
| y: Массив меток | |
| method: Метод вычисления весов ('balanced', 'balanced_subsample', или dict) | |
| Returns: | |
| Словарь {класс: вес} | |
| """ | |
| classes = np.unique(y) | |
| weights = compute_class_weight(method, classes=classes, y=y) | |
| return dict(zip(classes, weights)) | |
| def random_oversample(X: np.ndarray, y: np.ndarray, | |
| strategy: Optional[Dict[int, int]] = None) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Случайная перевыборка миноритарных классов. | |
| Args: | |
| X: Признаки | |
| y: Метки | |
| strategy: Словарь {класс: целевое количество} или None для балансировки | |
| Returns: | |
| Перевыбранные X, y | |
| """ | |
| if strategy is None: | |
| # Балансируем до максимального класса | |
| class_counts = Counter(y) | |
| max_count = max(class_counts.values()) | |
| strategy = {cls: max_count for cls in class_counts.keys()} | |
| X_resampled = [] | |
| y_resampled = [] | |
| for cls in strategy.keys(): | |
| mask = y == cls | |
| X_cls = X[mask] | |
| y_cls = y[mask] | |
| if len(X_cls) < strategy[cls]: | |
| # Перевыборка | |
| X_cls_resampled, y_cls_resampled = resample( | |
| X_cls, y_cls, | |
| n_samples=strategy[cls], | |
| random_state=42 | |
| ) | |
| else: | |
| X_cls_resampled, y_cls_resampled = X_cls, y_cls | |
| X_resampled.append(X_cls_resampled) | |
| y_resampled.append(y_cls_resampled) | |
| X_resampled = np.vstack(X_resampled) | |
| y_resampled = np.hstack(y_resampled) | |
| # Перемешивание | |
| indices = np.random.permutation(len(X_resampled)) | |
| return X_resampled[indices], y_resampled[indices] | |
| def random_undersample(X: np.ndarray, y: np.ndarray, | |
| strategy: Optional[Dict[int, int]] = None) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Случайная недо-выборка мажоритарных классов. | |
| Args: | |
| X: Признаки | |
| y: Метки | |
| strategy: Словарь {класс: целевое количество} или None для балансировки | |
| Returns: | |
| Недо-выбранные X, y | |
| """ | |
| if strategy is None: | |
| # Балансируем до минимального класса | |
| class_counts = Counter(y) | |
| min_count = min(class_counts.values()) | |
| strategy = {cls: min_count for cls in class_counts.keys()} | |
| X_resampled = [] | |
| y_resampled = [] | |
| for cls in strategy.keys(): | |
| mask = y == cls | |
| X_cls = X[mask] | |
| y_cls = y[mask] | |
| if len(X_cls) > strategy[cls]: | |
| # Недо-выборка | |
| X_cls_resampled, y_cls_resampled = resample( | |
| X_cls, y_cls, | |
| n_samples=strategy[cls], | |
| random_state=42 | |
| ) | |
| else: | |
| X_cls_resampled, y_cls_resampled = X_cls, y_cls | |
| X_resampled.append(X_cls_resampled) | |
| y_resampled.append(y_cls_resampled) | |
| X_resampled = np.vstack(X_resampled) | |
| y_resampled = np.hstack(y_resampled) | |
| # Перемешивание | |
| indices = np.random.permutation(len(X_resampled)) | |
| return X_resampled[indices], y_resampled[indices] | |
| def smote_oversample(X: np.ndarray, y: np.ndarray, | |
| k_neighbors: int = 5) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| SMOTE (Synthetic Minority Oversampling Technique) для векторизованных текстов. | |
| Args: | |
| X: Векторизованные признаки | |
| y: Метки | |
| k_neighbors: Количество соседей для SMOTE | |
| Returns: | |
| Перевыбранные X, y | |
| """ | |
| if not IMBLEARN_AVAILABLE: | |
| raise ImportError("imbalanced-learn не установлен. Установите: pip install imbalanced-learn") | |
| smote = SMOTE(k_neighbors=k_neighbors, random_state=42) | |
| X_resampled, y_resampled = smote.fit_resample(X, y) | |
| return X_resampled, y_resampled | |
| def adasyn_oversample(X: np.ndarray, y: np.ndarray, | |
| n_neighbors: int = 5) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| ADASYN (Adaptive Synthetic Sampling) для векторизованных текстов. | |
| Args: | |
| X: Векторизованные признаки | |
| y: Метки | |
| n_neighbors: Количество соседей для ADASYN | |
| Returns: | |
| Перевыбранные X, y | |
| """ | |
| if not IMBLEARN_AVAILABLE: | |
| raise ImportError("imbalanced-learn не установлен. Установите: pip install imbalanced-learn") | |
| adasyn = ADASYN(n_neighbors=n_neighbors, random_state=42) | |
| X_resampled, y_resampled = adasyn.fit_resample(X, y) | |
| return X_resampled, y_resampled | |
| def synonym_replacement(text: str, num_replacements: int = 1) -> str: | |
| """ | |
| Замена слов синонимами (упрощенная версия). | |
| Примечание: Для полноценной работы требуется словарь синонимов или WordNet. | |
| """ | |
| # Упрощенная версия - просто возвращаем исходный текст | |
| # Для реальной работы нужен словарь синонимов или библиотека типа pymorphy2 + словари | |
| return text | |
| def random_deletion(text: str, p: float = 0.1) -> str: | |
| """ | |
| Случайное удаление слов из текста. | |
| Args: | |
| text: Исходный текст | |
| p: Вероятность удаления каждого слова | |
| Returns: | |
| Текст с удаленными словами | |
| """ | |
| words = text.split() | |
| if len(words) == 0: | |
| return text | |
| # Удаляем слова с вероятностью p | |
| kept_words = [w for w in words if np.random.random() > p] | |
| if len(kept_words) == 0: | |
| # Если все слова удалены, возвращаем одно случайное слово | |
| return np.random.choice(words) | |
| return ' '.join(kept_words) | |
| def random_insertion(text: str, num_insertions: int = 1) -> str: | |
| """ | |
| Случайная вставка слов в текст (упрощенная версия). | |
| Args: | |
| text: Исходный текст | |
| num_insertions: Количество вставок | |
| Returns: | |
| Текст с вставленными словами | |
| """ | |
| words = text.split() | |
| if len(words) == 0: | |
| return text | |
| for _ in range(num_insertions): | |
| # Вставляем случайное слово в случайную позицию | |
| random_word = np.random.choice(words) | |
| random_pos = np.random.randint(0, len(words) + 1) | |
| words.insert(random_pos, random_word) | |
| return ' '.join(words) | |
| def random_swap(text: str, num_swaps: int = 1) -> str: | |
| """ | |
| Случайная перестановка слов в тексте. | |
| Args: | |
| text: Исходный текст | |
| num_swaps: Количество перестановок | |
| Returns: | |
| Текст с переставленными словами | |
| """ | |
| words = text.split() | |
| if len(words) < 2: | |
| return text | |
| for _ in range(num_swaps): | |
| idx1, idx2 = np.random.choice(len(words), size=2, replace=False) | |
| words[idx1], words[idx2] = words[idx2], words[idx1] | |
| return ' '.join(words) | |
| def easy_data_augmentation(text: str, | |
| alpha_sr: float = 0.1, | |
| alpha_ri: float = 0.1, | |
| alpha_rs: float = 0.1, | |
| num_aug: int = 1) -> List[str]: | |
| """ | |
| Easy Data Augmentation (EDA) для текста. | |
| Args: | |
| text: Исходный текст | |
| alpha_sr: Параметр для synonym replacement | |
| alpha_ri: Параметр для random insertion | |
| alpha_rs: Параметр для random swap | |
| num_aug: Количество аугментированных вариантов | |
| Returns: | |
| Список аугментированных текстов | |
| """ | |
| num_words = len(text.split()) | |
| augmented_texts = [] | |
| for _ in range(num_aug): | |
| augmented = text | |
| # Synonym replacement | |
| if np.random.random() < alpha_sr: | |
| augmented = synonym_replacement(augmented) | |
| # Random insertion | |
| if np.random.random() < alpha_ri: | |
| n_insert = max(1, int(alpha_ri * num_words)) | |
| augmented = random_insertion(augmented, n_insert) | |
| # Random swap | |
| if np.random.random() < alpha_rs: | |
| n_swap = max(1, int(alpha_rs * num_words)) | |
| augmented = random_swap(augmented, n_swap) | |
| # Random deletion | |
| if np.random.random() < alpha_sr: | |
| augmented = random_deletion(augmented, alpha_sr) | |
| augmented_texts.append(augmented) | |
| return augmented_texts | |
| def augment_texts(texts: List[str], labels: List[int], | |
| target_class: Optional[int] = None, | |
| num_aug: int = 1, | |
| method: str = "eda") -> Tuple[List[str], List[int]]: | |
| """ | |
| Аугментация текстов для балансировки классов. | |
| Args: | |
| texts: Список текстов | |
| labels: Список меток | |
| target_class: Класс для аугментации (None = все миноритарные) | |
| num_aug: Количество аугментированных вариантов на текст | |
| method: Метод аугментации ('eda', 'nlpaug') | |
| Returns: | |
| Расширенные списки текстов и меток | |
| """ | |
| augmented_texts = list(texts) | |
| augmented_labels = list(labels) | |
| if target_class is None: | |
| # Определяем миноритарные классы | |
| class_counts = Counter(labels) | |
| min_count = min(class_counts.values()) | |
| target_classes = [cls for cls, count in class_counts.items() if count == min_count] | |
| else: | |
| target_classes = [target_class] | |
| for cls in target_classes: | |
| cls_texts = [text for text, label in zip(texts, labels) if label == cls] | |
| for text in cls_texts: | |
| if method == "eda": | |
| aug_texts = easy_data_augmentation(text, num_aug=num_aug) | |
| elif method == "nlpaug" and NLPAUG_AVAILABLE: | |
| # Использование nlpaug (требует настройки) | |
| aug_texts = [text] # Заглушка | |
| else: | |
| aug_texts = [text] | |
| augmented_texts.extend(aug_texts) | |
| augmented_labels.extend([cls] * len(aug_texts)) | |
| return augmented_texts, augmented_labels | |
| if __name__ == "__main__": | |
| # Тестирование | |
| import numpy as np | |
| # Создаем несбалансированные данные | |
| X = np.random.randn(100, 50) | |
| y = np.array([0] * 80 + [1] * 20) | |
| print(f"Исходное распределение: {Counter(y)}") | |
| # Перевыборка | |
| X_resampled, y_resampled = random_oversample(X, y) | |
| print(f"После перевыборки: {Counter(y_resampled)}") | |
| # SMOTE (если доступен) | |
| if IMBLEARN_AVAILABLE: | |
| X_smote, y_smote = smote_oversample(X, y) | |
| print(f"После SMOTE: {Counter(y_smote)}") | |
| # Аугментация текстов | |
| texts = ["Это тестовый текст", "Другой пример текста"] * 50 | |
| labels = [0] * 80 + [1] * 20 | |
| aug_texts, aug_labels = augment_texts(texts, labels, num_aug=2) | |
| print(f"После аугментации: {len(aug_texts)} текстов, распределение: {Counter(aug_labels)}") | |