Ion-LLM-Base / src /data_processing.py
diabolic6045's picture
Upload 9 files
eb05668 verified
from datasets import load_dataset
from tqdm import tqdm
import os
from utils import load_config, setup_logging
import psutil # For monitoring memory usage
def download_and_process_data(config):
"""Downloads, preprocesses, and saves the dataset."""
setup_logging()
dataset_name = config["dataset"]["name"]
streaming = config["dataset"]["streaming"]
text_column = config["dataset"]["text_column"]
target_size_gb = config["dataset"]["target_size_gb"]
max_length = config["dataset"]["max_length"]
subset = config["dataset"]["subset"]
# Download dataset (streaming is essential for large datasets)
try:
dataset = load_dataset(dataset_name, subset,streaming=streaming)
if not streaming:
raise ValueError("Streaming must be True for large datasets like fineweb")
except Exception as e:
raise Exception(f"Failed to download dataset: {e}. Check dataset name and internet connection, and HF login.")
# Filter data - removing the subset filter since it's specific to CC-MAIN
dataset = dataset["train"] # Taking only train split
# Add basic quality filters
def quality_filter(example):
return (
example['text'] is not None and
len(example['text'].strip()) > 0 and
example['language'] == 'en' and # Filter for English content
example['language_score'] >= 0.8 # High confidence in language detection
)
dataset = dataset.filter(quality_filter)
# Create output directory if it doesn't exist
output_dir = os.path.join("data", "processed")
os.makedirs(output_dir, exist_ok=True)
# Process and save in chunks, monitoring data size
def process_and_save_chunk(chunk, chunk_num, total_bytes):
output_file = os.path.join(output_dir, f"processed_data_{chunk_num}.txt")
with open(output_file, "w", encoding="utf-8") as f:
for example in tqdm(chunk, desc=f"Processing chunk {chunk_num}"):
text = example[text_column].strip()
if text:
# Add metadata as a comment before each text
metadata = f"# ID: {example['id']} | URL: {example['url']} | Date: {example['date']}\n"
f.write(metadata)
f.write(text + "\n\n") # Add extra newline for separation
total_bytes += len(text.encode("utf-8")) + len(metadata.encode("utf-8"))
return total_bytes
chunk_num = 0
chunk = []
total_bytes_processed = 0
target_bytes = target_size_gb * (1024**3) # Convert GB to bytes
for example in tqdm(dataset, desc="Processing and saving data"):
chunk.append(example)
if len(chunk) >= 10000: # Adjust chunk size as needed
total_bytes_processed = process_and_save_chunk(chunk, chunk_num, total_bytes_processed)
chunk = []
chunk_num += 1
print(f"Processed: {total_bytes_processed / (1024**3):.2f} GB")
if total_bytes_processed >= target_bytes:
print("Target data size reached.")
break # Stop processing
if chunk:
process_and_save_chunk(chunk, chunk_num,total_bytes_processed) #for remaining data
print(f"Data download and processing complete. Total processed size: {total_bytes_processed / (1024**3):.2f} GB")
if __name__ == "__main__":
config = load_config()
download_and_process_data(config)