#!/bin/bash # ============================================================================= # deploy_to_worker.sh — Deploy code to a Vast.ai worker with verification # ============================================================================= # Usage: ./deploy_to_worker.sh [--restart-worker] # # This script: # 1. Computes checksums of all local source files # 2. SCPs the entire src/ directory to the worker # 3. Clears all __pycache__ directories on the worker # 4. Verifies remote checksums match local checksums # 5. Optionally runs a dry-run evaluation test on the worker # 6. Optionally restarts the worker agent # # Exit codes: # 0 = success (all checks passed) # 1 = checksum mismatch (deployment failed) # 2 = dry-run test failed # 3 = SSH connection failed # ============================================================================= set -euo pipefail RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' NC='\033[0m' SSH_HOST="${1:?Usage: $0 [--restart-worker]}" SSH_PORT="${2:?Usage: $0 [--restart-worker]}" RESTART_WORKER="${3:-}" LOCAL_DIR="/home/ubuntu/ascad-training-pipeline" REMOTE_DIR="/root/ascad-training-pipeline" SSH_OPTS="-o StrictHostKeyChecking=no -o ConnectTimeout=15" log() { echo -e "${GREEN}[DEPLOY]${NC} $*"; } warn() { echo -e "${YELLOW}[WARN]${NC} $*"; } fail() { echo -e "${RED}[FAIL]${NC} $*"; exit "${2:-1}"; } # ---- Step 0: Test SSH connectivity ---- log "Testing SSH connectivity to ${SSH_HOST}:${SSH_PORT}..." if ! ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" "echo OK" 2>/dev/null; then fail "Cannot connect to ${SSH_HOST}:${SSH_PORT}" 3 fi log "SSH connection OK" # ---- Step 1: Compute local checksums ---- log "Computing local checksums..." LOCAL_CHECKSUMS=$(cd "${LOCAL_DIR}" && find src/ scripts/ -name "*.py" -o -name "*.sh" | sort | xargs md5sum) echo "${LOCAL_CHECKSUMS}" > /tmp/deploy_local_checksums.txt LOCAL_COUNT=$(echo "${LOCAL_CHECKSUMS}" | wc -l) log "Found ${LOCAL_COUNT} files to deploy" # ---- Step 2: SCP source files ---- log "Deploying src/ and scripts/ to worker..." scp ${SSH_OPTS} -P "${SSH_PORT}" -r "${LOCAL_DIR}/src" "root@${SSH_HOST}:${REMOTE_DIR}/" scp ${SSH_OPTS} -P "${SSH_PORT}" -r "${LOCAL_DIR}/scripts" "root@${SSH_HOST}:${REMOTE_DIR}/" # Also deploy train_mtl.py and worker_agent.py if they exist at top level for f in train_mtl.py worker_agent.py; do if [ -f "${LOCAL_DIR}/${f}" ]; then scp ${SSH_OPTS} -P "${SSH_PORT}" "${LOCAL_DIR}/${f}" "root@${SSH_HOST}:${REMOTE_DIR}/" fi done log "Files transferred" # ---- Step 3: Clear __pycache__ on worker ---- log "Clearing __pycache__ on worker..." ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" \ "find ${REMOTE_DIR} -name '__pycache__' -type d -exec rm -rf {} + 2>/dev/null; echo 'Cleared'" log "__pycache__ cleared" # ---- Step 4: Verify checksums ---- log "Verifying remote checksums..." REMOTE_CHECKSUMS=$(ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" \ "cd ${REMOTE_DIR} && find src/ scripts/ -name '*.py' -o -name '*.sh' | sort | xargs md5sum") echo "${REMOTE_CHECKSUMS}" > /tmp/deploy_remote_checksums.txt # Compare only files that exist locally (worker may have extra files from HF pull) MISMATCH=0 while IFS= read -r line; do local_md5=$(echo "$line" | awk '{print $1}') local_file=$(echo "$line" | awk '{print $2}') remote_line=$(echo "${REMOTE_CHECKSUMS}" | grep "${local_file}$" || true) if [ -z "${remote_line}" ]; then warn "File missing on worker: ${local_file}" MISMATCH=1 else remote_md5=$(echo "${remote_line}" | awk '{print $1}') if [ "${local_md5}" != "${remote_md5}" ]; then warn "Checksum mismatch: ${local_file} (local=${local_md5}, remote=${remote_md5})" MISMATCH=1 fi fi done <<< "${LOCAL_CHECKSUMS}" if [ "${MISMATCH}" -eq 1 ]; then fail "Checksum verification failed! See warnings above." 1 fi log "All ${LOCAL_COUNT} local files verified — checksums match on worker" # ---- Step 5: Run dry-run evaluation test on worker ---- log "Running dry-run evaluation test on worker..." # Create and deploy the test script cat > /tmp/test_eval_dryrun.py << 'PYEOF' #!/usr/bin/env python3 """Quick dry-run test for evaluation code on the worker.""" import sys import os import numpy as np sys.path.insert(0, "/root/ascad-training-pipeline") os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" from src.constants import AES_SBOX, NUM_CLASSES # Test 1: AES_SBOX indexing sbox_out = AES_SBOX[np.uint8(42) ^ np.uint8(0)] assert isinstance(sbox_out, (int, np.integer)), f"sbox_out type wrong: {type(sbox_out)}" print(f"[PASS] AES_SBOX indexing: type={type(sbox_out).__name__}, val={sbox_out}") # Test 2: Simulate multi-output predictions dict raw_predictions = {f"byte_{i}": np.random.rand(10, 256).astype(np.float32) for i in range(16)} output_index = 0 key = f"byte_{output_index}" predictions = raw_predictions[key] assert predictions.shape == (10, 256), f"Shape wrong: {predictions.shape}" assert isinstance(predictions[0][sbox_out], (float, np.floating)), f"Indexing wrong: {type(predictions[0][sbox_out])}" print(f"[PASS] Dict extraction: predictions shape={predictions.shape}") # Test 3: Full compute_key_rank with simulated data from src.evaluation import compute_key_rank metadata_dtype = np.dtype([('plaintext', np.uint8, (16,)), ('key', np.uint8, (16,))]) metadata = np.zeros(10, dtype=metadata_dtype) for i in range(10): metadata[i]['plaintext'] = np.random.randint(0, 256, 16, dtype=np.uint8) metadata[i]['key'] = np.array([0x4D] * 16, dtype=np.uint8) ranks_array, final_rank = compute_key_rank( predictions=predictions, metadata=metadata, real_key=0x4D, target_byte=0, num_traces=10, rank_step=5 ) assert isinstance(final_rank, int), f"final_rank type wrong: {type(final_rank)}" print(f"[PASS] compute_key_rank: final_rank={final_rank}") # Test 4: Build tiny Keras model and test full evaluate_model flow import tensorflow as tf inp = tf.keras.Input(shape=(100, 1)) x = tf.keras.layers.Flatten()(inp) x = tf.keras.layers.Dense(32, activation='relu')(x) outputs = {} for byte_idx in range(16): outputs[f"byte_{byte_idx}"] = tf.keras.layers.Dense(256, activation='softmax', name=f"byte_{byte_idx}")(x) model = tf.keras.Model(inputs=inp, outputs=outputs, name="test_hps") from src.evaluation import evaluate_model attack_traces = np.random.rand(20, 100).astype(np.float32) attack_metadata = np.zeros(20, dtype=metadata_dtype) for i in range(20): attack_metadata[i]['plaintext'] = np.random.randint(0, 256, 16, dtype=np.uint8) attack_metadata[i]['key'] = np.array([0x4D] * 16, dtype=np.uint8) for byte_idx in [0, 7, 15]: result = evaluate_model( model=model, attack_traces=attack_traces, attack_metadata=attack_metadata, target_byte=byte_idx, real_key=0x4D, model_type="mtan", num_traces=20, output_index=byte_idx ) assert 'final_rank' in result, f"Missing final_rank for byte {byte_idx}" print(f"[PASS] evaluate_model byte_{byte_idx}: final_rank={result['final_rank']}") print("\n=== ALL TESTS PASSED ===") PYEOF scp ${SSH_OPTS} -P "${SSH_PORT}" /tmp/test_eval_dryrun.py "root@${SSH_HOST}:/tmp/test_eval_dryrun.py" # Run the test TEST_OUTPUT=$(ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" \ "cd ${REMOTE_DIR} && python3 /tmp/test_eval_dryrun.py 2>&1") echo "${TEST_OUTPUT}" if echo "${TEST_OUTPUT}" | grep -q "ALL TESTS PASSED"; then log "Dry-run test PASSED on worker" else fail "Dry-run test FAILED on worker:\n${TEST_OUTPUT}" 2 fi # ---- Step 6: Optionally restart worker ---- if [ "${RESTART_WORKER}" = "--restart-worker" ]; then log "Restarting worker agent..." # Kill existing worker and training processes ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" " pkill -f worker_agent.py 2>/dev/null || true pkill -f train_mtl.py 2>/dev/null || true sleep 2 # Verify processes are dead if pgrep -f 'worker_agent.py|train_mtl.py' > /dev/null 2>&1; then pkill -9 -f 'worker_agent.py|train_mtl.py' 2>/dev/null || true sleep 1 fi echo 'Processes killed' " # Start worker in screen session (module-based, with log forwarding) ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" " cd ${REMOTE_DIR} screen -dmS worker bash -c ' python3 -m orchestrator.worker.agent \ --server-url \"\${TQ_SERVER_URL}\" \ --worker-id \"\${WORKER_ID:-worker-\$(hostname)}\" \ --data-dir /root/ascad_data \ --pipeline-dir ${REMOTE_DIR} \ --auth-user \"\${TQ_AUTH_USER:-admin}\" \ --auth-pass \"\${TQ_AUTH_PASS}\" \ --forward-logs /root/worker.log \ 2>&1 | tee -a /root/worker.log ' sleep 2 if screen -list | grep -q worker; then echo 'Worker started in screen session' else echo 'ERROR: Worker failed to start' exit 1 fi " log "Worker agent restarted" fi echo "" log "=========================================" log " DEPLOYMENT COMPLETE — ALL CHECKS PASSED" log "========================================="