| from transformers import PreTrainedModel |
| from nltk.corpus import stopwords |
| from nltk.tokenize import word_tokenize |
| from .config_gzipembed import * |
| from tqdm.auto import tqdm |
| import torch |
| import gzip |
| import multiprocessing |
| class GZIPEmbeddingModel(PreTrainedModel): |
| config_class = GZIPEmbeddingConfig |
| def __init__(self, config): |
| super().__init__(config) |
| if config.reduction: |
| self.reduction_head = torch.nn.Linear(len(config.corpus), config.reduced_dimension) |
| else: |
| self.reduction_head = None |
| self.dummy_parameter = torch.nn.Parameter(torch.ones(1)) |
|
|
| def forward(self, prompt, num_procs=16, return_tensor=True): |
| global calculate_ncd_row |
| global p |
| def calculate_ncd_row(data_row): |
| i = data_row[0] |
| row = self.ncd(data_row[1], p) |
| return i, row |
| if type(prompt) == str: |
| prompt = [prompt] |
| x = [] |
| for p in prompt: |
| ncd = [0] * len(self.config.corpus) |
| with multiprocessing.Pool(num_procs) as pool: |
| data = enumerate(self.config.corpus) |
| results = pool.map(calculate_ncd_row,data) |
| for i,row in results: |
| ncd[i]=row |
| x.append(ncd) |
| if self.reduction_head is not None: |
| x = torch.tensor(x) |
| x = x.to(self.reduction_head.dtype).to(self.reduction_head.device) |
| return self.reduction_head(x) |
| return x if not return_tensor else torch.tensor(x) |
| |
| def encode(self, sentences, batch_size=32, **kwargs): |
| """ |
| Returns a list of embeddings for the given sentences. |
| Args: |
| sentences (`List[str]`): List of sentences to encode |
| batch_size (`int`): Batch size for the encoding |
| |
| Returns: |
| `List[np.ndarray]` or `List[tensor]`: List of embeddings for the given sentences |
| """ |
| import numpy as np |
| x = self.forward(sentences, num_procs=batch_size, return_tensor=False) |
| |
| return [np.array(i) for i in x] |
|
|
| def normalize(self, x): |
| x = ''.join([char for char in x.lower() if char in "abcdefghijklmnopqrstuvwxyz "]) |
| x = word_tokenize(x) |
| x = [w for w in x if not w in self.config.stop_words] |
| return ' '.join(x) |
|
|
| def ncd(self, x, y): |
| _x = self.normalize(x) if self.config.normalize else x |
| _y = self.normalize(y) if (not self.config.normalized_corpus) and self.config.normalize else y |
| x_c = len(gzip.compress(_x.encode())) |
| y_c = len(gzip.compress(_y.encode())) |
| xy_c = len(gzip.compress(f"{_x} {_y}".encode())) |
| return (xy_c-min(x_c,y_c))/max(x_c,y_c) |
|
|
| def gzip_embed( |
| self, |
| corpus, |
| document, |
| verbose=False, |
| ): |
| embedding = [] |
| for reference_document in (corpus if not verbose else tqdm(corpus)): |
| embedding.append(self.ncd(reference_document, document)) |
| return embedding |
|
|
| def dimensionality(self): |
| return len(self.config.corpus) |
|
|