tempo-snn-v2 / src /benchmarks /mlperf_tiny.py
KD099's picture
Upload folder using huggingface_hub
a157e36 verified
"""
mlperf_tiny.py
==============
MLPerf Tiny benchmark model stubs and evaluation harness.
Literature:
- Banbury et al. MLPerf Tiny Benchmark (arXiv 2021)
"""
import math
from typing import Dict, Tuple, Optional
import numpy as np
try:
import torch
import torch.nn as nn
HAS_TORCH = True
except ImportError:
HAS_TORCH = False
# ================================================================
# MLPerf Tiny Model Stubs
# ================================================================
class DS_CNN(nn.Module):
"""Depthwise-separable CNN for Keyword Spotting (KWS)."""
def __init__(self, num_classes: int = 12, input_length: int = 490):
super().__init__()
self.conv1 = nn.Conv2d(1, 64, 3, padding=1)
self.bn1 = nn.BatchNorm2d(64)
self.relu1 = nn.ReLU()
# Depthwise
self.dw = nn.Conv2d(64, 64, 3, padding=1, groups=64)
self.bn_dw = nn.BatchNorm2d(64)
self.relu_dw = nn.ReLU()
# Pointwise
self.pw = nn.Conv2d(64, 64, 1)
self.bn_pw = nn.BatchNorm2d(64)
self.relu_pw = nn.ReLU()
self.avgpool = nn.AdaptiveAvgPool2d(1)
self.fc = nn.Linear(64, num_classes)
def forward(self, x):
x = self.relu1(self.bn1(self.conv1(x)))
x = self.relu_dw(self.bn_dw(self.dw(x)))
x = self.relu_pw(self.bn_pw(self.pw(x)))
x = self.avgpool(x)
x = x.view(x.size(0), -1)
return self.fc(x)
class MobileNetV1_Tiny(nn.Module):
"""Slim MobileNetV1 for Visual Wake Words (VWW)."""
def __init__(self, num_classes: int = 2, width_mult: float = 0.25):
super().__init__()
def conv_bn(inp, oup, stride):
return nn.Sequential(
nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
nn.BatchNorm2d(oup), nn.ReLU6(inplace=True))
def conv_dw(inp, oup, stride):
return nn.Sequential(
nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False),
nn.BatchNorm2d(inp), nn.ReLU6(inplace=True),
nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup), nn.ReLU6(inplace=True))
self.model = nn.Sequential(
conv_bn(3, int(32*width_mult), 2),
conv_dw(int(32*width_mult), int(64*width_mult), 1),
conv_dw(int(64*width_mult), int(128*width_mult), 2),
conv_dw(int(128*width_mult), int(128*width_mult), 1),
conv_dw(int(128*width_mult), int(256*width_mult), 2),
conv_dw(int(256*width_mult), int(256*width_mult), 1),
nn.AdaptiveAvgPool2d(1)
)
self.fc = nn.Linear(int(256*width_mult), num_classes)
def forward(self, x):
x = self.model(x)
x = x.view(x.size(0), -1)
return self.fc(x)
class FC_Autoencoder(nn.Module):
"""Fully-connected autoencoder for Anomaly Detection (AD)."""
def __init__(self, input_dim: int = 640, bottleneck: int = 8):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 128), nn.ReLU(),
nn.Linear(128, 64), nn.ReLU(),
nn.Linear(64, bottleneck),
)
self.decoder = nn.Sequential(
nn.Linear(bottleneck, 64), nn.ReLU(),
nn.Linear(64, 128), nn.ReLU(),
nn.Linear(128, input_dim),
)
def forward(self, x):
z = self.encoder(x)
return self.decoder(z)
class ResNetLike_Tiny(nn.Module):
"""Tiny ResNet for Image Classification (IC)."""
def __init__(self, num_classes: int = 10, base_channels: int = 16):
super().__init__()
self.conv1 = nn.Conv2d(3, base_channels, 3, padding=1)
self.bn1 = nn.BatchNorm2d(base_channels)
self.relu = nn.ReLU(inplace=True)
self.layer1 = self._make_layer(base_channels, base_channels, 2)
self.layer2 = self._make_layer(base_channels, base_channels*2, 2, stride=2)
self.avgpool = nn.AdaptiveAvgPool2d(1)
self.fc = nn.Linear(base_channels*2, num_classes)
def _make_layer(self, in_ch, out_ch, blocks, stride=1):
layers = []
layers.append(nn.Conv2d(in_ch, out_ch, 3, stride=stride, padding=1))
layers.append(nn.BatchNorm2d(out_ch))
layers.append(nn.ReLU(inplace=True))
for _ in range(1, blocks):
layers.append(nn.Conv2d(out_ch, out_ch, 3, padding=1))
layers.append(nn.BatchNorm2d(out_ch))
layers.append(nn.ReLU(inplace=True))
return nn.Sequential(*layers)
def forward(self, x):
x = self.relu(self.bn1(self.conv1(x)))
x = self.layer1(x)
x = self.layer2(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
return self.fc(x)
# ================================================================
# Accuracy Degradation Model for PIM Non-Idealities
# ================================================================
class PIMAccuracyModel:
"""
Models accuracy degradation on PIM due to ReRAM non-idealities.
Based on: AWGN injection proportional to fault density and V_th drift.
"""
def __init__(self, base_accuracy: float = 0.92):
self.base_accuracy = base_accuracy
def predict(self, fault_density: float, v_th_deviation: float,
temperature: float) -> float:
"""
Degrade accuracy based on physical conditions.
fault_density: 0..1
v_th_deviation: |V_th - V_nominal|
temperature: °C
"""
# AWGN-like degradation: accuracy drops with fault density and drift
noise_factor = (fault_density * 0.15) + (v_th_deviation * 0.1)
thermal_factor = max(0, (temperature - 65) / 100) * 0.05
degraded = self.base_accuracy - noise_factor - thermal_factor
return float(np.clip(degraded, 0.5, self.base_accuracy))
# ================================================================
# MLPerf Tiny Benchmark Harness
# ================================================================
class MLPerfTinyBenchmark:
"""Run a router on MLPerf Tiny model stubs and collect metrics."""
MODEL_SPECS = {
"kws": {"model": DS_CNN, "input_shape": (1, 1, 49, 10), "timesteps": 1, "target": "PIM"},
"vww": {"model": MobileNetV1_Tiny, "input_shape": (1, 3, 96, 96), "timesteps": 1, "target": "GPU"},
"ad": {"model": FC_Autoencoder, "input_shape": (1, 640), "timesteps": 1, "target": "PIM"},
"ic": {"model": ResNetLike_Tiny, "input_shape": (1, 3, 32, 32), "timesteps": 1, "target": "GPU"},
}
def __init__(self, device: str = "cpu"):
self.device = device
self.results: Dict[str, Dict] = {}
def run(self, router_fn, n_runs: int = 100) -> Dict[str, Dict]:
"""
router_fn(model, input_shape, timesteps) -> target_name
Returns dict of per-task metrics.
"""
from profiler import TaskComplexityProfiler
from physics import PhysicsSensorModel
profiler = TaskComplexityProfiler()
pim_acc = PIMAccuracyModel(base_accuracy=0.92)
for task_name, spec in self.MODEL_SPECS.items():
model = spec["model"]()
profile = profiler.profile(model, spec["input_shape"], spec["timesteps"])
targets = []
latencies = []
energies = []
accuracies = []
for _ in range(n_runs):
target = router_fn(model, spec["input_shape"], spec["timesteps"])
targets.append(target)
lat = profiler.estimate_latency(profile, target)
eng = profiler.estimate_energy(profile, target)
latencies.append(lat)
energies.append(eng)
if target == "PIM":
# Simulate physics at random temperature
sensor = PhysicsSensorModel(T_ambient=25.0)
sensor.T_current = np.random.uniform(30, 75)
fd = sensor.get_fault_density()
vth = sensor.get_threshold_voltage(deterministic=True)
acc = pim_acc.predict(fd, abs(vth - 0.6), sensor.T_current)
else:
acc = {"CPU": 0.95, "GPU": 0.96}[target]
accuracies.append(acc)
self.results[task_name] = {
"targets": targets,
"target_counts": {t: targets.count(t) for t in set(targets)},
"avg_latency_ms": float(np.mean(latencies)),
"avg_energy_mj": float(np.mean(energies)),
"avg_accuracy": float(np.mean(accuracies)),
"expected": spec["target"],
}
return self.results
def print_report(self):
print("\n" + "=" * 65)
print(" MLPERF TINY BENCHMARK REPORT")
print("=" * 65)
for task, m in self.results.items():
correct = m["target_counts"].get(m["expected"], 0)
pct = correct / len(m["targets"]) * 100
print(f" {task.upper():<8} | "
f"Accuracy: {m['avg_accuracy']:.3f} | "
f"Latency: {m['avg_latency_ms']:.2f}ms | "
f"Energy: {m['avg_energy_mj']:.4f}mJ | "
f"Match: {pct:.0f}%")
print(f" Distribution: {m['target_counts']}")
print("=" * 65)