|
|
from datasets import load_dataset
|
|
|
from tqdm import tqdm
|
|
|
import os
|
|
|
from utils import load_config, setup_logging
|
|
|
import psutil
|
|
|
|
|
|
|
|
|
def download_and_process_data(config):
|
|
|
"""Downloads, preprocesses, and saves the dataset."""
|
|
|
setup_logging()
|
|
|
|
|
|
dataset_name = config["dataset"]["name"]
|
|
|
streaming = config["dataset"]["streaming"]
|
|
|
text_column = config["dataset"]["text_column"]
|
|
|
target_size_gb = config["dataset"]["target_size_gb"]
|
|
|
max_length = config["dataset"]["max_length"]
|
|
|
subset = config["dataset"]["subset"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
dataset = load_dataset(dataset_name, subset,streaming=streaming)
|
|
|
if not streaming:
|
|
|
raise ValueError("Streaming must be True for large datasets like fineweb")
|
|
|
except Exception as e:
|
|
|
raise Exception(f"Failed to download dataset: {e}. Check dataset name and internet connection, and HF login.")
|
|
|
|
|
|
|
|
|
dataset = dataset["train"]
|
|
|
|
|
|
|
|
|
def quality_filter(example):
|
|
|
return (
|
|
|
example['text'] is not None and
|
|
|
len(example['text'].strip()) > 0 and
|
|
|
example['language'] == 'en' and
|
|
|
example['language_score'] >= 0.8
|
|
|
)
|
|
|
|
|
|
dataset = dataset.filter(quality_filter)
|
|
|
|
|
|
|
|
|
output_dir = os.path.join("data", "processed")
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
|
|
|
|
def process_and_save_chunk(chunk, chunk_num, total_bytes):
|
|
|
output_file = os.path.join(output_dir, f"processed_data_{chunk_num}.txt")
|
|
|
|
|
|
with open(output_file, "w", encoding="utf-8") as f:
|
|
|
for example in tqdm(chunk, desc=f"Processing chunk {chunk_num}"):
|
|
|
text = example[text_column].strip()
|
|
|
if text:
|
|
|
|
|
|
metadata = f"# ID: {example['id']} | URL: {example['url']} | Date: {example['date']}\n"
|
|
|
f.write(metadata)
|
|
|
f.write(text + "\n\n")
|
|
|
total_bytes += len(text.encode("utf-8")) + len(metadata.encode("utf-8"))
|
|
|
return total_bytes
|
|
|
|
|
|
chunk_num = 0
|
|
|
chunk = []
|
|
|
total_bytes_processed = 0
|
|
|
target_bytes = target_size_gb * (1024**3)
|
|
|
|
|
|
for example in tqdm(dataset, desc="Processing and saving data"):
|
|
|
chunk.append(example)
|
|
|
if len(chunk) >= 10000:
|
|
|
total_bytes_processed = process_and_save_chunk(chunk, chunk_num, total_bytes_processed)
|
|
|
chunk = []
|
|
|
chunk_num += 1
|
|
|
print(f"Processed: {total_bytes_processed / (1024**3):.2f} GB")
|
|
|
|
|
|
if total_bytes_processed >= target_bytes:
|
|
|
print("Target data size reached.")
|
|
|
break
|
|
|
|
|
|
if chunk:
|
|
|
process_and_save_chunk(chunk, chunk_num,total_bytes_processed)
|
|
|
|
|
|
print(f"Data download and processing complete. Total processed size: {total_bytes_processed / (1024**3):.2f} GB")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
config = load_config()
|
|
|
download_and_process_data(config) |