File size: 7,507 Bytes
cfcbbc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#!/bin/bash
#
# test_stats_parallel.sh - Run ATLAS analysis tasks in parallel using GNU parallel
#
# This script runs 5 independent Snakemake workflows in parallel:
# 1. summarize_root: Convert ROOT files to summary format (2 sequential steps: summarize_root -> insert_root_summary)
# 2. create_numpy: Create NumPy arrays from ROOT data
# 3. preprocess: Preprocess data for ML
# 4. scores: Generate signal/background scores
# 5. categorization: Perform event categorization
#
# Usage:
#   ./test_stats_parallel.sh
#
# Requirements:
#   - GNU parallel must be installed
#   - config.yml must exist with model configuration
#   - All workflow/*.smk files must be present
#   - Python environment with required packages
#
# The script will:
#   - Copy and modify workflow files
#   - Run all 5 tasks concurrently (no job limits)
#   - Log each task's output separately
#   - Measure individual execution time for each task
#   - Collect statistics and check results
#   - Clean up temporary files
#
# Output:
#   - Individual task logs in $OUT_DIR/*.log
#   - Individual task times in $OUT_DIR/*.time
#   - Combined statistics in stats.csv
#   - Results validation via check_soln.py
#

NAME=$(grep '^name:' config.yml | awk '{print$2}' | tr -d "'")
MODEL=$(grep '^model:' config.yml | awk '{print$2}' | tr -d "'")
OUT_DIR=$(grep '^out_dir:' config.yml | awk '{print$2}' | tr -d "'")

cp -r prompts prompts_temp
sed -i "s#{BASE_DIR}#$OUT_DIR#g" prompts_temp/*.txt

# Copy all 5 smk files to temp versions
cp workflow/summarize_root.smk summarize_root_temp.smk
cp workflow/create_numpy.smk create_numpy_temp.smk
cp workflow/preprocess.smk preprocess_temp.smk
cp workflow/scores.smk scores_temp.smk
cp workflow/categorization.smk categorization_temp.smk
sed -i "s#{BASE_DIR}#$OUT_DIR#g" *_temp.smk

mkdir -p $OUT_DIR/generated_code
cp utils.py $OUT_DIR/generated_code/utils.py

rm -f $OUT_DIR/logs/success.npy
rm -f $OUT_DIR/logs/calls.npy
rm -f $OUT_DIR/logs/input_tokens.npy
rm -f $OUT_DIR/logs/output_tokens.npy

echo "Starting all tasks in parallel using GNU parallel..."
echo "Running 5 independent tasks concurrently (no job limits)"
echo "Tasks: summarize_root (2 steps), create_numpy, preprocess, scores, categorization"
echo ""

# Start timing for all tasks
START_TIME=$SECONDS

# Define the tasks as a function for GNU parallel
run_task() {
    local task_name=$1
    local smk_file=$2
    local target1=$3
    local target2=$4  # Optional second target
    local log_file="$OUT_DIR/${task_name}.log"
    local time_file="$OUT_DIR/${task_name}.time"

    echo "Starting $task_name..."
    local task_start=$SECONDS

    # Run first target
    if ! snakemake -s "$smk_file" -j 1 --forcerun "$target1" --rerun-incomplete --configfile config.yml --latency-wait 120 --verbose > "$log_file" 2>&1; then
        local task_time=$((SECONDS - task_start))
        echo "$task_time" > "$time_file"
        echo "ERROR: $task_name failed on $target1 after $task_time seconds"
        return 1
    fi

    # Run second target if provided (for summarize_root workflow)
    if [ -n "$target2" ]; then
        echo "Running $task_name second stage: $target2..."
        if ! snakemake -s "$smk_file" -j 1 --forcerun "$target2" --rerun-incomplete --configfile config.yml --latency-wait 120 --verbose >> "$log_file" 2>&1; then
            local task_time=$((SECONDS - task_start))
            echo "$task_time" > "$time_file"
            echo "ERROR: $task_name failed on $target2 after $task_time seconds"
            return 1
        fi
    fi

    local task_time=$((SECONDS - task_start))
    echo "$task_time" > "$time_file"
    echo "$task_name completed successfully in $task_time seconds"
    return 0
}
export -f run_task
export OUT_DIR
export CONFIG_FILE=config.yml

# Export necessary environment variables
export PYTHONPATH="$OUT_DIR:$PYTHONPATH"

# Run all 5 tasks in parallel using GNU parallel
# No job limit - let GNU parallel manage concurrency based on system resources
# This allows maximum parallelism for independent tasks
parallel --no-notice --halt soon,fail=1 --line-buffer ::: \
    "run_task summarize_root summarize_root_temp.smk summarize_root insert_root_summary" \
    "run_task create_numpy create_numpy_temp.smk create_numpy" \
    "run_task preprocess preprocess_temp.smk preprocess" \
    "run_task scores scores_temp.smk scores" \
    "run_task categorization categorization_temp.smk categorization"

# Check exit status
if [ $? -eq 0 ]; then
    echo "All tasks completed successfully"
else
    echo "ERROR: One or more tasks failed!"
    # Show logs of failed tasks
    for log in "$OUT_DIR"/*.log; do
        if [ -f "$log" ] && grep -q "ERROR\|failed" "$log"; then
            echo "=== Failed task log: $(basename "$log") ==="
            tail -20 "$log"
        fi
    done
    exit 1
fi

# Calculate total time
TOTAL_TIME=$((SECONDS-START_TIME))
echo "Total time: $TOTAL_TIME seconds"

echo "Checking results"
python check_soln.py --out_dir $OUT_DIR

echo "Writing stats"

# Read individual task times
TIME1=$(cat "$OUT_DIR/summarize_root.time" 2>/dev/null || echo "0")
TIME2=$(cat "$OUT_DIR/create_numpy.time" 2>/dev/null || echo "0")
TIME3=$(cat "$OUT_DIR/preprocess.time" 2>/dev/null || echo "0")
TIME4=$(cat "$OUT_DIR/scores.time" 2>/dev/null || echo "0")
TIME5=$(cat "$OUT_DIR/categorization.time" 2>/dev/null || echo "0")

echo "Task times: summarize_root=${TIME1}s, create_numpy=${TIME2}s, preprocess=${TIME3}s, scores=${TIME4}s, categorization=${TIME5}s"

# Get arrays for all 5 tasks (assuming the structure is now 5 elements)
read -r -a success_arr < <(python get_arr.py --name success --out_dir $OUT_DIR)
SUCCESS1=${success_arr[0]:-0}
SUCCESS2=${success_arr[1]:-0}
SUCCESS3=${success_arr[2]:-0}
SUCCESS4=${success_arr[3]:-0}
SUCCESS5=${success_arr[4]:-0}

read -r -a call_arr < <(python get_arr.py --name calls --out_dir $OUT_DIR)
CALLS1=${call_arr[0]:-0}
CALLS2=${call_arr[1]:-0}
CALLS3=${call_arr[2]:-0}
CALLS4=${call_arr[3]:-0}
CALLS5=${call_arr[4]:-0}

read -r -a input_token_arr < <(python get_arr.py --name input_tokens --out_dir $OUT_DIR)
INPUT_TOKENS1=${input_token_arr[0]:-0}
INPUT_TOKENS2=${input_token_arr[1]:-0}
INPUT_TOKENS3=${input_token_arr[2]:-0}
INPUT_TOKENS4=${input_token_arr[3]:-0}
INPUT_TOKENS5=${input_token_arr[4]:-0}

read -r -a output_token_arr < <(python get_arr.py --name output_tokens --out_dir $OUT_DIR)
OUTPUT_TOKENS1=${output_token_arr[0]:-0}
OUTPUT_TOKENS2=${output_token_arr[1]:-0}
OUTPUT_TOKENS3=${output_token_arr[2]:-0}
OUTPUT_TOKENS4=${output_token_arr[3]:-0}
OUTPUT_TOKENS5=${output_token_arr[4]:-0}

# Update stats with all 5 tasks using individual task times
python update_stats.py --name $NAME \
    --success1 $SUCCESS1 --time1 $TIME1 --calls1 $CALLS1 --input_tokens1 $INPUT_TOKENS1 \
    --success2 $SUCCESS2 --time2 $TIME2 --calls2 $CALLS2 --input_tokens2 $INPUT_TOKENS2 \
    --success3 $SUCCESS3 --time3 $TIME3 --calls3 $CALLS3 --input_tokens3 $INPUT_TOKENS3 \
    --success4 $SUCCESS4 --time4 $TIME4 --calls4 $CALLS4 --input_tokens4 $INPUT_TOKENS4 \
    --success5 $SUCCESS5 --time5 $TIME5 --calls5 $CALLS5 --input_tokens5 $INPUT_TOKENS5 \
    --output_tokens1 $OUTPUT_TOKENS1 --output_tokens2 $OUTPUT_TOKENS2 --output_tokens3 $OUTPUT_TOKENS3 \
    --output_tokens4 $OUTPUT_TOKENS4 --output_tokens5 $OUTPUT_TOKENS5

# Clean up temp files
rm -r prompts_temp
rm summarize_root_temp.smk
rm create_numpy_temp.smk
rm preprocess_temp.smk
rm scores_temp.smk
rm categorization_temp.smk
rm -f "$OUT_DIR"/*.time

echo "Finished!"