DTECT / backend /datasets /preprocess.py
AdhyaSuman's picture
Initial commit with Git LFS for large files
11c72a2
import json
import os
import numpy as np
from collections import OrderedDict
import tempfile
import gensim.downloader
from tqdm import tqdm
from backend.datasets.utils.logger import Logger
import scipy.sparse
from gensim.models.phrases import Phrases, Phraser
from typing import List, Union
from octis.preprocessing.preprocessing import Preprocessing
logger = Logger("WARNING")
class Preprocessor:
def __init__(self,
docs_jsonl_path: str,
output_folder: str,
use_partition: bool = False,
use_bigrams: bool = False,
min_count_bigram: int = 5,
threshold_bigram: int = 10,
remove_punctuation: bool = True,
lemmatize: bool = True,
stopword_list: Union[str, List[str]] = None,
min_chars: int = 3,
min_words_docs: int = 10,
min_df: Union[int, float] = 0.0,
max_df: Union[int, float] = 1.0,
max_features: int = None,
language: str = 'english'):
self.docs_jsonl_path = docs_jsonl_path
self.output_folder = output_folder
self.use_partition = use_partition
self.use_bigrams = use_bigrams
self.min_count_bigram = min_count_bigram
self.threshold_bigram = threshold_bigram
os.makedirs(self.output_folder, exist_ok=True)
self.preprocessing_params = {
'remove_punctuation': remove_punctuation,
'lemmatize': lemmatize,
'stopword_list': stopword_list,
'min_chars': min_chars,
'min_words_docs': min_words_docs,
'min_df': min_df,
'max_df': max_df,
'max_features': max_features,
'language': language
}
self.preprocessor_octis = Preprocessing(**self.preprocessing_params)
def _load_data_to_temp_files(self):
"""Loads data from JSONL and writes to temporary files for OCTIS preprocessor."""
raw_texts = []
raw_timestamps = []
raw_labels = []
has_labels = False
with open(self.docs_jsonl_path, 'r', encoding='utf-8') as f:
for line in f:
data = json.loads(line.strip())
# Remove newlines from text
clean_text = data.get('text', '').replace('\n', ' ').replace('\r', ' ')
clean_text = " ".join(clean_text.split())
raw_texts.append(clean_text)
raw_timestamps.append(data.get('timestamp', ''))
label = data.get('label', '')
if label:
has_labels = True
raw_labels.append(label)
# Create temporary files
temp_dir = tempfile.mkdtemp()
temp_docs_path = os.path.join(temp_dir, "temp_docs.txt")
temp_labels_path = None
with open(temp_docs_path, 'w', encoding='utf-8') as f_docs:
for text in raw_texts:
f_docs.write(f"{text}\n")
if has_labels:
temp_labels_path = os.path.join(temp_dir, "temp_labels.txt")
with open(temp_labels_path, 'w', encoding='utf-8') as f_labels:
for label in raw_labels:
f_labels.write(f"{label}\n")
print(f"Loaded {len(raw_texts)} raw documents and created temporary files in {temp_dir}.")
return raw_texts, raw_timestamps, raw_labels, temp_docs_path, temp_labels_path, temp_dir
def _make_word_embeddings(self, vocab):
"""
Generates word embeddings for the given vocabulary using GloVe.
For n-grams (e.g., "wordA_wordB", "wordX_wordY_wordZ" for n>=2),
the resultant embedding is the sum of the embeddings of its constituent
single words (wordA + wordB + ...).
"""
print("Loading GloVe word embeddings...")
glove_vectors = gensim.downloader.load('glove-wiki-gigaword-200')
# Initialize word_embeddings matrix with zeros.
# This ensures that words not found (single or n-gram constituents)
# will have a zero vector embedding.
word_embeddings = np.zeros((len(vocab), glove_vectors.vectors.shape[1]), dtype=np.float32)
num_found = 0
try:
# Using a set for key_word_list for O(1) average time complexity lookup
key_word_list = set(glove_vectors.index_to_key)
except AttributeError: # For older gensim versions
key_word_list = set(glove_vectors.index2word)
print("Generating word embeddings for vocabulary (including n-grams)...")
for i, word in enumerate(tqdm(vocab, desc="Processing vocabulary words")):
if '_' in word: # Check if it's a potential n-gram (n >= 2)
parts = word.split('_')
# Check if *all* constituent words are present in GloVe
all_parts_in_glove = True
for part in parts:
if part not in key_word_list:
all_parts_in_glove = False
break # One part not found, stop checking
if all_parts_in_glove:
# If all parts are found, sum their embeddings
resultant_vector = np.zeros(glove_vectors.vectors.shape[1], dtype=np.float32)
for part in parts:
resultant_vector += glove_vectors[part]
word_embeddings[i] = resultant_vector
num_found += 1
# Else: one or more constituent words not found, embedding remains zero
else: # It's a single word (n=1)
if word in key_word_list:
word_embeddings[i] = glove_vectors[word]
num_found += 1
# Else: single word not found, embedding remains zero
logger.info(f'Number of found embeddings (including n-grams): {num_found}/{len(vocab)}')
return word_embeddings # Return as dense NumPy array
def _save_doc_length_stats(self, filepath: str, output_path: str):
doc_lengths = []
try:
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
doc = line.strip()
if doc:
doc_lengths.append(len(doc))
except Exception as e:
print(f"Error processing '{filepath}': {e}")
return
if not doc_lengths:
print(f"No documents found in '{filepath}'.")
return
stats = {
"avg_len": float(np.mean(doc_lengths)),
"std_len": float(np.std(doc_lengths)),
"max_len": int(np.max(doc_lengths)),
"min_len": int(np.min(doc_lengths)),
"num_docs": int(len(doc_lengths))
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=4)
print(f"Saved document length stats to: {output_path}")
def preprocess(self):
print("Loading data and creating temporary files for OCTIS...")
_, raw_timestamps, _, temp_docs_path, temp_labels_path, temp_dir = \
self._load_data_to_temp_files()
print("Starting OCTIS pre-processing using file paths and specified parameters...")
octis_dataset = self.preprocessor_octis.preprocess_dataset(
documents_path=temp_docs_path,
labels_path=temp_labels_path
)
# Clean up temporary files immediately
os.remove(temp_docs_path)
if temp_labels_path:
os.remove(temp_labels_path)
os.rmdir(temp_dir)
print(f"Temporary files in {temp_dir} cleaned up.")
# --- Proxy: Save __original_indexes and then manually load it ---
temp_indexes_dir = tempfile.mkdtemp()
temp_indexes_file = os.path.join(temp_indexes_dir, "temp_original_indexes.txt")
print(f"Saving __original_indexes to {temp_indexes_file}...")
octis_dataset._save_document_indexes(temp_indexes_file)
# Manually load the indexes from the file
original_indexes_after_octis = []
with open(temp_indexes_file, 'r') as f_indexes:
for line in f_indexes:
original_indexes_after_octis.append(int(line.strip())) # Read as int
# Clean up the temporary indexes file and its directory
os.remove(temp_indexes_file)
os.rmdir(temp_indexes_dir)
print("Temporary indexes file cleaned up.")
# --- End Proxy ---
# Get processed data from OCTIS Dataset object
processed_corpus_octis_list = octis_dataset.get_corpus() # List of list of tokens
processed_labels_octis = octis_dataset.get_labels() # List of labels
print("Max index in original_indexes_after_octis:", max(original_indexes_after_octis))
print("Length of raw_timestamps:", len(raw_timestamps))
# Filter timestamps based on documents that survived OCTIS preprocessing
filtered_timestamps = [raw_timestamps[i] for i in original_indexes_after_octis]
print(f"OCTIS preprocessing complete. {len(processed_corpus_octis_list)} documents remaining.")
if self.use_bigrams:
print("Generating bigrams with Gensim...")
phrases = Phrases(processed_corpus_octis_list, min_count=self.min_count_bigram, threshold=self.threshold_bigram)
bigram_phraser = Phraser(phrases)
bigrammed_corpus_list = [bigram_phraser[doc] for doc in processed_corpus_octis_list]
print("Bigram generation complete.")
else:
print("Skipping bigram generation as 'use_bigrams' is False.")
bigrammed_corpus_list = processed_corpus_octis_list # Use the original processed list
# Convert back to list of strings for easier handling if needed later, but keep as list of lists for BOW
bigrammed_texts_for_file = [" ".join(doc) for doc in bigrammed_corpus_list]
print("Bigram generation complete.")
# Build Vocabulary from OCTIS output (after bigrams)
# We need a flat list of all tokens to build the vocabulary
all_tokens = [token for doc in bigrammed_corpus_list for token in doc]
vocab = sorted(list(set(all_tokens))) # Sorted unique words form the vocabulary
word_to_id = {word: i for i, word in enumerate(vocab)}
# Create BOW matrix manually
print("Creating Bag-of-Words representations...")
rows, cols, data = [], [], []
for i, doc_tokens in enumerate(bigrammed_corpus_list):
doc_word_counts = {}
for token in doc_tokens:
if token in word_to_id: # Ensure token is in our final vocab
doc_word_counts[word_to_id[token]] = doc_word_counts.get(word_to_id[token], 0) + 1
for col_id, count in doc_word_counts.items():
rows.append(i)
cols.append(col_id)
data.append(count)
# Shape is (num_documents, vocab_size)
bow_matrix = scipy.sparse.csc_matrix((data, (rows, cols)), shape=(len(bigrammed_corpus_list), len(vocab)))
print("Bag-of-Words complete.")
# Handle partitioning if required
if self.use_partition:
num_docs = len(bigrammed_corpus_list)
train_size = int(0.8 * num_docs)
train_texts = bigrammed_texts_for_file[:train_size]
train_bow_matrix = bow_matrix[:train_size]
train_timestamps = filtered_timestamps[:train_size]
train_labels = processed_labels_octis[:train_size] if processed_labels_octis else []
test_texts = bigrammed_texts_for_file[train_size:]
test_bow_matrix = bow_matrix[train_size:]
test_timestamps = filtered_timestamps[train_size:]
test_labels = processed_labels_octis[train_size:] if processed_labels_octis else []
else:
train_texts = bigrammed_texts_for_file
train_bow_matrix = bow_matrix
train_timestamps = filtered_timestamps
train_labels = processed_labels_octis
test_texts = []
test_timestamps = []
test_labels = []
# Generate word embeddings using the provided function
word_embeddings = self._make_word_embeddings(vocab)
# Process timestamps to 0, 1, 2...T and create time2id.txt
print("Processing timestamps...")
unique_timestamps = sorted(list(set(train_timestamps + test_timestamps)))
time_to_id = {timestamp: i for i, timestamp in enumerate(unique_timestamps)}
train_times_ids = [time_to_id[ts] for ts in train_timestamps]
test_times_ids = [time_to_id[ts] for ts in test_timestamps] if self.use_partition else []
print("Timestamps processed.")
# Save files
print(f"Saving preprocessed files to {self.output_folder}...")
# 1. vocab.txt
with open(os.path.join(self.output_folder, "vocab.txt"), "w", encoding="utf-8") as f:
for word in vocab:
f.write(f"{word}\n")
# 2. train_texts.txt
train_text_path = os.path.join(self.output_folder, "train_texts.txt")
with open(train_text_path, "w", encoding="utf-8") as f:
for doc in train_texts:
f.write(f"{doc}\n")
# Save document length stats
doc_stats_path = os.path.join(self.output_folder, "length_stats.json")
self._save_doc_length_stats(train_text_path, doc_stats_path)
# 3. train_bow.npz
scipy.sparse.save_npz(os.path.join(self.output_folder, "train_bow.npz"), train_bow_matrix)
# 4. word_embeddings.npz
sparse_word_embeddings = scipy.sparse.csr_matrix(word_embeddings)
scipy.sparse.save_npz(os.path.join(self.output_folder, "word_embeddings.npz"), sparse_word_embeddings)
# 5. train_labels.txt (if labels exist)
if train_labels:
with open(os.path.join(self.output_folder, "train_labels.txt"), "w", encoding="utf-8") as f:
for label in train_labels:
f.write(f"{label}\n")
# 6. train_times.txt
with open(os.path.join(self.output_folder, "train_times.txt"), "w", encoding="utf-8") as f:
for time_id in train_times_ids:
f.write(f"{time_id}\n")
# Files for test set (if use_partition=True)
if self.use_partition:
# 7. test_bow.npz
scipy.sparse.save_npz(os.path.join(self.output_folder, "test_bow.npz"), test_bow_matrix)
# 8. test_texts.txt
with open(os.path.join(self.output_folder, "test_texts.txt"), "w", encoding="utf-8") as f:
for doc in test_texts:
f.write(f"{doc}\n")
# 9. test_labels.txt (if labels exist)
if test_labels:
with open(os.path.join(self.output_folder, "test_labels.txt"), "w", encoding="utf-8") as f:
for label in test_labels:
f.write(f"{label}\n")
# 10. test_times.txt
with open(os.path.join(self.output_folder, "test_times.txt"), "w", encoding="utf-8") as f:
for time_id in test_times_ids:
f.write(f"{time_id}\n")
# 11. time2id.txt
sorted_time_to_id = OrderedDict(sorted(time_to_id.items(), key=lambda item: item[1]))
with open(os.path.join(self.output_folder, "time2id.txt"), "w", encoding="utf-8") as f:
json.dump(sorted_time_to_id, f, indent=4)
print("All files saved successfully.")