| """ |
| mlperf_tiny.py |
| ============== |
| MLPerf Tiny benchmark model stubs and evaluation harness. |
| |
| Literature: |
| - Banbury et al. MLPerf Tiny Benchmark (arXiv 2021) |
| """ |
|
|
| import math |
| from typing import Dict, Tuple, Optional |
| import numpy as np |
|
|
| try: |
| import torch |
| import torch.nn as nn |
| HAS_TORCH = True |
| except ImportError: |
| HAS_TORCH = False |
|
|
|
|
| |
| |
| |
|
|
| class DS_CNN(nn.Module): |
| """Depthwise-separable CNN for Keyword Spotting (KWS).""" |
| def __init__(self, num_classes: int = 12, input_length: int = 490): |
| super().__init__() |
| self.conv1 = nn.Conv2d(1, 64, 3, padding=1) |
| self.bn1 = nn.BatchNorm2d(64) |
| self.relu1 = nn.ReLU() |
| |
| self.dw = nn.Conv2d(64, 64, 3, padding=1, groups=64) |
| self.bn_dw = nn.BatchNorm2d(64) |
| self.relu_dw = nn.ReLU() |
| |
| self.pw = nn.Conv2d(64, 64, 1) |
| self.bn_pw = nn.BatchNorm2d(64) |
| self.relu_pw = nn.ReLU() |
| self.avgpool = nn.AdaptiveAvgPool2d(1) |
| self.fc = nn.Linear(64, num_classes) |
|
|
| def forward(self, x): |
| x = self.relu1(self.bn1(self.conv1(x))) |
| x = self.relu_dw(self.bn_dw(self.dw(x))) |
| x = self.relu_pw(self.bn_pw(self.pw(x))) |
| x = self.avgpool(x) |
| x = x.view(x.size(0), -1) |
| return self.fc(x) |
|
|
|
|
| class MobileNetV1_Tiny(nn.Module): |
| """Slim MobileNetV1 for Visual Wake Words (VWW).""" |
| def __init__(self, num_classes: int = 2, width_mult: float = 0.25): |
| super().__init__() |
| def conv_bn(inp, oup, stride): |
| return nn.Sequential( |
| nn.Conv2d(inp, oup, 3, stride, 1, bias=False), |
| nn.BatchNorm2d(oup), nn.ReLU6(inplace=True)) |
| def conv_dw(inp, oup, stride): |
| return nn.Sequential( |
| nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False), |
| nn.BatchNorm2d(inp), nn.ReLU6(inplace=True), |
| nn.Conv2d(inp, oup, 1, 1, 0, bias=False), |
| nn.BatchNorm2d(oup), nn.ReLU6(inplace=True)) |
| self.model = nn.Sequential( |
| conv_bn(3, int(32*width_mult), 2), |
| conv_dw(int(32*width_mult), int(64*width_mult), 1), |
| conv_dw(int(64*width_mult), int(128*width_mult), 2), |
| conv_dw(int(128*width_mult), int(128*width_mult), 1), |
| conv_dw(int(128*width_mult), int(256*width_mult), 2), |
| conv_dw(int(256*width_mult), int(256*width_mult), 1), |
| nn.AdaptiveAvgPool2d(1) |
| ) |
| self.fc = nn.Linear(int(256*width_mult), num_classes) |
|
|
| def forward(self, x): |
| x = self.model(x) |
| x = x.view(x.size(0), -1) |
| return self.fc(x) |
|
|
|
|
| class FC_Autoencoder(nn.Module): |
| """Fully-connected autoencoder for Anomaly Detection (AD).""" |
| def __init__(self, input_dim: int = 640, bottleneck: int = 8): |
| super().__init__() |
| self.encoder = nn.Sequential( |
| nn.Linear(input_dim, 128), nn.ReLU(), |
| nn.Linear(128, 64), nn.ReLU(), |
| nn.Linear(64, bottleneck), |
| ) |
| self.decoder = nn.Sequential( |
| nn.Linear(bottleneck, 64), nn.ReLU(), |
| nn.Linear(64, 128), nn.ReLU(), |
| nn.Linear(128, input_dim), |
| ) |
|
|
| def forward(self, x): |
| z = self.encoder(x) |
| return self.decoder(z) |
|
|
|
|
| class ResNetLike_Tiny(nn.Module): |
| """Tiny ResNet for Image Classification (IC).""" |
| def __init__(self, num_classes: int = 10, base_channels: int = 16): |
| super().__init__() |
| self.conv1 = nn.Conv2d(3, base_channels, 3, padding=1) |
| self.bn1 = nn.BatchNorm2d(base_channels) |
| self.relu = nn.ReLU(inplace=True) |
| self.layer1 = self._make_layer(base_channels, base_channels, 2) |
| self.layer2 = self._make_layer(base_channels, base_channels*2, 2, stride=2) |
| self.avgpool = nn.AdaptiveAvgPool2d(1) |
| self.fc = nn.Linear(base_channels*2, num_classes) |
|
|
| def _make_layer(self, in_ch, out_ch, blocks, stride=1): |
| layers = [] |
| layers.append(nn.Conv2d(in_ch, out_ch, 3, stride=stride, padding=1)) |
| layers.append(nn.BatchNorm2d(out_ch)) |
| layers.append(nn.ReLU(inplace=True)) |
| for _ in range(1, blocks): |
| layers.append(nn.Conv2d(out_ch, out_ch, 3, padding=1)) |
| layers.append(nn.BatchNorm2d(out_ch)) |
| layers.append(nn.ReLU(inplace=True)) |
| return nn.Sequential(*layers) |
|
|
| def forward(self, x): |
| x = self.relu(self.bn1(self.conv1(x))) |
| x = self.layer1(x) |
| x = self.layer2(x) |
| x = self.avgpool(x) |
| x = x.view(x.size(0), -1) |
| return self.fc(x) |
|
|
|
|
| |
| |
| |
|
|
| class PIMAccuracyModel: |
| """ |
| Models accuracy degradation on PIM due to ReRAM non-idealities. |
| Based on: AWGN injection proportional to fault density and V_th drift. |
| """ |
| def __init__(self, base_accuracy: float = 0.92): |
| self.base_accuracy = base_accuracy |
|
|
| def predict(self, fault_density: float, v_th_deviation: float, |
| temperature: float) -> float: |
| """ |
| Degrade accuracy based on physical conditions. |
| fault_density: 0..1 |
| v_th_deviation: |V_th - V_nominal| |
| temperature: °C |
| """ |
| |
| noise_factor = (fault_density * 0.15) + (v_th_deviation * 0.1) |
| thermal_factor = max(0, (temperature - 65) / 100) * 0.05 |
| degraded = self.base_accuracy - noise_factor - thermal_factor |
| return float(np.clip(degraded, 0.5, self.base_accuracy)) |
|
|
|
|
| |
| |
| |
|
|
| class MLPerfTinyBenchmark: |
| """Run a router on MLPerf Tiny model stubs and collect metrics.""" |
|
|
| MODEL_SPECS = { |
| "kws": {"model": DS_CNN, "input_shape": (1, 1, 49, 10), "timesteps": 1, "target": "PIM"}, |
| "vww": {"model": MobileNetV1_Tiny, "input_shape": (1, 3, 96, 96), "timesteps": 1, "target": "GPU"}, |
| "ad": {"model": FC_Autoencoder, "input_shape": (1, 640), "timesteps": 1, "target": "PIM"}, |
| "ic": {"model": ResNetLike_Tiny, "input_shape": (1, 3, 32, 32), "timesteps": 1, "target": "GPU"}, |
| } |
|
|
| def __init__(self, device: str = "cpu"): |
| self.device = device |
| self.results: Dict[str, Dict] = {} |
|
|
| def run(self, router_fn, n_runs: int = 100) -> Dict[str, Dict]: |
| """ |
| router_fn(model, input_shape, timesteps) -> target_name |
| Returns dict of per-task metrics. |
| """ |
| from profiler import TaskComplexityProfiler |
| from physics import PhysicsSensorModel |
| profiler = TaskComplexityProfiler() |
| pim_acc = PIMAccuracyModel(base_accuracy=0.92) |
|
|
| for task_name, spec in self.MODEL_SPECS.items(): |
| model = spec["model"]() |
| profile = profiler.profile(model, spec["input_shape"], spec["timesteps"]) |
| targets = [] |
| latencies = [] |
| energies = [] |
| accuracies = [] |
| for _ in range(n_runs): |
| target = router_fn(model, spec["input_shape"], spec["timesteps"]) |
| targets.append(target) |
| lat = profiler.estimate_latency(profile, target) |
| eng = profiler.estimate_energy(profile, target) |
| latencies.append(lat) |
| energies.append(eng) |
| if target == "PIM": |
| |
| sensor = PhysicsSensorModel(T_ambient=25.0) |
| sensor.T_current = np.random.uniform(30, 75) |
| fd = sensor.get_fault_density() |
| vth = sensor.get_threshold_voltage(deterministic=True) |
| acc = pim_acc.predict(fd, abs(vth - 0.6), sensor.T_current) |
| else: |
| acc = {"CPU": 0.95, "GPU": 0.96}[target] |
| accuracies.append(acc) |
|
|
| self.results[task_name] = { |
| "targets": targets, |
| "target_counts": {t: targets.count(t) for t in set(targets)}, |
| "avg_latency_ms": float(np.mean(latencies)), |
| "avg_energy_mj": float(np.mean(energies)), |
| "avg_accuracy": float(np.mean(accuracies)), |
| "expected": spec["target"], |
| } |
| return self.results |
|
|
| def print_report(self): |
| print("\n" + "=" * 65) |
| print(" MLPERF TINY BENCHMARK REPORT") |
| print("=" * 65) |
| for task, m in self.results.items(): |
| correct = m["target_counts"].get(m["expected"], 0) |
| pct = correct / len(m["targets"]) * 100 |
| print(f" {task.upper():<8} | " |
| f"Accuracy: {m['avg_accuracy']:.3f} | " |
| f"Latency: {m['avg_latency_ms']:.2f}ms | " |
| f"Energy: {m['avg_energy_mj']:.4f}mJ | " |
| f"Match: {pct:.0f}%") |
| print(f" Distribution: {m['target_counts']}") |
| print("=" * 65) |
|
|