""" tests/test_perception.py ======================== Test suite for Phase 1 — perception & physics layer. Coverage: 1. DQC Gatekeeper — all three quality tiers, edge cases 2. Feature extractors — log preprocess, FFT peak ratio, diagonal strength 3. TinyCNN — forward pass, output shape, feature extraction 4. EnsembleCNN — classification interface, disagreement metric 5. MahalanobisOOD — fit, score, flag, persistence 6. InspectionAgent — full pipeline, 2D-only guard, NL report format 7. CIMDataset — shape contract, label balance, resolution variety These tests are designed to run without GPU (CPU-only) and without generating the full 50k training set. They use synthetic small batches. Run: pytest tests/test_perception.py -v """ from __future__ import annotations import json import tempfile import uuid from pathlib import Path import numpy as np import pytest import torch from qdot.core.types import ( ChargeLabel, DQCQuality, Measurement, MeasurementModality, VoltagePoint, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_measurement( shape=(32, 32), modality=MeasurementModality.COARSE_2D, array=None, ) -> Measurement: rng = np.random.default_rng(0) if array is None: array = rng.uniform(0.0, 1.0, shape).astype(np.float32) return Measurement( array=array, modality=modality, resolution=shape[0] if len(shape) == 2 else None, steps=shape[0] if modality == MeasurementModality.LINE_SCAN else None, device_id="test_device", voltage_centre=VoltagePoint(0.0, 0.0), ) def _make_double_dot_patch(res=32) -> np.ndarray: """Synthetic double-dot-like patch: diagonal stripes (approximates transition lines).""" arr = np.zeros((res, res), dtype=np.float32) for i in range(res): for j in range(res): arr[i, j] = 0.5 + 0.4 * np.sin((i + j) * 2 * np.pi / (res / 3)) return np.clip(arr, 0.0, 1.0) def _make_featureless_patch(res=32, level=0.5) -> np.ndarray: """Flat array — should trigger DQC MODERATE/LOW and MISC classification.""" return np.full((res, res), level, dtype=np.float32) def _make_noisy_patch(res=32) -> np.ndarray: """High-noise array — should trigger DQC LOW.""" rng = np.random.default_rng(42) return rng.uniform(0.0, 1.0, (res, res)).astype(np.float32) # --------------------------------------------------------------------------- # 1. DQC Gatekeeper # --------------------------------------------------------------------------- class TestDQCGatekeeper: def setup_method(self): from qdot.perception.dqc import DQCGatekeeper self.gk = DQCGatekeeper() def test_high_quality_real_signal(self): """Clean synthetic patch should score HIGH.""" arr = _make_double_dot_patch(32) m = _make_measurement(array=arr) result = self.gk.assess(m) assert result.quality in (DQCQuality.HIGH, DQCQuality.MODERATE), ( f"Expected HIGH or MODERATE, got {result.quality}. SNR={result.snr_db:.1f}" ) def test_flat_array_is_low_or_moderate(self): """A completely flat array should not be HIGH — it has no signal.""" arr = _make_featureless_patch(32, level=0.5) m = _make_measurement(array=arr) result = self.gk.assess(m) assert result.quality in (DQCQuality.LOW, DQCQuality.MODERATE), ( f"Expected LOW or MODERATE for featureless patch, got {result.quality}" ) def test_nan_is_low(self): """NaN in array must trigger LOW quality.""" arr = np.ones((32, 32), dtype=np.float32) arr[5, 5] = np.nan m = _make_measurement(array=arr) result = self.gk.assess(m) assert result.quality == DQCQuality.LOW assert not result.physically_plausible def test_dynamic_range_returned(self): arr = _make_double_dot_patch(32) m = _make_measurement(array=arr) result = self.gk.assess(m) assert 0.0 <= result.dynamic_range <= 1.0 def test_snr_db_positive_for_structured_array(self): arr = _make_double_dot_patch(32) m = _make_measurement(array=arr) result = self.gk.assess(m) assert result.snr_db > 0.0 def test_1d_line_scan_works(self): """DQC should handle 1D arrays (line scans) without error.""" arr = np.sin(np.linspace(0, 4 * np.pi, 128)).astype(np.float32) arr = (arr - arr.min()) / (arr.max() - arr.min()) m = _make_measurement(array=arr, shape=(128,), modality=MeasurementModality.LINE_SCAN) result = self.gk.assess(m) assert result.quality in (DQCQuality.HIGH, DQCQuality.MODERATE, DQCQuality.LOW) def test_assess_array_matches_assess_measurement(self): """assess_array and assess should return identical results.""" arr = _make_double_dot_patch(32) m = _make_measurement(array=arr) r1 = self.gk.assess(m) r2 = self.gk.assess_array(m.id, arr) assert r1.quality == r2.quality assert abs(r1.snr_db - r2.snr_db) < 1e-6 # --------------------------------------------------------------------------- # 2. Feature extractors # --------------------------------------------------------------------------- class TestFeatureExtractors: def test_log_preprocess_output_range(self): from qdot.perception.features import log_preprocess rng = np.random.default_rng(0) arr = rng.uniform(0, 1, (32, 32)).astype(np.float32) out = log_preprocess(arr) assert out.shape == arr.shape assert out.dtype == np.float32 assert out.min() >= 0.0 - 1e-6 assert out.max() <= 1.0 + 1e-6 def test_log_preprocess_handles_zeros(self): from qdot.perception.features import log_preprocess arr = np.zeros((16, 16), dtype=np.float32) out = log_preprocess(arr) assert np.all(np.isfinite(out)) def test_fft_peak_ratio_single_dot_higher(self): """A periodic array (Coulomb peaks) should have a higher peak ratio.""" from qdot.perception.features import fft_peak_ratio # Single-dot: horizontal stripes (periodic in one direction) single_dot = np.zeros((32, 32), dtype=np.float32) for i in range(32): single_dot[i, :] = 0.5 + 0.4 * np.sin(i * 2 * np.pi / 4) # Double-dot: diagonal stripes (two frequencies) double_dot = _make_double_dot_patch(32) pr_sd = fft_peak_ratio(single_dot) pr_dd = fft_peak_ratio(double_dot) # Not a hard assertion (thresholds vary), just sanity check assert pr_sd > 0.0 assert pr_dd > 0.0 def test_diagonal_strength_on_diagonal_stripes(self): from qdot.perception.features import diagonal_strength arr = _make_double_dot_patch(32) ds = diagonal_strength(arr) assert 0.0 <= ds <= 1.0 def test_diagonal_strength_higher_for_diagonal_vs_horizontal(self): from qdot.perception.features import diagonal_strength # Horizontal stripes — not diagonal horizontal = np.zeros((32, 32), dtype=np.float32) for i in range(32): horizontal[i, :] = float(i % 4 == 0) # Diagonal stripes diagonal = _make_double_dot_patch(32) ds_h = diagonal_strength(horizontal) ds_d = diagonal_strength(diagonal) assert ds_d > ds_h, f"Expected ds_diag ({ds_d:.3f}) > ds_horiz ({ds_h:.3f})" def test_physics_features_returns_all_keys(self): from qdot.perception.features import physics_features arr = _make_double_dot_patch(32) feats = physics_features(arr) assert "fft_peak_ratio" in feats assert "diagonal_strength" in feats assert "mean_conductance" in feats assert "conductance_std" in feats # --------------------------------------------------------------------------- # 3. TinyCNN # --------------------------------------------------------------------------- class TestTinyCNN: def test_forward_pass_shape(self): from qdot.perception.classifier import TinyCNN model = TinyCNN() x = torch.randn(4, 1, 64, 64) out = model(x) assert out.shape == (4, 3) def test_predict_proba_sums_to_one(self): from qdot.perception.classifier import TinyCNN model = TinyCNN() model.eval() x = torch.randn(8, 1, 64, 64) probs = model.predict_proba(x) sums = probs.sum(dim=-1) assert torch.allclose(sums, torch.ones(8), atol=1e-5) def test_extract_features_shape(self): from qdot.perception.classifier import TinyCNN model = TinyCNN() model.eval() x = torch.randn(4, 1, 64, 64) feats = model.extract_features(x) assert feats.shape == (4, 32) def test_forward_32x32_input_fails_gracefully(self): """TinyCNN expects 64×64 input — 32×32 should still work via conv layers.""" from qdot.perception.classifier import TinyCNN model = TinyCNN() model.eval() # 32×32 input: after 4 stride-2 convs → 2×2, GAP → scalar per channel x = torch.randn(2, 1, 32, 32) out = model(x) assert out.shape == (2, 3) # --------------------------------------------------------------------------- # 4. EnsembleCNN # --------------------------------------------------------------------------- class TestEnsembleCNN: def setup_method(self): from qdot.perception.classifier import EnsembleCNN self.ensemble = EnsembleCNN(device="cpu") def test_classify_returns_correct_types(self): arr = _make_double_dot_patch(32) label_idx, confidence, disagreement = self.ensemble.classify(arr) assert label_idx in (0, 1, 2) assert 0.0 <= confidence <= 1.0 assert 0.0 <= disagreement <= 1.0 def test_disagreement_between_zero_and_one(self): arr = _make_double_dot_patch(64) _, _, d = self.ensemble.classify(arr) assert 0.0 <= d <= 1.0 def test_predict_proba_shape_and_sums(self): arr = _make_double_dot_patch(32) probs = self.ensemble.predict_proba(arr) assert probs.shape == (3,) assert abs(probs.sum() - 1.0) < 1e-4 def test_extract_features_shape(self): arr = _make_double_dot_patch(32) feats = self.ensemble.extract_features(arr) assert feats.shape == (32,) def test_save_and_load(self): from qdot.perception.classifier import EnsembleCNN arr = _make_double_dot_patch(32) label_before, conf_before, _ = self.ensemble.classify(arr) with tempfile.TemporaryDirectory() as tmpdir: self.ensemble.save(tmpdir) loaded = EnsembleCNN.load(tmpdir, device="cpu") label_after, conf_after, _ = loaded.classify(arr) assert label_before == label_after assert abs(conf_before - conf_after) < 1e-5 # --------------------------------------------------------------------------- # 5. MahalanobisOOD # --------------------------------------------------------------------------- class TestMahalanobisOOD: def _make_features(self, n: int, seed: int = 0) -> np.ndarray: rng = np.random.default_rng(seed) return rng.normal(0.0, 1.0, (n, 32)).astype(np.float32) def test_fit_and_score_in_distribution(self): from qdot.perception.ood import MahalanobisOOD ood = MahalanobisOOD(n_components=8, calibration_percentile=95.0) train_feats = self._make_features(200, seed=0) ood.fit(train_feats) # In-distribution sample — should score < threshold most of the time in_dist_feat = self._make_features(1, seed=1)[0] result = ood.score(uuid.uuid4(), in_dist_feat) assert isinstance(result.score, float) assert isinstance(result.flag, bool) assert result.threshold > 0 def test_out_of_distribution_flagged(self): from qdot.perception.ood import MahalanobisOOD ood = MahalanobisOOD(n_components=8, calibration_percentile=95.0) rng = np.random.default_rng(0) # Training distribution: N(0, 1) train_feats = rng.normal(0.0, 1.0, (300, 32)).astype(np.float32) ood.fit(train_feats) # OOD sample: very far from training distribution ood_feat = rng.normal(50.0, 1.0, (32,)).astype(np.float32) result = ood.score(uuid.uuid4(), ood_feat) assert result.flag, f"Expected OOD flag=True for far-out sample (score={result.score:.2f})" def test_batch_score_shape(self): from qdot.perception.ood import MahalanobisOOD ood = MahalanobisOOD(n_components=8) train_feats = self._make_features(200) ood.fit(train_feats) test_feats = self._make_features(50, seed=99) scores, flags = ood.score_batch(test_feats) assert scores.shape == (50,) assert flags.shape == (50,) assert flags.dtype == bool def test_save_and_load(self): from qdot.perception.ood import MahalanobisOOD ood = MahalanobisOOD(n_components=8) train_feats = self._make_features(200) ood.fit(train_feats) test_feat = self._make_features(1, seed=42)[0] score_before = ood.score(uuid.uuid4(), test_feat).score with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f: path = f.name ood.save(path) loaded = MahalanobisOOD.load(path) score_after = loaded.score(uuid.uuid4(), test_feat).score assert abs(score_before - score_after) < 1e-6 def test_unfitted_raises(self): from qdot.perception.ood import MahalanobisOOD ood = MahalanobisOOD() with pytest.raises(RuntimeError, match="not been fitted"): ood.score(uuid.uuid4(), np.zeros(32)) # --------------------------------------------------------------------------- # 6. InspectionAgent — full pipeline # --------------------------------------------------------------------------- class TestInspectionAgent: def setup_method(self): from qdot.perception.classifier import EnsembleCNN from qdot.perception.inspector import InspectionAgent self.ensemble = EnsembleCNN(device="cpu") self.agent = InspectionAgent(ensemble=self.ensemble) def test_inspect_2d_returns_classification_and_ood(self): arr = _make_double_dot_patch(32) m = _make_measurement(array=arr) cls, ood = self.agent.inspect(m) assert cls.measurement_id == m.id assert cls.label in ChargeLabel assert 0.0 <= cls.confidence <= 1.0 assert 0.0 <= cls.ensemble_disagreement <= 1.0 assert ood.measurement_id == m.id assert isinstance(ood.flag, bool) def test_inspect_rejects_line_scan(self): arr = np.sin(np.linspace(0, np.pi, 128)).astype(np.float32) m = _make_measurement(array=arr, shape=(128,), modality=MeasurementModality.LINE_SCAN) with pytest.raises(ValueError, match="non-2D"): self.agent.inspect(m) def test_inspect_rejects_low_dqc(self): from qdot.core.types import DQCResult arr = _make_double_dot_patch(32) m = _make_measurement(array=arr) bad_dqc = DQCResult( measurement_id=m.id, quality=DQCQuality.LOW, snr_db=5.0, dynamic_range=0.1, flatness_score=0.001, physically_plausible=False, notes="Test: forced LOW", ) with pytest.raises(RuntimeError, match="LOW-quality"): self.agent.inspect(m, dqc_result=bad_dqc) def test_nl_summary_is_valid_json(self): arr = _make_double_dot_patch(32) m = _make_measurement(array=arr) cls, _ = self.agent.inspect(m) parsed = json.loads(cls.nl_summary) assert "classification" in parsed assert "uncertainty" in parsed assert "physics_reasoning" in parsed assert "ood" in parsed assert "recommended_executive_action" in parsed def test_features_dict_populated(self): arr = _make_double_dot_patch(32) m = _make_measurement(array=arr) cls, _ = self.agent.inspect(m) assert "fft_peak_ratio" in cls.features assert "diagonal_strength" in cls.features def test_physics_override_possible(self): """An array that looks like single-dot by FFT should be overridable.""" from qdot.perception.inspector import InspectionAgent from qdot.perception.classifier import EnsembleCNN from qdot.core.types import DQCResult, DQCQuality # Create agent with very permissive override thresholds agent = InspectionAgent( ensemble=EnsembleCNN(), peak_ratio_threshold=0.1, # extremely low → will always trigger SD override diagonal_strength_min=0.99, # extremely high → diagonal always "missing" ) arr = _make_double_dot_patch(32) # Use a valid patch instead of featureless m = _make_measurement(array=arr) # Bypass DQC check by providing a pre-approved MODERATE result dqc_override = DQCResult( measurement_id=m.id, quality=DQCQuality.MODERATE, snr_db=15.0, dynamic_range=0.4, flatness_score=0.1, physically_plausible=True, notes="Test override" ) cls, _ = agent.inspect(m, dqc_result=dqc_override) # We don't assert the specific label — just that it ran and has a result assert cls.label in ChargeLabel def test_inspect_array_quick_interface(self): arr = _make_double_dot_patch(32) label, conf, disagreement = self.agent.inspect_array(arr) assert label in ChargeLabel assert 0.0 <= conf <= 1.0 # --------------------------------------------------------------------------- # 7. CIMDataset (smoke tests — no full generation) # --------------------------------------------------------------------------- class TestCIMDataset: def test_small_dataset_generation(self): from qdot.perception.dataset import CIMDataset, DatasetConfig cfg = DatasetConfig(n_per_class=10, seed=0, augment=False) ds = CIMDataset(cfg) arrays, labels = ds.generate() assert arrays.shape == (30, 1, 64, 64) assert labels.shape == (30,) assert arrays.dtype == np.float32 assert labels.dtype == np.int64 def test_label_balance(self): from qdot.perception.dataset import CIMDataset, DatasetConfig cfg = DatasetConfig(n_per_class=15, seed=0, augment=False) ds = CIMDataset(cfg) arrays, labels = ds.generate() counts = np.bincount(labels) assert len(counts) == 3 assert all(c == 15 for c in counts), f"Unexpected counts: {counts}" def test_arrays_in_valid_range(self): from qdot.perception.dataset import CIMDataset, DatasetConfig cfg = DatasetConfig(n_per_class=5, seed=42, augment=True) ds = CIMDataset(cfg) arrays, _ = ds.generate() assert arrays.min() >= -0.01 # tiny tolerance for augment noise assert arrays.max() <= 1.01 def test_resolution_variety(self): from qdot.perception.dataset import CIMDataset, DatasetConfig # With resolutions dict and n_per_class=50 we expect some 16 and 64 samples cfg = DatasetConfig( n_per_class=50, resolutions={16: 0.4, 32: 0.3, 64: 0.3}, seed=7, augment=False, ) ds = CIMDataset(cfg) # generate_measurements() returns native-resolution arrays samples = ds.generate_measurements() shapes = {s[0].shape for s in samples} # Should see more than one resolution assert len(shapes) > 1, f"Only one resolution seen: {shapes}" def test_split_is_stratified(self): from qdot.perception.dataset import CIMDataset, DatasetConfig cfg = DatasetConfig(n_per_class=30, seed=0, augment=False) ds = CIMDataset(cfg) arrays, labels = ds.generate() X_tr, X_val, y_tr, y_val = CIMDataset.split(arrays, labels, val_frac=0.2) # Both splits should contain all 3 classes assert len(np.unique(y_tr)) == 3 assert len(np.unique(y_val)) == 3 assert len(X_tr) + len(X_val) == len(arrays)