| |
| from __future__ import print_function, division |
|
|
| import glob |
| import json |
| import uuid |
| from copy import deepcopy |
| from collections import defaultdict, OrderedDict |
| import numpy as np |
|
|
| from torchmoji.filter_utils import is_special_token |
| from torchmoji.word_generator import WordGenerator |
| from torchmoji.global_variables import SPECIAL_TOKENS, VOCAB_PATH |
|
|
| class VocabBuilder(): |
| """ Create vocabulary with words extracted from sentences as fed from a |
| word generator. |
| """ |
| def __init__(self, word_gen): |
| |
| self.word_counts = defaultdict(lambda: 0, {}) |
| self.word_length_limit=30 |
|
|
| for token in SPECIAL_TOKENS: |
| assert len(token) < self.word_length_limit |
| self.word_counts[token] = 0 |
| self.word_gen = word_gen |
|
|
| def count_words_in_sentence(self, words): |
| """ Generates word counts for all tokens in the given sentence. |
| |
| # Arguments: |
| words: Tokenized sentence whose words should be counted. |
| """ |
| for word in words: |
| if 0 < len(word) and len(word) <= self.word_length_limit: |
| try: |
| self.word_counts[word] += 1 |
| except KeyError: |
| self.word_counts[word] = 1 |
|
|
| def save_vocab(self, path=None): |
| """ Saves the vocabulary into a file. |
| |
| # Arguments: |
| path: Where the vocabulary should be saved. If not specified, a |
| randomly generated filename is used instead. |
| """ |
| dtype = ([('word','|S{}'.format(self.word_length_limit)),('count','int')]) |
| np_dict = np.array(self.word_counts.items(), dtype=dtype) |
|
|
| |
| np_dict[::-1].sort(order='count') |
| data = np_dict |
|
|
| if path is None: |
| path = str(uuid.uuid4()) |
|
|
| np.savez_compressed(path, data=data) |
| print("Saved dict to {}".format(path)) |
|
|
| def get_next_word(self): |
| """ Returns next tokenized sentence from the word geneerator. |
| |
| # Returns: |
| List of strings, representing the next tokenized sentence. |
| """ |
| return self.word_gen.__iter__().next() |
|
|
| def count_all_words(self): |
| """ Generates word counts for all words in all sentences of the word |
| generator. |
| """ |
| for words, _ in self.word_gen: |
| self.count_words_in_sentence(words) |
|
|
| class MasterVocab(): |
| """ Combines vocabularies. |
| """ |
| def __init__(self): |
|
|
| |
| self.master_vocab = {} |
|
|
| def populate_master_vocab(self, vocab_path, min_words=1, force_appearance=None): |
| """ Populates the master vocabulary using all vocabularies found in the |
| given path. Vocabularies should be named *.npz. Expects the |
| vocabularies to be numpy arrays with counts. Normalizes the counts |
| and combines them. |
| |
| # Arguments: |
| vocab_path: Path containing vocabularies to be combined. |
| min_words: Minimum amount of occurences a word must have in order |
| to be included in the master vocabulary. |
| force_appearance: Optional vocabulary filename that will be added |
| to the master vocabulary no matter what. This vocabulary must |
| be present in vocab_path. |
| """ |
|
|
| paths = glob.glob(vocab_path + '*.npz') |
| sizes = {path: 0 for path in paths} |
| dicts = {path: {} for path in paths} |
|
|
| |
| for path in paths: |
| np_data = np.load(path)['data'] |
|
|
| for entry in np_data: |
| word, count = entry |
| if count < min_words: |
| continue |
| if is_special_token(word): |
| continue |
| dicts[path][word] = count |
|
|
| sizes[path] = sum(dicts[path].values()) |
| print('Overall word count for {} -> {}'.format(path, sizes[path])) |
| print('Overall word number for {} -> {}'.format(path, len(dicts[path]))) |
|
|
| vocab_of_max_size = max(sizes, key=sizes.get) |
| max_size = sizes[vocab_of_max_size] |
| print('Min: {}, {}, {}'.format(sizes, vocab_of_max_size, max_size)) |
|
|
| |
| if force_appearance is not None: |
| force_appearance_path = [p for p in paths if force_appearance in p][0] |
| force_appearance_vocab = deepcopy(dicts[force_appearance_path]) |
| print(force_appearance_path) |
| else: |
| force_appearance_path, force_appearance_vocab = None, None |
|
|
| |
| for path in paths: |
| normalization_factor = max_size / sizes[path] |
| print('Norm factor for path {} -> {}'.format(path, normalization_factor)) |
|
|
| for word in dicts[path]: |
| if is_special_token(word): |
| print("SPECIAL - ", word) |
| continue |
| normalized_count = dicts[path][word] * normalization_factor |
|
|
| |
| if force_appearance_vocab is not None: |
| try: |
| force_word_count = force_appearance_vocab[word] |
| except KeyError: |
| continue |
| |
| |
|
|
| if word in self.master_vocab: |
| self.master_vocab[word] += normalized_count |
| else: |
| self.master_vocab[word] = normalized_count |
|
|
| print('Size of master_dict {}'.format(len(self.master_vocab))) |
| print("Hashes for master dict: {}".format( |
| len([w for w in self.master_vocab if '#' in w[0]]))) |
|
|
| def save_vocab(self, path_count, path_vocab, word_limit=100000): |
| """ Saves the master vocabulary into a file. |
| """ |
|
|
| |
| words = OrderedDict() |
| for token in SPECIAL_TOKENS: |
| |
| words[token] = -1 |
|
|
| |
| desc_order = OrderedDict(sorted(self.master_vocab.items(), |
| key=lambda kv: kv[1], reverse=True)) |
| words.update(desc_order) |
|
|
| |
| |
| np_vocab = np.array(words.items(), |
| dtype=([('word','|S30'),('count','float')])) |
|
|
| |
| counts = np_vocab[:word_limit] |
| np.savez_compressed(path_count, counts=counts) |
|
|
| |
| final_words = OrderedDict() |
| for i, w in enumerate(words.keys()[:word_limit]): |
| final_words.update({w:i}) |
| with open(path_vocab, 'w') as f: |
| f.write(json.dumps(final_words, indent=4, separators=(',', ': '))) |
|
|
|
|
| def all_words_in_sentences(sentences): |
| """ Extracts all unique words from a given list of sentences. |
| |
| # Arguments: |
| sentences: List or word generator of sentences to be processed. |
| |
| # Returns: |
| List of all unique words contained in the given sentences. |
| """ |
| vocab = [] |
| if isinstance(sentences, WordGenerator): |
| sentences = [s for s, _ in sentences] |
|
|
| for sentence in sentences: |
| for word in sentence: |
| if word not in vocab: |
| vocab.append(word) |
|
|
| return vocab |
|
|
|
|
| def extend_vocab_in_file(vocab, max_tokens=10000, vocab_path=VOCAB_PATH): |
| """ Extends JSON-formatted vocabulary with words from vocab that are not |
| present in the current vocabulary. Adds up to max_tokens words. |
| Overwrites file in vocab_path. |
| |
| # Arguments: |
| new_vocab: Vocabulary to be added. MUST have word_counts populated, i.e. |
| must have run count_all_words() previously. |
| max_tokens: Maximum number of words to be added. |
| vocab_path: Path to the vocabulary json which is to be extended. |
| """ |
| try: |
| with open(vocab_path, 'r') as f: |
| current_vocab = json.load(f) |
| except IOError: |
| print('Vocabulary file not found, expected at ' + vocab_path) |
| return |
|
|
| extend_vocab(current_vocab, vocab, max_tokens) |
|
|
| |
| with open(vocab_path, 'w') as f: |
| json.dump(current_vocab, f, sort_keys=True, indent=4, separators=(',',': ')) |
|
|
|
|
| def extend_vocab(current_vocab, new_vocab, max_tokens=10000): |
| """ Extends current vocabulary with words from vocab that are not |
| present in the current vocabulary. Adds up to max_tokens words. |
| |
| # Arguments: |
| current_vocab: Current dictionary of tokens. |
| new_vocab: Vocabulary to be added. MUST have word_counts populated, i.e. |
| must have run count_all_words() previously. |
| max_tokens: Maximum number of words to be added. |
| |
| # Returns: |
| How many new tokens have been added. |
| """ |
| if max_tokens < 0: |
| max_tokens = 10000 |
|
|
| words = OrderedDict() |
|
|
| |
| desc_order = OrderedDict(sorted(new_vocab.word_counts.items(), |
| key=lambda kv: kv[1], reverse=True)) |
| words.update(desc_order) |
|
|
| base_index = len(current_vocab.keys()) |
| added = 0 |
| for word in words: |
| if added >= max_tokens: |
| break |
| if word not in current_vocab.keys(): |
| current_vocab[word] = base_index + added |
| added += 1 |
|
|
| return added |
|
|