NLP_Homework_1 / src /tokenizers_cmp.py
Kolesnikov Dmitry
fix: Добавлено скачивание через nltk
753b589
# src/tokenizers_cmp.py
"""
Модуль для сравнения различных методов токенизации и нормализации текста.
Реализует классические и современные методы токенизации, стемминга и лемматизации.
"""
import re
import time
from typing import List, Dict, Tuple, Optional, Any
from dataclasses import dataclass
from collections import Counter
import pandas as pd
import numpy as np
# Импорты для различных методов токенизации
try:
from razdel import tokenize as rz_tokenize
RAZDEL_AVAILABLE = True
except ImportError:
RAZDEL_AVAILABLE = False
try:
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer, SnowballStemmer
NLTK_AVAILABLE = True
except ImportError:
NLTK_AVAILABLE = False
try:
import spacy
SPACY_AVAILABLE = True
except ImportError:
SPACY_AVAILABLE = False
try:
import pymorphy2
# Проверяем совместимость с текущей версией Python
import inspect
if hasattr(inspect, 'getargspec'):
PYMORPHY_AVAILABLE = True
else:
PYMORPHY_AVAILABLE = False
print("⚠️ pymorphy2 несовместим с Python 3.13+. Используйте Python 3.11 или ниже для полной функциональности.")
except ImportError:
PYMORPHY_AVAILABLE = False
try:
from transformers import AutoTokenizer
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
@dataclass
class TokenizationMetrics:
"""Метрики для оценки качества токенизации."""
method_name: str
total_tokens: int
unique_tokens: int
vocabulary_size: int
avg_token_length: float
processing_time: float
oov_rate: float = 0.0
fragmentation_rate: float = 0.0
compression_ratio: float = 1.0
class TokenizationComparator:
"""Класс для сравнения различных методов токенизации."""
def __init__(self):
"""Инициализация компаратора."""
self.methods = {}
self.results = {}
self._ensure_nltk_resources()
self._initialize_methods()
def _ensure_nltk_resources(self):
"""Обеспечивает наличие необходимых ресурсов NLTK."""
if not NLTK_AVAILABLE:
return
import nltk
try:
# Пробуем использовать punkt_tab для русского языка
try:
nltk.data.find('tokenizers/punkt_tab/russian')
except LookupError:
try:
nltk.download('punkt_tab', quiet=True)
except Exception:
pass
except Exception:
pass
# Также загружаем обычный punkt как fallback
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
try:
nltk.download('punkt', quiet=True)
except Exception:
pass
def _initialize_methods(self):
"""Инициализирует доступные методы токенизации."""
# Наивная токенизация
self.methods['naive'] = self._tokenize_naive
# Регулярные выражения
self.methods['regex'] = self._tokenize_regex
# Razdel (специально для русского языка)
if RAZDEL_AVAILABLE:
self.methods['razdel'] = self._tokenize_razdel
# NLTK
if NLTK_AVAILABLE:
self.methods['nltk'] = self._tokenize_nltk
self.methods['porter_stemmer'] = self._tokenize_with_stemming
self.methods['snowball_stemmer'] = self._tokenize_with_snowball
# SpaCy
if SPACY_AVAILABLE:
try:
self.nlp = spacy.load('ru_core_news_sm')
self.methods['spacy'] = self._tokenize_spacy
self.methods['spacy_lemmatize'] = self._tokenize_with_lemmatization
except OSError:
print("SpaCy русская модель не найдена. Установите: python -m spacy download ru_core_news_sm")
# PyMorphy2
if PYMORPHY_AVAILABLE:
self.morph = pymorphy2.MorphAnalyzer()
self.methods['pymorphy'] = self._tokenize_with_pymorphy
def _tokenize_naive(self, text: str) -> List[str]:
"""Наивная токенизация по пробелам."""
return text.split()
def _tokenize_regex(self, text: str) -> List[str]:
"""Токенизация с помощью регулярных выражений."""
# Улучшенная токенизация: слова + основные знаки препинания
tokens = re.findall(r"\b\w+\b|[.,!?;:]", text, flags=re.U)
# Фильтруем слишком короткие токены (кроме знаков препинания)
filtered_tokens = []
for token in tokens:
if len(token) > 1 or token in '.,!?;:':
filtered_tokens.append(token)
return filtered_tokens
def _tokenize_razdel(self, text: str) -> List[str]:
"""Токенизация с помощью razdel."""
return [t.text for t in rz_tokenize(text)]
def _tokenize_nltk(self, text: str) -> List[str]:
"""Токенизация с помощью NLTK."""
import nltk
try:
return word_tokenize(text, language='russian')
except LookupError as e:
# Автоматическая загрузка необходимых данных NLTK
try:
# Пробуем загрузить punkt_tab для русского языка
nltk.download('punkt_tab', quiet=True)
return word_tokenize(text, language='russian')
except Exception:
try:
# Если не получилось, пробуем загрузить обычный punkt
nltk.download('punkt', quiet=True)
# Используем английский язык как fallback
return word_tokenize(text, language='english')
except Exception:
# Если и это не сработало, используем простую токенизацию
return text.split()
def _tokenize_spacy(self, text: str) -> List[str]:
"""Токенизация с помощью SpaCy."""
doc = self.nlp(text)
return [token.text for token in doc if not token.is_space]
def _tokenize_with_stemming(self, text: str) -> List[str]:
"""Токенизация с применением стемминга Porter."""
import nltk
try:
tokens = word_tokenize(text, language='russian')
except LookupError:
try:
nltk.download('punkt_tab', quiet=True)
tokens = word_tokenize(text, language='russian')
except Exception:
try:
nltk.download('punkt', quiet=True)
tokens = word_tokenize(text, language='english')
except Exception:
tokens = text.split()
stemmer = PorterStemmer()
return [stemmer.stem(token) for token in tokens if token.isalpha()]
def _tokenize_with_snowball(self, text: str) -> List[str]:
"""Токенизация с применением стемминга Snowball."""
import nltk
try:
tokens = word_tokenize(text, language='russian')
except LookupError:
try:
nltk.download('punkt_tab', quiet=True)
tokens = word_tokenize(text, language='russian')
except Exception:
try:
nltk.download('punkt', quiet=True)
tokens = word_tokenize(text, language='english')
except Exception:
tokens = text.split()
stemmer = SnowballStemmer('russian')
return [stemmer.stem(token) for token in tokens if token.isalpha()]
def _tokenize_with_lemmatization(self, text: str) -> List[str]:
"""Токенизация с применением лемматизации SpaCy."""
doc = self.nlp(text)
return [token.lemma_ for token in doc if not token.is_space and token.is_alpha]
def _tokenize_with_pymorphy(self, text: str) -> List[str]:
"""Токенизация с применением лемматизации PyMorphy2."""
import nltk
try:
tokens = word_tokenize(text, language='russian')
except LookupError:
try:
nltk.download('punkt_tab', quiet=True)
tokens = word_tokenize(text, language='russian')
except Exception:
try:
nltk.download('punkt', quiet=True)
tokens = word_tokenize(text, language='english')
except Exception:
tokens = text.split()
lemmas = []
for token in tokens:
if token.isalpha():
parsed = self.morph.parse(token)[0]
lemmas.append(parsed.normal_form)
return lemmas
def tokenize_text(self, text: str, method: str) -> Tuple[List[str], float]:
"""
Токенизирует текст указанным методом.
Args:
text: Исходный текст
method: Название метода токенизации
Returns:
Кортеж (список токенов, время обработки)
"""
if method not in self.methods:
raise ValueError(f"Метод '{method}' не поддерживается")
start_time = time.time()
tokens = self.methods[method](text)
processing_time = time.time() - start_time
return tokens, processing_time
def calculate_metrics(self, tokens: List[str], original_text: str, method: str, processing_time: float) -> TokenizationMetrics:
"""
Вычисляет метрики для токенизации.
Args:
tokens: Список токенов
original_text: Исходный текст
method: Название метода
processing_time: Время обработки
Returns:
Объект с метриками
"""
total_tokens = len(tokens)
unique_tokens = len(set(tokens))
vocabulary_size = unique_tokens
# Средняя длина токена
if total_tokens > 0:
avg_token_length = sum(len(token) for token in tokens) / total_tokens
else:
avg_token_length = 0
# Коэффициент сжатия (отношение исходных слов к токенам)
original_words = len(original_text.split())
compression_ratio = original_words / total_tokens if total_tokens > 0 else 1.0
# Процент фрагментации (слова, разбитые на несколько токенов)
fragmentation_rate = 0.0 # Будет вычислено отдельно для подсловых методов
return TokenizationMetrics(
method_name=method,
total_tokens=total_tokens,
unique_tokens=unique_tokens,
vocabulary_size=vocabulary_size,
avg_token_length=avg_token_length,
processing_time=processing_time,
compression_ratio=compression_ratio,
fragmentation_rate=fragmentation_rate
)
def compare_methods(self, texts: List[str], methods: Optional[List[str]] = None) -> pd.DataFrame:
"""
Сравнивает различные методы токенизации на наборе текстов.
Args:
texts: Список текстов для анализа
methods: Список методов для сравнения (если None, используются все доступные)
Returns:
DataFrame с результатами сравнения
"""
if methods is None:
methods = list(self.methods.keys())
results = []
for method in methods:
print(f"Тестируем метод: {method}")
total_tokens = 0
total_unique_tokens = set()
total_processing_time = 0
total_original_words = 0
for text in texts:
try:
tokens, processing_time = self.tokenize_text(text, method)
total_tokens += len(tokens)
total_unique_tokens.update(tokens)
total_processing_time += processing_time
total_original_words += len(text.split())
except Exception as e:
print(f"Ошибка при обработке текста методом {method}: {e}")
continue
# Вычисляем агрегированные метрики
vocabulary_size = len(total_unique_tokens)
avg_token_length = sum(len(token) for token in total_unique_tokens) / vocabulary_size if vocabulary_size > 0 else 0
compression_ratio = total_original_words / total_tokens if total_tokens > 0 else 1.0
metrics = TokenizationMetrics(
method_name=method,
total_tokens=total_tokens,
unique_tokens=vocabulary_size,
vocabulary_size=vocabulary_size,
avg_token_length=avg_token_length,
processing_time=total_processing_time,
compression_ratio=compression_ratio
)
results.append(metrics)
# Преобразуем в DataFrame
df = pd.DataFrame([{
'Метод': r.method_name,
'Всего токенов': r.total_tokens,
'Уникальных токенов': r.unique_tokens,
'Размер словаря': r.vocabulary_size,
'Средняя длина токена': round(r.avg_token_length, 2),
'Время обработки (сек)': round(r.processing_time, 3),
'Коэффициент сжатия': round(r.compression_ratio, 3)
} for r in results])
return df.sort_values('Время обработки (сек)')
def analyze_token_distribution(self, text: str, method: str) -> Dict[str, Any]:
"""
Анализирует распределение токенов для указанного метода.
Args:
text: Исходный текст
method: Метод токенизации
Returns:
Словарь с анализом распределения
"""
tokens, _ = self.tokenize_text(text, method)
# Подсчет частот
token_counts = Counter(tokens)
# Статистика по длинам токенов
token_lengths = [len(token) for token in tokens]
return {
'method': method,
'total_tokens': len(tokens),
'unique_tokens': len(token_counts),
'most_common_tokens': token_counts.most_common(10),
'token_length_stats': {
'min': min(token_lengths) if token_lengths else 0,
'max': max(token_lengths) if token_lengths else 0,
'mean': np.mean(token_lengths) if token_lengths else 0,
'median': np.median(token_lengths) if token_lengths else 0
},
'vocabulary_diversity': len(token_counts) / len(tokens) if tokens else 0
}
def save_results(self, results_df: pd.DataFrame, output_path: str):
"""Сохраняет результаты в CSV файл."""
results_df.to_csv(output_path, index=False, encoding='utf-8')
print(f"Результаты сохранены в {output_path}")
def load_corpus_from_jsonl(file_path: str, text_field: str = 'text', max_articles: Optional[int] = None) -> List[str]:
"""
Загружает корпус из JSONL файла.
Args:
file_path: Путь к JSONL файлу
text_field: Поле с текстом статьи
max_articles: Максимальное количество статей для загрузки
Returns:
Список текстов
"""
import json
texts = []
with open(file_path, 'r', encoding='utf-8') as f:
for i, line in enumerate(f):
if max_articles and i >= max_articles:
break
try:
article = json.loads(line.strip())
if text_field in article and article[text_field].strip():
texts.append(article[text_field])
except json.JSONDecodeError:
continue
return texts
if __name__ == "__main__":
# Пример использования
comparator = TokenizationComparator()
# Тестовые тексты
test_texts = [
"Это тестовый текст для проверки различных методов токенизации.",
"В России работает множество новостных агентств: РИА Новости, ТАСС, Интерфакс.",
"Компания ООО 'Тест' сообщила о результатах за 2023 год. Контакты: info@test.ru"
]
print("Доступные методы токенизации:")
for method in comparator.methods.keys():
print(f"- {method}")
# Сравниваем методы
results = comparator.compare_methods(test_texts)
print("\nРезультаты сравнения:")
print(results)
# Анализируем распределение токенов для одного метода
if 'razdel' in comparator.methods:
analysis = comparator.analyze_token_distribution(test_texts[0], 'razdel')
print(f"\nАнализ распределения токенов (razdel):")
print(f"Всего токенов: {analysis['total_tokens']}")
print(f"Уникальных токенов: {analysis['unique_tokens']}")
print(f"Наиболее частые токены: {analysis['most_common_tokens'][:5]}")