File size: 5,325 Bytes
72c0672
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/bin/bash
NODE_ID=$ARNOLD_ID
NUM_NODES=$ARNOLD_WORKER_NUM
NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU

# Total number of JSONL files to process
TOTAL_JSONL_FILES=16

# 1. Calculate which range of files this node is responsible for.
FILES_PER_NODE=$(( (TOTAL_JSONL_FILES + NUM_NODES - 1) / NUM_NODES ))
START_JSONL_IDX=$(( NODE_ID * FILES_PER_NODE + 1 ))
END_JSONL_IDX=$(( (NODE_ID + 1) * FILES_PER_NODE ))

if [ $END_JSONL_IDX -gt $TOTAL_JSONL_FILES ]; then
    END_JSONL_IDX=$TOTAL_JSONL_FILES
fi

MODE=${1:-"split"}

ENTROPY_QUANTILE=0.90
OUTPUTWINDOW=${2:-16}
ITERATIVE_COMPRESS=${3:-"true"}
splits_dir=${4:-"ocpython_subsampled_50G_entropy90_splits_line"}
FORCE_PADDING=${5:-"true"}
SPLIT_CHUNK_SIZE=${6:-"lines"}

if [[ $SPLIT_CHUNK_SIZE == "lines" ]]; then
    SPLIT_ARGS="--chunk_size 128 --apply_line_split --max_entropy_batch_size 2048"
else
    SPLIT_ARGS="--chunk_size $SPLIT_CHUNK_SIZE --max_entropy_batch_size 512"
fi

if [[ $ITERATIVE_COMPRESS == "false" ]]; then
    ADDITIONAL_ARG=""
elif [[ $ITERATIVE_COMPRESS == "true" ]]; then
    ADDITIONAL_ARG="--iterative_compress"
else
    echo "Error: Unknown arg '$ITERATIVE_COMPRESS'."
    echo "Available values: false, true"
    exit 1
fi

if [[ $FORCE_PADDING == "false" ]]; then
    ADDITIONAL_ARG=${ADDITIONAL_ARG}" "
elif [[ $FORCE_PADDING == "true" ]]; then
    ADDITIONAL_ARG=${ADDITIONAL_ARG}" --force_padding_to_threshold"
else
    echo "Error: Unknown arg '$FORCE_PADDING'."
    echo "Available values: false, true"
    exit 1
fi

compress_dir=${splits_dir}"_ow${OUTPUTWINDOW}_iterative-${ITERATIVE_COMPRESS}_forcepadding-${FORCE_PADDING}_ac"

# Directory and model paths
input_dir="opencoder"
entropy_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000
compression_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000

if [ "$MODE" == "split" ]; then
    JOBS_PER_GPU=1
elif [ "$MODE" == "compress" ]; then
    JOBS_PER_GPU=2
else
    echo "Error: Unknown mode '$MODE'."
    echo "Available modes: split, compress"
    exit 1
fi

TOTAL_JOBS_PER_FILE=$(( JOBS_PER_GPU * ((NUM_GPUS_PER_NODE + FILES_PER_NODE - 1) / FILES_PER_NODE) ))

echo "=================================================="
echo "Starting processing on Node ${NODE_ID} of ${NUM_NODES}"
echo "Node GPU Count: ${NUM_GPUS_PER_NODE}"
echo "Jobs per JSONL file: ${TOTAL_JOBS_PER_FILE}"
echo "This node will process files: ${START_JSONL_IDX} to ${END_JSONL_IDX}"
echo "=================================================="

# Create a directory for log files if it doesn't exist
mkdir -p logs

GLOBAL_JOB_COUNTER=0
for JSONL_IDX in $(seq $START_JSONL_IDX $END_JSONL_IDX); do
    echo "--> Processing JSONL file: ${input_dir}/chunk.${JSONL_IDX}.jsonl"
    for job_index in $(seq 0 $((TOTAL_JOBS_PER_FILE - 1))); do
        GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE ))
        echo "    Launching job ${job_index} for file ${JSONL_IDX} on GPU ${GPU_IDX} (Global Job #${GLOBAL_JOB_COUNTER})..."
        if [ "$MODE" == "split" ]; then
            CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \
                --input_file /mnt/hdfs/user/linzheng/data/${input_dir}/chunk.${JSONL_IDX}.jsonl \
                --output_dir /mnt/hdfs/user/linzheng/data/${splits_dir} \
                --entropy_model_path $entropy_model_path \
                --compression_model_path $compression_model_path \
                --data_batch_size 256 \
                --num_workers 1 \
                --process_id ${job_index} \
                --num_processes ${TOTAL_JOBS_PER_FILE} \
                --base_global_quantile ${ENTROPY_QUANTILE} \
                --base_monotonic_quantile ${ENTROPY_QUANTILE} \
                $SPLIT_ARGS > "logs/split_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 &
        elif [ "$MODE" == "compress" ]; then
            CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \
                --input_file /mnt/hdfs/user/linzheng/data/${splits_dir}/chunk.${JSONL_IDX}.jsonl \
                --output_dir /mnt/hdfs/user/linzheng/data/${compress_dir} \
                --entropy_model_path $entropy_model_path \
                --compression_model_path $compression_model_path \
                --firstbyte_prob_path /mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json \
                --data_batch_size 256 --max_compression_batch_size 2560 \
                --output_window_size ${OUTPUTWINDOW} ${ADDITIONAL_ARG} \
                --num_workers 3 --process_id $job_index --num_processes $TOTAL_JOBS_PER_FILE > "logs/compress_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 &
        else
            echo "Error: Unknown mode '$MODE'."
            echo "Available modes: split, compress"
            exit 1
        fi

        # Increment the global counter for the next job
        GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 ))

    done
done

wait
cat logs/compress_node${NODE_ID}_jsonl${START_JSONL_IDX}_process0.log
echo ""
echo "All jobs on Node ${NODE_ID} have successfully finished."
echo "=================================================="