simquantum-tuning-lab / tests /test_perception.py
100enigma's picture
SimQuantum — AMD Developer Hackathon
da98415
Raw
History Blame Contribute Delete
20.6 kB
"""
tests/test_perception.py
========================
Test suite for Phase 1 — perception & physics layer.
Coverage:
1. DQC Gatekeeper — all three quality tiers, edge cases
2. Feature extractors — log preprocess, FFT peak ratio, diagonal strength
3. TinyCNN — forward pass, output shape, feature extraction
4. EnsembleCNN — classification interface, disagreement metric
5. MahalanobisOOD — fit, score, flag, persistence
6. InspectionAgent — full pipeline, 2D-only guard, NL report format
7. CIMDataset — shape contract, label balance, resolution variety
These tests are designed to run without GPU (CPU-only) and without
generating the full 50k training set. They use synthetic small batches.
Run:
pytest tests/test_perception.py -v
"""
from __future__ import annotations
import json
import tempfile
import uuid
from pathlib import Path
import numpy as np
import pytest
import torch
from qdot.core.types import (
ChargeLabel,
DQCQuality,
Measurement,
MeasurementModality,
VoltagePoint,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_measurement(
shape=(32, 32),
modality=MeasurementModality.COARSE_2D,
array=None,
) -> Measurement:
rng = np.random.default_rng(0)
if array is None:
array = rng.uniform(0.0, 1.0, shape).astype(np.float32)
return Measurement(
array=array,
modality=modality,
resolution=shape[0] if len(shape) == 2 else None,
steps=shape[0] if modality == MeasurementModality.LINE_SCAN else None,
device_id="test_device",
voltage_centre=VoltagePoint(0.0, 0.0),
)
def _make_double_dot_patch(res=32) -> np.ndarray:
"""Synthetic double-dot-like patch: diagonal stripes (approximates transition lines)."""
arr = np.zeros((res, res), dtype=np.float32)
for i in range(res):
for j in range(res):
arr[i, j] = 0.5 + 0.4 * np.sin((i + j) * 2 * np.pi / (res / 3))
return np.clip(arr, 0.0, 1.0)
def _make_featureless_patch(res=32, level=0.5) -> np.ndarray:
"""Flat array — should trigger DQC MODERATE/LOW and MISC classification."""
return np.full((res, res), level, dtype=np.float32)
def _make_noisy_patch(res=32) -> np.ndarray:
"""High-noise array — should trigger DQC LOW."""
rng = np.random.default_rng(42)
return rng.uniform(0.0, 1.0, (res, res)).astype(np.float32)
# ---------------------------------------------------------------------------
# 1. DQC Gatekeeper
# ---------------------------------------------------------------------------
class TestDQCGatekeeper:
def setup_method(self):
from qdot.perception.dqc import DQCGatekeeper
self.gk = DQCGatekeeper()
def test_high_quality_real_signal(self):
"""Clean synthetic patch should score HIGH."""
arr = _make_double_dot_patch(32)
m = _make_measurement(array=arr)
result = self.gk.assess(m)
assert result.quality in (DQCQuality.HIGH, DQCQuality.MODERATE), (
f"Expected HIGH or MODERATE, got {result.quality}. SNR={result.snr_db:.1f}"
)
def test_flat_array_is_low_or_moderate(self):
"""A completely flat array should not be HIGH — it has no signal."""
arr = _make_featureless_patch(32, level=0.5)
m = _make_measurement(array=arr)
result = self.gk.assess(m)
assert result.quality in (DQCQuality.LOW, DQCQuality.MODERATE), (
f"Expected LOW or MODERATE for featureless patch, got {result.quality}"
)
def test_nan_is_low(self):
"""NaN in array must trigger LOW quality."""
arr = np.ones((32, 32), dtype=np.float32)
arr[5, 5] = np.nan
m = _make_measurement(array=arr)
result = self.gk.assess(m)
assert result.quality == DQCQuality.LOW
assert not result.physically_plausible
def test_dynamic_range_returned(self):
arr = _make_double_dot_patch(32)
m = _make_measurement(array=arr)
result = self.gk.assess(m)
assert 0.0 <= result.dynamic_range <= 1.0
def test_snr_db_positive_for_structured_array(self):
arr = _make_double_dot_patch(32)
m = _make_measurement(array=arr)
result = self.gk.assess(m)
assert result.snr_db > 0.0
def test_1d_line_scan_works(self):
"""DQC should handle 1D arrays (line scans) without error."""
arr = np.sin(np.linspace(0, 4 * np.pi, 128)).astype(np.float32)
arr = (arr - arr.min()) / (arr.max() - arr.min())
m = _make_measurement(array=arr, shape=(128,), modality=MeasurementModality.LINE_SCAN)
result = self.gk.assess(m)
assert result.quality in (DQCQuality.HIGH, DQCQuality.MODERATE, DQCQuality.LOW)
def test_assess_array_matches_assess_measurement(self):
"""assess_array and assess should return identical results."""
arr = _make_double_dot_patch(32)
m = _make_measurement(array=arr)
r1 = self.gk.assess(m)
r2 = self.gk.assess_array(m.id, arr)
assert r1.quality == r2.quality
assert abs(r1.snr_db - r2.snr_db) < 1e-6
# ---------------------------------------------------------------------------
# 2. Feature extractors
# ---------------------------------------------------------------------------
class TestFeatureExtractors:
def test_log_preprocess_output_range(self):
from qdot.perception.features import log_preprocess
rng = np.random.default_rng(0)
arr = rng.uniform(0, 1, (32, 32)).astype(np.float32)
out = log_preprocess(arr)
assert out.shape == arr.shape
assert out.dtype == np.float32
assert out.min() >= 0.0 - 1e-6
assert out.max() <= 1.0 + 1e-6
def test_log_preprocess_handles_zeros(self):
from qdot.perception.features import log_preprocess
arr = np.zeros((16, 16), dtype=np.float32)
out = log_preprocess(arr)
assert np.all(np.isfinite(out))
def test_fft_peak_ratio_single_dot_higher(self):
"""A periodic array (Coulomb peaks) should have a higher peak ratio."""
from qdot.perception.features import fft_peak_ratio
# Single-dot: horizontal stripes (periodic in one direction)
single_dot = np.zeros((32, 32), dtype=np.float32)
for i in range(32):
single_dot[i, :] = 0.5 + 0.4 * np.sin(i * 2 * np.pi / 4)
# Double-dot: diagonal stripes (two frequencies)
double_dot = _make_double_dot_patch(32)
pr_sd = fft_peak_ratio(single_dot)
pr_dd = fft_peak_ratio(double_dot)
# Not a hard assertion (thresholds vary), just sanity check
assert pr_sd > 0.0
assert pr_dd > 0.0
def test_diagonal_strength_on_diagonal_stripes(self):
from qdot.perception.features import diagonal_strength
arr = _make_double_dot_patch(32)
ds = diagonal_strength(arr)
assert 0.0 <= ds <= 1.0
def test_diagonal_strength_higher_for_diagonal_vs_horizontal(self):
from qdot.perception.features import diagonal_strength
# Horizontal stripes — not diagonal
horizontal = np.zeros((32, 32), dtype=np.float32)
for i in range(32):
horizontal[i, :] = float(i % 4 == 0)
# Diagonal stripes
diagonal = _make_double_dot_patch(32)
ds_h = diagonal_strength(horizontal)
ds_d = diagonal_strength(diagonal)
assert ds_d > ds_h, f"Expected ds_diag ({ds_d:.3f}) > ds_horiz ({ds_h:.3f})"
def test_physics_features_returns_all_keys(self):
from qdot.perception.features import physics_features
arr = _make_double_dot_patch(32)
feats = physics_features(arr)
assert "fft_peak_ratio" in feats
assert "diagonal_strength" in feats
assert "mean_conductance" in feats
assert "conductance_std" in feats
# ---------------------------------------------------------------------------
# 3. TinyCNN
# ---------------------------------------------------------------------------
class TestTinyCNN:
def test_forward_pass_shape(self):
from qdot.perception.classifier import TinyCNN
model = TinyCNN()
x = torch.randn(4, 1, 64, 64)
out = model(x)
assert out.shape == (4, 3)
def test_predict_proba_sums_to_one(self):
from qdot.perception.classifier import TinyCNN
model = TinyCNN()
model.eval()
x = torch.randn(8, 1, 64, 64)
probs = model.predict_proba(x)
sums = probs.sum(dim=-1)
assert torch.allclose(sums, torch.ones(8), atol=1e-5)
def test_extract_features_shape(self):
from qdot.perception.classifier import TinyCNN
model = TinyCNN()
model.eval()
x = torch.randn(4, 1, 64, 64)
feats = model.extract_features(x)
assert feats.shape == (4, 32)
def test_forward_32x32_input_fails_gracefully(self):
"""TinyCNN expects 64×64 input — 32×32 should still work via conv layers."""
from qdot.perception.classifier import TinyCNN
model = TinyCNN()
model.eval()
# 32×32 input: after 4 stride-2 convs → 2×2, GAP → scalar per channel
x = torch.randn(2, 1, 32, 32)
out = model(x)
assert out.shape == (2, 3)
# ---------------------------------------------------------------------------
# 4. EnsembleCNN
# ---------------------------------------------------------------------------
class TestEnsembleCNN:
def setup_method(self):
from qdot.perception.classifier import EnsembleCNN
self.ensemble = EnsembleCNN(device="cpu")
def test_classify_returns_correct_types(self):
arr = _make_double_dot_patch(32)
label_idx, confidence, disagreement = self.ensemble.classify(arr)
assert label_idx in (0, 1, 2)
assert 0.0 <= confidence <= 1.0
assert 0.0 <= disagreement <= 1.0
def test_disagreement_between_zero_and_one(self):
arr = _make_double_dot_patch(64)
_, _, d = self.ensemble.classify(arr)
assert 0.0 <= d <= 1.0
def test_predict_proba_shape_and_sums(self):
arr = _make_double_dot_patch(32)
probs = self.ensemble.predict_proba(arr)
assert probs.shape == (3,)
assert abs(probs.sum() - 1.0) < 1e-4
def test_extract_features_shape(self):
arr = _make_double_dot_patch(32)
feats = self.ensemble.extract_features(arr)
assert feats.shape == (32,)
def test_save_and_load(self):
from qdot.perception.classifier import EnsembleCNN
arr = _make_double_dot_patch(32)
label_before, conf_before, _ = self.ensemble.classify(arr)
with tempfile.TemporaryDirectory() as tmpdir:
self.ensemble.save(tmpdir)
loaded = EnsembleCNN.load(tmpdir, device="cpu")
label_after, conf_after, _ = loaded.classify(arr)
assert label_before == label_after
assert abs(conf_before - conf_after) < 1e-5
# ---------------------------------------------------------------------------
# 5. MahalanobisOOD
# ---------------------------------------------------------------------------
class TestMahalanobisOOD:
def _make_features(self, n: int, seed: int = 0) -> np.ndarray:
rng = np.random.default_rng(seed)
return rng.normal(0.0, 1.0, (n, 32)).astype(np.float32)
def test_fit_and_score_in_distribution(self):
from qdot.perception.ood import MahalanobisOOD
ood = MahalanobisOOD(n_components=8, calibration_percentile=95.0)
train_feats = self._make_features(200, seed=0)
ood.fit(train_feats)
# In-distribution sample — should score < threshold most of the time
in_dist_feat = self._make_features(1, seed=1)[0]
result = ood.score(uuid.uuid4(), in_dist_feat)
assert isinstance(result.score, float)
assert isinstance(result.flag, bool)
assert result.threshold > 0
def test_out_of_distribution_flagged(self):
from qdot.perception.ood import MahalanobisOOD
ood = MahalanobisOOD(n_components=8, calibration_percentile=95.0)
rng = np.random.default_rng(0)
# Training distribution: N(0, 1)
train_feats = rng.normal(0.0, 1.0, (300, 32)).astype(np.float32)
ood.fit(train_feats)
# OOD sample: very far from training distribution
ood_feat = rng.normal(50.0, 1.0, (32,)).astype(np.float32)
result = ood.score(uuid.uuid4(), ood_feat)
assert result.flag, f"Expected OOD flag=True for far-out sample (score={result.score:.2f})"
def test_batch_score_shape(self):
from qdot.perception.ood import MahalanobisOOD
ood = MahalanobisOOD(n_components=8)
train_feats = self._make_features(200)
ood.fit(train_feats)
test_feats = self._make_features(50, seed=99)
scores, flags = ood.score_batch(test_feats)
assert scores.shape == (50,)
assert flags.shape == (50,)
assert flags.dtype == bool
def test_save_and_load(self):
from qdot.perception.ood import MahalanobisOOD
ood = MahalanobisOOD(n_components=8)
train_feats = self._make_features(200)
ood.fit(train_feats)
test_feat = self._make_features(1, seed=42)[0]
score_before = ood.score(uuid.uuid4(), test_feat).score
with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f:
path = f.name
ood.save(path)
loaded = MahalanobisOOD.load(path)
score_after = loaded.score(uuid.uuid4(), test_feat).score
assert abs(score_before - score_after) < 1e-6
def test_unfitted_raises(self):
from qdot.perception.ood import MahalanobisOOD
ood = MahalanobisOOD()
with pytest.raises(RuntimeError, match="not been fitted"):
ood.score(uuid.uuid4(), np.zeros(32))
# ---------------------------------------------------------------------------
# 6. InspectionAgent — full pipeline
# ---------------------------------------------------------------------------
class TestInspectionAgent:
def setup_method(self):
from qdot.perception.classifier import EnsembleCNN
from qdot.perception.inspector import InspectionAgent
self.ensemble = EnsembleCNN(device="cpu")
self.agent = InspectionAgent(ensemble=self.ensemble)
def test_inspect_2d_returns_classification_and_ood(self):
arr = _make_double_dot_patch(32)
m = _make_measurement(array=arr)
cls, ood = self.agent.inspect(m)
assert cls.measurement_id == m.id
assert cls.label in ChargeLabel
assert 0.0 <= cls.confidence <= 1.0
assert 0.0 <= cls.ensemble_disagreement <= 1.0
assert ood.measurement_id == m.id
assert isinstance(ood.flag, bool)
def test_inspect_rejects_line_scan(self):
arr = np.sin(np.linspace(0, np.pi, 128)).astype(np.float32)
m = _make_measurement(array=arr, shape=(128,), modality=MeasurementModality.LINE_SCAN)
with pytest.raises(ValueError, match="non-2D"):
self.agent.inspect(m)
def test_inspect_rejects_low_dqc(self):
from qdot.core.types import DQCResult
arr = _make_double_dot_patch(32)
m = _make_measurement(array=arr)
bad_dqc = DQCResult(
measurement_id=m.id,
quality=DQCQuality.LOW,
snr_db=5.0,
dynamic_range=0.1,
flatness_score=0.001,
physically_plausible=False,
notes="Test: forced LOW",
)
with pytest.raises(RuntimeError, match="LOW-quality"):
self.agent.inspect(m, dqc_result=bad_dqc)
def test_nl_summary_is_valid_json(self):
arr = _make_double_dot_patch(32)
m = _make_measurement(array=arr)
cls, _ = self.agent.inspect(m)
parsed = json.loads(cls.nl_summary)
assert "classification" in parsed
assert "uncertainty" in parsed
assert "physics_reasoning" in parsed
assert "ood" in parsed
assert "recommended_executive_action" in parsed
def test_features_dict_populated(self):
arr = _make_double_dot_patch(32)
m = _make_measurement(array=arr)
cls, _ = self.agent.inspect(m)
assert "fft_peak_ratio" in cls.features
assert "diagonal_strength" in cls.features
def test_physics_override_possible(self):
"""An array that looks like single-dot by FFT should be overridable."""
from qdot.perception.inspector import InspectionAgent
from qdot.perception.classifier import EnsembleCNN
from qdot.core.types import DQCResult, DQCQuality
# Create agent with very permissive override thresholds
agent = InspectionAgent(
ensemble=EnsembleCNN(),
peak_ratio_threshold=0.1, # extremely low → will always trigger SD override
diagonal_strength_min=0.99, # extremely high → diagonal always "missing"
)
arr = _make_double_dot_patch(32) # Use a valid patch instead of featureless
m = _make_measurement(array=arr)
# Bypass DQC check by providing a pre-approved MODERATE result
dqc_override = DQCResult(
measurement_id=m.id,
quality=DQCQuality.MODERATE,
snr_db=15.0,
dynamic_range=0.4,
flatness_score=0.1,
physically_plausible=True,
notes="Test override"
)
cls, _ = agent.inspect(m, dqc_result=dqc_override)
# We don't assert the specific label — just that it ran and has a result
assert cls.label in ChargeLabel
def test_inspect_array_quick_interface(self):
arr = _make_double_dot_patch(32)
label, conf, disagreement = self.agent.inspect_array(arr)
assert label in ChargeLabel
assert 0.0 <= conf <= 1.0
# ---------------------------------------------------------------------------
# 7. CIMDataset (smoke tests — no full generation)
# ---------------------------------------------------------------------------
class TestCIMDataset:
def test_small_dataset_generation(self):
from qdot.perception.dataset import CIMDataset, DatasetConfig
cfg = DatasetConfig(n_per_class=10, seed=0, augment=False)
ds = CIMDataset(cfg)
arrays, labels = ds.generate()
assert arrays.shape == (30, 1, 64, 64)
assert labels.shape == (30,)
assert arrays.dtype == np.float32
assert labels.dtype == np.int64
def test_label_balance(self):
from qdot.perception.dataset import CIMDataset, DatasetConfig
cfg = DatasetConfig(n_per_class=15, seed=0, augment=False)
ds = CIMDataset(cfg)
arrays, labels = ds.generate()
counts = np.bincount(labels)
assert len(counts) == 3
assert all(c == 15 for c in counts), f"Unexpected counts: {counts}"
def test_arrays_in_valid_range(self):
from qdot.perception.dataset import CIMDataset, DatasetConfig
cfg = DatasetConfig(n_per_class=5, seed=42, augment=True)
ds = CIMDataset(cfg)
arrays, _ = ds.generate()
assert arrays.min() >= -0.01 # tiny tolerance for augment noise
assert arrays.max() <= 1.01
def test_resolution_variety(self):
from qdot.perception.dataset import CIMDataset, DatasetConfig
# With resolutions dict and n_per_class=50 we expect some 16 and 64 samples
cfg = DatasetConfig(
n_per_class=50,
resolutions={16: 0.4, 32: 0.3, 64: 0.3},
seed=7,
augment=False,
)
ds = CIMDataset(cfg)
# generate_measurements() returns native-resolution arrays
samples = ds.generate_measurements()
shapes = {s[0].shape for s in samples}
# Should see more than one resolution
assert len(shapes) > 1, f"Only one resolution seen: {shapes}"
def test_split_is_stratified(self):
from qdot.perception.dataset import CIMDataset, DatasetConfig
cfg = DatasetConfig(n_per_class=30, seed=0, augment=False)
ds = CIMDataset(cfg)
arrays, labels = ds.generate()
X_tr, X_val, y_tr, y_val = CIMDataset.split(arrays, labels, val_frac=0.2)
# Both splits should contain all 3 classes
assert len(np.unique(y_tr)) == 3
assert len(np.unique(y_val)) == 3
assert len(X_tr) + len(X_val) == len(arrays)