File size: 7,507 Bytes
cfcbbc8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
#!/bin/bash
#
# test_stats_parallel.sh - Run ATLAS analysis tasks in parallel using GNU parallel
#
# This script runs 5 independent Snakemake workflows in parallel:
# 1. summarize_root: Convert ROOT files to summary format (2 sequential steps: summarize_root -> insert_root_summary)
# 2. create_numpy: Create NumPy arrays from ROOT data
# 3. preprocess: Preprocess data for ML
# 4. scores: Generate signal/background scores
# 5. categorization: Perform event categorization
#
# Usage:
# ./test_stats_parallel.sh
#
# Requirements:
# - GNU parallel must be installed
# - config.yml must exist with model configuration
# - All workflow/*.smk files must be present
# - Python environment with required packages
#
# The script will:
# - Copy and modify workflow files
# - Run all 5 tasks concurrently (no job limits)
# - Log each task's output separately
# - Measure individual execution time for each task
# - Collect statistics and check results
# - Clean up temporary files
#
# Output:
# - Individual task logs in $OUT_DIR/*.log
# - Individual task times in $OUT_DIR/*.time
# - Combined statistics in stats.csv
# - Results validation via check_soln.py
#
NAME=$(grep '^name:' config.yml | awk '{print$2}' | tr -d "'")
MODEL=$(grep '^model:' config.yml | awk '{print$2}' | tr -d "'")
OUT_DIR=$(grep '^out_dir:' config.yml | awk '{print$2}' | tr -d "'")
cp -r prompts prompts_temp
sed -i "s#{BASE_DIR}#$OUT_DIR#g" prompts_temp/*.txt
# Copy all 5 smk files to temp versions
cp workflow/summarize_root.smk summarize_root_temp.smk
cp workflow/create_numpy.smk create_numpy_temp.smk
cp workflow/preprocess.smk preprocess_temp.smk
cp workflow/scores.smk scores_temp.smk
cp workflow/categorization.smk categorization_temp.smk
sed -i "s#{BASE_DIR}#$OUT_DIR#g" *_temp.smk
mkdir -p $OUT_DIR/generated_code
cp utils.py $OUT_DIR/generated_code/utils.py
rm -f $OUT_DIR/logs/success.npy
rm -f $OUT_DIR/logs/calls.npy
rm -f $OUT_DIR/logs/input_tokens.npy
rm -f $OUT_DIR/logs/output_tokens.npy
echo "Starting all tasks in parallel using GNU parallel..."
echo "Running 5 independent tasks concurrently (no job limits)"
echo "Tasks: summarize_root (2 steps), create_numpy, preprocess, scores, categorization"
echo ""
# Start timing for all tasks
START_TIME=$SECONDS
# Define the tasks as a function for GNU parallel
run_task() {
local task_name=$1
local smk_file=$2
local target1=$3
local target2=$4 # Optional second target
local log_file="$OUT_DIR/${task_name}.log"
local time_file="$OUT_DIR/${task_name}.time"
echo "Starting $task_name..."
local task_start=$SECONDS
# Run first target
if ! snakemake -s "$smk_file" -j 1 --forcerun "$target1" --rerun-incomplete --configfile config.yml --latency-wait 120 --verbose > "$log_file" 2>&1; then
local task_time=$((SECONDS - task_start))
echo "$task_time" > "$time_file"
echo "ERROR: $task_name failed on $target1 after $task_time seconds"
return 1
fi
# Run second target if provided (for summarize_root workflow)
if [ -n "$target2" ]; then
echo "Running $task_name second stage: $target2..."
if ! snakemake -s "$smk_file" -j 1 --forcerun "$target2" --rerun-incomplete --configfile config.yml --latency-wait 120 --verbose >> "$log_file" 2>&1; then
local task_time=$((SECONDS - task_start))
echo "$task_time" > "$time_file"
echo "ERROR: $task_name failed on $target2 after $task_time seconds"
return 1
fi
fi
local task_time=$((SECONDS - task_start))
echo "$task_time" > "$time_file"
echo "$task_name completed successfully in $task_time seconds"
return 0
}
export -f run_task
export OUT_DIR
export CONFIG_FILE=config.yml
# Export necessary environment variables
export PYTHONPATH="$OUT_DIR:$PYTHONPATH"
# Run all 5 tasks in parallel using GNU parallel
# No job limit - let GNU parallel manage concurrency based on system resources
# This allows maximum parallelism for independent tasks
parallel --no-notice --halt soon,fail=1 --line-buffer ::: \
"run_task summarize_root summarize_root_temp.smk summarize_root insert_root_summary" \
"run_task create_numpy create_numpy_temp.smk create_numpy" \
"run_task preprocess preprocess_temp.smk preprocess" \
"run_task scores scores_temp.smk scores" \
"run_task categorization categorization_temp.smk categorization"
# Check exit status
if [ $? -eq 0 ]; then
echo "All tasks completed successfully"
else
echo "ERROR: One or more tasks failed!"
# Show logs of failed tasks
for log in "$OUT_DIR"/*.log; do
if [ -f "$log" ] && grep -q "ERROR\|failed" "$log"; then
echo "=== Failed task log: $(basename "$log") ==="
tail -20 "$log"
fi
done
exit 1
fi
# Calculate total time
TOTAL_TIME=$((SECONDS-START_TIME))
echo "Total time: $TOTAL_TIME seconds"
echo "Checking results"
python check_soln.py --out_dir $OUT_DIR
echo "Writing stats"
# Read individual task times
TIME1=$(cat "$OUT_DIR/summarize_root.time" 2>/dev/null || echo "0")
TIME2=$(cat "$OUT_DIR/create_numpy.time" 2>/dev/null || echo "0")
TIME3=$(cat "$OUT_DIR/preprocess.time" 2>/dev/null || echo "0")
TIME4=$(cat "$OUT_DIR/scores.time" 2>/dev/null || echo "0")
TIME5=$(cat "$OUT_DIR/categorization.time" 2>/dev/null || echo "0")
echo "Task times: summarize_root=${TIME1}s, create_numpy=${TIME2}s, preprocess=${TIME3}s, scores=${TIME4}s, categorization=${TIME5}s"
# Get arrays for all 5 tasks (assuming the structure is now 5 elements)
read -r -a success_arr < <(python get_arr.py --name success --out_dir $OUT_DIR)
SUCCESS1=${success_arr[0]:-0}
SUCCESS2=${success_arr[1]:-0}
SUCCESS3=${success_arr[2]:-0}
SUCCESS4=${success_arr[3]:-0}
SUCCESS5=${success_arr[4]:-0}
read -r -a call_arr < <(python get_arr.py --name calls --out_dir $OUT_DIR)
CALLS1=${call_arr[0]:-0}
CALLS2=${call_arr[1]:-0}
CALLS3=${call_arr[2]:-0}
CALLS4=${call_arr[3]:-0}
CALLS5=${call_arr[4]:-0}
read -r -a input_token_arr < <(python get_arr.py --name input_tokens --out_dir $OUT_DIR)
INPUT_TOKENS1=${input_token_arr[0]:-0}
INPUT_TOKENS2=${input_token_arr[1]:-0}
INPUT_TOKENS3=${input_token_arr[2]:-0}
INPUT_TOKENS4=${input_token_arr[3]:-0}
INPUT_TOKENS5=${input_token_arr[4]:-0}
read -r -a output_token_arr < <(python get_arr.py --name output_tokens --out_dir $OUT_DIR)
OUTPUT_TOKENS1=${output_token_arr[0]:-0}
OUTPUT_TOKENS2=${output_token_arr[1]:-0}
OUTPUT_TOKENS3=${output_token_arr[2]:-0}
OUTPUT_TOKENS4=${output_token_arr[3]:-0}
OUTPUT_TOKENS5=${output_token_arr[4]:-0}
# Update stats with all 5 tasks using individual task times
python update_stats.py --name $NAME \
--success1 $SUCCESS1 --time1 $TIME1 --calls1 $CALLS1 --input_tokens1 $INPUT_TOKENS1 \
--success2 $SUCCESS2 --time2 $TIME2 --calls2 $CALLS2 --input_tokens2 $INPUT_TOKENS2 \
--success3 $SUCCESS3 --time3 $TIME3 --calls3 $CALLS3 --input_tokens3 $INPUT_TOKENS3 \
--success4 $SUCCESS4 --time4 $TIME4 --calls4 $CALLS4 --input_tokens4 $INPUT_TOKENS4 \
--success5 $SUCCESS5 --time5 $TIME5 --calls5 $CALLS5 --input_tokens5 $INPUT_TOKENS5 \
--output_tokens1 $OUTPUT_TOKENS1 --output_tokens2 $OUTPUT_TOKENS2 --output_tokens3 $OUTPUT_TOKENS3 \
--output_tokens4 $OUTPUT_TOKENS4 --output_tokens5 $OUTPUT_TOKENS5
# Clean up temp files
rm -r prompts_temp
rm summarize_root_temp.smk
rm create_numpy_temp.smk
rm preprocess_temp.smk
rm scores_temp.smk
rm categorization_temp.smk
rm -f "$OUT_DIR"/*.time
echo "Finished!"
|