import json import os import numpy as np from collections import OrderedDict import tempfile import gensim.downloader from tqdm import tqdm from backend.datasets.utils.logger import Logger import scipy.sparse from gensim.models.phrases import Phrases, Phraser from typing import List, Union from octis.preprocessing.preprocessing import Preprocessing logger = Logger("WARNING") class Preprocessor: def __init__(self, docs_jsonl_path: str, output_folder: str, use_partition: bool = False, use_bigrams: bool = False, min_count_bigram: int = 5, threshold_bigram: int = 10, remove_punctuation: bool = True, lemmatize: bool = True, stopword_list: Union[str, List[str]] = None, min_chars: int = 3, min_words_docs: int = 10, min_df: Union[int, float] = 0.0, max_df: Union[int, float] = 1.0, max_features: int = None, language: str = 'english'): self.docs_jsonl_path = docs_jsonl_path self.output_folder = output_folder self.use_partition = use_partition self.use_bigrams = use_bigrams self.min_count_bigram = min_count_bigram self.threshold_bigram = threshold_bigram os.makedirs(self.output_folder, exist_ok=True) self.preprocessing_params = { 'remove_punctuation': remove_punctuation, 'lemmatize': lemmatize, 'stopword_list': stopword_list, 'min_chars': min_chars, 'min_words_docs': min_words_docs, 'min_df': min_df, 'max_df': max_df, 'max_features': max_features, 'language': language } self.preprocessor_octis = Preprocessing(**self.preprocessing_params) def _load_data_to_temp_files(self): """Loads data from JSONL and writes to temporary files for OCTIS preprocessor.""" raw_texts = [] raw_timestamps = [] raw_labels = [] has_labels = False with open(self.docs_jsonl_path, 'r', encoding='utf-8') as f: for line in f: data = json.loads(line.strip()) # Remove newlines from text clean_text = data.get('text', '').replace('\n', ' ').replace('\r', ' ') clean_text = " ".join(clean_text.split()) raw_texts.append(clean_text) raw_timestamps.append(data.get('timestamp', '')) label = data.get('label', '') if label: has_labels = True raw_labels.append(label) # Create temporary files temp_dir = tempfile.mkdtemp() temp_docs_path = os.path.join(temp_dir, "temp_docs.txt") temp_labels_path = None with open(temp_docs_path, 'w', encoding='utf-8') as f_docs: for text in raw_texts: f_docs.write(f"{text}\n") if has_labels: temp_labels_path = os.path.join(temp_dir, "temp_labels.txt") with open(temp_labels_path, 'w', encoding='utf-8') as f_labels: for label in raw_labels: f_labels.write(f"{label}\n") print(f"Loaded {len(raw_texts)} raw documents and created temporary files in {temp_dir}.") return raw_texts, raw_timestamps, raw_labels, temp_docs_path, temp_labels_path, temp_dir def _make_word_embeddings(self, vocab): """ Generates word embeddings for the given vocabulary using GloVe. For n-grams (e.g., "wordA_wordB", "wordX_wordY_wordZ" for n>=2), the resultant embedding is the sum of the embeddings of its constituent single words (wordA + wordB + ...). """ print("Loading GloVe word embeddings...") glove_vectors = gensim.downloader.load('glove-wiki-gigaword-200') # Initialize word_embeddings matrix with zeros. # This ensures that words not found (single or n-gram constituents) # will have a zero vector embedding. word_embeddings = np.zeros((len(vocab), glove_vectors.vectors.shape[1]), dtype=np.float32) num_found = 0 try: # Using a set for key_word_list for O(1) average time complexity lookup key_word_list = set(glove_vectors.index_to_key) except AttributeError: # For older gensim versions key_word_list = set(glove_vectors.index2word) print("Generating word embeddings for vocabulary (including n-grams)...") for i, word in enumerate(tqdm(vocab, desc="Processing vocabulary words")): if '_' in word: # Check if it's a potential n-gram (n >= 2) parts = word.split('_') # Check if *all* constituent words are present in GloVe all_parts_in_glove = True for part in parts: if part not in key_word_list: all_parts_in_glove = False break # One part not found, stop checking if all_parts_in_glove: # If all parts are found, sum their embeddings resultant_vector = np.zeros(glove_vectors.vectors.shape[1], dtype=np.float32) for part in parts: resultant_vector += glove_vectors[part] word_embeddings[i] = resultant_vector num_found += 1 # Else: one or more constituent words not found, embedding remains zero else: # It's a single word (n=1) if word in key_word_list: word_embeddings[i] = glove_vectors[word] num_found += 1 # Else: single word not found, embedding remains zero logger.info(f'Number of found embeddings (including n-grams): {num_found}/{len(vocab)}') return word_embeddings # Return as dense NumPy array def _save_doc_length_stats(self, filepath: str, output_path: str): doc_lengths = [] try: with open(filepath, 'r', encoding='utf-8') as f: for line in f: doc = line.strip() if doc: doc_lengths.append(len(doc)) except Exception as e: print(f"Error processing '{filepath}': {e}") return if not doc_lengths: print(f"No documents found in '{filepath}'.") return stats = { "avg_len": float(np.mean(doc_lengths)), "std_len": float(np.std(doc_lengths)), "max_len": int(np.max(doc_lengths)), "min_len": int(np.min(doc_lengths)), "num_docs": int(len(doc_lengths)) } with open(output_path, 'w', encoding='utf-8') as f: json.dump(stats, f, indent=4) print(f"Saved document length stats to: {output_path}") def preprocess(self): print("Loading data and creating temporary files for OCTIS...") _, raw_timestamps, _, temp_docs_path, temp_labels_path, temp_dir = \ self._load_data_to_temp_files() print("Starting OCTIS pre-processing using file paths and specified parameters...") octis_dataset = self.preprocessor_octis.preprocess_dataset( documents_path=temp_docs_path, labels_path=temp_labels_path ) # Clean up temporary files immediately os.remove(temp_docs_path) if temp_labels_path: os.remove(temp_labels_path) os.rmdir(temp_dir) print(f"Temporary files in {temp_dir} cleaned up.") # --- Proxy: Save __original_indexes and then manually load it --- temp_indexes_dir = tempfile.mkdtemp() temp_indexes_file = os.path.join(temp_indexes_dir, "temp_original_indexes.txt") print(f"Saving __original_indexes to {temp_indexes_file}...") octis_dataset._save_document_indexes(temp_indexes_file) # Manually load the indexes from the file original_indexes_after_octis = [] with open(temp_indexes_file, 'r') as f_indexes: for line in f_indexes: original_indexes_after_octis.append(int(line.strip())) # Read as int # Clean up the temporary indexes file and its directory os.remove(temp_indexes_file) os.rmdir(temp_indexes_dir) print("Temporary indexes file cleaned up.") # --- End Proxy --- # Get processed data from OCTIS Dataset object processed_corpus_octis_list = octis_dataset.get_corpus() # List of list of tokens processed_labels_octis = octis_dataset.get_labels() # List of labels print("Max index in original_indexes_after_octis:", max(original_indexes_after_octis)) print("Length of raw_timestamps:", len(raw_timestamps)) # Filter timestamps based on documents that survived OCTIS preprocessing filtered_timestamps = [raw_timestamps[i] for i in original_indexes_after_octis] print(f"OCTIS preprocessing complete. {len(processed_corpus_octis_list)} documents remaining.") if self.use_bigrams: print("Generating bigrams with Gensim...") phrases = Phrases(processed_corpus_octis_list, min_count=self.min_count_bigram, threshold=self.threshold_bigram) bigram_phraser = Phraser(phrases) bigrammed_corpus_list = [bigram_phraser[doc] for doc in processed_corpus_octis_list] print("Bigram generation complete.") else: print("Skipping bigram generation as 'use_bigrams' is False.") bigrammed_corpus_list = processed_corpus_octis_list # Use the original processed list # Convert back to list of strings for easier handling if needed later, but keep as list of lists for BOW bigrammed_texts_for_file = [" ".join(doc) for doc in bigrammed_corpus_list] print("Bigram generation complete.") # Build Vocabulary from OCTIS output (after bigrams) # We need a flat list of all tokens to build the vocabulary all_tokens = [token for doc in bigrammed_corpus_list for token in doc] vocab = sorted(list(set(all_tokens))) # Sorted unique words form the vocabulary word_to_id = {word: i for i, word in enumerate(vocab)} # Create BOW matrix manually print("Creating Bag-of-Words representations...") rows, cols, data = [], [], [] for i, doc_tokens in enumerate(bigrammed_corpus_list): doc_word_counts = {} for token in doc_tokens: if token in word_to_id: # Ensure token is in our final vocab doc_word_counts[word_to_id[token]] = doc_word_counts.get(word_to_id[token], 0) + 1 for col_id, count in doc_word_counts.items(): rows.append(i) cols.append(col_id) data.append(count) # Shape is (num_documents, vocab_size) bow_matrix = scipy.sparse.csc_matrix((data, (rows, cols)), shape=(len(bigrammed_corpus_list), len(vocab))) print("Bag-of-Words complete.") # Handle partitioning if required if self.use_partition: num_docs = len(bigrammed_corpus_list) train_size = int(0.8 * num_docs) train_texts = bigrammed_texts_for_file[:train_size] train_bow_matrix = bow_matrix[:train_size] train_timestamps = filtered_timestamps[:train_size] train_labels = processed_labels_octis[:train_size] if processed_labels_octis else [] test_texts = bigrammed_texts_for_file[train_size:] test_bow_matrix = bow_matrix[train_size:] test_timestamps = filtered_timestamps[train_size:] test_labels = processed_labels_octis[train_size:] if processed_labels_octis else [] else: train_texts = bigrammed_texts_for_file train_bow_matrix = bow_matrix train_timestamps = filtered_timestamps train_labels = processed_labels_octis test_texts = [] test_timestamps = [] test_labels = [] # Generate word embeddings using the provided function word_embeddings = self._make_word_embeddings(vocab) # Process timestamps to 0, 1, 2...T and create time2id.txt print("Processing timestamps...") unique_timestamps = sorted(list(set(train_timestamps + test_timestamps))) time_to_id = {timestamp: i for i, timestamp in enumerate(unique_timestamps)} train_times_ids = [time_to_id[ts] for ts in train_timestamps] test_times_ids = [time_to_id[ts] for ts in test_timestamps] if self.use_partition else [] print("Timestamps processed.") # Save files print(f"Saving preprocessed files to {self.output_folder}...") # 1. vocab.txt with open(os.path.join(self.output_folder, "vocab.txt"), "w", encoding="utf-8") as f: for word in vocab: f.write(f"{word}\n") # 2. train_texts.txt train_text_path = os.path.join(self.output_folder, "train_texts.txt") with open(train_text_path, "w", encoding="utf-8") as f: for doc in train_texts: f.write(f"{doc}\n") # Save document length stats doc_stats_path = os.path.join(self.output_folder, "length_stats.json") self._save_doc_length_stats(train_text_path, doc_stats_path) # 3. train_bow.npz scipy.sparse.save_npz(os.path.join(self.output_folder, "train_bow.npz"), train_bow_matrix) # 4. word_embeddings.npz sparse_word_embeddings = scipy.sparse.csr_matrix(word_embeddings) scipy.sparse.save_npz(os.path.join(self.output_folder, "word_embeddings.npz"), sparse_word_embeddings) # 5. train_labels.txt (if labels exist) if train_labels: with open(os.path.join(self.output_folder, "train_labels.txt"), "w", encoding="utf-8") as f: for label in train_labels: f.write(f"{label}\n") # 6. train_times.txt with open(os.path.join(self.output_folder, "train_times.txt"), "w", encoding="utf-8") as f: for time_id in train_times_ids: f.write(f"{time_id}\n") # Files for test set (if use_partition=True) if self.use_partition: # 7. test_bow.npz scipy.sparse.save_npz(os.path.join(self.output_folder, "test_bow.npz"), test_bow_matrix) # 8. test_texts.txt with open(os.path.join(self.output_folder, "test_texts.txt"), "w", encoding="utf-8") as f: for doc in test_texts: f.write(f"{doc}\n") # 9. test_labels.txt (if labels exist) if test_labels: with open(os.path.join(self.output_folder, "test_labels.txt"), "w", encoding="utf-8") as f: for label in test_labels: f.write(f"{label}\n") # 10. test_times.txt with open(os.path.join(self.output_folder, "test_times.txt"), "w", encoding="utf-8") as f: for time_id in test_times_ids: f.write(f"{time_id}\n") # 11. time2id.txt sorted_time_to_id = OrderedDict(sorted(time_to_id.items(), key=lambda item: item[1])) with open(os.path.join(self.output_folder, "time2id.txt"), "w", encoding="utf-8") as f: json.dump(sorted_time_to_id, f, indent=4) print("All files saved successfully.")