Spaces:
Running
Running
File size: 15,879 Bytes
11c72a2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 | import json
import os
import numpy as np
from collections import OrderedDict
import tempfile
import gensim.downloader
from tqdm import tqdm
from backend.datasets.utils.logger import Logger
import scipy.sparse
from gensim.models.phrases import Phrases, Phraser
from typing import List, Union
from octis.preprocessing.preprocessing import Preprocessing
logger = Logger("WARNING")
class Preprocessor:
def __init__(self,
docs_jsonl_path: str,
output_folder: str,
use_partition: bool = False,
use_bigrams: bool = False,
min_count_bigram: int = 5,
threshold_bigram: int = 10,
remove_punctuation: bool = True,
lemmatize: bool = True,
stopword_list: Union[str, List[str]] = None,
min_chars: int = 3,
min_words_docs: int = 10,
min_df: Union[int, float] = 0.0,
max_df: Union[int, float] = 1.0,
max_features: int = None,
language: str = 'english'):
self.docs_jsonl_path = docs_jsonl_path
self.output_folder = output_folder
self.use_partition = use_partition
self.use_bigrams = use_bigrams
self.min_count_bigram = min_count_bigram
self.threshold_bigram = threshold_bigram
os.makedirs(self.output_folder, exist_ok=True)
self.preprocessing_params = {
'remove_punctuation': remove_punctuation,
'lemmatize': lemmatize,
'stopword_list': stopword_list,
'min_chars': min_chars,
'min_words_docs': min_words_docs,
'min_df': min_df,
'max_df': max_df,
'max_features': max_features,
'language': language
}
self.preprocessor_octis = Preprocessing(**self.preprocessing_params)
def _load_data_to_temp_files(self):
"""Loads data from JSONL and writes to temporary files for OCTIS preprocessor."""
raw_texts = []
raw_timestamps = []
raw_labels = []
has_labels = False
with open(self.docs_jsonl_path, 'r', encoding='utf-8') as f:
for line in f:
data = json.loads(line.strip())
# Remove newlines from text
clean_text = data.get('text', '').replace('\n', ' ').replace('\r', ' ')
clean_text = " ".join(clean_text.split())
raw_texts.append(clean_text)
raw_timestamps.append(data.get('timestamp', ''))
label = data.get('label', '')
if label:
has_labels = True
raw_labels.append(label)
# Create temporary files
temp_dir = tempfile.mkdtemp()
temp_docs_path = os.path.join(temp_dir, "temp_docs.txt")
temp_labels_path = None
with open(temp_docs_path, 'w', encoding='utf-8') as f_docs:
for text in raw_texts:
f_docs.write(f"{text}\n")
if has_labels:
temp_labels_path = os.path.join(temp_dir, "temp_labels.txt")
with open(temp_labels_path, 'w', encoding='utf-8') as f_labels:
for label in raw_labels:
f_labels.write(f"{label}\n")
print(f"Loaded {len(raw_texts)} raw documents and created temporary files in {temp_dir}.")
return raw_texts, raw_timestamps, raw_labels, temp_docs_path, temp_labels_path, temp_dir
def _make_word_embeddings(self, vocab):
"""
Generates word embeddings for the given vocabulary using GloVe.
For n-grams (e.g., "wordA_wordB", "wordX_wordY_wordZ" for n>=2),
the resultant embedding is the sum of the embeddings of its constituent
single words (wordA + wordB + ...).
"""
print("Loading GloVe word embeddings...")
glove_vectors = gensim.downloader.load('glove-wiki-gigaword-200')
# Initialize word_embeddings matrix with zeros.
# This ensures that words not found (single or n-gram constituents)
# will have a zero vector embedding.
word_embeddings = np.zeros((len(vocab), glove_vectors.vectors.shape[1]), dtype=np.float32)
num_found = 0
try:
# Using a set for key_word_list for O(1) average time complexity lookup
key_word_list = set(glove_vectors.index_to_key)
except AttributeError: # For older gensim versions
key_word_list = set(glove_vectors.index2word)
print("Generating word embeddings for vocabulary (including n-grams)...")
for i, word in enumerate(tqdm(vocab, desc="Processing vocabulary words")):
if '_' in word: # Check if it's a potential n-gram (n >= 2)
parts = word.split('_')
# Check if *all* constituent words are present in GloVe
all_parts_in_glove = True
for part in parts:
if part not in key_word_list:
all_parts_in_glove = False
break # One part not found, stop checking
if all_parts_in_glove:
# If all parts are found, sum their embeddings
resultant_vector = np.zeros(glove_vectors.vectors.shape[1], dtype=np.float32)
for part in parts:
resultant_vector += glove_vectors[part]
word_embeddings[i] = resultant_vector
num_found += 1
# Else: one or more constituent words not found, embedding remains zero
else: # It's a single word (n=1)
if word in key_word_list:
word_embeddings[i] = glove_vectors[word]
num_found += 1
# Else: single word not found, embedding remains zero
logger.info(f'Number of found embeddings (including n-grams): {num_found}/{len(vocab)}')
return word_embeddings # Return as dense NumPy array
def _save_doc_length_stats(self, filepath: str, output_path: str):
doc_lengths = []
try:
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
doc = line.strip()
if doc:
doc_lengths.append(len(doc))
except Exception as e:
print(f"Error processing '{filepath}': {e}")
return
if not doc_lengths:
print(f"No documents found in '{filepath}'.")
return
stats = {
"avg_len": float(np.mean(doc_lengths)),
"std_len": float(np.std(doc_lengths)),
"max_len": int(np.max(doc_lengths)),
"min_len": int(np.min(doc_lengths)),
"num_docs": int(len(doc_lengths))
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=4)
print(f"Saved document length stats to: {output_path}")
def preprocess(self):
print("Loading data and creating temporary files for OCTIS...")
_, raw_timestamps, _, temp_docs_path, temp_labels_path, temp_dir = \
self._load_data_to_temp_files()
print("Starting OCTIS pre-processing using file paths and specified parameters...")
octis_dataset = self.preprocessor_octis.preprocess_dataset(
documents_path=temp_docs_path,
labels_path=temp_labels_path
)
# Clean up temporary files immediately
os.remove(temp_docs_path)
if temp_labels_path:
os.remove(temp_labels_path)
os.rmdir(temp_dir)
print(f"Temporary files in {temp_dir} cleaned up.")
# --- Proxy: Save __original_indexes and then manually load it ---
temp_indexes_dir = tempfile.mkdtemp()
temp_indexes_file = os.path.join(temp_indexes_dir, "temp_original_indexes.txt")
print(f"Saving __original_indexes to {temp_indexes_file}...")
octis_dataset._save_document_indexes(temp_indexes_file)
# Manually load the indexes from the file
original_indexes_after_octis = []
with open(temp_indexes_file, 'r') as f_indexes:
for line in f_indexes:
original_indexes_after_octis.append(int(line.strip())) # Read as int
# Clean up the temporary indexes file and its directory
os.remove(temp_indexes_file)
os.rmdir(temp_indexes_dir)
print("Temporary indexes file cleaned up.")
# --- End Proxy ---
# Get processed data from OCTIS Dataset object
processed_corpus_octis_list = octis_dataset.get_corpus() # List of list of tokens
processed_labels_octis = octis_dataset.get_labels() # List of labels
print("Max index in original_indexes_after_octis:", max(original_indexes_after_octis))
print("Length of raw_timestamps:", len(raw_timestamps))
# Filter timestamps based on documents that survived OCTIS preprocessing
filtered_timestamps = [raw_timestamps[i] for i in original_indexes_after_octis]
print(f"OCTIS preprocessing complete. {len(processed_corpus_octis_list)} documents remaining.")
if self.use_bigrams:
print("Generating bigrams with Gensim...")
phrases = Phrases(processed_corpus_octis_list, min_count=self.min_count_bigram, threshold=self.threshold_bigram)
bigram_phraser = Phraser(phrases)
bigrammed_corpus_list = [bigram_phraser[doc] for doc in processed_corpus_octis_list]
print("Bigram generation complete.")
else:
print("Skipping bigram generation as 'use_bigrams' is False.")
bigrammed_corpus_list = processed_corpus_octis_list # Use the original processed list
# Convert back to list of strings for easier handling if needed later, but keep as list of lists for BOW
bigrammed_texts_for_file = [" ".join(doc) for doc in bigrammed_corpus_list]
print("Bigram generation complete.")
# Build Vocabulary from OCTIS output (after bigrams)
# We need a flat list of all tokens to build the vocabulary
all_tokens = [token for doc in bigrammed_corpus_list for token in doc]
vocab = sorted(list(set(all_tokens))) # Sorted unique words form the vocabulary
word_to_id = {word: i for i, word in enumerate(vocab)}
# Create BOW matrix manually
print("Creating Bag-of-Words representations...")
rows, cols, data = [], [], []
for i, doc_tokens in enumerate(bigrammed_corpus_list):
doc_word_counts = {}
for token in doc_tokens:
if token in word_to_id: # Ensure token is in our final vocab
doc_word_counts[word_to_id[token]] = doc_word_counts.get(word_to_id[token], 0) + 1
for col_id, count in doc_word_counts.items():
rows.append(i)
cols.append(col_id)
data.append(count)
# Shape is (num_documents, vocab_size)
bow_matrix = scipy.sparse.csc_matrix((data, (rows, cols)), shape=(len(bigrammed_corpus_list), len(vocab)))
print("Bag-of-Words complete.")
# Handle partitioning if required
if self.use_partition:
num_docs = len(bigrammed_corpus_list)
train_size = int(0.8 * num_docs)
train_texts = bigrammed_texts_for_file[:train_size]
train_bow_matrix = bow_matrix[:train_size]
train_timestamps = filtered_timestamps[:train_size]
train_labels = processed_labels_octis[:train_size] if processed_labels_octis else []
test_texts = bigrammed_texts_for_file[train_size:]
test_bow_matrix = bow_matrix[train_size:]
test_timestamps = filtered_timestamps[train_size:]
test_labels = processed_labels_octis[train_size:] if processed_labels_octis else []
else:
train_texts = bigrammed_texts_for_file
train_bow_matrix = bow_matrix
train_timestamps = filtered_timestamps
train_labels = processed_labels_octis
test_texts = []
test_timestamps = []
test_labels = []
# Generate word embeddings using the provided function
word_embeddings = self._make_word_embeddings(vocab)
# Process timestamps to 0, 1, 2...T and create time2id.txt
print("Processing timestamps...")
unique_timestamps = sorted(list(set(train_timestamps + test_timestamps)))
time_to_id = {timestamp: i for i, timestamp in enumerate(unique_timestamps)}
train_times_ids = [time_to_id[ts] for ts in train_timestamps]
test_times_ids = [time_to_id[ts] for ts in test_timestamps] if self.use_partition else []
print("Timestamps processed.")
# Save files
print(f"Saving preprocessed files to {self.output_folder}...")
# 1. vocab.txt
with open(os.path.join(self.output_folder, "vocab.txt"), "w", encoding="utf-8") as f:
for word in vocab:
f.write(f"{word}\n")
# 2. train_texts.txt
train_text_path = os.path.join(self.output_folder, "train_texts.txt")
with open(train_text_path, "w", encoding="utf-8") as f:
for doc in train_texts:
f.write(f"{doc}\n")
# Save document length stats
doc_stats_path = os.path.join(self.output_folder, "length_stats.json")
self._save_doc_length_stats(train_text_path, doc_stats_path)
# 3. train_bow.npz
scipy.sparse.save_npz(os.path.join(self.output_folder, "train_bow.npz"), train_bow_matrix)
# 4. word_embeddings.npz
sparse_word_embeddings = scipy.sparse.csr_matrix(word_embeddings)
scipy.sparse.save_npz(os.path.join(self.output_folder, "word_embeddings.npz"), sparse_word_embeddings)
# 5. train_labels.txt (if labels exist)
if train_labels:
with open(os.path.join(self.output_folder, "train_labels.txt"), "w", encoding="utf-8") as f:
for label in train_labels:
f.write(f"{label}\n")
# 6. train_times.txt
with open(os.path.join(self.output_folder, "train_times.txt"), "w", encoding="utf-8") as f:
for time_id in train_times_ids:
f.write(f"{time_id}\n")
# Files for test set (if use_partition=True)
if self.use_partition:
# 7. test_bow.npz
scipy.sparse.save_npz(os.path.join(self.output_folder, "test_bow.npz"), test_bow_matrix)
# 8. test_texts.txt
with open(os.path.join(self.output_folder, "test_texts.txt"), "w", encoding="utf-8") as f:
for doc in test_texts:
f.write(f"{doc}\n")
# 9. test_labels.txt (if labels exist)
if test_labels:
with open(os.path.join(self.output_folder, "test_labels.txt"), "w", encoding="utf-8") as f:
for label in test_labels:
f.write(f"{label}\n")
# 10. test_times.txt
with open(os.path.join(self.output_folder, "test_times.txt"), "w", encoding="utf-8") as f:
for time_id in test_times_ids:
f.write(f"{time_id}\n")
# 11. time2id.txt
sorted_time_to_id = OrderedDict(sorted(time_to_id.items(), key=lambda item: item[1]))
with open(os.path.join(self.output_folder, "time2id.txt"), "w", encoding="utf-8") as f:
json.dump(sorted_time_to_id, f, indent=4)
print("All files saved successfully.") |