File size: 3,546 Bytes
eb05668
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from datasets import load_dataset
from tqdm import tqdm
import os
from utils import load_config, setup_logging
import psutil  # For monitoring memory usage


def download_and_process_data(config):
    """Downloads, preprocesses, and saves the dataset."""
    setup_logging()

    dataset_name = config["dataset"]["name"]
    streaming = config["dataset"]["streaming"]
    text_column = config["dataset"]["text_column"]
    target_size_gb = config["dataset"]["target_size_gb"]
    max_length = config["dataset"]["max_length"]
    subset = config["dataset"]["subset"]



    # Download dataset (streaming is essential for large datasets)
    try:
        dataset = load_dataset(dataset_name, subset,streaming=streaming)
        if not streaming:
            raise ValueError("Streaming must be True for large datasets like fineweb")
    except Exception as e:
        raise Exception(f"Failed to download dataset: {e}. Check dataset name and internet connection, and HF login.")

    # Filter data - removing the subset filter since it's specific to CC-MAIN
    dataset = dataset["train"]  # Taking only train split

    # Add basic quality filters
    def quality_filter(example):
        return (
            example['text'] is not None and 
            len(example['text'].strip()) > 0 and
            example['language'] == 'en' and  # Filter for English content
            example['language_score'] >= 0.8  # High confidence in language detection
        )

    dataset = dataset.filter(quality_filter)

    # Create output directory if it doesn't exist
    output_dir = os.path.join("data", "processed")
    os.makedirs(output_dir, exist_ok=True)

    # Process and save in chunks, monitoring data size
    def process_and_save_chunk(chunk, chunk_num, total_bytes):
        output_file = os.path.join(output_dir, f"processed_data_{chunk_num}.txt")
        
        with open(output_file, "w", encoding="utf-8") as f:
            for example in tqdm(chunk, desc=f"Processing chunk {chunk_num}"):
                text = example[text_column].strip()
                if text:
                    # Add metadata as a comment before each text
                    metadata = f"# ID: {example['id']} | URL: {example['url']} | Date: {example['date']}\n"
                    f.write(metadata)
                    f.write(text + "\n\n")  # Add extra newline for separation
                    total_bytes += len(text.encode("utf-8")) + len(metadata.encode("utf-8"))
        return total_bytes

    chunk_num = 0
    chunk = []
    total_bytes_processed = 0
    target_bytes = target_size_gb * (1024**3)  # Convert GB to bytes

    for example in tqdm(dataset, desc="Processing and saving data"):
        chunk.append(example)
        if len(chunk) >= 10000:  # Adjust chunk size as needed
            total_bytes_processed = process_and_save_chunk(chunk, chunk_num, total_bytes_processed)
            chunk = []
            chunk_num += 1
            print(f"Processed: {total_bytes_processed / (1024**3):.2f} GB")

        if total_bytes_processed >= target_bytes:
            print("Target data size reached.")
            break  # Stop processing

    if chunk:
        process_and_save_chunk(chunk, chunk_num,total_bytes_processed) #for remaining data

    print(f"Data download and processing complete. Total processed size: {total_bytes_processed / (1024**3):.2f} GB")

if __name__ == "__main__":
    config = load_config()
    download_and_process_data(config)