| """ |
| tests/test_perception.py |
| ======================== |
| Test suite for Phase 1 — perception & physics layer. |
| |
| Coverage: |
| 1. DQC Gatekeeper — all three quality tiers, edge cases |
| 2. Feature extractors — log preprocess, FFT peak ratio, diagonal strength |
| 3. TinyCNN — forward pass, output shape, feature extraction |
| 4. EnsembleCNN — classification interface, disagreement metric |
| 5. MahalanobisOOD — fit, score, flag, persistence |
| 6. InspectionAgent — full pipeline, 2D-only guard, NL report format |
| 7. CIMDataset — shape contract, label balance, resolution variety |
| |
| These tests are designed to run without GPU (CPU-only) and without |
| generating the full 50k training set. They use synthetic small batches. |
| |
| Run: |
| pytest tests/test_perception.py -v |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import tempfile |
| import uuid |
| from pathlib import Path |
|
|
| import numpy as np |
| import pytest |
| import torch |
|
|
| from qdot.core.types import ( |
| ChargeLabel, |
| DQCQuality, |
| Measurement, |
| MeasurementModality, |
| VoltagePoint, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def _make_measurement( |
| shape=(32, 32), |
| modality=MeasurementModality.COARSE_2D, |
| array=None, |
| ) -> Measurement: |
| rng = np.random.default_rng(0) |
| if array is None: |
| array = rng.uniform(0.0, 1.0, shape).astype(np.float32) |
| return Measurement( |
| array=array, |
| modality=modality, |
| resolution=shape[0] if len(shape) == 2 else None, |
| steps=shape[0] if modality == MeasurementModality.LINE_SCAN else None, |
| device_id="test_device", |
| voltage_centre=VoltagePoint(0.0, 0.0), |
| ) |
|
|
|
|
| def _make_double_dot_patch(res=32) -> np.ndarray: |
| """Synthetic double-dot-like patch: diagonal stripes (approximates transition lines).""" |
| arr = np.zeros((res, res), dtype=np.float32) |
| for i in range(res): |
| for j in range(res): |
| arr[i, j] = 0.5 + 0.4 * np.sin((i + j) * 2 * np.pi / (res / 3)) |
| return np.clip(arr, 0.0, 1.0) |
|
|
|
|
| def _make_featureless_patch(res=32, level=0.5) -> np.ndarray: |
| """Flat array — should trigger DQC MODERATE/LOW and MISC classification.""" |
| return np.full((res, res), level, dtype=np.float32) |
|
|
|
|
| def _make_noisy_patch(res=32) -> np.ndarray: |
| """High-noise array — should trigger DQC LOW.""" |
| rng = np.random.default_rng(42) |
| return rng.uniform(0.0, 1.0, (res, res)).astype(np.float32) |
|
|
|
|
| |
| |
| |
|
|
| class TestDQCGatekeeper: |
| def setup_method(self): |
| from qdot.perception.dqc import DQCGatekeeper |
| self.gk = DQCGatekeeper() |
|
|
| def test_high_quality_real_signal(self): |
| """Clean synthetic patch should score HIGH.""" |
| arr = _make_double_dot_patch(32) |
| m = _make_measurement(array=arr) |
| result = self.gk.assess(m) |
| assert result.quality in (DQCQuality.HIGH, DQCQuality.MODERATE), ( |
| f"Expected HIGH or MODERATE, got {result.quality}. SNR={result.snr_db:.1f}" |
| ) |
|
|
| def test_flat_array_is_low_or_moderate(self): |
| """A completely flat array should not be HIGH — it has no signal.""" |
| arr = _make_featureless_patch(32, level=0.5) |
| m = _make_measurement(array=arr) |
| result = self.gk.assess(m) |
| assert result.quality in (DQCQuality.LOW, DQCQuality.MODERATE), ( |
| f"Expected LOW or MODERATE for featureless patch, got {result.quality}" |
| ) |
|
|
| def test_nan_is_low(self): |
| """NaN in array must trigger LOW quality.""" |
| arr = np.ones((32, 32), dtype=np.float32) |
| arr[5, 5] = np.nan |
| m = _make_measurement(array=arr) |
| result = self.gk.assess(m) |
| assert result.quality == DQCQuality.LOW |
| assert not result.physically_plausible |
|
|
| def test_dynamic_range_returned(self): |
| arr = _make_double_dot_patch(32) |
| m = _make_measurement(array=arr) |
| result = self.gk.assess(m) |
| assert 0.0 <= result.dynamic_range <= 1.0 |
|
|
| def test_snr_db_positive_for_structured_array(self): |
| arr = _make_double_dot_patch(32) |
| m = _make_measurement(array=arr) |
| result = self.gk.assess(m) |
| assert result.snr_db > 0.0 |
|
|
| def test_1d_line_scan_works(self): |
| """DQC should handle 1D arrays (line scans) without error.""" |
| arr = np.sin(np.linspace(0, 4 * np.pi, 128)).astype(np.float32) |
| arr = (arr - arr.min()) / (arr.max() - arr.min()) |
| m = _make_measurement(array=arr, shape=(128,), modality=MeasurementModality.LINE_SCAN) |
| result = self.gk.assess(m) |
| assert result.quality in (DQCQuality.HIGH, DQCQuality.MODERATE, DQCQuality.LOW) |
|
|
| def test_assess_array_matches_assess_measurement(self): |
| """assess_array and assess should return identical results.""" |
| arr = _make_double_dot_patch(32) |
| m = _make_measurement(array=arr) |
| r1 = self.gk.assess(m) |
| r2 = self.gk.assess_array(m.id, arr) |
| assert r1.quality == r2.quality |
| assert abs(r1.snr_db - r2.snr_db) < 1e-6 |
|
|
|
|
| |
| |
| |
|
|
| class TestFeatureExtractors: |
| def test_log_preprocess_output_range(self): |
| from qdot.perception.features import log_preprocess |
| rng = np.random.default_rng(0) |
| arr = rng.uniform(0, 1, (32, 32)).astype(np.float32) |
| out = log_preprocess(arr) |
| assert out.shape == arr.shape |
| assert out.dtype == np.float32 |
| assert out.min() >= 0.0 - 1e-6 |
| assert out.max() <= 1.0 + 1e-6 |
|
|
| def test_log_preprocess_handles_zeros(self): |
| from qdot.perception.features import log_preprocess |
| arr = np.zeros((16, 16), dtype=np.float32) |
| out = log_preprocess(arr) |
| assert np.all(np.isfinite(out)) |
|
|
| def test_fft_peak_ratio_single_dot_higher(self): |
| """A periodic array (Coulomb peaks) should have a higher peak ratio.""" |
| from qdot.perception.features import fft_peak_ratio |
| |
| single_dot = np.zeros((32, 32), dtype=np.float32) |
| for i in range(32): |
| single_dot[i, :] = 0.5 + 0.4 * np.sin(i * 2 * np.pi / 4) |
|
|
| |
| double_dot = _make_double_dot_patch(32) |
|
|
| pr_sd = fft_peak_ratio(single_dot) |
| pr_dd = fft_peak_ratio(double_dot) |
| |
| assert pr_sd > 0.0 |
| assert pr_dd > 0.0 |
|
|
| def test_diagonal_strength_on_diagonal_stripes(self): |
| from qdot.perception.features import diagonal_strength |
| arr = _make_double_dot_patch(32) |
| ds = diagonal_strength(arr) |
| assert 0.0 <= ds <= 1.0 |
|
|
| def test_diagonal_strength_higher_for_diagonal_vs_horizontal(self): |
| from qdot.perception.features import diagonal_strength |
| |
| horizontal = np.zeros((32, 32), dtype=np.float32) |
| for i in range(32): |
| horizontal[i, :] = float(i % 4 == 0) |
|
|
| |
| diagonal = _make_double_dot_patch(32) |
|
|
| ds_h = diagonal_strength(horizontal) |
| ds_d = diagonal_strength(diagonal) |
| assert ds_d > ds_h, f"Expected ds_diag ({ds_d:.3f}) > ds_horiz ({ds_h:.3f})" |
|
|
| def test_physics_features_returns_all_keys(self): |
| from qdot.perception.features import physics_features |
| arr = _make_double_dot_patch(32) |
| feats = physics_features(arr) |
| assert "fft_peak_ratio" in feats |
| assert "diagonal_strength" in feats |
| assert "mean_conductance" in feats |
| assert "conductance_std" in feats |
|
|
|
|
| |
| |
| |
|
|
| class TestTinyCNN: |
| def test_forward_pass_shape(self): |
| from qdot.perception.classifier import TinyCNN |
| model = TinyCNN() |
| x = torch.randn(4, 1, 64, 64) |
| out = model(x) |
| assert out.shape == (4, 3) |
|
|
| def test_predict_proba_sums_to_one(self): |
| from qdot.perception.classifier import TinyCNN |
| model = TinyCNN() |
| model.eval() |
| x = torch.randn(8, 1, 64, 64) |
| probs = model.predict_proba(x) |
| sums = probs.sum(dim=-1) |
| assert torch.allclose(sums, torch.ones(8), atol=1e-5) |
|
|
| def test_extract_features_shape(self): |
| from qdot.perception.classifier import TinyCNN |
| model = TinyCNN() |
| model.eval() |
| x = torch.randn(4, 1, 64, 64) |
| feats = model.extract_features(x) |
| assert feats.shape == (4, 32) |
|
|
| def test_forward_32x32_input_fails_gracefully(self): |
| """TinyCNN expects 64×64 input — 32×32 should still work via conv layers.""" |
| from qdot.perception.classifier import TinyCNN |
| model = TinyCNN() |
| model.eval() |
| |
| x = torch.randn(2, 1, 32, 32) |
| out = model(x) |
| assert out.shape == (2, 3) |
|
|
|
|
| |
| |
| |
|
|
| class TestEnsembleCNN: |
| def setup_method(self): |
| from qdot.perception.classifier import EnsembleCNN |
| self.ensemble = EnsembleCNN(device="cpu") |
|
|
| def test_classify_returns_correct_types(self): |
| arr = _make_double_dot_patch(32) |
| label_idx, confidence, disagreement = self.ensemble.classify(arr) |
| assert label_idx in (0, 1, 2) |
| assert 0.0 <= confidence <= 1.0 |
| assert 0.0 <= disagreement <= 1.0 |
|
|
| def test_disagreement_between_zero_and_one(self): |
| arr = _make_double_dot_patch(64) |
| _, _, d = self.ensemble.classify(arr) |
| assert 0.0 <= d <= 1.0 |
|
|
| def test_predict_proba_shape_and_sums(self): |
| arr = _make_double_dot_patch(32) |
| probs = self.ensemble.predict_proba(arr) |
| assert probs.shape == (3,) |
| assert abs(probs.sum() - 1.0) < 1e-4 |
|
|
| def test_extract_features_shape(self): |
| arr = _make_double_dot_patch(32) |
| feats = self.ensemble.extract_features(arr) |
| assert feats.shape == (32,) |
|
|
| def test_save_and_load(self): |
| from qdot.perception.classifier import EnsembleCNN |
| arr = _make_double_dot_patch(32) |
| label_before, conf_before, _ = self.ensemble.classify(arr) |
|
|
| with tempfile.TemporaryDirectory() as tmpdir: |
| self.ensemble.save(tmpdir) |
| loaded = EnsembleCNN.load(tmpdir, device="cpu") |
| label_after, conf_after, _ = loaded.classify(arr) |
|
|
| assert label_before == label_after |
| assert abs(conf_before - conf_after) < 1e-5 |
|
|
|
|
| |
| |
| |
|
|
| class TestMahalanobisOOD: |
| def _make_features(self, n: int, seed: int = 0) -> np.ndarray: |
| rng = np.random.default_rng(seed) |
| return rng.normal(0.0, 1.0, (n, 32)).astype(np.float32) |
|
|
| def test_fit_and_score_in_distribution(self): |
| from qdot.perception.ood import MahalanobisOOD |
| ood = MahalanobisOOD(n_components=8, calibration_percentile=95.0) |
| train_feats = self._make_features(200, seed=0) |
| ood.fit(train_feats) |
|
|
| |
| in_dist_feat = self._make_features(1, seed=1)[0] |
| result = ood.score(uuid.uuid4(), in_dist_feat) |
| assert isinstance(result.score, float) |
| assert isinstance(result.flag, bool) |
| assert result.threshold > 0 |
|
|
| def test_out_of_distribution_flagged(self): |
| from qdot.perception.ood import MahalanobisOOD |
| ood = MahalanobisOOD(n_components=8, calibration_percentile=95.0) |
| rng = np.random.default_rng(0) |
|
|
| |
| train_feats = rng.normal(0.0, 1.0, (300, 32)).astype(np.float32) |
| ood.fit(train_feats) |
|
|
| |
| ood_feat = rng.normal(50.0, 1.0, (32,)).astype(np.float32) |
| result = ood.score(uuid.uuid4(), ood_feat) |
| assert result.flag, f"Expected OOD flag=True for far-out sample (score={result.score:.2f})" |
|
|
| def test_batch_score_shape(self): |
| from qdot.perception.ood import MahalanobisOOD |
| ood = MahalanobisOOD(n_components=8) |
| train_feats = self._make_features(200) |
| ood.fit(train_feats) |
| test_feats = self._make_features(50, seed=99) |
| scores, flags = ood.score_batch(test_feats) |
| assert scores.shape == (50,) |
| assert flags.shape == (50,) |
| assert flags.dtype == bool |
|
|
| def test_save_and_load(self): |
| from qdot.perception.ood import MahalanobisOOD |
| ood = MahalanobisOOD(n_components=8) |
| train_feats = self._make_features(200) |
| ood.fit(train_feats) |
| test_feat = self._make_features(1, seed=42)[0] |
| score_before = ood.score(uuid.uuid4(), test_feat).score |
|
|
| with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f: |
| path = f.name |
| ood.save(path) |
| loaded = MahalanobisOOD.load(path) |
| score_after = loaded.score(uuid.uuid4(), test_feat).score |
| assert abs(score_before - score_after) < 1e-6 |
|
|
| def test_unfitted_raises(self): |
| from qdot.perception.ood import MahalanobisOOD |
| ood = MahalanobisOOD() |
| with pytest.raises(RuntimeError, match="not been fitted"): |
| ood.score(uuid.uuid4(), np.zeros(32)) |
|
|
|
|
| |
| |
| |
|
|
| class TestInspectionAgent: |
| def setup_method(self): |
| from qdot.perception.classifier import EnsembleCNN |
| from qdot.perception.inspector import InspectionAgent |
| self.ensemble = EnsembleCNN(device="cpu") |
| self.agent = InspectionAgent(ensemble=self.ensemble) |
|
|
| def test_inspect_2d_returns_classification_and_ood(self): |
| arr = _make_double_dot_patch(32) |
| m = _make_measurement(array=arr) |
| cls, ood = self.agent.inspect(m) |
| assert cls.measurement_id == m.id |
| assert cls.label in ChargeLabel |
| assert 0.0 <= cls.confidence <= 1.0 |
| assert 0.0 <= cls.ensemble_disagreement <= 1.0 |
| assert ood.measurement_id == m.id |
| assert isinstance(ood.flag, bool) |
|
|
| def test_inspect_rejects_line_scan(self): |
| arr = np.sin(np.linspace(0, np.pi, 128)).astype(np.float32) |
| m = _make_measurement(array=arr, shape=(128,), modality=MeasurementModality.LINE_SCAN) |
| with pytest.raises(ValueError, match="non-2D"): |
| self.agent.inspect(m) |
|
|
| def test_inspect_rejects_low_dqc(self): |
| from qdot.core.types import DQCResult |
| arr = _make_double_dot_patch(32) |
| m = _make_measurement(array=arr) |
| bad_dqc = DQCResult( |
| measurement_id=m.id, |
| quality=DQCQuality.LOW, |
| snr_db=5.0, |
| dynamic_range=0.1, |
| flatness_score=0.001, |
| physically_plausible=False, |
| notes="Test: forced LOW", |
| ) |
| with pytest.raises(RuntimeError, match="LOW-quality"): |
| self.agent.inspect(m, dqc_result=bad_dqc) |
|
|
| def test_nl_summary_is_valid_json(self): |
| arr = _make_double_dot_patch(32) |
| m = _make_measurement(array=arr) |
| cls, _ = self.agent.inspect(m) |
| parsed = json.loads(cls.nl_summary) |
| assert "classification" in parsed |
| assert "uncertainty" in parsed |
| assert "physics_reasoning" in parsed |
| assert "ood" in parsed |
| assert "recommended_executive_action" in parsed |
|
|
| def test_features_dict_populated(self): |
| arr = _make_double_dot_patch(32) |
| m = _make_measurement(array=arr) |
| cls, _ = self.agent.inspect(m) |
| assert "fft_peak_ratio" in cls.features |
| assert "diagonal_strength" in cls.features |
|
|
| def test_physics_override_possible(self): |
| """An array that looks like single-dot by FFT should be overridable.""" |
| from qdot.perception.inspector import InspectionAgent |
| from qdot.perception.classifier import EnsembleCNN |
| from qdot.core.types import DQCResult, DQCQuality |
|
|
| |
| agent = InspectionAgent( |
| ensemble=EnsembleCNN(), |
| peak_ratio_threshold=0.1, |
| diagonal_strength_min=0.99, |
| ) |
| arr = _make_double_dot_patch(32) |
| m = _make_measurement(array=arr) |
| |
| |
| dqc_override = DQCResult( |
| measurement_id=m.id, |
| quality=DQCQuality.MODERATE, |
| snr_db=15.0, |
| dynamic_range=0.4, |
| flatness_score=0.1, |
| physically_plausible=True, |
| notes="Test override" |
| ) |
| cls, _ = agent.inspect(m, dqc_result=dqc_override) |
| |
| assert cls.label in ChargeLabel |
|
|
| def test_inspect_array_quick_interface(self): |
| arr = _make_double_dot_patch(32) |
| label, conf, disagreement = self.agent.inspect_array(arr) |
| assert label in ChargeLabel |
| assert 0.0 <= conf <= 1.0 |
|
|
|
|
| |
| |
| |
|
|
| class TestCIMDataset: |
| def test_small_dataset_generation(self): |
| from qdot.perception.dataset import CIMDataset, DatasetConfig |
| cfg = DatasetConfig(n_per_class=10, seed=0, augment=False) |
| ds = CIMDataset(cfg) |
| arrays, labels = ds.generate() |
| assert arrays.shape == (30, 1, 64, 64) |
| assert labels.shape == (30,) |
| assert arrays.dtype == np.float32 |
| assert labels.dtype == np.int64 |
|
|
| def test_label_balance(self): |
| from qdot.perception.dataset import CIMDataset, DatasetConfig |
| cfg = DatasetConfig(n_per_class=15, seed=0, augment=False) |
| ds = CIMDataset(cfg) |
| arrays, labels = ds.generate() |
| counts = np.bincount(labels) |
| assert len(counts) == 3 |
| assert all(c == 15 for c in counts), f"Unexpected counts: {counts}" |
|
|
| def test_arrays_in_valid_range(self): |
| from qdot.perception.dataset import CIMDataset, DatasetConfig |
| cfg = DatasetConfig(n_per_class=5, seed=42, augment=True) |
| ds = CIMDataset(cfg) |
| arrays, _ = ds.generate() |
| assert arrays.min() >= -0.01 |
| assert arrays.max() <= 1.01 |
|
|
| def test_resolution_variety(self): |
| from qdot.perception.dataset import CIMDataset, DatasetConfig |
| |
| cfg = DatasetConfig( |
| n_per_class=50, |
| resolutions={16: 0.4, 32: 0.3, 64: 0.3}, |
| seed=7, |
| augment=False, |
| ) |
| ds = CIMDataset(cfg) |
| |
| samples = ds.generate_measurements() |
| shapes = {s[0].shape for s in samples} |
| |
| assert len(shapes) > 1, f"Only one resolution seen: {shapes}" |
|
|
| def test_split_is_stratified(self): |
| from qdot.perception.dataset import CIMDataset, DatasetConfig |
| cfg = DatasetConfig(n_per_class=30, seed=0, augment=False) |
| ds = CIMDataset(cfg) |
| arrays, labels = ds.generate() |
| X_tr, X_val, y_tr, y_val = CIMDataset.split(arrays, labels, val_frac=0.2) |
| |
| assert len(np.unique(y_tr)) == 3 |
| assert len(np.unique(y_val)) == 3 |
| assert len(X_tr) + len(X_val) == len(arrays) |
|
|