Spaces:
Sleeping
Sleeping
| # Importing necessary libraries | |
| import math | |
| import re | |
| # Data handling libraries | |
| import numpy as np | |
| from sklearn.preprocessing import MinMaxScaler | |
| # PyTorch libraries | |
| import torch | |
| from torch import nn | |
| from torch.utils.data import Dataset | |
| # NLP libraries | |
| import nltk | |
| # Multithreading libraries | |
| from multiprocessing import Pool, cpu_count | |
| class BM25: | |
| def __init__(self, corpus, tokenizer=None): | |
| self.corpus_size = 0 | |
| self.avgdl = 0 | |
| self.doc_freqs = [] | |
| self.idf = {} | |
| self.doc_len = [] | |
| self.tokenizer = tokenizer | |
| if tokenizer: | |
| corpus = self._tokenize_corpus(corpus) | |
| nd = self._initialize(corpus) | |
| self._calc_idf(nd) | |
| def _initialize(self, corpus): | |
| nd = {} # word -> number of documents with word | |
| num_doc = 0 | |
| for document in corpus: | |
| self.doc_len.append(len(document)) | |
| num_doc += len(document) | |
| frequencies = {} | |
| for word in document: | |
| if word not in frequencies: | |
| frequencies[word] = 0 | |
| frequencies[word] += 1 | |
| self.doc_freqs.append(frequencies) | |
| for word, freq in frequencies.items(): | |
| try: | |
| nd[word]+=1 | |
| except KeyError: | |
| nd[word] = 1 | |
| self.corpus_size += 1 | |
| self.avgdl = num_doc / self.corpus_size | |
| return nd | |
| def _tokenize_corpus(self, corpus): | |
| pool = Pool(cpu_count()) | |
| tokenized_corpus = pool.map(self.tokenizer, corpus) | |
| return tokenized_corpus | |
| def _calc_idf(self, nd): | |
| raise NotImplementedError() | |
| def get_scores(self, query): | |
| raise NotImplementedError() | |
| def get_batch_scores(self, query, doc_ids): | |
| raise NotImplementedError() | |
| def get_top_n(self, query, documents, n=5): | |
| assert self.corpus_size == len(documents), "The documents given don't match the index corpus!" | |
| scores = self.get_scores(query) | |
| min_score = np.min(scores) | |
| max_score = np.max(scores) | |
| # Scale scores to 0-1 range | |
| if max_score != min_score: | |
| scaled_scores = (scores - min_score) / (max_score - min_score) | |
| else: | |
| scaled_scores = np.ones(self.corpus_size) | |
| top_n_indices = np.argsort(scaled_scores)[::-1][:n] | |
| top_n_scaled_scores = [scaled_scores[i] for i in top_n_indices] | |
| return [documents[i] for i in top_n_indices], top_n_scaled_scores | |
| class BM25Okapi(BM25): | |
| def __init__(self, corpus, tokenizer=None, k1=1.5, b=0.75, epsilon=0.25): | |
| self.k1 = k1 | |
| self.b = b | |
| self.epsilon = epsilon | |
| super().__init__(corpus, tokenizer) | |
| def _calc_idf(self, nd): | |
| """ | |
| Calculates frequencies of terms in documents and in corpus. | |
| This algorithm sets a floor on the idf values to eps * average_idf | |
| """ | |
| # collect idf sum to calculate an average idf for epsilon value | |
| idf_sum = 0 | |
| # collect words with negative idf to set them a special epsilon value. | |
| # idf can be negative if word is contained in more than half of documents | |
| negative_idfs = [] | |
| for word, freq in nd.items(): | |
| idf = math.log(self.corpus_size - freq + 0.5) - math.log(freq + 0.5) | |
| self.idf[word] = idf | |
| idf_sum += idf | |
| if idf < 0: | |
| negative_idfs.append(word) | |
| self.average_idf = idf_sum / len(self.idf) | |
| eps = self.epsilon * self.average_idf | |
| for word in negative_idfs: | |
| self.idf[word] = eps | |
| def get_scores(self, query): | |
| """ | |
| The ATIRE BM25 variant uses an idf function which uses a log(idf) score. To prevent negative idf scores, | |
| this algorithm also adds a floor to the idf value of epsilon. | |
| See [Trotman, A., X. Jia, M. Crane, Towards an Efficient and Effective Search Engine] for more info | |
| :param query: | |
| :return: | |
| """ | |
| score = np.zeros(self.corpus_size) | |
| doc_len = np.array(self.doc_len) | |
| for q in query: | |
| q_freq = np.array([(doc.get(q) or 0) for doc in self.doc_freqs]) | |
| score += (self.idf.get(q) or 0) * (q_freq * (self.k1 + 1) / | |
| (q_freq + self.k1 * (1 - self.b + self.b * doc_len / self.avgdl))) | |
| return score | |
| def get_batch_scores(self, query, doc_ids): | |
| """ | |
| Calculate bm25 scores between query and subset of all docs | |
| """ | |
| assert all(di < len(self.doc_freqs) for di in doc_ids) | |
| score = np.zeros(len(doc_ids)) | |
| doc_len = np.array(self.doc_len)[doc_ids] | |
| for q in query: | |
| q_freq = np.array([(self.doc_freqs[di].get(q) or 0) for di in doc_ids]) | |
| score += (self.idf.get(q) or 0) * (q_freq * (self.k1 + 1) / | |
| (q_freq + self.k1 * (1 - self.b + self.b * doc_len / self.avgdl))) | |
| return score.tolist() | |
| def preprocess_text(text: str) -> str: | |
| text = re.sub(r"['\",\.\?:\-!]", "", text) | |
| text = text.strip() | |
| text = " ".join(text.split()) | |
| text = text.lower() | |
| return text | |
| def evidence_top_n(context, query): | |
| sentences = split_text(context) | |
| tokenized_sentences = [str(doc).split(" ") for doc in sentences] | |
| bm25 = BM25Okapi(tokenized_sentences) | |
| tokenized_query = query.split(" ") | |
| top_docs, top_scores = bm25.get_top_n(tokenized_query, sentences, n=5) | |
| return top_docs, top_scores | |
| def mean_pooling(model_output, attention_mask): | |
| token_embeddings = model_output[0] #First element of model_output contains all token embeddings | |
| input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() | |
| return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9) | |
| def similarities(context: list, text: str, weight: list): | |
| sentences = [text] + context | |
| encoded_input = tokenizer_sbert(sentences, padding=True, truncation=True, return_tensors='pt') | |
| encoded_input = {key: value.to('cuda') for key, value in encoded_input.items()} | |
| # Compute token embeddings | |
| with torch.no_grad(): | |
| model_output = model_sbert(**encoded_input) | |
| # Perform pooling. In this case, mean pooling. | |
| sentence_embeddings = mean_pooling(model_output, encoded_input['attention_mask']) | |
| similarities = [] | |
| claim_embeddings = sentence_embeddings[0].unsqueeze(0) | |
| for i in (range(1, len(sentence_embeddings))): | |
| evidence_embeddings = sentence_embeddings[i].unsqueeze(0) | |
| cosine = nn.CosineSimilarity(dim=1, eps=1e-6) | |
| similarity = cosine(claim_embeddings.to(device), evidence_embeddings.to(device)).item() | |
| # scaled_similarity = ((similarity + 1) / 2) * weight[i-1] | |
| similarities.append((sentences[i], similarity)) | |
| simi_values = [s[1] for s in similarities] | |
| scaler = MinMaxScaler() | |
| scaled_simi_values = scaler.fit_transform(np.array(simi_values).reshape(-1, 1)).flatten() | |
| similarities = [(sentences[i+1], scaled_value * weight[i]) for i, scaled_value in enumerate(scaled_simi_values)] | |
| similarities.sort(key=lambda x: x[1], reverse=True) | |
| top_k = [item[0] for item in similarities[:1]] | |
| simi = [item[1] for item in similarities[:1]] | |
| top_5 = [item[0] for item in similarities[:5]] | |
| return top_k, simi, top_5 | |
| def clean_quotes(sentence): | |
| # Replace characters within quotes | |
| return re.sub(r'"([^"]*)"', lambda m: m.group(0).replace('!', '').replace(',', '').replace('?', ''), sentence) | |
| def remove_brackets(text): | |
| return re.sub(r'\([^)]*\)', lambda m: m.group(0).replace('...', '').replace('.', ''), text) | |
| def split_text(content): | |
| # Split the text by "\n\n" | |
| paragraphs = content.split('\n\n') | |
| # Split each paragraph into sentences | |
| sentences = [] | |
| for paragraph in paragraphs: | |
| paragraph = paragraph.replace('...)', ')') | |
| paragraph = paragraph.replace('... ,', ',') | |
| paragraph = re.sub(r'\.\.\.(?=\")', '', paragraph) | |
| paragraph = paragraph.replace('\n', ' ') # Remove internal line breaks | |
| paragraph = clean_quotes(paragraph) | |
| paragraph = re.sub(r'\.(\s[a-z])', lambda match: match.group(1).upper(), paragraph) | |
| paragraph = paragraph.replace(' .', '.') # Remove space before period | |
| paragraph = re.sub(r'\?(?=\s+[a-z])', ' ', paragraph) | |
| paragraph = re.sub(r'\.\.\.(?=\,)', '', paragraph) | |
| paragraph = re.sub(r'\.\.\.(?=\s+[a-z])', ' ', paragraph) | |
| paragraph = paragraph.replace('...', '. ') # Replace "..." with ". " | |
| paragraph = paragraph.replace('..', '. ') # Replace ".." with ". " | |
| paragraph = paragraph.replace('. ', ' . ') # Add space after period | |
| paragraph = paragraph.replace(' ', ' ') # Remove extra spaces | |
| paragraph = paragraph.strip() # Strip leading/trailing spaces | |
| # Tokenize the paragraph into sentences using NLTK | |
| paragraph_sentences = nltk.sent_tokenize(paragraph) | |
| sentences.extend(paragraph_sentences) | |
| return sentences | |
| class SentencePairDataset(Dataset): | |
| def __init__(self, sentence_pairs, labels, tokenizer, max_length): | |
| self.sentence_pairs = sentence_pairs | |
| self.labels = labels | |
| self.tokenizer = tokenizer | |
| self.max_length = max_length | |
| def __len__(self): | |
| return len(self.sentence_pairs) | |
| def __getitem__(self, idx): | |
| sentence1, sentence2 = self.sentence_pairs[idx] | |
| label = self.labels[idx] | |
| encoding = self.tokenizer.encode_plus( | |
| sentence1, | |
| text_pair=sentence2, | |
| add_special_tokens=True, | |
| max_length=self.max_length, | |
| return_token_type_ids=False, | |
| padding="max_length", | |
| return_attention_mask=True, | |
| return_tensors="pt", | |
| truncation=True, | |
| ) | |
| return { | |
| "input_ids": encoding["input_ids"].flatten(), | |
| "attention_mask": encoding["attention_mask"].flatten(), | |
| "label": torch.tensor(label, dtype=torch.long), | |
| } | |
| class MBERTClassifier(nn.Module): | |
| def __init__(self, mbert, num_classes): | |
| super(MBERTClassifier, self).__init__() | |
| self.mbert = mbert | |
| self.layer_norm = nn.LayerNorm(self.mbert.config.hidden_size) | |
| self.dropout = nn.Dropout(0.2) | |
| self.batch_norm = nn.BatchNorm1d(self.mbert.config.hidden_size) | |
| self.linear = nn.LazyLinear(num_classes) | |
| self.activation = nn.ELU() | |
| def forward(self, input_ids, attention_mask): | |
| _, pooled_output = self.mbert(input_ids=input_ids, attention_mask=attention_mask, return_dict=False) | |
| norm_output = self.layer_norm(pooled_output) | |
| batch_norm_output = self.batch_norm(norm_output) | |
| logits = self.linear(batch_norm_output) | |
| activated_output = self.activation(logits) | |
| dropout_output = self.dropout(activated_output) | |
| return dropout_output | |
| def predict_proba(self, input_ids, attention_mask): | |
| logits = self.forward(input_ids, attention_mask) | |
| probabilities = torch.softmax(logits, dim=-1) | |
| return probabilities |