| |
| """ |
| BiRefNet Lite Optimization for Intel Xeon W-2145 (Skylake-SP) |
| v2 β fixes: manual ONNX export, ONNXβOpenVINO conversion, INT8 quantization |
| |
| Target CPU: 8C/16T, AVX-512 (NO VNNI/BF16/AMX), 11MB L3 |
| """ |
|
|
| import os, sys, time, json, warnings, gc |
| from pathlib import Path |
| import numpy as np |
| import torch |
| from PIL import Image |
| from torchvision import transforms |
|
|
| warnings.filterwarnings("ignore") |
|
|
| MODEL_ID = "ZhengPeng7/BiRefNet_lite" |
| RESOLUTIONS = [(1024, 1024), (512, 512)] |
| WARMUP_RUNS = 3 |
| BENCHMARK_RUNS = 10 |
| NUM_THREADS = 8 |
| OUTPUT_DIR = Path("/app/optimized_models") |
| RESULTS_FILE = Path("/app/benchmark_results.json") |
|
|
| os.environ["OMP_NUM_THREADS"] = str(NUM_THREADS) |
| os.environ["KMP_AFFINITY"] = "granularity=fine,compact,1,0" |
| os.environ["KMP_BLOCKTIME"] = "1" |
| os.environ["OMP_WAIT_POLICY"] = "ACTIVE" |
|
|
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| def create_dummy_input(resolution, batch_size=1): |
| return torch.randn(batch_size, 3, resolution[0], resolution[1]) |
|
|
| def benchmark_fn(fn, warmup=WARMUP_RUNS, runs=BENCHMARK_RUNS, label=""): |
| for _ in range(warmup): |
| fn() |
| times = [] |
| for _ in range(runs): |
| gc.collect() |
| t0 = time.perf_counter() |
| fn() |
| t1 = time.perf_counter() |
| times.append((t1 - t0) * 1000) |
| result = { |
| "label": label, |
| "mean_ms": round(np.mean(times), 2), |
| "std_ms": round(np.std(times), 2), |
| "min_ms": round(np.min(times), 2), |
| "max_ms": round(np.max(times), 2), |
| "median_ms": round(np.median(times), 2), |
| "fps": round(1000.0 / np.mean(times), 2), |
| "runs": runs, |
| } |
| print(f" [{label}] mean={result['mean_ms']:.1f}ms Β± {result['std_ms']:.1f}ms | " |
| f"min={result['min_ms']:.1f}ms | fps={result['fps']:.2f}") |
| return result |
|
|
| all_results = {"model": MODEL_ID, "target_cpu": "Intel Xeon W-2145 (Skylake-SP)", "benchmarks": []} |
|
|
| |
| |
| |
| print("=" * 70) |
| print("STEP 1: Loading BiRefNet Lite (PyTorch)") |
| print("=" * 70) |
|
|
| from transformers import AutoModelForImageSegmentation |
|
|
| model_pt = AutoModelForImageSegmentation.from_pretrained(MODEL_ID, trust_remote_code=True) |
| model_pt.eval() |
| model_pt = model_pt.float() |
|
|
| param_count = sum(p.numel() for p in model_pt.parameters()) |
| model_size_mb = sum(p.numel() * p.element_size() for p in model_pt.parameters()) / 1024**2 |
| print(f" Parameters: {param_count:,}") |
| print(f" Model size (FP32): {model_size_mb:.1f} MB") |
|
|
| torch.set_num_threads(NUM_THREADS) |
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| print("STEP 2: PyTorch FP32 Baseline") |
| print("=" * 70) |
|
|
| for res in RESOLUTIONS: |
| dummy = create_dummy_input(res) |
| def pt_infer(d=dummy): |
| with torch.no_grad(): |
| return model_pt(d)[-1].sigmoid() |
| result = benchmark_fn(pt_infer, label=f"PyTorch-FP32-{res[0]}x{res[1]}") |
| result["resolution"] = f"{res[0]}x{res[1]}" |
| result["backend"] = "pytorch_fp32" |
| all_results["benchmarks"].append(result) |
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| print("STEP 3: ONNX Export (static shape per resolution)") |
| print("=" * 70) |
|
|
| onnx_dir = OUTPUT_DIR / "onnx" |
| onnx_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| class BiRefNetWrapper(torch.nn.Module): |
| def __init__(self, model): |
| super().__init__() |
| self.model = model |
| def forward(self, x): |
| return self.model(x)[-1] |
|
|
| wrapper = BiRefNetWrapper(model_pt) |
| wrapper.eval() |
|
|
| onnx_models = {} |
| for res in RESOLUTIONS: |
| onnx_path = onnx_dir / f"birefnet_lite_{res[0]}x{res[1]}.onnx" |
| print(f" Exporting ONNX for {res[0]}x{res[1]}...") |
| try: |
| dummy = create_dummy_input(res) |
| torch.onnx.export( |
| wrapper, |
| dummy, |
| str(onnx_path), |
| input_names=["input_image"], |
| output_names=["output"], |
| opset_version=17, |
| do_constant_folding=True, |
| ) |
| size_mb = onnx_path.stat().st_size / 1024**2 |
| print(f" Exported: {onnx_path.name} ({size_mb:.1f} MB)") |
| onnx_models[res] = str(onnx_path) |
| except Exception as e: |
| print(f" ONNX export failed for {res[0]}x{res[1]}: {e}") |
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| print("STEP 3b: OpenVINO Direct Conversion from PyTorch (ov.convert_model)") |
| print("=" * 70) |
|
|
| import openvino as ov |
| core = ov.Core() |
|
|
| ov_dir = OUTPUT_DIR / "openvino_fp32" |
| ov_dir.mkdir(parents=True, exist_ok=True) |
| ov_models = {} |
|
|
| for res in RESOLUTIONS: |
| ir_path = str(ov_dir / f"birefnet_lite_{res[0]}x{res[1]}.xml") |
| print(f" Converting PyTorch β OpenVINO IR for {res[0]}x{res[1]}...") |
| try: |
| dummy = create_dummy_input(res) |
| |
| if res in onnx_models: |
| print(f" Using ONNX path...") |
| ov_model = core.read_model(onnx_models[res]) |
| else: |
| |
| print(f" Using direct PyTorch β OV conversion...") |
| ov_model = ov.convert_model(wrapper, example_input=dummy) |
| |
| ov.save_model(ov_model, ir_path) |
| bin_size = Path(ir_path.replace(".xml", ".bin")).stat().st_size / 1024**2 |
| print(f" IR saved: {bin_size:.1f} MB") |
| ov_models[res] = ir_path |
| except Exception as e: |
| print(f" Conversion failed for {res[0]}x{res[1]}: {e}") |
| import traceback; traceback.print_exc() |
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| print("STEP 4: ONNX Runtime FP32 (Graph Optimized)") |
| print("=" * 70) |
|
|
| import onnxruntime as ort |
|
|
| for res in RESOLUTIONS: |
| if res not in onnx_models: |
| print(f" Skipping {res[0]}x{res[1]} β no ONNX model") |
| continue |
| |
| try: |
| sess_opts = ort.SessionOptions() |
| sess_opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
| sess_opts.intra_op_num_threads = NUM_THREADS |
| sess_opts.inter_op_num_threads = 1 |
| sess_opts.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL |
| |
| session = ort.InferenceSession(onnx_models[res], sess_opts, providers=["CPUExecutionProvider"]) |
| input_name = session.get_inputs()[0].name |
| dummy_np = create_dummy_input(res).numpy() |
| |
| def ort_infer(s=session, inp=input_name, d=dummy_np): |
| return s.run(None, {inp: d}) |
| |
| result = benchmark_fn(ort_infer, label=f"ONNX-RT-FP32-{res[0]}x{res[1]}") |
| result["resolution"] = f"{res[0]}x{res[1]}" |
| result["backend"] = "onnxruntime_fp32" |
| all_results["benchmarks"].append(result) |
| del session |
| except Exception as e: |
| print(f" ONNX-RT FP32 {res[0]}x{res[1]} failed: {e}") |
| import traceback; traceback.print_exc() |
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| print("STEP 5: ONNX Runtime INT8 Dynamic Quantization") |
| print("=" * 70) |
|
|
| from onnxruntime.quantization import quantize_dynamic, QuantType |
|
|
| ort_int8_models = {} |
| for res in RESOLUTIONS: |
| if res not in onnx_models: |
| continue |
| |
| int8_path = onnx_dir / f"birefnet_lite_{res[0]}x{res[1]}_int8.onnx" |
| try: |
| print(f" Quantizing {res[0]}x{res[1]} to INT8 (dynamic)...") |
| quantize_dynamic( |
| model_input=onnx_models[res], |
| model_output=str(int8_path), |
| weight_type=QuantType.QInt8, |
| per_channel=True, |
| reduce_range=False, |
| extra_options={"DefaultTensorType": 1}, |
| ) |
| size_mb = int8_path.stat().st_size / 1024**2 |
| print(f" INT8 model: {int8_path.name} ({size_mb:.1f} MB)") |
| ort_int8_models[res] = str(int8_path) |
| except Exception as e: |
| print(f" INT8 quantization failed for {res[0]}x{res[1]}: {e}") |
| import traceback; traceback.print_exc() |
|
|
| |
| for res in RESOLUTIONS: |
| if res not in ort_int8_models: |
| continue |
| try: |
| sess_opts = ort.SessionOptions() |
| sess_opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
| sess_opts.intra_op_num_threads = NUM_THREADS |
| sess_opts.inter_op_num_threads = 1 |
| sess_opts.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL |
| |
| session = ort.InferenceSession(ort_int8_models[res], sess_opts, providers=["CPUExecutionProvider"]) |
| input_name = session.get_inputs()[0].name |
| dummy_np = create_dummy_input(res).numpy() |
| |
| def ort_int8_infer(s=session, inp=input_name, d=dummy_np): |
| return s.run(None, {inp: d}) |
| |
| result = benchmark_fn(ort_int8_infer, label=f"ONNX-RT-INT8-{res[0]}x{res[1]}") |
| result["resolution"] = f"{res[0]}x{res[1]}" |
| result["backend"] = "onnxruntime_int8_dynamic" |
| all_results["benchmarks"].append(result) |
| del session |
| except Exception as e: |
| print(f" ONNX-RT INT8 {res[0]}x{res[1]} failed: {e}") |
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| print("STEP 6: OpenVINO FP32 Benchmark") |
| print("=" * 70) |
|
|
| for res in RESOLUTIONS: |
| if res not in ov_models: |
| print(f" Skipping {res[0]}x{res[1]} β no OpenVINO model") |
| continue |
| try: |
| ov_model = core.read_model(ov_models[res]) |
| ov_config = { |
| "PERFORMANCE_HINT": "LATENCY", |
| "NUM_STREAMS": "1", |
| "INFERENCE_NUM_THREADS": str(NUM_THREADS), |
| } |
| compiled = core.compile_model(ov_model, "CPU", ov_config) |
| infer_req = compiled.create_infer_request() |
| dummy_np = create_dummy_input(res).numpy() |
| |
| def ov_fp32_infer(req=infer_req, d=dummy_np): |
| return req.infer({0: d}) |
| |
| result = benchmark_fn(ov_fp32_infer, label=f"OpenVINO-FP32-{res[0]}x{res[1]}") |
| result["resolution"] = f"{res[0]}x{res[1]}" |
| result["backend"] = "openvino_fp32" |
| all_results["benchmarks"].append(result) |
| |
| except Exception as e: |
| print(f" OpenVINO FP32 {res[0]}x{res[1]} failed: {e}") |
| import traceback; traceback.print_exc() |
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| print("STEP 7: OpenVINO FP16 Weight Compression") |
| print("=" * 70) |
|
|
| ov_fp16_dir = OUTPUT_DIR / "openvino_fp16" |
| ov_fp16_dir.mkdir(parents=True, exist_ok=True) |
|
|
| for res in RESOLUTIONS: |
| if res not in ov_models: |
| continue |
| try: |
| ov_model = core.read_model(ov_models[res]) |
| fp16_path = str(ov_fp16_dir / f"birefnet_lite_{res[0]}x{res[1]}_fp16.xml") |
| ov.save_model(ov_model, fp16_path, compress_to_fp16=True) |
| |
| bin_size = Path(fp16_path.replace(".xml", ".bin")).stat().st_size / 1024**2 |
| print(f" {res[0]}x{res[1]} FP16: {bin_size:.1f} MB") |
| |
| ov_config = {"PERFORMANCE_HINT": "LATENCY", "NUM_STREAMS": "1", "INFERENCE_NUM_THREADS": str(NUM_THREADS)} |
| compiled = core.compile_model(core.read_model(fp16_path), "CPU", ov_config) |
| infer_req = compiled.create_infer_request() |
| dummy_np = create_dummy_input(res).numpy() |
| |
| def ov_fp16_infer(req=infer_req, d=dummy_np): |
| return req.infer({0: d}) |
| |
| result = benchmark_fn(ov_fp16_infer, label=f"OpenVINO-FP16-{res[0]}x{res[1]}") |
| result["resolution"] = f"{res[0]}x{res[1]}" |
| result["backend"] = "openvino_fp16" |
| all_results["benchmarks"].append(result) |
| except Exception as e: |
| print(f" OpenVINO FP16 {res[0]}x{res[1]} failed: {e}") |
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| print("STEP 8: OpenVINO INT8 NNCF Post-Training Quantization") |
| print("=" * 70) |
|
|
| import nncf |
|
|
| ov_int8_dir = OUTPUT_DIR / "openvino_int8" |
| ov_int8_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| res_1024 = (1024, 1024) |
| if res_1024 in ov_models: |
| try: |
| ov_model_fp32 = core.read_model(ov_models[res_1024]) |
| |
| print(" Generating calibration data (50 synthetic images at 1024x1024)...") |
| transform = transforms.Compose([ |
| transforms.Resize((1024, 1024)), |
| transforms.ToTensor(), |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), |
| ]) |
| |
| cal_data = [] |
| for i in range(50): |
| img = Image.fromarray(np.random.randint(0, 256, (1024, 1024, 3), dtype=np.uint8)) |
| tensor = transform(img).unsqueeze(0).numpy() |
| cal_data.append(tensor) |
| |
| def cal_transform(data_item): |
| return {0: data_item} |
| |
| nncf_dataset = nncf.Dataset(cal_data, cal_transform) |
| |
| print(" Running NNCF INT8 quantization...") |
| t0 = time.time() |
| quantized_model = nncf.quantize( |
| ov_model_fp32, |
| nncf_dataset, |
| preset=nncf.QuantizationPreset.MIXED, |
| subset_size=50, |
| fast_bias_correction=True, |
| ) |
| print(f" Quantization completed in {time.time() - t0:.1f}s") |
| |
| int8_path = str(ov_int8_dir / "birefnet_lite_1024x1024_int8.xml") |
| ov.save_model(quantized_model, int8_path) |
| bin_size = Path(int8_path.replace(".xml", ".bin")).stat().st_size / 1024**2 |
| print(f" INT8 model: {bin_size:.1f} MB") |
| |
| |
| ov_config = {"PERFORMANCE_HINT": "LATENCY", "NUM_STREAMS": "1", "INFERENCE_NUM_THREADS": str(NUM_THREADS)} |
| compiled_int8 = core.compile_model(core.read_model(int8_path), "CPU", ov_config) |
| infer_req_int8 = compiled_int8.create_infer_request() |
| dummy_np = create_dummy_input(res_1024).numpy() |
| |
| def ov_int8_infer(req=infer_req_int8, d=dummy_np): |
| return req.infer({0: d}) |
| |
| result = benchmark_fn(ov_int8_infer, label="OpenVINO-INT8-1024x1024") |
| result["resolution"] = "1024x1024" |
| result["backend"] = "openvino_int8_nncf" |
| all_results["benchmarks"].append(result) |
| |
| except Exception as e: |
| print(f" OpenVINO INT8 NNCF failed: {e}") |
| import traceback; traceback.print_exc() |
|
|
| |
| res_512 = (512, 512) |
| if res_512 in ov_models: |
| try: |
| ov_model_fp32_512 = core.read_model(ov_models[res_512]) |
| |
| print("\n Generating calibration data (50 synthetic images at 512x512)...") |
| transform_512 = transforms.Compose([ |
| transforms.Resize((512, 512)), |
| transforms.ToTensor(), |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), |
| ]) |
| cal_data_512 = [] |
| for i in range(50): |
| img = Image.fromarray(np.random.randint(0, 256, (512, 512, 3), dtype=np.uint8)) |
| tensor = transform_512(img).unsqueeze(0).numpy() |
| cal_data_512.append(tensor) |
| |
| nncf_dataset_512 = nncf.Dataset(cal_data_512, cal_transform) |
| |
| print(" Running NNCF INT8 quantization for 512x512...") |
| t0 = time.time() |
| quantized_model_512 = nncf.quantize( |
| ov_model_fp32_512, |
| nncf_dataset_512, |
| preset=nncf.QuantizationPreset.MIXED, |
| subset_size=50, |
| fast_bias_correction=True, |
| ) |
| print(f" Quantization completed in {time.time() - t0:.1f}s") |
| |
| int8_512_path = str(ov_int8_dir / "birefnet_lite_512x512_int8.xml") |
| ov.save_model(quantized_model_512, int8_512_path) |
| |
| |
| compiled_int8_512 = core.compile_model(core.read_model(int8_512_path), "CPU", ov_config) |
| infer_req_int8_512 = compiled_int8_512.create_infer_request() |
| dummy_512 = create_dummy_input(res_512).numpy() |
| |
| def ov_int8_512_infer(req=infer_req_int8_512, d=dummy_512): |
| return req.infer({0: d}) |
| |
| result = benchmark_fn(ov_int8_512_infer, label="OpenVINO-INT8-512x512") |
| result["resolution"] = "512x512" |
| result["backend"] = "openvino_int8_nncf" |
| all_results["benchmarks"].append(result) |
| |
| except Exception as e: |
| print(f" OpenVINO INT8 512x512 failed: {e}") |
| import traceback; traceback.print_exc() |
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| print("STEP 9: OpenVINO INT8 Weight-Only Quantization") |
| print("=" * 70) |
|
|
| ov_int8wo_dir = OUTPUT_DIR / "openvino_int8wo" |
| ov_int8wo_dir.mkdir(parents=True, exist_ok=True) |
|
|
| for res in RESOLUTIONS: |
| if res not in ov_models: |
| continue |
| try: |
| ov_model = core.read_model(ov_models[res]) |
| compressed = nncf.compress_weights(ov_model, mode=nncf.CompressWeightsMode.INT8_SYM) |
| |
| wo_path = str(ov_int8wo_dir / f"birefnet_lite_{res[0]}x{res[1]}_int8wo.xml") |
| ov.save_model(compressed, wo_path) |
| bin_size = Path(wo_path.replace(".xml", ".bin")).stat().st_size / 1024**2 |
| print(f" {res[0]}x{res[1]} INT8-WO: {bin_size:.1f} MB") |
| |
| compiled = core.compile_model(core.read_model(wo_path), "CPU", |
| {"PERFORMANCE_HINT": "LATENCY", "NUM_STREAMS": "1", |
| "INFERENCE_NUM_THREADS": str(NUM_THREADS)}) |
| infer_req = compiled.create_infer_request() |
| dummy_np = create_dummy_input(res).numpy() |
| |
| def ov_int8wo_infer(req=infer_req, d=dummy_np): |
| return req.infer({0: d}) |
| |
| result = benchmark_fn(ov_int8wo_infer, label=f"OpenVINO-INT8wo-{res[0]}x{res[1]}") |
| result["resolution"] = f"{res[0]}x{res[1]}" |
| result["backend"] = "openvino_int8_weight_only" |
| all_results["benchmarks"].append(result) |
| except Exception as e: |
| print(f" OpenVINO INT8-WO {res[0]}x{res[1]} failed: {e}") |
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| print("FINAL RESULTS SUMMARY") |
| print("=" * 70) |
|
|
| with open(RESULTS_FILE, "w") as f: |
| json.dump(all_results, f, indent=2) |
|
|
| print(f"\n{'Backend':<40} {'Resolution':<12} {'Mean (ms)':<12} {'Min (ms)':<12} {'FPS':<10} {'Speedup':<10}") |
| print("-" * 96) |
|
|
| baselines = {} |
| for b in all_results["benchmarks"]: |
| if b["backend"] == "pytorch_fp32": |
| baselines[b["resolution"]] = b["mean_ms"] |
|
|
| for b in sorted(all_results["benchmarks"], key=lambda x: (x["resolution"], x["mean_ms"])): |
| baseline = baselines.get(b["resolution"], b["mean_ms"]) |
| speedup = baseline / b["mean_ms"] if b["mean_ms"] > 0 else 0 |
| best_for_res = min( |
| (x["mean_ms"] for x in all_results["benchmarks"] if x["resolution"] == b["resolution"] and x["mean_ms"] > 0), |
| default=b["mean_ms"] |
| ) |
| marker = " β
BEST" if b["mean_ms"] == best_for_res else "" |
| print(f"{b['backend']:<40} {b['resolution']:<12} {b['mean_ms']:<12.1f} {b['min_ms']:<12.1f} {b['fps']:<10.2f} {speedup:<10.2f}{marker}") |
|
|
| print("\n" + "=" * 70) |
| print("OPTIMIZATION GUIDE FOR INTEL XEON W-2145") |
| print("=" * 70) |
| print(""" |
| CPU: Intel Xeon W-2145 (Skylake-SP) |
| - 8 cores / 16 threads, 11 MB L3 |
| - AVX-512F/CD/BW/DQ/VL β NO VNNI, NO BF16, NO AMX |
| - FP32 compute only; INT8 reduces memory bandwidth, not compute |
| |
| Recommended deployment: |
| 1. OpenVINO INT8 (NNCF) β best latency/throughput ratio |
| 2. Static input shape β eliminates dynamic dispatch overhead |
| 3. OMP_NUM_THREADS=8 (physical cores, avoid HT contention) |
| 4. KMP_AFFINITY=granularity=fine,compact,1,0 |
| 5. NUM_STREAMS=1 for single-request latency optimization |
| 6. 512x512 resolution when quality allows (~4x faster than 1024x1024) |
| |
| Upgrade path for additional gains: |
| - Cascade Lake (W-3200): VNNI β 2x more INT8 throughput |
| - Sapphire Rapids (W-2400): AMX β 4-8x INT8/BF16 throughput |
| """) |
|
|
| print(f"\nAll optimized models in: {OUTPUT_DIR}") |
| print(f"Benchmark results in: {RESULTS_FILE}") |
| print("Done!") |
|
|