modules_play / remote_eval.py
richard.lin
feat: finished with regnet_x_800mf PTQ scripts.
91075d7
Raw
History Blame Contribute Delete
21.2 kB
#!/usr/bin/env python3
"""
Remote TensorRT evaluation script for HGNetV2-B2 quantized ONNX models.
Steps:
1. Upload ONNX models + calibration data to remote
2. Build TensorRT engines with various precision flags
3. Run inference on ImageNet validation set via ONNX Runtime (on remote)
4. Collect and compare accuracy metrics
5. Log everything to /data/rilin/hgnetv2/evaluate.log
"""
import paramiko
import os
import time
import sys
import json
import scp
import io
import numpy as np
import torch
import timm
from timm.data import resolve_model_data_config, create_transform
from PIL import Image
import pyarrow.ipc as ipc
from sklearn.metrics import average_precision_score, precision_recall_fscore_support
# ===================== CONFIG =====================
REMOTE_HOST = "192.168.8.108"
REMOTE_USER = "nvidia"
REMOTE_PASS = "nvidia"
REMOTE_BASE = "/data/rilin/hgnetv2"
TRTEXEC = "/data/rilin/tlr/aicompiler"
LOCAL_MODELS = {
"FP32 (baseline)": "hgnetv2_b2_fp32.onnx",
"FP16": "fp16/hgnetv2_b2_fp16.onnx",
"INT8 entropy": "int8/hgnetv2_b2_int8_entropy.onnx",
"INT8 max": "int8/hgnetv2_b2_int8_max.onnx",
"FP8 entropy": "fp8/hgnetv2_b2_fp8_entropy.onnx",
"FP8 max": "fp8/hgnetv2_b2_fp8_max.onnx",
"INT4 awq_clip": "int4/hgnetv2_b2_int4_awq_clip.onnx",
"INT4 awq_lite (asym)": "int4/hgnetv2_b2_int4_awq_lite_asym.onnx",
"INT4 awq_lite (sym)": "int4/hgnetv2_b2_int4_awq_lite.onnx",
"INT4 awq_full": "int4/hgnetv2_b2_int4_awq_full.onnx",
"INT4 rtn_dq": "int4/hgnetv2_b2_int4_rtn_dq.onnx"
}
# ===================== END CONFIG =====================
def ssh_exec(ssh, cmd, timeout=600, log_output=True):
"""Execute command on remote and return stdout, stderr."""
stdin, stdout, stderr = ssh.exec_command(cmd, timeout=timeout)
out = stdout.read().decode()
err = stderr.read().decode()
if log_output and out.strip():
print(f" [OUT] {out.strip()[:500]}")
if log_output and err.strip():
print(f" [ERR] {err.strip()[:500]}")
return out.strip(), err.strip()
def main():
# Connect
print(f"Connecting to {REMOTE_HOST}...")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(REMOTE_HOST, username=REMOTE_USER, password=REMOTE_PASS, timeout=30)
print("Connected!")
# Create remote dirs
ssh_exec(ssh, f"mkdir -p {REMOTE_BASE}/models {REMOTE_BASE}/results", log_output=False)
# =====================================================================
# PHASE 1: Upload models and calibration data
# =====================================================================
print("\n" + "=" * 70)
print("PHASE 1: Uploading models to remote")
print("=" * 70)
scp_client = scp.SCPClient(ssh.get_transport())
# Upload calibration data
calib_local = "hgnetv2_b2_calibration.npy"
calib_remote = f"{REMOTE_BASE}/models/hgnetv2_b2_calibration.npy"
print(f" Uploading {calib_local}...")
scp_client.put(calib_local, calib_remote)
# Upload ONNX models + external data
uploaded = []
for name, local_path in LOCAL_MODELS.items():
if not os.path.exists(local_path):
print(f" SKIP {name}: {local_path} not found")
continue
remote_path = f"{REMOTE_BASE}/models/{os.path.basename(local_path)}"
print(f" Uploading {name}: {local_path} -> {remote_path}")
scp_client.put(local_path, remote_path)
uploaded.append((name, remote_path))
# Also upload external data files (.data or _data)
for ext in [".data", "_data"]:
data_local = local_path + ext if ext == ".data" else local_path.replace(".onnx", ".onnx" + ext)
# Check various naming patterns
for candidate in [local_path + ".data", local_path.replace(".onnx", ".onnx.data"),
local_path + "_data"]:
if os.path.exists(candidate):
data_remote = f"{REMOTE_BASE}/models/{os.path.basename(candidate)}"
print(f" + external data: {candidate}")
scp_client.put(candidate, data_remote)
break
scp_client.close()
print(f" Uploaded {len(uploaded)} models.")
# Verify uploads
out, _ = ssh_exec(ssh, f"ls -lh {REMOTE_BASE}/models/", log_output=False)
print(f" Remote files:\n{out}")
# =====================================================================
# PHASE 2: Build TensorRT engines
# =====================================================================
print("\n" + "=" * 70)
print("PHASE 2: Building TensorRT engines")
print("=" * 70)
# FP32 baseline: just build engine
# FP16: --fp16
# INT8 QDQ models: --int8 (they already have QDQ nodes)
# FP8 QDQ models: --fp8
# INT4 QDQ models: --int4
engine_configs = []
# FP32 baseline
engine_configs.append({
"name": "fp32",
"onnx": f"{REMOTE_BASE}/models/hgnetv2_b2_fp32.onnx",
"engine": f"{REMOTE_BASE}/results/hgnetv2_b2_fp32.plan",
"flags": "",
})
# FP16
engine_configs.append({
"name": "fp16",
"onnx": f"{REMOTE_BASE}/models/hgnetv2_b2_fp16.onnx",
"engine": f"{REMOTE_BASE}/results/hgnetv2_b2_fp16.plan",
"flags": "--fp16",
})
# INT8 QDQ models (already quantized, just need --int8 flag for TRT to recognize QDQ)
for name in ["int8_entropy", "int8_entropy_asym", "int8_max", "int8_max_asym"]:
onnx_file = f"{REMOTE_BASE}/models/hgnetv2_b2_{name}.onnx"
engine_configs.append({
"name": name,
"onnx": onnx_file,
"engine": f"{REMOTE_BASE}/results/hgnetv2_b2_{name}.plan",
"flags": "--int8 --fp16",
})
# FP8 QDQ models
for name in ["fp8_entropy", "fp8_max"]:
onnx_file = f"{REMOTE_BASE}/models/hgnetv2_b2_{name}.onnx"
engine_configs.append({
"name": name,
"onnx": onnx_file,
"engine": f"{REMOTE_BASE}/results/hgnetv2_b2_{name}.plan",
"flags": "--fp8 --fp16",
})
# INT4 QDQ models
for name in ["int4_rtn_dq", "int4_awq_clip", "int4_awq_lite", "int4_awq_full"]:
onnx_file = f"{REMOTE_BASE}/models/hgnetv2_b2_{name}.onnx"
engine_configs.append({
"name": name,
"onnx": onnx_file,
"engine": f"{REMOTE_BASE}/results/hgnetv2_b2_{name}.plan",
"flags": "--int4 --fp16",
})
build_results = {}
for cfg in engine_configs:
name = cfg["name"]
onnx = cfg["onnx"]
engine = cfg["engine"]
flags = cfg["flags"]
print(f"\n--- Building: {name} (flags: {flags or 'none'}) ---")
# Check if ONNX file exists on remote
check_out, _ = ssh_exec(ssh, f"test -f {onnx} && echo EXISTS || echo MISSING", log_output=False)
if "MISSING" in check_out:
print(f" SKIP: ONNX file not found on remote: {onnx}")
build_results[name] = {"status": "skipped", "reason": "onnx missing"}
continue
cmd = f"{TRTEXEC} --onnx={onnx} --saveEngine={engine} {flags} --tacticSources=+CUBLAS,+CUBLASLT 2>&1"
print(f" CMD: {cmd}")
t0 = time.time()
out, err = ssh_exec(ssh, cmd, timeout=600)
elapsed = time.time() - t0
# Check if engine was built
check_out, _ = ssh_exec(ssh, f"test -f {engine} && echo EXISTS || echo MISSING", log_output=False)
if "EXISTS" in check_out:
size_out, _ = ssh_exec(ssh, f"ls -lh {engine}", log_output=False)
print(f" SUCCESS: {size_out} ({elapsed:.1f}s)")
build_results[name] = {"status": "success", "engine": engine, "time": elapsed}
else:
print(f" FAILED ({elapsed:.1f}s)")
build_results[name] = {"status": "failed", "time": elapsed, "output": out[-500:] if out else ""}
# =====================================================================
# PHASE 3: Install onnxruntime on remote and evaluate
# =====================================================================
print("\n" + "=" * 70)
print("PHASE 3: Installing dependencies and evaluating on ImageNet")
print("=" * 70)
# Check if pip packages are available
pip_check, _ = ssh_exec(ssh, "pip3 list 2>/dev/null | grep -iE 'onnx|numpy|pillow|sklearn'", log_output=False)
if not pip_check.strip():
print(" Installing Python packages on remote...")
ssh_exec(ssh, "pip3 install onnxruntime numpy pillow scikit-learn 2>&1 | tail -5", timeout=300)
# Prepare calibration data as numpy on remote for evaluation
# We need to generate evaluation data on the remote side
# Since the remote may not have ImageNet, we'll upload a pre-built evaluation batch
print("\n Preparing evaluation data locally and uploading...")
# Build evaluation dataset from cached arrow shards
model = timm.create_model("hgnetv2_b2.ssld_stage2_ft_in1k", pretrained=True)
data_config = resolve_model_data_config(model)
transform = create_transform(**data_config, is_training=False)
del model
# Load from arrow shards
arrow_dir = os.path.expanduser(
"~/.cache/huggingface/datasets/Tsomaros___imagenet-1k_validation/"
"default/0.0.0/55405c49dece42420e68ddd5f80174f19b29ebaf/"
)
# Build a manageable subset for remote eval - use 2000 images for speed
# (full 41K would take very long over SSH)
N_EVAL = 2000
print(f" Loading {N_EVAL} evaluation images from arrow shards...")
all_images = []
all_labels = []
shard_files = sorted(
f for f in os.listdir(arrow_dir)
if f.startswith("imagenet-1k_validation-validation-") and f.endswith(".arrow")
)
count = 0
for fname in shard_files:
if count >= N_EVAL:
break
path = os.path.join(arrow_dir, fname)
try:
with open(path, "rb") as f:
reader = ipc.RecordBatchStreamReader(f)
table = reader.read_all()
except Exception:
continue
for i in range(len(table)):
if count >= N_EVAL:
break
img_bytes = table.column("image")[i].as_py()
if isinstance(img_bytes, dict):
img_bytes = img_bytes.get("bytes", img_bytes.get("path", b""))
if isinstance(img_bytes, bytes):
img = Image.open(io.BytesIO(img_bytes)).convert("RGB")
else:
continue
label = table.column("label")[i].as_py()
tensor = transform(img)
all_images.append(tensor.numpy())
all_labels.append(label)
count += 1
if count % 500 == 0:
print(f" [{count}/{N_EVAL}] images loaded")
images_np = np.stack(all_images) # (N, C, H, W)
labels_np = np.array(all_labels) # (N,)
print(f" Evaluation data: images={images_np.shape}, labels={labels_np.shape}")
# Save and upload
eval_images_path = "/tmp/hgnetv2_eval_images.npy"
eval_labels_path = "/tmp/hgnetv2_eval_labels.npy"
np.save(eval_images_path, images_np)
np.save(eval_labels_path, labels_np)
scp_client = scp.SCPClient(ssh.get_transport())
scp_client.put(eval_images_path, f"{REMOTE_BASE}/models/eval_images.npy")
scp_client.put(eval_labels_path, f"{REMOTE_BASE}/models/eval_labels.npy")
scp_client.close()
print(" Uploaded evaluation data.")
# =====================================================================
# PHASE 4: Run evaluation on remote via ONNX Runtime + TensorRT EP
# =====================================================================
print("\n" + "=" * 70)
print("PHASE 4: Running evaluation on remote platform")
print("=" * 70)
# Write evaluation script and upload
eval_script = r'''
import sys
import os
import time
import numpy as np
# Try ONNX Runtime with TensorRT EP first, fall back to CPU
try:
import onnxruntime as ort
print(f"ONNX Runtime version: {ort.__version__}")
print(f"Available providers: {ort.get_available_providers()}")
except ImportError:
print("ERROR: onnxruntime not installed")
sys.exit(1)
from sklearn.metrics import average_precision_score, precision_recall_fscore_support
import torch
BASE = "/data/rilin/hgnetv2"
# Load eval data
print("Loading evaluation data...")
images = np.load(f"{BASE}/models/eval_images.npy")
labels = np.load(f"{BASE}/models/eval_labels.npy")
print(f" Images: {images.shape}, Labels: {labels.shape}")
N = len(labels)
num_classes = 1000
# Compute metrics
def compute_metrics(logits, labels, num_classes):
probs = torch.softmax(torch.from_numpy(logits), dim=1).numpy()
preds = probs.argmax(axis=1)
N = len(labels)
top1 = (preds == labels).sum() / N
topk_vals = np.argsort(probs, axis=1)[:, ::-1]
top5 = sum(labels[i] in topk_vals[i, :5] for i in range(N)) / N
one_hot = np.zeros((N, num_classes), dtype=np.int32)
one_hot[np.arange(N), labels] = 1
aps = []
for c in range(num_classes):
if one_hot[:, c].sum() == 0: continue
try: ap = average_precision_score(one_hot[:, c], probs[:, c])
except: ap = 0.0
aps.append(ap)
mAP = np.mean(aps) if aps else 0.0
prec_mac, rec_mac, f1_mac, _ = precision_recall_fscore_support(labels, preds, average="macro", zero_division=0)
prec_wt, rec_wt, f1_wt, _ = precision_recall_fscore_support(labels, preds, average="weighted", zero_division=0)
return {"top1": float(top1), "top5": float(top5), "mAP": float(mAP),
"f1_macro": float(f1_mac), "f1_weighted": float(f1_wt)}
# Models to evaluate
models = {
"fp32": f"{BASE}/models/hgnetv2_b2_fp32.onnx",
"fp16": f"{BASE}/models/hgnetv2_b2_fp16.onnx",
"int8_entropy": f"{BASE}/models/hgnetv2_b2_int8_entropy.onnx",
"int8_entropy_asym": f"{BASE}/models/hgnetv2_b2_int8_entropy_asym.onnx",
"int8_max": f"{BASE}/models/hgnetv2_b2_int8_max.onnx",
"int8_max_asym": f"{BASE}/models/hgnetv2_b2_int8_max_asym.onnx",
"fp8_entropy": f"{BASE}/models/hgnetv2_b2_fp8_entropy.onnx",
"fp8_max": f"{BASE}/models/hgnetv2_b2_fp8_max.onnx",
"int4_rtn_dq": f"{BASE}/models/hgnetv2_b2_int4_rtn_dq.onnx",
"int4_awq_clip": f"{BASE}/models/hgnetv2_b2_int4_awq_clip.onnx",
"int4_awq_lite": f"{BASE}/models/hgnetv2_b2_int4_awq_lite.onnx",
"int4_awq_full": f"{BASE}/models/hgnetv2_b2_int4_awq_full.onnx",
}
# Evaluate each model
results = {}
for name, onnx_path in models.items():
if not os.path.exists(onnx_path):
print(f"\nSKIP {name}: file not found")
results[name] = {"error": "file not found"}
continue
print(f"\n{'='*50}")
print(f"Evaluating: {name}")
print(f" ONNX: {onnx_path}")
# Try TRT EP first, then CPU
sess = None
for providers in [
[("TensorrtExecutionProvider", {"trt_max_workspace_size": 2147483648}), "CPUExecutionProvider"],
["CPUExecutionProvider"],
]:
try:
opts = ort.SessionOptions()
opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL
sess = ort.InferenceSession(onnx_path, sess_options=opts, providers=providers)
print(f" Session created with providers: {sess.get_providers()}")
break
except Exception as e:
print(f" Provider {providers} failed: {e}")
sess = None
continue
if sess is None:
print(f" FAILED: could not create session")
results[name] = {"error": "session creation failed"}
continue
input_name = sess.get_inputs()[0].name
all_logits = []
batch_size = 1 # batch=1 for safety with static shapes
t0 = time.time()
for i in range(N):
single = images[i:i+1]
out = sess.run(None, {input_name: single})
all_logits.append(out[0])
if (i+1) % 500 == 0:
elapsed = time.time() - t0
speed = (i+1) / elapsed
print(f" [{i+1}/{N}] {speed:.1f} img/s")
all_logits = np.concatenate(all_logits, axis=0)
elapsed = time.time() - t0
metrics = compute_metrics(all_logits, labels, num_classes)
metrics["elapsed"] = elapsed
metrics["speed"] = N / elapsed
results[name] = metrics
print(f" Top-1: {metrics['top1']*100:.3f}%")
print(f" Top-5: {metrics['top5']*100:.3f}%")
print(f" mAP: {metrics['mAP']:.4f}")
print(f" Time: {elapsed:.1f}s ({metrics['speed']:.1f} img/s)")
# Print comparison table
print(f"\n\n{'='*80}")
print("Evaluation Comparison Table (TensorRT Platform)")
print(f"{'='*80}")
print(f" {'Model':<25s} {'Top-1%':>8s} {'Top-5%':>8s} {'mAP':>8s} {'F1_mac':>8s} {'Speed':>10s}")
print(f" {'-'*25} {'-'*8} {'-'*8} {'-'*8} {'-'*8} {'-'*10}")
for name, m in results.items():
if "error" in m:
print(f" {name:<25s} FAILED: {m['error']}")
else:
print(f" {name:<25s} {m['top1']*100:>8.3f} {m['top5']*100:>8.3f} {m['mAP']:>8.4f} {m['f1_macro']:>8.4f} {m['speed']:>9.1f}/s")
print(f"\n Reference (timm): Top-1: 82.346% | Top-5: 96.394%")
print(f"{'='*80}")
# Save results as JSON
import json
with open(f"{BASE}/results/eval_results.json", "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to {BASE}/results/eval_results.json")
'''
# Upload eval script
script_remote = f"{REMOTE_BASE}/eval_trt.py"
scp_client = scp.SCPClient(ssh.get_transport())
scp_client.put(io.BytesIO(eval_script.encode()), script_remote)
scp_client.close()
# Run evaluation on remote
print("\n Running remote evaluation script...")
print(" (This will take a while - evaluating 12 models x 2000 images)")
# Run in background and poll
cmd = f"cd {REMOTE_BASE} && python3 {script_remote} 2>&1 | tee {REMOTE_BASE}/evaluate.log"
t0 = time.time()
stdin, stdout, stderr = ssh.exec_command(cmd, timeout=3600)
# Stream output
full_output = ""
while True:
line = stdout.readline()
if not line:
break
line = line.strip()
if line:
print(f" {line}")
full_output += line + "\n"
exit_code = stdout.channel.recv_exit_status()
elapsed = time.time() - t0
print(f"\n Remote evaluation completed in {elapsed:.1f}s (exit code: {exit_code})")
# =====================================================================
# PHASE 5: Also test with trtexec for performance benchmarking
# =====================================================================
print("\n" + "=" * 70)
print("PHASE 5: TensorRT engine build + trtexec benchmarking")
print("=" * 70)
# For each successfully built engine, run trtexec inference benchmark
for cfg in engine_configs:
name = cfg["name"]
engine = cfg["engine"]
check_out, _ = ssh_exec(ssh, f"test -f {engine} && echo EXISTS || echo MISSING", log_output=False)
if "EXISTS" not in check_out:
print(f" SKIP {name}: engine not built")
continue
print(f"\n--- Benchmarking: {name} ---")
cmd = f"{TRTEXEC} --loadEngine={engine} --iterations=100 --warmUp=10 2>&1 | tail -30"
out, err = ssh_exec(ssh, cmd, timeout=120)
# =====================================================================
# SUMMARY
# =====================================================================
print("\n" + "=" * 70)
print("FINAL SUMMARY")
print("=" * 70)
# Fetch results JSON from remote
try:
scp_client = scp.SCPClient(ssh.get_transport())
local_results_path = "/tmp/hgnetv2_remote_eval_results.json"
scp_client.get(f"{REMOTE_BASE}/results/eval_results.json", local_results_path)
scp_client.close()
with open(local_results_path) as f:
remote_results = json.load(f)
print("\nRemote Evaluation Results (ORT + TensorRT EP):")
print(f" {'Model':<25s} {'Top-1%':>8s} {'Top-5%':>8s} {'mAP':>8s}")
print(f" {'-'*25} {'-'*8} {'-'*8} {'-'*8}")
for name, m in remote_results.items():
if "error" in m:
print(f" {name:<25s} FAILED: {m['error']}")
else:
print(f" {name:<25s} {m['top1']*100:>8.3f} {m['top5']*100:>8.3f} {m['mAP']:>8.4f}")
except Exception as e:
print(f" Could not fetch results: {e}")
# Fetch the remote log
try:
scp_client = scp.SCPClient(ssh.get_transport())
scp_client.get(f"{REMOTE_BASE}/evaluate.log", "/tmp/hgnetv2_evaluate.log")
scp_client.close()
print(f"\nRemote log saved to /tmp/hgnetv2_evaluate.log")
except Exception as e:
print(f" Could not fetch log: {e}")
# Print build results
print("\nTensorRT Engine Build Results:")
print(f" {'Model':<25s} {'Status':>10s} {'Time':>8s}")
print(f" {'-'*25} {'-'*10} {'-'*8}")
for name, r in build_results.items():
status = r["status"]
t = f"{r.get('time', 0):.1f}s" if "time" in r else "-"
print(f" {name:<25s} {status:>10s} {t:>8s}")
ssh.close()
print("\nDone!")
if __name__ == "__main__":
main()