Spaces:
Sleeping
Sleeping
| import re | |
| import string | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from nltk.corpus import stopwords | |
| stop_words = set(stopwords.words('russian')) | |
| from collections import Counter | |
| from gensim.models import Word2Vec | |
| import pandas as pd | |
| import torch.nn.functional as F | |
| HIDDEN_SIZE = 32 | |
| SEQ_LEN = 32 | |
| df = pd.read_json('/Users/olgaseina/ds-phase-2/10-nlp/data/tg_channels/healthcare_facilities_reviews.jsonl', lines=True) | |
| def data_preprocessing(text: str) -> str: | |
| text = text.lower() | |
| text = re.sub('<.*?>', '', text) # html tags | |
| text = ''.join([c for c in text if c not in string.punctuation])# Remove punctuation | |
| text = ' '.join([word for word in text.split() if word not in stop_words]) | |
| text = [word for word in text.split() if not word.isdigit()] | |
| text = ' '.join(text) | |
| return text | |
| contents = df['content'].tolist() | |
| preprocessed = [data_preprocessing(content) for content in contents] | |
| corpus = [word for text in preprocessed for word in text.split()] | |
| sorted_words = Counter(corpus).most_common() | |
| def get_words_by_freq(sorted_words: list[tuple[str, int]], n: int = 10) -> list: | |
| return list(filter(lambda x: x[1] > n, sorted_words)) | |
| sorted_words = get_words_by_freq(sorted_words, 100) | |
| sorted_words[-10:] | |
| vocab_to_int = {w:i+1 for i, (w,c) in enumerate(sorted_words)} | |
| reviews_int = [] | |
| for text in preprocessed: | |
| r = [vocab_to_int[word] for word in text.split() if vocab_to_int.get(word)] | |
| reviews_int.append(r) | |
| w2v_input = [] | |
| for review in preprocessed: | |
| cur_review = [] | |
| for word in review.split(): | |
| if vocab_to_int.get(word): | |
| cur_review.append(word) | |
| w2v_input.append(cur_review) | |
| VOCAB_SIZE = len(vocab_to_int) + 1 | |
| EMBEDDING_DIM = 64 | |
| wv = Word2Vec( | |
| min_count=1, # минимальная встречаемость в корпусе | |
| vector_size=EMBEDDING_DIM # размерность вектора для слова | |
| ) | |
| wv.build_vocab(w2v_input) | |
| wv.train( | |
| corpus_iterable=w2v_input, | |
| total_examples=wv.corpus_count, | |
| epochs=10 | |
| ) | |
| embedding_matrix = np.zeros((VOCAB_SIZE, EMBEDDING_DIM)) | |
| # Бежим по всем словам словаря: если слово есть, достаем его вектор | |
| # если слова нет, то распечатываем его и пропускаем | |
| for word, i in vocab_to_int.items(): | |
| try: | |
| embedding_vector = wv.wv[word] | |
| embedding_matrix[i] = embedding_vector | |
| except KeyError as e: | |
| pass | |
| print(f'{e}: word: {word}') | |
| # Создаем предобученный эмбеддинг – этот слой в нашей сети обучаться не будет | |
| embedding_layer = nn.Embedding.from_pretrained(torch.FloatTensor(embedding_matrix)) | |
| def data_preprocessing(text: str) -> str: | |
| text = text.lower() | |
| text = re.sub('<.*?>', '', text) # html tags | |
| text = ''.join([c for c in text if c not in string.punctuation])# Remove punctuation | |
| text = ' '.join([word for word in text.split() if word not in stop_words]) | |
| text = [word for word in text.split() if not word.isdigit()] | |
| text = ' '.join(text) | |
| return text | |
| def padding(review_int: list, seq_len: int) -> np.array: # type: ignore | |
| features = np.zeros((len(review_int), seq_len), dtype = int) | |
| for i, review in enumerate(review_int): | |
| if len(review) <= seq_len: | |
| zeros = list(np.zeros(seq_len - len(review))) | |
| new = zeros + review | |
| else: | |
| new = review[: seq_len] | |
| features[i, :] = np.array(new) | |
| return features | |
| def preprocess_single_string( | |
| input_string: str, | |
| seq_len: int, | |
| vocab_to_int: dict, | |
| verbose : bool = False | |
| ) -> torch.tensor: | |
| preprocessed_string = data_preprocessing(input_string) | |
| result_list = [] | |
| for word in preprocessed_string.split(): | |
| try: | |
| result_list.append(vocab_to_int[word]) | |
| except KeyError as e: | |
| if verbose: | |
| print(f'{e}: not in dictionary!') | |
| pass | |
| result_padded = padding([result_list], seq_len)[0] | |
| return torch.tensor(result_padded) | |
| class BahdanauAttention(nn.Module): | |
| def __init__( | |
| self, | |
| hidden_size: int = HIDDEN_SIZE | |
| ) -> None: | |
| super().__init__() | |
| self.hidden_size = hidden_size | |
| self.W = nn.Linear(hidden_size, hidden_size) | |
| self.U = nn.Linear(hidden_size, hidden_size) | |
| self.V = nn.Linear(hidden_size, 1) | |
| self.tanh = nn.Tanh() | |
| def forward( | |
| self, | |
| keys: torch.Tensor, # BATCH_SIZE x SEQ_LEN x HIDDEN_SIZE | |
| query: torch.Tensor # BATCH_SIZE x HIDDEN_SIZE | |
| ): | |
| query = query.unsqueeze(1) # BATCH_SIZE x 1 x HIDDEN_SIZE | |
| r_query = self.W(query) # BATCH_SIZE x 1 x HIDDEN_SIZE | |
| r_keys = self.U(keys) # BATCH_SIZE x SEQ_LEN x HIDDEN_SIZE | |
| scores = self.V(torch.tanh(r_query + r_keys)) # BATCH_SIZE x SEQ_LEN x 1 | |
| scores = scores.squeeze(-1) # BATCH_SIZE x SEQ_LEN | |
| att_weights = F.softmax(scores, dim=1) # BATCH_SIZE x SEQ_LEN | |
| context = torch.bmm(att_weights.unsqueeze(1), keys).squeeze(1) # BATCH_SIZE x HIDDEN_SIZE | |
| return context, att_weights | |
| class LSTMBahdanauAttention(nn.Module): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| # self.embedding = nn.Embedding(VOCAB_SIZE, EMBEDDING_DIM) | |
| self.embedding = embedding_layer | |
| self.lstm = nn.LSTM(EMBEDDING_DIM, HIDDEN_SIZE, batch_first=True) | |
| self.attn = BahdanauAttention(HIDDEN_SIZE) | |
| self.clf = nn.Sequential( | |
| nn.Linear(HIDDEN_SIZE, 128), | |
| nn.Dropout(), | |
| nn.Tanh(), | |
| nn.Linear(128, 1) | |
| ) | |
| def forward(self, x): | |
| embeddings = self.embedding(x) | |
| outputs, (h_n, _) = self.lstm(embeddings) | |
| context, att_weights = self.attn(outputs, h_n.squeeze(0)) | |
| out = self.clf(context) | |
| return out, att_weights | |