#!/usr/bin/env python3 """ Remote TensorRT evaluation script for HGNetV2-B2 quantized ONNX models. Steps: 1. Upload ONNX models + calibration data to remote 2. Build TensorRT engines with various precision flags 3. Run inference on ImageNet validation set via ONNX Runtime (on remote) 4. Collect and compare accuracy metrics 5. Log everything to /data/rilin/hgnetv2/evaluate.log """ import paramiko import os import time import sys import json import scp import io import numpy as np import torch import timm from timm.data import resolve_model_data_config, create_transform from PIL import Image import pyarrow.ipc as ipc from sklearn.metrics import average_precision_score, precision_recall_fscore_support # ===================== CONFIG ===================== REMOTE_HOST = "192.168.8.108" REMOTE_USER = "nvidia" REMOTE_PASS = "nvidia" REMOTE_BASE = "/data/rilin/hgnetv2" TRTEXEC = "/data/rilin/tlr/aicompiler" LOCAL_MODELS = { "FP32 (baseline)": "hgnetv2_b2_fp32.onnx", "FP16": "fp16/hgnetv2_b2_fp16.onnx", "INT8 entropy": "int8/hgnetv2_b2_int8_entropy.onnx", "INT8 max": "int8/hgnetv2_b2_int8_max.onnx", "FP8 entropy": "fp8/hgnetv2_b2_fp8_entropy.onnx", "FP8 max": "fp8/hgnetv2_b2_fp8_max.onnx", "INT4 awq_clip": "int4/hgnetv2_b2_int4_awq_clip.onnx", "INT4 awq_lite (asym)": "int4/hgnetv2_b2_int4_awq_lite_asym.onnx", "INT4 awq_lite (sym)": "int4/hgnetv2_b2_int4_awq_lite.onnx", "INT4 awq_full": "int4/hgnetv2_b2_int4_awq_full.onnx", "INT4 rtn_dq": "int4/hgnetv2_b2_int4_rtn_dq.onnx" } # ===================== END CONFIG ===================== def ssh_exec(ssh, cmd, timeout=600, log_output=True): """Execute command on remote and return stdout, stderr.""" stdin, stdout, stderr = ssh.exec_command(cmd, timeout=timeout) out = stdout.read().decode() err = stderr.read().decode() if log_output and out.strip(): print(f" [OUT] {out.strip()[:500]}") if log_output and err.strip(): print(f" [ERR] {err.strip()[:500]}") return out.strip(), err.strip() def main(): # Connect print(f"Connecting to {REMOTE_HOST}...") ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(REMOTE_HOST, username=REMOTE_USER, password=REMOTE_PASS, timeout=30) print("Connected!") # Create remote dirs ssh_exec(ssh, f"mkdir -p {REMOTE_BASE}/models {REMOTE_BASE}/results", log_output=False) # ===================================================================== # PHASE 1: Upload models and calibration data # ===================================================================== print("\n" + "=" * 70) print("PHASE 1: Uploading models to remote") print("=" * 70) scp_client = scp.SCPClient(ssh.get_transport()) # Upload calibration data calib_local = "hgnetv2_b2_calibration.npy" calib_remote = f"{REMOTE_BASE}/models/hgnetv2_b2_calibration.npy" print(f" Uploading {calib_local}...") scp_client.put(calib_local, calib_remote) # Upload ONNX models + external data uploaded = [] for name, local_path in LOCAL_MODELS.items(): if not os.path.exists(local_path): print(f" SKIP {name}: {local_path} not found") continue remote_path = f"{REMOTE_BASE}/models/{os.path.basename(local_path)}" print(f" Uploading {name}: {local_path} -> {remote_path}") scp_client.put(local_path, remote_path) uploaded.append((name, remote_path)) # Also upload external data files (.data or _data) for ext in [".data", "_data"]: data_local = local_path + ext if ext == ".data" else local_path.replace(".onnx", ".onnx" + ext) # Check various naming patterns for candidate in [local_path + ".data", local_path.replace(".onnx", ".onnx.data"), local_path + "_data"]: if os.path.exists(candidate): data_remote = f"{REMOTE_BASE}/models/{os.path.basename(candidate)}" print(f" + external data: {candidate}") scp_client.put(candidate, data_remote) break scp_client.close() print(f" Uploaded {len(uploaded)} models.") # Verify uploads out, _ = ssh_exec(ssh, f"ls -lh {REMOTE_BASE}/models/", log_output=False) print(f" Remote files:\n{out}") # ===================================================================== # PHASE 2: Build TensorRT engines # ===================================================================== print("\n" + "=" * 70) print("PHASE 2: Building TensorRT engines") print("=" * 70) # FP32 baseline: just build engine # FP16: --fp16 # INT8 QDQ models: --int8 (they already have QDQ nodes) # FP8 QDQ models: --fp8 # INT4 QDQ models: --int4 engine_configs = [] # FP32 baseline engine_configs.append({ "name": "fp32", "onnx": f"{REMOTE_BASE}/models/hgnetv2_b2_fp32.onnx", "engine": f"{REMOTE_BASE}/results/hgnetv2_b2_fp32.plan", "flags": "", }) # FP16 engine_configs.append({ "name": "fp16", "onnx": f"{REMOTE_BASE}/models/hgnetv2_b2_fp16.onnx", "engine": f"{REMOTE_BASE}/results/hgnetv2_b2_fp16.plan", "flags": "--fp16", }) # INT8 QDQ models (already quantized, just need --int8 flag for TRT to recognize QDQ) for name in ["int8_entropy", "int8_entropy_asym", "int8_max", "int8_max_asym"]: onnx_file = f"{REMOTE_BASE}/models/hgnetv2_b2_{name}.onnx" engine_configs.append({ "name": name, "onnx": onnx_file, "engine": f"{REMOTE_BASE}/results/hgnetv2_b2_{name}.plan", "flags": "--int8 --fp16", }) # FP8 QDQ models for name in ["fp8_entropy", "fp8_max"]: onnx_file = f"{REMOTE_BASE}/models/hgnetv2_b2_{name}.onnx" engine_configs.append({ "name": name, "onnx": onnx_file, "engine": f"{REMOTE_BASE}/results/hgnetv2_b2_{name}.plan", "flags": "--fp8 --fp16", }) # INT4 QDQ models for name in ["int4_rtn_dq", "int4_awq_clip", "int4_awq_lite", "int4_awq_full"]: onnx_file = f"{REMOTE_BASE}/models/hgnetv2_b2_{name}.onnx" engine_configs.append({ "name": name, "onnx": onnx_file, "engine": f"{REMOTE_BASE}/results/hgnetv2_b2_{name}.plan", "flags": "--int4 --fp16", }) build_results = {} for cfg in engine_configs: name = cfg["name"] onnx = cfg["onnx"] engine = cfg["engine"] flags = cfg["flags"] print(f"\n--- Building: {name} (flags: {flags or 'none'}) ---") # Check if ONNX file exists on remote check_out, _ = ssh_exec(ssh, f"test -f {onnx} && echo EXISTS || echo MISSING", log_output=False) if "MISSING" in check_out: print(f" SKIP: ONNX file not found on remote: {onnx}") build_results[name] = {"status": "skipped", "reason": "onnx missing"} continue cmd = f"{TRTEXEC} --onnx={onnx} --saveEngine={engine} {flags} --tacticSources=+CUBLAS,+CUBLASLT 2>&1" print(f" CMD: {cmd}") t0 = time.time() out, err = ssh_exec(ssh, cmd, timeout=600) elapsed = time.time() - t0 # Check if engine was built check_out, _ = ssh_exec(ssh, f"test -f {engine} && echo EXISTS || echo MISSING", log_output=False) if "EXISTS" in check_out: size_out, _ = ssh_exec(ssh, f"ls -lh {engine}", log_output=False) print(f" SUCCESS: {size_out} ({elapsed:.1f}s)") build_results[name] = {"status": "success", "engine": engine, "time": elapsed} else: print(f" FAILED ({elapsed:.1f}s)") build_results[name] = {"status": "failed", "time": elapsed, "output": out[-500:] if out else ""} # ===================================================================== # PHASE 3: Install onnxruntime on remote and evaluate # ===================================================================== print("\n" + "=" * 70) print("PHASE 3: Installing dependencies and evaluating on ImageNet") print("=" * 70) # Check if pip packages are available pip_check, _ = ssh_exec(ssh, "pip3 list 2>/dev/null | grep -iE 'onnx|numpy|pillow|sklearn'", log_output=False) if not pip_check.strip(): print(" Installing Python packages on remote...") ssh_exec(ssh, "pip3 install onnxruntime numpy pillow scikit-learn 2>&1 | tail -5", timeout=300) # Prepare calibration data as numpy on remote for evaluation # We need to generate evaluation data on the remote side # Since the remote may not have ImageNet, we'll upload a pre-built evaluation batch print("\n Preparing evaluation data locally and uploading...") # Build evaluation dataset from cached arrow shards model = timm.create_model("hgnetv2_b2.ssld_stage2_ft_in1k", pretrained=True) data_config = resolve_model_data_config(model) transform = create_transform(**data_config, is_training=False) del model # Load from arrow shards arrow_dir = os.path.expanduser( "~/.cache/huggingface/datasets/Tsomaros___imagenet-1k_validation/" "default/0.0.0/55405c49dece42420e68ddd5f80174f19b29ebaf/" ) # Build a manageable subset for remote eval - use 2000 images for speed # (full 41K would take very long over SSH) N_EVAL = 2000 print(f" Loading {N_EVAL} evaluation images from arrow shards...") all_images = [] all_labels = [] shard_files = sorted( f for f in os.listdir(arrow_dir) if f.startswith("imagenet-1k_validation-validation-") and f.endswith(".arrow") ) count = 0 for fname in shard_files: if count >= N_EVAL: break path = os.path.join(arrow_dir, fname) try: with open(path, "rb") as f: reader = ipc.RecordBatchStreamReader(f) table = reader.read_all() except Exception: continue for i in range(len(table)): if count >= N_EVAL: break img_bytes = table.column("image")[i].as_py() if isinstance(img_bytes, dict): img_bytes = img_bytes.get("bytes", img_bytes.get("path", b"")) if isinstance(img_bytes, bytes): img = Image.open(io.BytesIO(img_bytes)).convert("RGB") else: continue label = table.column("label")[i].as_py() tensor = transform(img) all_images.append(tensor.numpy()) all_labels.append(label) count += 1 if count % 500 == 0: print(f" [{count}/{N_EVAL}] images loaded") images_np = np.stack(all_images) # (N, C, H, W) labels_np = np.array(all_labels) # (N,) print(f" Evaluation data: images={images_np.shape}, labels={labels_np.shape}") # Save and upload eval_images_path = "/tmp/hgnetv2_eval_images.npy" eval_labels_path = "/tmp/hgnetv2_eval_labels.npy" np.save(eval_images_path, images_np) np.save(eval_labels_path, labels_np) scp_client = scp.SCPClient(ssh.get_transport()) scp_client.put(eval_images_path, f"{REMOTE_BASE}/models/eval_images.npy") scp_client.put(eval_labels_path, f"{REMOTE_BASE}/models/eval_labels.npy") scp_client.close() print(" Uploaded evaluation data.") # ===================================================================== # PHASE 4: Run evaluation on remote via ONNX Runtime + TensorRT EP # ===================================================================== print("\n" + "=" * 70) print("PHASE 4: Running evaluation on remote platform") print("=" * 70) # Write evaluation script and upload eval_script = r''' import sys import os import time import numpy as np # Try ONNX Runtime with TensorRT EP first, fall back to CPU try: import onnxruntime as ort print(f"ONNX Runtime version: {ort.__version__}") print(f"Available providers: {ort.get_available_providers()}") except ImportError: print("ERROR: onnxruntime not installed") sys.exit(1) from sklearn.metrics import average_precision_score, precision_recall_fscore_support import torch BASE = "/data/rilin/hgnetv2" # Load eval data print("Loading evaluation data...") images = np.load(f"{BASE}/models/eval_images.npy") labels = np.load(f"{BASE}/models/eval_labels.npy") print(f" Images: {images.shape}, Labels: {labels.shape}") N = len(labels) num_classes = 1000 # Compute metrics def compute_metrics(logits, labels, num_classes): probs = torch.softmax(torch.from_numpy(logits), dim=1).numpy() preds = probs.argmax(axis=1) N = len(labels) top1 = (preds == labels).sum() / N topk_vals = np.argsort(probs, axis=1)[:, ::-1] top5 = sum(labels[i] in topk_vals[i, :5] for i in range(N)) / N one_hot = np.zeros((N, num_classes), dtype=np.int32) one_hot[np.arange(N), labels] = 1 aps = [] for c in range(num_classes): if one_hot[:, c].sum() == 0: continue try: ap = average_precision_score(one_hot[:, c], probs[:, c]) except: ap = 0.0 aps.append(ap) mAP = np.mean(aps) if aps else 0.0 prec_mac, rec_mac, f1_mac, _ = precision_recall_fscore_support(labels, preds, average="macro", zero_division=0) prec_wt, rec_wt, f1_wt, _ = precision_recall_fscore_support(labels, preds, average="weighted", zero_division=0) return {"top1": float(top1), "top5": float(top5), "mAP": float(mAP), "f1_macro": float(f1_mac), "f1_weighted": float(f1_wt)} # Models to evaluate models = { "fp32": f"{BASE}/models/hgnetv2_b2_fp32.onnx", "fp16": f"{BASE}/models/hgnetv2_b2_fp16.onnx", "int8_entropy": f"{BASE}/models/hgnetv2_b2_int8_entropy.onnx", "int8_entropy_asym": f"{BASE}/models/hgnetv2_b2_int8_entropy_asym.onnx", "int8_max": f"{BASE}/models/hgnetv2_b2_int8_max.onnx", "int8_max_asym": f"{BASE}/models/hgnetv2_b2_int8_max_asym.onnx", "fp8_entropy": f"{BASE}/models/hgnetv2_b2_fp8_entropy.onnx", "fp8_max": f"{BASE}/models/hgnetv2_b2_fp8_max.onnx", "int4_rtn_dq": f"{BASE}/models/hgnetv2_b2_int4_rtn_dq.onnx", "int4_awq_clip": f"{BASE}/models/hgnetv2_b2_int4_awq_clip.onnx", "int4_awq_lite": f"{BASE}/models/hgnetv2_b2_int4_awq_lite.onnx", "int4_awq_full": f"{BASE}/models/hgnetv2_b2_int4_awq_full.onnx", } # Evaluate each model results = {} for name, onnx_path in models.items(): if not os.path.exists(onnx_path): print(f"\nSKIP {name}: file not found") results[name] = {"error": "file not found"} continue print(f"\n{'='*50}") print(f"Evaluating: {name}") print(f" ONNX: {onnx_path}") # Try TRT EP first, then CPU sess = None for providers in [ [("TensorrtExecutionProvider", {"trt_max_workspace_size": 2147483648}), "CPUExecutionProvider"], ["CPUExecutionProvider"], ]: try: opts = ort.SessionOptions() opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL sess = ort.InferenceSession(onnx_path, sess_options=opts, providers=providers) print(f" Session created with providers: {sess.get_providers()}") break except Exception as e: print(f" Provider {providers} failed: {e}") sess = None continue if sess is None: print(f" FAILED: could not create session") results[name] = {"error": "session creation failed"} continue input_name = sess.get_inputs()[0].name all_logits = [] batch_size = 1 # batch=1 for safety with static shapes t0 = time.time() for i in range(N): single = images[i:i+1] out = sess.run(None, {input_name: single}) all_logits.append(out[0]) if (i+1) % 500 == 0: elapsed = time.time() - t0 speed = (i+1) / elapsed print(f" [{i+1}/{N}] {speed:.1f} img/s") all_logits = np.concatenate(all_logits, axis=0) elapsed = time.time() - t0 metrics = compute_metrics(all_logits, labels, num_classes) metrics["elapsed"] = elapsed metrics["speed"] = N / elapsed results[name] = metrics print(f" Top-1: {metrics['top1']*100:.3f}%") print(f" Top-5: {metrics['top5']*100:.3f}%") print(f" mAP: {metrics['mAP']:.4f}") print(f" Time: {elapsed:.1f}s ({metrics['speed']:.1f} img/s)") # Print comparison table print(f"\n\n{'='*80}") print("Evaluation Comparison Table (TensorRT Platform)") print(f"{'='*80}") print(f" {'Model':<25s} {'Top-1%':>8s} {'Top-5%':>8s} {'mAP':>8s} {'F1_mac':>8s} {'Speed':>10s}") print(f" {'-'*25} {'-'*8} {'-'*8} {'-'*8} {'-'*8} {'-'*10}") for name, m in results.items(): if "error" in m: print(f" {name:<25s} FAILED: {m['error']}") else: print(f" {name:<25s} {m['top1']*100:>8.3f} {m['top5']*100:>8.3f} {m['mAP']:>8.4f} {m['f1_macro']:>8.4f} {m['speed']:>9.1f}/s") print(f"\n Reference (timm): Top-1: 82.346% | Top-5: 96.394%") print(f"{'='*80}") # Save results as JSON import json with open(f"{BASE}/results/eval_results.json", "w") as f: json.dump(results, f, indent=2) print(f"\nResults saved to {BASE}/results/eval_results.json") ''' # Upload eval script script_remote = f"{REMOTE_BASE}/eval_trt.py" scp_client = scp.SCPClient(ssh.get_transport()) scp_client.put(io.BytesIO(eval_script.encode()), script_remote) scp_client.close() # Run evaluation on remote print("\n Running remote evaluation script...") print(" (This will take a while - evaluating 12 models x 2000 images)") # Run in background and poll cmd = f"cd {REMOTE_BASE} && python3 {script_remote} 2>&1 | tee {REMOTE_BASE}/evaluate.log" t0 = time.time() stdin, stdout, stderr = ssh.exec_command(cmd, timeout=3600) # Stream output full_output = "" while True: line = stdout.readline() if not line: break line = line.strip() if line: print(f" {line}") full_output += line + "\n" exit_code = stdout.channel.recv_exit_status() elapsed = time.time() - t0 print(f"\n Remote evaluation completed in {elapsed:.1f}s (exit code: {exit_code})") # ===================================================================== # PHASE 5: Also test with trtexec for performance benchmarking # ===================================================================== print("\n" + "=" * 70) print("PHASE 5: TensorRT engine build + trtexec benchmarking") print("=" * 70) # For each successfully built engine, run trtexec inference benchmark for cfg in engine_configs: name = cfg["name"] engine = cfg["engine"] check_out, _ = ssh_exec(ssh, f"test -f {engine} && echo EXISTS || echo MISSING", log_output=False) if "EXISTS" not in check_out: print(f" SKIP {name}: engine not built") continue print(f"\n--- Benchmarking: {name} ---") cmd = f"{TRTEXEC} --loadEngine={engine} --iterations=100 --warmUp=10 2>&1 | tail -30" out, err = ssh_exec(ssh, cmd, timeout=120) # ===================================================================== # SUMMARY # ===================================================================== print("\n" + "=" * 70) print("FINAL SUMMARY") print("=" * 70) # Fetch results JSON from remote try: scp_client = scp.SCPClient(ssh.get_transport()) local_results_path = "/tmp/hgnetv2_remote_eval_results.json" scp_client.get(f"{REMOTE_BASE}/results/eval_results.json", local_results_path) scp_client.close() with open(local_results_path) as f: remote_results = json.load(f) print("\nRemote Evaluation Results (ORT + TensorRT EP):") print(f" {'Model':<25s} {'Top-1%':>8s} {'Top-5%':>8s} {'mAP':>8s}") print(f" {'-'*25} {'-'*8} {'-'*8} {'-'*8}") for name, m in remote_results.items(): if "error" in m: print(f" {name:<25s} FAILED: {m['error']}") else: print(f" {name:<25s} {m['top1']*100:>8.3f} {m['top5']*100:>8.3f} {m['mAP']:>8.4f}") except Exception as e: print(f" Could not fetch results: {e}") # Fetch the remote log try: scp_client = scp.SCPClient(ssh.get_transport()) scp_client.get(f"{REMOTE_BASE}/evaluate.log", "/tmp/hgnetv2_evaluate.log") scp_client.close() print(f"\nRemote log saved to /tmp/hgnetv2_evaluate.log") except Exception as e: print(f" Could not fetch log: {e}") # Print build results print("\nTensorRT Engine Build Results:") print(f" {'Model':<25s} {'Status':>10s} {'Time':>8s}") print(f" {'-'*25} {'-'*10} {'-'*8}") for name, r in build_results.items(): status = r["status"] t = f"{r.get('time', 0):.1f}s" if "time" in r else "-" print(f" {name:<25s} {status:>10s} {t:>8s}") ssh.close() print("\nDone!") if __name__ == "__main__": main()