ascad-training-pipeline / scripts /deploy_to_worker.sh
lemousehunter
feat: LMIC-TSBN model + persistence fixes across restarts
cbb6546
#!/bin/bash
# =============================================================================
# deploy_to_worker.sh — Deploy code to a Vast.ai worker with verification
# =============================================================================
# Usage: ./deploy_to_worker.sh <ssh_host> <ssh_port> [--restart-worker]
#
# This script:
# 1. Computes checksums of all local source files
# 2. SCPs the entire src/ directory to the worker
# 3. Clears all __pycache__ directories on the worker
# 4. Verifies remote checksums match local checksums
# 5. Optionally runs a dry-run evaluation test on the worker
# 6. Optionally restarts the worker agent
#
# Exit codes:
# 0 = success (all checks passed)
# 1 = checksum mismatch (deployment failed)
# 2 = dry-run test failed
# 3 = SSH connection failed
# =============================================================================
set -euo pipefail
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
SSH_HOST="${1:?Usage: $0 <ssh_host> <ssh_port> [--restart-worker]}"
SSH_PORT="${2:?Usage: $0 <ssh_host> <ssh_port> [--restart-worker]}"
RESTART_WORKER="${3:-}"
LOCAL_DIR="/home/ubuntu/ascad-training-pipeline"
REMOTE_DIR="/root/ascad-training-pipeline"
SSH_OPTS="-o StrictHostKeyChecking=no -o ConnectTimeout=15"
log() { echo -e "${GREEN}[DEPLOY]${NC} $*"; }
warn() { echo -e "${YELLOW}[WARN]${NC} $*"; }
fail() { echo -e "${RED}[FAIL]${NC} $*"; exit "${2:-1}"; }
# ---- Step 0: Test SSH connectivity ----
log "Testing SSH connectivity to ${SSH_HOST}:${SSH_PORT}..."
if ! ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" "echo OK" 2>/dev/null; then
fail "Cannot connect to ${SSH_HOST}:${SSH_PORT}" 3
fi
log "SSH connection OK"
# ---- Step 1: Compute local checksums ----
log "Computing local checksums..."
LOCAL_CHECKSUMS=$(cd "${LOCAL_DIR}" && find src/ scripts/ -name "*.py" -o -name "*.sh" | sort | xargs md5sum)
echo "${LOCAL_CHECKSUMS}" > /tmp/deploy_local_checksums.txt
LOCAL_COUNT=$(echo "${LOCAL_CHECKSUMS}" | wc -l)
log "Found ${LOCAL_COUNT} files to deploy"
# ---- Step 2: SCP source files ----
log "Deploying src/ and scripts/ to worker..."
scp ${SSH_OPTS} -P "${SSH_PORT}" -r "${LOCAL_DIR}/src" "root@${SSH_HOST}:${REMOTE_DIR}/"
scp ${SSH_OPTS} -P "${SSH_PORT}" -r "${LOCAL_DIR}/scripts" "root@${SSH_HOST}:${REMOTE_DIR}/"
# Also deploy train_mtl.py and worker_agent.py if they exist at top level
for f in train_mtl.py worker_agent.py; do
if [ -f "${LOCAL_DIR}/${f}" ]; then
scp ${SSH_OPTS} -P "${SSH_PORT}" "${LOCAL_DIR}/${f}" "root@${SSH_HOST}:${REMOTE_DIR}/"
fi
done
log "Files transferred"
# ---- Step 3: Clear __pycache__ on worker ----
log "Clearing __pycache__ on worker..."
ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" \
"find ${REMOTE_DIR} -name '__pycache__' -type d -exec rm -rf {} + 2>/dev/null; echo 'Cleared'"
log "__pycache__ cleared"
# ---- Step 4: Verify checksums ----
log "Verifying remote checksums..."
REMOTE_CHECKSUMS=$(ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" \
"cd ${REMOTE_DIR} && find src/ scripts/ -name '*.py' -o -name '*.sh' | sort | xargs md5sum")
echo "${REMOTE_CHECKSUMS}" > /tmp/deploy_remote_checksums.txt
# Compare only files that exist locally (worker may have extra files from HF pull)
MISMATCH=0
while IFS= read -r line; do
local_md5=$(echo "$line" | awk '{print $1}')
local_file=$(echo "$line" | awk '{print $2}')
remote_line=$(echo "${REMOTE_CHECKSUMS}" | grep "${local_file}$" || true)
if [ -z "${remote_line}" ]; then
warn "File missing on worker: ${local_file}"
MISMATCH=1
else
remote_md5=$(echo "${remote_line}" | awk '{print $1}')
if [ "${local_md5}" != "${remote_md5}" ]; then
warn "Checksum mismatch: ${local_file} (local=${local_md5}, remote=${remote_md5})"
MISMATCH=1
fi
fi
done <<< "${LOCAL_CHECKSUMS}"
if [ "${MISMATCH}" -eq 1 ]; then
fail "Checksum verification failed! See warnings above." 1
fi
log "All ${LOCAL_COUNT} local files verified — checksums match on worker"
# ---- Step 5: Run dry-run evaluation test on worker ----
log "Running dry-run evaluation test on worker..."
# Create and deploy the test script
cat > /tmp/test_eval_dryrun.py << 'PYEOF'
#!/usr/bin/env python3
"""Quick dry-run test for evaluation code on the worker."""
import sys
import os
import numpy as np
sys.path.insert(0, "/root/ascad-training-pipeline")
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
from src.constants import AES_SBOX, NUM_CLASSES
# Test 1: AES_SBOX indexing
sbox_out = AES_SBOX[np.uint8(42) ^ np.uint8(0)]
assert isinstance(sbox_out, (int, np.integer)), f"sbox_out type wrong: {type(sbox_out)}"
print(f"[PASS] AES_SBOX indexing: type={type(sbox_out).__name__}, val={sbox_out}")
# Test 2: Simulate multi-output predictions dict
raw_predictions = {f"byte_{i}": np.random.rand(10, 256).astype(np.float32) for i in range(16)}
output_index = 0
key = f"byte_{output_index}"
predictions = raw_predictions[key]
assert predictions.shape == (10, 256), f"Shape wrong: {predictions.shape}"
assert isinstance(predictions[0][sbox_out], (float, np.floating)), f"Indexing wrong: {type(predictions[0][sbox_out])}"
print(f"[PASS] Dict extraction: predictions shape={predictions.shape}")
# Test 3: Full compute_key_rank with simulated data
from src.evaluation import compute_key_rank
metadata_dtype = np.dtype([('plaintext', np.uint8, (16,)), ('key', np.uint8, (16,))])
metadata = np.zeros(10, dtype=metadata_dtype)
for i in range(10):
metadata[i]['plaintext'] = np.random.randint(0, 256, 16, dtype=np.uint8)
metadata[i]['key'] = np.array([0x4D] * 16, dtype=np.uint8)
ranks_array, final_rank = compute_key_rank(
predictions=predictions, metadata=metadata,
real_key=0x4D, target_byte=0, num_traces=10, rank_step=5
)
assert isinstance(final_rank, int), f"final_rank type wrong: {type(final_rank)}"
print(f"[PASS] compute_key_rank: final_rank={final_rank}")
# Test 4: Build tiny Keras model and test full evaluate_model flow
import tensorflow as tf
inp = tf.keras.Input(shape=(100, 1))
x = tf.keras.layers.Flatten()(inp)
x = tf.keras.layers.Dense(32, activation='relu')(x)
outputs = {}
for byte_idx in range(16):
outputs[f"byte_{byte_idx}"] = tf.keras.layers.Dense(256, activation='softmax', name=f"byte_{byte_idx}")(x)
model = tf.keras.Model(inputs=inp, outputs=outputs, name="test_hps")
from src.evaluation import evaluate_model
attack_traces = np.random.rand(20, 100).astype(np.float32)
attack_metadata = np.zeros(20, dtype=metadata_dtype)
for i in range(20):
attack_metadata[i]['plaintext'] = np.random.randint(0, 256, 16, dtype=np.uint8)
attack_metadata[i]['key'] = np.array([0x4D] * 16, dtype=np.uint8)
for byte_idx in [0, 7, 15]:
result = evaluate_model(
model=model, attack_traces=attack_traces, attack_metadata=attack_metadata,
target_byte=byte_idx, real_key=0x4D, model_type="mtan",
num_traces=20, output_index=byte_idx
)
assert 'final_rank' in result, f"Missing final_rank for byte {byte_idx}"
print(f"[PASS] evaluate_model byte_{byte_idx}: final_rank={result['final_rank']}")
print("\n=== ALL TESTS PASSED ===")
PYEOF
scp ${SSH_OPTS} -P "${SSH_PORT}" /tmp/test_eval_dryrun.py "root@${SSH_HOST}:/tmp/test_eval_dryrun.py"
# Run the test
TEST_OUTPUT=$(ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" \
"cd ${REMOTE_DIR} && python3 /tmp/test_eval_dryrun.py 2>&1")
echo "${TEST_OUTPUT}"
if echo "${TEST_OUTPUT}" | grep -q "ALL TESTS PASSED"; then
log "Dry-run test PASSED on worker"
else
fail "Dry-run test FAILED on worker:\n${TEST_OUTPUT}" 2
fi
# ---- Step 6: Optionally restart worker ----
if [ "${RESTART_WORKER}" = "--restart-worker" ]; then
log "Restarting worker agent..."
# Kill existing worker and training processes
ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" "
pkill -f worker_agent.py 2>/dev/null || true
pkill -f train_mtl.py 2>/dev/null || true
sleep 2
# Verify processes are dead
if pgrep -f 'worker_agent.py|train_mtl.py' > /dev/null 2>&1; then
pkill -9 -f 'worker_agent.py|train_mtl.py' 2>/dev/null || true
sleep 1
fi
echo 'Processes killed'
"
# Start worker in screen session (module-based, with log forwarding)
ssh ${SSH_OPTS} -p "${SSH_PORT}" "root@${SSH_HOST}" "
cd ${REMOTE_DIR}
screen -dmS worker bash -c '
python3 -m orchestrator.worker.agent \
--server-url \"\${TQ_SERVER_URL}\" \
--worker-id \"\${WORKER_ID:-worker-\$(hostname)}\" \
--data-dir /root/ascad_data \
--pipeline-dir ${REMOTE_DIR} \
--auth-user \"\${TQ_AUTH_USER:-admin}\" \
--auth-pass \"\${TQ_AUTH_PASS}\" \
--forward-logs /root/worker.log \
2>&1 | tee -a /root/worker.log
'
sleep 2
if screen -list | grep -q worker; then
echo 'Worker started in screen session'
else
echo 'ERROR: Worker failed to start'
exit 1
fi
"
log "Worker agent restarted"
fi
echo ""
log "========================================="
log " DEPLOYMENT COMPLETE — ALL CHECKS PASSED"
log "========================================="