import os import re import string import gensim.downloader from collections import Counter import numpy as np import scipy.sparse from tqdm import tqdm from sklearn.feature_extraction.text import CountVectorizer from backend.datasets.data import file_utils from backend.datasets.utils._utils import get_stopwords_set from backend.datasets.utils.logger import Logger import json import nltk from nltk.stem import WordNetLemmatizer logger = Logger("WARNING") try: nltk.data.find('corpora/wordnet') except LookupError: nltk.download('wordnet', quiet=True) try: nltk.data.find('corpora/omw-1.4') except LookupError: nltk.download('omw-1.4', quiet=True) # compile some regexes punct_chars = list(set(string.punctuation) - set("'")) punct_chars.sort() punctuation = ''.join(punct_chars) replace = re.compile('[%s]' % re.escape(punctuation)) alpha = re.compile('^[a-zA-Z_]+$') alpha_or_num = re.compile('^[a-zA-Z_]+|[0-9_]+$') alphanum = re.compile('^[a-zA-Z0-9_]+$') class Tokenizer: def __init__(self, stopwords="English", keep_num=False, keep_alphanum=False, strip_html=False, no_lower=False, min_length=3, lemmatize=True, ): self.keep_num = keep_num self.keep_alphanum = keep_alphanum self.strip_html = strip_html self.lower = not no_lower self.min_length = min_length self.stopword_set = get_stopwords_set(stopwords) self.lemmatize = lemmatize if lemmatize: self.lemmatizer = WordNetLemmatizer() def clean_text(self, text, strip_html=False, lower=True, keep_emails=False, keep_at_mentions=False): # remove html tags if strip_html: text = re.sub(r'<[^>]+>', '', text) else: # replace angle brackets text = re.sub(r'<', '(', text) text = re.sub(r'>', ')', text) # lower case if lower: text = text.lower() # eliminate email addresses if not keep_emails: text = re.sub(r'\S+@\S+', ' ', text) # eliminate @mentions if not keep_at_mentions: text = re.sub(r'\s@\S+', ' ', text) # replace underscores with spaces text = re.sub(r'_', ' ', text) # break off single quotes at the ends of words text = re.sub(r'\s\'', ' ', text) text = re.sub(r'\'\s', ' ', text) # remove periods text = re.sub(r'\.', '', text) # replace all other punctuation (except single quotes) with spaces text = replace.sub(' ', text) # remove single quotes text = re.sub(r'\'', '', text) # replace all whitespace with a single space text = re.sub(r'\s', ' ', text) # strip off spaces on either end text = text.strip() return text def tokenize(self, text): text = self.clean_text(text, self.strip_html, self.lower) tokens = text.split() tokens = ['_' if t in self.stopword_set else t for t in tokens] # remove tokens that contain numbers if not self.keep_alphanum and not self.keep_num: tokens = [t if alpha.match(t) else '_' for t in tokens] # or just remove tokens that contain a combination of letters and numbers elif not self.keep_alphanum: tokens = [t if alpha_or_num.match(t) else '_' for t in tokens] # drop short tokens if self.min_length > 0: tokens = [t if len(t) >= self.min_length else '_' for t in tokens] if getattr(self, "lemmatize", False): tokens = [self.lemmatizer.lemmatize(t) if t != '_' else t for t in tokens] unigrams = [t for t in tokens if t != '_'] return unigrams def make_word_embeddings(vocab): glove_vectors = gensim.downloader.load('glove-wiki-gigaword-200') word_embeddings = np.zeros((len(vocab), glove_vectors.vectors.shape[1])) num_found = 0 try: key_word_list = glove_vectors.index_to_key except: key_word_list = glove_vectors.index2word for i, word in enumerate(tqdm(vocab, desc="loading word embeddings")): if word in key_word_list: word_embeddings[i] = glove_vectors[word] num_found += 1 logger.info(f'number of found embeddings: {num_found}/{len(vocab)}') return scipy.sparse.csr_matrix(word_embeddings) class Preprocess: def __init__(self, tokenizer=None, test_sample_size=None, test_p=0.2, stopwords="English", min_doc_count=0, max_doc_freq=1.0, keep_num=False, keep_alphanum=False, strip_html=False, no_lower=False, min_length=3, min_term=0, vocab_size=None, seed=42, verbose=True, lemmatize=True, ): """ Args: test_sample_size: Size of the test set. test_p: Proportion of the test set. This helps sample the train set based on the size of the test set. stopwords: List of stopwords to exclude. min-doc-count: Exclude words that occur in less than this number of documents. max_doc_freq: Exclude words that occur in more than this proportion of documents. keep-num: Keep tokens made of only numbers. keep-alphanum: Keep tokens made of a mixture of letters and numbers. strip_html: Strip HTML tags. no-lower: Do not lowercase text min_length: Minimum token length. min_term: Minimum term number vocab-size: Size of the vocabulary (by most common in the union of train and test sets, following above exclusions) seed: Random integer seed (only relevant for choosing test set) lemmatize: Whether to apply lemmatization to the tokens. """ self.test_sample_size = test_sample_size self.min_doc_count = min_doc_count self.max_doc_freq = max_doc_freq self.min_term = min_term self.test_p = test_p self.vocab_size = vocab_size self.seed = seed if tokenizer is not None: self.tokenizer = tokenizer else: self.tokenizer = Tokenizer( stopwords, keep_num, keep_alphanum, strip_html, no_lower, min_length, lemmatize=lemmatize ).tokenize if verbose: logger.set_level("DEBUG") else: logger.set_level("WARNING") def parse(self, texts, vocab): if not isinstance(texts, list): texts = [texts] vocab_set = set(vocab) parsed_texts = list() for i, text in enumerate(tqdm(texts, desc="parsing texts")): tokens = self.tokenizer(text) tokens = [t for t in tokens if t in vocab_set] parsed_texts.append(" ".join(tokens)) vectorizer = CountVectorizer(vocabulary=vocab, tokenizer=lambda x: x.split()) sparse_bow = vectorizer.fit_transform(parsed_texts) return parsed_texts, sparse_bow def preprocess_jsonlist(self, dataset_dir, label_name=None, use_partition=True): if use_partition: train_items = file_utils.read_jsonlist(os.path.join(dataset_dir, 'train.jsonlist')) test_items = file_utils.read_jsonlist(os.path.join(dataset_dir, 'test.jsonlist')) else: raw_path = os.path.join(dataset_dir, 'docs.jsonl') with open(raw_path, 'r', encoding='utf-8') as f: train_items = [json.loads(line.strip()) for line in f if line.strip()] test_items = [] logger.info(f"Found training documents {len(train_items)} testing documents {len(test_items)}") # Initialize containers raw_train_texts, train_labels, raw_train_times = [], [], [] raw_test_texts, test_labels, raw_test_times = [], [], [] # Process train items for item in train_items: raw_train_texts.append(item['text']) raw_train_times.append(str(item['timestamp'])) if label_name and label_name in item: train_labels.append(item[label_name]) # Process test items for item in test_items: raw_test_texts.append(item['text']) raw_test_times.append(str(item['timestamp'])) if label_name and label_name in item: test_labels.append(item[label_name]) # Create and apply time2id mapping all_times = sorted(set(raw_train_times + raw_test_times)) time2id = {t: i for i, t in enumerate(all_times)} train_times = np.array([time2id[t] for t in raw_train_times], dtype=np.int32) test_times = np.array([time2id[t] for t in raw_test_times], dtype=np.int32) if raw_test_times else None # Preprocess and get sample indices rst = self.preprocess(raw_train_texts, train_labels, raw_test_texts, test_labels) train_idx = rst.get("train_idx") test_idx = rst.get("test_idx") # Add filtered timestamps to result for saving later rst["train_times"] = train_times[train_idx] if test_times is not None and test_idx is not None: rst["test_times"] = test_times[test_idx] # Add time2id to result dict rst["time2id"] = time2id return rst def convert_labels(self, train_labels, test_labels): if train_labels: label_list = list(set(train_labels).union(set(test_labels))) label_list.sort() n_labels = len(label_list) label2id = dict(zip(label_list, range(n_labels))) logger.info(f"label2id: {label2id}") train_labels = [label2id[label] for label in train_labels] if test_labels: test_labels = [label2id[label] for label in test_labels] return train_labels, test_labels def preprocess( self, raw_train_texts, train_labels=None, raw_test_texts=None, test_labels=None, pretrained_WE=True ): np.random.seed(self.seed) train_texts = list() test_texts = list() word_counts = Counter() doc_counts_counter = Counter() train_labels, test_labels = self.convert_labels(train_labels, test_labels) for text in tqdm(raw_train_texts, desc="loading train texts"): tokens = self.tokenizer(text) word_counts.update(tokens) doc_counts_counter.update(set(tokens)) parsed_text = ' '.join(tokens) train_texts.append(parsed_text) if raw_test_texts: for text in tqdm(raw_test_texts, desc="loading test texts"): tokens = self.tokenizer(text) word_counts.update(tokens) doc_counts_counter.update(set(tokens)) parsed_text = ' '.join(tokens) test_texts.append(parsed_text) words, doc_counts = zip(*doc_counts_counter.most_common()) doc_freqs = np.array(doc_counts) / float(len(train_texts) + len(test_texts)) vocab = [word for i, word in enumerate(words) if doc_counts[i] >= self.min_doc_count and doc_freqs[i] <= self.max_doc_freq] # filter vocabulary if self.vocab_size is not None: vocab = vocab[:self.vocab_size] vocab.sort() train_idx = [i for i, text in enumerate(train_texts) if len(text.split()) >= self.min_term] train_idx = np.asarray(train_idx) if raw_test_texts is not None: test_idx = [i for i, text in enumerate(test_texts) if len(text.split()) >= self.min_term] test_idx = np.asarray(test_idx) else: test_idx = None # randomly sample if self.test_sample_size and raw_test_texts is not None: logger.info("sample train and test sets...") train_num = len(train_idx) test_num = len(test_idx) test_sample_size = min(test_num, self.test_sample_size) train_sample_size = int((test_sample_size / self.test_p) * (1 - self.test_p)) if train_sample_size > train_num: test_sample_size = int((train_num / (1 - self.test_p)) * self.test_p) train_sample_size = train_num train_idx = train_idx[np.sort(np.random.choice(train_num, train_sample_size, replace=False))] test_idx = test_idx[np.sort(np.random.choice(test_num, test_sample_size, replace=False))] logger.info(f"sampled train size: {len(train_idx)}") logger.info(f"sampled test size: {len(test_idx)}") train_texts, train_bow = self.parse([train_texts[i] for i in train_idx], vocab) rst = { 'vocab': vocab, 'train_bow': train_bow, "train_texts": train_texts, "train_idx": train_idx, # <--- NEW: indices of kept train samples } if train_labels: rst['train_labels'] = np.asarray(train_labels)[train_idx] logger.info(f"Real vocab size: {len(vocab)}") logger.info(f"Real training size: {len(train_texts)} \t avg length: {rst['train_bow'].sum() / len(train_texts):.3f}") if raw_test_texts: rst['test_texts'], rst['test_bow'] = self.parse(np.asarray(test_texts)[test_idx].tolist(), vocab) rst["test_idx"] = test_idx # <--- NEW: indices of kept test samples if test_labels: rst['test_labels'] = np.asarray(test_labels)[test_idx] logger.info(f"Real testing size: {len(rst['test_texts'])} \t avg length: {rst['test_bow'].sum() / len(rst['test_texts']):.3f}") if pretrained_WE: rst['word_embeddings'] = make_word_embeddings(vocab) return rst def save( self, output_dir, vocab, train_texts, train_bow, word_embeddings=None, train_labels=None, test_texts=None, test_bow=None, test_labels=None, train_times=None, test_times=None, time2id=None # <-- new parameter ): file_utils.make_dir(output_dir) file_utils.save_text(vocab, f"{output_dir}/vocab.txt") file_utils.save_text(train_texts, f"{output_dir}/train_texts.txt") scipy.sparse.save_npz(f"{output_dir}/train_bow.npz", scipy.sparse.csr_matrix(train_bow)) if word_embeddings is not None: scipy.sparse.save_npz(f"{output_dir}/word_embeddings.npz", word_embeddings) if train_labels: np.savetxt(f"{output_dir}/train_labels.txt", train_labels, fmt='%i') if train_times is not None: np.savetxt(f"{output_dir}/train_times.txt", train_times, fmt='%i') if test_bow is not None: scipy.sparse.save_npz(f"{output_dir}/test_bow.npz", scipy.sparse.csr_matrix(test_bow)) if test_texts is not None: file_utils.save_text(test_texts, f"{output_dir}/test_texts.txt") if test_labels: np.savetxt(f"{output_dir}/test_labels.txt", test_labels, fmt='%i') if test_times is not None: np.savetxt(f"{output_dir}/test_times.txt", test_times, fmt='%i') # Save time2id mapping if provided if time2id is not None: with open(f"{output_dir}/time2id.txt", "w", encoding="utf-8") as f: json.dump(time2id, f, indent=2)