NLP_Homework_1 / src /imbalance_handling.py
Kolesnikov Dmitry
feat: Попытка навайбкодить 3 и 4 лабораторные
68545bc
"""
Методы борьбы с дисбалансом классов в текстовых данных:
взвешивание классов, сэмплирование, аугментация текстов.
"""
from __future__ import annotations
from typing import List, Tuple, Dict, Any, Optional
from collections import Counter
import numpy as np
from sklearn.utils import resample
from sklearn.utils.class_weight import compute_class_weight
try:
from imblearn.over_sampling import SMOTE, ADASYN, RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
IMBLEARN_AVAILABLE = True
except ImportError:
IMBLEARN_AVAILABLE = False
print("⚠️ imbalanced-learn не установлен. SMOTE/ADASYN недоступны.")
try:
import nlpaug.augmenter.word as naw
NLPAUG_AVAILABLE = True
except ImportError:
NLPAUG_AVAILABLE = False
print("⚠️ nlpaug не установлен. Аугментация текстов недоступна.")
def compute_class_weights(y: np.ndarray, method: str = "balanced") -> Dict[int, float]:
"""
Вычисляет веса классов.
Args:
y: Массив меток
method: Метод вычисления весов ('balanced', 'balanced_subsample', или dict)
Returns:
Словарь {класс: вес}
"""
classes = np.unique(y)
weights = compute_class_weight(method, classes=classes, y=y)
return dict(zip(classes, weights))
def random_oversample(X: np.ndarray, y: np.ndarray,
strategy: Optional[Dict[int, int]] = None) -> Tuple[np.ndarray, np.ndarray]:
"""
Случайная перевыборка миноритарных классов.
Args:
X: Признаки
y: Метки
strategy: Словарь {класс: целевое количество} или None для балансировки
Returns:
Перевыбранные X, y
"""
if strategy is None:
# Балансируем до максимального класса
class_counts = Counter(y)
max_count = max(class_counts.values())
strategy = {cls: max_count for cls in class_counts.keys()}
X_resampled = []
y_resampled = []
for cls in strategy.keys():
mask = y == cls
X_cls = X[mask]
y_cls = y[mask]
if len(X_cls) < strategy[cls]:
# Перевыборка
X_cls_resampled, y_cls_resampled = resample(
X_cls, y_cls,
n_samples=strategy[cls],
random_state=42
)
else:
X_cls_resampled, y_cls_resampled = X_cls, y_cls
X_resampled.append(X_cls_resampled)
y_resampled.append(y_cls_resampled)
X_resampled = np.vstack(X_resampled)
y_resampled = np.hstack(y_resampled)
# Перемешивание
indices = np.random.permutation(len(X_resampled))
return X_resampled[indices], y_resampled[indices]
def random_undersample(X: np.ndarray, y: np.ndarray,
strategy: Optional[Dict[int, int]] = None) -> Tuple[np.ndarray, np.ndarray]:
"""
Случайная недо-выборка мажоритарных классов.
Args:
X: Признаки
y: Метки
strategy: Словарь {класс: целевое количество} или None для балансировки
Returns:
Недо-выбранные X, y
"""
if strategy is None:
# Балансируем до минимального класса
class_counts = Counter(y)
min_count = min(class_counts.values())
strategy = {cls: min_count for cls in class_counts.keys()}
X_resampled = []
y_resampled = []
for cls in strategy.keys():
mask = y == cls
X_cls = X[mask]
y_cls = y[mask]
if len(X_cls) > strategy[cls]:
# Недо-выборка
X_cls_resampled, y_cls_resampled = resample(
X_cls, y_cls,
n_samples=strategy[cls],
random_state=42
)
else:
X_cls_resampled, y_cls_resampled = X_cls, y_cls
X_resampled.append(X_cls_resampled)
y_resampled.append(y_cls_resampled)
X_resampled = np.vstack(X_resampled)
y_resampled = np.hstack(y_resampled)
# Перемешивание
indices = np.random.permutation(len(X_resampled))
return X_resampled[indices], y_resampled[indices]
def smote_oversample(X: np.ndarray, y: np.ndarray,
k_neighbors: int = 5) -> Tuple[np.ndarray, np.ndarray]:
"""
SMOTE (Synthetic Minority Oversampling Technique) для векторизованных текстов.
Args:
X: Векторизованные признаки
y: Метки
k_neighbors: Количество соседей для SMOTE
Returns:
Перевыбранные X, y
"""
if not IMBLEARN_AVAILABLE:
raise ImportError("imbalanced-learn не установлен. Установите: pip install imbalanced-learn")
smote = SMOTE(k_neighbors=k_neighbors, random_state=42)
X_resampled, y_resampled = smote.fit_resample(X, y)
return X_resampled, y_resampled
def adasyn_oversample(X: np.ndarray, y: np.ndarray,
n_neighbors: int = 5) -> Tuple[np.ndarray, np.ndarray]:
"""
ADASYN (Adaptive Synthetic Sampling) для векторизованных текстов.
Args:
X: Векторизованные признаки
y: Метки
n_neighbors: Количество соседей для ADASYN
Returns:
Перевыбранные X, y
"""
if not IMBLEARN_AVAILABLE:
raise ImportError("imbalanced-learn не установлен. Установите: pip install imbalanced-learn")
adasyn = ADASYN(n_neighbors=n_neighbors, random_state=42)
X_resampled, y_resampled = adasyn.fit_resample(X, y)
return X_resampled, y_resampled
def synonym_replacement(text: str, num_replacements: int = 1) -> str:
"""
Замена слов синонимами (упрощенная версия).
Примечание: Для полноценной работы требуется словарь синонимов или WordNet.
"""
# Упрощенная версия - просто возвращаем исходный текст
# Для реальной работы нужен словарь синонимов или библиотека типа pymorphy2 + словари
return text
def random_deletion(text: str, p: float = 0.1) -> str:
"""
Случайное удаление слов из текста.
Args:
text: Исходный текст
p: Вероятность удаления каждого слова
Returns:
Текст с удаленными словами
"""
words = text.split()
if len(words) == 0:
return text
# Удаляем слова с вероятностью p
kept_words = [w for w in words if np.random.random() > p]
if len(kept_words) == 0:
# Если все слова удалены, возвращаем одно случайное слово
return np.random.choice(words)
return ' '.join(kept_words)
def random_insertion(text: str, num_insertions: int = 1) -> str:
"""
Случайная вставка слов в текст (упрощенная версия).
Args:
text: Исходный текст
num_insertions: Количество вставок
Returns:
Текст с вставленными словами
"""
words = text.split()
if len(words) == 0:
return text
for _ in range(num_insertions):
# Вставляем случайное слово в случайную позицию
random_word = np.random.choice(words)
random_pos = np.random.randint(0, len(words) + 1)
words.insert(random_pos, random_word)
return ' '.join(words)
def random_swap(text: str, num_swaps: int = 1) -> str:
"""
Случайная перестановка слов в тексте.
Args:
text: Исходный текст
num_swaps: Количество перестановок
Returns:
Текст с переставленными словами
"""
words = text.split()
if len(words) < 2:
return text
for _ in range(num_swaps):
idx1, idx2 = np.random.choice(len(words), size=2, replace=False)
words[idx1], words[idx2] = words[idx2], words[idx1]
return ' '.join(words)
def easy_data_augmentation(text: str,
alpha_sr: float = 0.1,
alpha_ri: float = 0.1,
alpha_rs: float = 0.1,
num_aug: int = 1) -> List[str]:
"""
Easy Data Augmentation (EDA) для текста.
Args:
text: Исходный текст
alpha_sr: Параметр для synonym replacement
alpha_ri: Параметр для random insertion
alpha_rs: Параметр для random swap
num_aug: Количество аугментированных вариантов
Returns:
Список аугментированных текстов
"""
num_words = len(text.split())
augmented_texts = []
for _ in range(num_aug):
augmented = text
# Synonym replacement
if np.random.random() < alpha_sr:
augmented = synonym_replacement(augmented)
# Random insertion
if np.random.random() < alpha_ri:
n_insert = max(1, int(alpha_ri * num_words))
augmented = random_insertion(augmented, n_insert)
# Random swap
if np.random.random() < alpha_rs:
n_swap = max(1, int(alpha_rs * num_words))
augmented = random_swap(augmented, n_swap)
# Random deletion
if np.random.random() < alpha_sr:
augmented = random_deletion(augmented, alpha_sr)
augmented_texts.append(augmented)
return augmented_texts
def augment_texts(texts: List[str], labels: List[int],
target_class: Optional[int] = None,
num_aug: int = 1,
method: str = "eda") -> Tuple[List[str], List[int]]:
"""
Аугментация текстов для балансировки классов.
Args:
texts: Список текстов
labels: Список меток
target_class: Класс для аугментации (None = все миноритарные)
num_aug: Количество аугментированных вариантов на текст
method: Метод аугментации ('eda', 'nlpaug')
Returns:
Расширенные списки текстов и меток
"""
augmented_texts = list(texts)
augmented_labels = list(labels)
if target_class is None:
# Определяем миноритарные классы
class_counts = Counter(labels)
min_count = min(class_counts.values())
target_classes = [cls for cls, count in class_counts.items() if count == min_count]
else:
target_classes = [target_class]
for cls in target_classes:
cls_texts = [text for text, label in zip(texts, labels) if label == cls]
for text in cls_texts:
if method == "eda":
aug_texts = easy_data_augmentation(text, num_aug=num_aug)
elif method == "nlpaug" and NLPAUG_AVAILABLE:
# Использование nlpaug (требует настройки)
aug_texts = [text] # Заглушка
else:
aug_texts = [text]
augmented_texts.extend(aug_texts)
augmented_labels.extend([cls] * len(aug_texts))
return augmented_texts, augmented_labels
if __name__ == "__main__":
# Тестирование
import numpy as np
# Создаем несбалансированные данные
X = np.random.randn(100, 50)
y = np.array([0] * 80 + [1] * 20)
print(f"Исходное распределение: {Counter(y)}")
# Перевыборка
X_resampled, y_resampled = random_oversample(X, y)
print(f"После перевыборки: {Counter(y_resampled)}")
# SMOTE (если доступен)
if IMBLEARN_AVAILABLE:
X_smote, y_smote = smote_oversample(X, y)
print(f"После SMOTE: {Counter(y_smote)}")
# Аугментация текстов
texts = ["Это тестовый текст", "Другой пример текста"] * 50
labels = [0] * 80 + [1] * 20
aug_texts, aug_labels = augment_texts(texts, labels, num_aug=2)
print(f"После аугментации: {len(aug_texts)} текстов, распределение: {Counter(aug_labels)}")