""" mlperf_tiny.py ============== MLPerf Tiny benchmark model stubs and evaluation harness. Literature: - Banbury et al. MLPerf Tiny Benchmark (arXiv 2021) """ import math from typing import Dict, Tuple, Optional import numpy as np try: import torch import torch.nn as nn HAS_TORCH = True except ImportError: HAS_TORCH = False # ================================================================ # MLPerf Tiny Model Stubs # ================================================================ class DS_CNN(nn.Module): """Depthwise-separable CNN for Keyword Spotting (KWS).""" def __init__(self, num_classes: int = 12, input_length: int = 490): super().__init__() self.conv1 = nn.Conv2d(1, 64, 3, padding=1) self.bn1 = nn.BatchNorm2d(64) self.relu1 = nn.ReLU() # Depthwise self.dw = nn.Conv2d(64, 64, 3, padding=1, groups=64) self.bn_dw = nn.BatchNorm2d(64) self.relu_dw = nn.ReLU() # Pointwise self.pw = nn.Conv2d(64, 64, 1) self.bn_pw = nn.BatchNorm2d(64) self.relu_pw = nn.ReLU() self.avgpool = nn.AdaptiveAvgPool2d(1) self.fc = nn.Linear(64, num_classes) def forward(self, x): x = self.relu1(self.bn1(self.conv1(x))) x = self.relu_dw(self.bn_dw(self.dw(x))) x = self.relu_pw(self.bn_pw(self.pw(x))) x = self.avgpool(x) x = x.view(x.size(0), -1) return self.fc(x) class MobileNetV1_Tiny(nn.Module): """Slim MobileNetV1 for Visual Wake Words (VWW).""" def __init__(self, num_classes: int = 2, width_mult: float = 0.25): super().__init__() def conv_bn(inp, oup, stride): return nn.Sequential( nn.Conv2d(inp, oup, 3, stride, 1, bias=False), nn.BatchNorm2d(oup), nn.ReLU6(inplace=True)) def conv_dw(inp, oup, stride): return nn.Sequential( nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False), nn.BatchNorm2d(inp), nn.ReLU6(inplace=True), nn.Conv2d(inp, oup, 1, 1, 0, bias=False), nn.BatchNorm2d(oup), nn.ReLU6(inplace=True)) self.model = nn.Sequential( conv_bn(3, int(32*width_mult), 2), conv_dw(int(32*width_mult), int(64*width_mult), 1), conv_dw(int(64*width_mult), int(128*width_mult), 2), conv_dw(int(128*width_mult), int(128*width_mult), 1), conv_dw(int(128*width_mult), int(256*width_mult), 2), conv_dw(int(256*width_mult), int(256*width_mult), 1), nn.AdaptiveAvgPool2d(1) ) self.fc = nn.Linear(int(256*width_mult), num_classes) def forward(self, x): x = self.model(x) x = x.view(x.size(0), -1) return self.fc(x) class FC_Autoencoder(nn.Module): """Fully-connected autoencoder for Anomaly Detection (AD).""" def __init__(self, input_dim: int = 640, bottleneck: int = 8): super().__init__() self.encoder = nn.Sequential( nn.Linear(input_dim, 128), nn.ReLU(), nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, bottleneck), ) self.decoder = nn.Sequential( nn.Linear(bottleneck, 64), nn.ReLU(), nn.Linear(64, 128), nn.ReLU(), nn.Linear(128, input_dim), ) def forward(self, x): z = self.encoder(x) return self.decoder(z) class ResNetLike_Tiny(nn.Module): """Tiny ResNet for Image Classification (IC).""" def __init__(self, num_classes: int = 10, base_channels: int = 16): super().__init__() self.conv1 = nn.Conv2d(3, base_channels, 3, padding=1) self.bn1 = nn.BatchNorm2d(base_channels) self.relu = nn.ReLU(inplace=True) self.layer1 = self._make_layer(base_channels, base_channels, 2) self.layer2 = self._make_layer(base_channels, base_channels*2, 2, stride=2) self.avgpool = nn.AdaptiveAvgPool2d(1) self.fc = nn.Linear(base_channels*2, num_classes) def _make_layer(self, in_ch, out_ch, blocks, stride=1): layers = [] layers.append(nn.Conv2d(in_ch, out_ch, 3, stride=stride, padding=1)) layers.append(nn.BatchNorm2d(out_ch)) layers.append(nn.ReLU(inplace=True)) for _ in range(1, blocks): layers.append(nn.Conv2d(out_ch, out_ch, 3, padding=1)) layers.append(nn.BatchNorm2d(out_ch)) layers.append(nn.ReLU(inplace=True)) return nn.Sequential(*layers) def forward(self, x): x = self.relu(self.bn1(self.conv1(x))) x = self.layer1(x) x = self.layer2(x) x = self.avgpool(x) x = x.view(x.size(0), -1) return self.fc(x) # ================================================================ # Accuracy Degradation Model for PIM Non-Idealities # ================================================================ class PIMAccuracyModel: """ Models accuracy degradation on PIM due to ReRAM non-idealities. Based on: AWGN injection proportional to fault density and V_th drift. """ def __init__(self, base_accuracy: float = 0.92): self.base_accuracy = base_accuracy def predict(self, fault_density: float, v_th_deviation: float, temperature: float) -> float: """ Degrade accuracy based on physical conditions. fault_density: 0..1 v_th_deviation: |V_th - V_nominal| temperature: °C """ # AWGN-like degradation: accuracy drops with fault density and drift noise_factor = (fault_density * 0.15) + (v_th_deviation * 0.1) thermal_factor = max(0, (temperature - 65) / 100) * 0.05 degraded = self.base_accuracy - noise_factor - thermal_factor return float(np.clip(degraded, 0.5, self.base_accuracy)) # ================================================================ # MLPerf Tiny Benchmark Harness # ================================================================ class MLPerfTinyBenchmark: """Run a router on MLPerf Tiny model stubs and collect metrics.""" MODEL_SPECS = { "kws": {"model": DS_CNN, "input_shape": (1, 1, 49, 10), "timesteps": 1, "target": "PIM"}, "vww": {"model": MobileNetV1_Tiny, "input_shape": (1, 3, 96, 96), "timesteps": 1, "target": "GPU"}, "ad": {"model": FC_Autoencoder, "input_shape": (1, 640), "timesteps": 1, "target": "PIM"}, "ic": {"model": ResNetLike_Tiny, "input_shape": (1, 3, 32, 32), "timesteps": 1, "target": "GPU"}, } def __init__(self, device: str = "cpu"): self.device = device self.results: Dict[str, Dict] = {} def run(self, router_fn, n_runs: int = 100) -> Dict[str, Dict]: """ router_fn(model, input_shape, timesteps) -> target_name Returns dict of per-task metrics. """ from profiler import TaskComplexityProfiler from physics import PhysicsSensorModel profiler = TaskComplexityProfiler() pim_acc = PIMAccuracyModel(base_accuracy=0.92) for task_name, spec in self.MODEL_SPECS.items(): model = spec["model"]() profile = profiler.profile(model, spec["input_shape"], spec["timesteps"]) targets = [] latencies = [] energies = [] accuracies = [] for _ in range(n_runs): target = router_fn(model, spec["input_shape"], spec["timesteps"]) targets.append(target) lat = profiler.estimate_latency(profile, target) eng = profiler.estimate_energy(profile, target) latencies.append(lat) energies.append(eng) if target == "PIM": # Simulate physics at random temperature sensor = PhysicsSensorModel(T_ambient=25.0) sensor.T_current = np.random.uniform(30, 75) fd = sensor.get_fault_density() vth = sensor.get_threshold_voltage(deterministic=True) acc = pim_acc.predict(fd, abs(vth - 0.6), sensor.T_current) else: acc = {"CPU": 0.95, "GPU": 0.96}[target] accuracies.append(acc) self.results[task_name] = { "targets": targets, "target_counts": {t: targets.count(t) for t in set(targets)}, "avg_latency_ms": float(np.mean(latencies)), "avg_energy_mj": float(np.mean(energies)), "avg_accuracy": float(np.mean(accuracies)), "expected": spec["target"], } return self.results def print_report(self): print("\n" + "=" * 65) print(" MLPERF TINY BENCHMARK REPORT") print("=" * 65) for task, m in self.results.items(): correct = m["target_counts"].get(m["expected"], 0) pct = correct / len(m["targets"]) * 100 print(f" {task.upper():<8} | " f"Accuracy: {m['avg_accuracy']:.3f} | " f"Latency: {m['avg_latency_ms']:.2f}ms | " f"Energy: {m['avg_energy_mj']:.4f}mJ | " f"Match: {pct:.0f}%") print(f" Distribution: {m['target_counts']}") print("=" * 65)