import numpy as np import os import time import gc # --- 1. ENVIRONMENT CONFIGURATION (M1 TURBO) --- os.environ["OMP_NUM_THREADS"] = "4" os.environ["RAYON_NUM_THREADS"] = "4" os.environ["QISKIT_PARALLEL"] = "TRUE" from sklearn.svm import SVC from sklearn.metrics import roc_auc_score from qiskit import transpile from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes from qiskit_machine_learning.kernels import TrainableFidelityQuantumKernel from qiskit_machine_learning.utils.loss_functions import SVCLoss from qiskit_algorithms.optimizers import COBYLA # --- AER V2 IMPORTS --- from qiskit_aer.primitives import SamplerV2 as AerSampler from qiskit_aer import AerSimulator from qiskit_machine_learning.state_fidelities import ComputeUncompute, BaseStateFidelity # --- BASE LITE CONFIGURATION (MODIFY THESE FOR VARIATIONS) --- N_QUBITS = 8 TRAIN_SIZE = 200 # Reduced from 600 TEST_SIZE = 300 MAX_ITERS = 5 # Reduced from 50 (Smoke Test) BATCH_SIZE = 5000 # Increased from 2000 (Test RAM usage) OPT_LEVEL = 1 # Reduced from 3 (Test compilation speed) OUTPUT_DIR = "lite_diagnostics" if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) # --- DUMMY JOB WRAPPER --- class DummyResult: def __init__(self, fidelities): self.fidelities = fidelities class DummyJob: def __init__(self, fidelities): self._fidelities = fidelities def submit(self): return def result(self): return DummyResult(self._fidelities) class BatchedFidelity(BaseStateFidelity): def __init__(self, fidelity_inst, batch_size): self._fidelity = fidelity_inst self._batch_size = batch_size super().__init__() def create_fidelity_circuit(self, circuit_1, circuit_2): return self._fidelity.create_fidelity_circuit(circuit_1, circuit_2) def _run(self, circuits_1, circuits_2, values_1=None, values_2=None, **options): n_samples = len(circuits_1) results = [] # TIMING DIAGNOSTIC batch_start = time.time() for i in range(0, n_samples, self._batch_size): end = min(i + self._batch_size, n_samples) c1_batch = circuits_1[i:end] c2_batch = circuits_2[i:end] v1_batch = values_1[i:end] if values_1 is not None else None v2_batch = values_2[i:end] if values_2 is not None else None # Run Job job = self._fidelity.run(c1_batch, c2_batch, v1_batch, v2_batch, **options) batch_values = job.result().fidelities results.append(batch_values) # Print progress for every batch to debug speed if (i // self._batch_size) % 5 == 0: print(f" Batch {i}/{n_samples} processed in {time.time() - batch_start:.2f}s") batch_start = time.time() # Reset timer del job, batch_values return DummyJob(np.concatenate(results)) def main(): print(f"🚀 Starting Base Lite Diagnostic...") print(f" ⚙️ Config: Train={TRAIN_SIZE} | Batch={BATCH_SIZE} | Iters={MAX_ITERS} | Opt={OPT_LEVEL}") # 1. LOAD DATA possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz'] data_path = next((p for p in possible_paths if os.path.exists(p)), None) if not data_path: return data = np.load(data_path) X_train_full = data['X_train'] y_train_full = data['y_train'] X_test_full = data['X_test'] y_test_full = data['y_test'] # 2. SUBSAMPLE pos_idx = np.where(y_train_full == 1)[0] neg_idx = np.where(y_train_full == 0)[0] train_idx = np.concatenate([ np.random.choice(pos_idx, TRAIN_SIZE // 2, replace=False), np.random.choice(neg_idx, TRAIN_SIZE // 2, replace=False) ]) np.random.shuffle(train_idx) X_train = X_train_full[train_idx] y_train = y_train_full[train_idx] X_test = X_test_full[:TEST_SIZE] y_test = y_test_full[:TEST_SIZE] # --- 3. MEMORY FLUSH --- del data, X_train_full, y_train_full, X_test_full, y_test_full gc.collect() # 4. CONFIGURE AER BACKEND backend = AerSimulator( method="statevector", precision="single", max_parallel_threads=4, max_memory_mb=12000 ) aer_sampler = AerSampler.from_backend(backend) aer_sampler.options.default_shots = None # 5. DEFINE KERNEL print(" ⚛️ Initializing Kernel...") fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear') ansatz = RealAmplitudes(N_QUBITS, reps=1) combined_circuit = fm.compose(ansatz) print(f" 🔧 Transpiling (Level {OPT_LEVEL})...") transpiled_circuit = transpile(combined_circuit, backend=backend, optimization_level=OPT_LEVEL) base_fidelity = ComputeUncompute(sampler=aer_sampler) batched_fidelity = BatchedFidelity(base_fidelity, batch_size=BATCH_SIZE) quant_kernel = TrainableFidelityQuantumKernel( feature_map=transpiled_circuit, training_parameters=ansatz.parameters, fidelity=batched_fidelity ) # 6. OPTIMIZE print(" 🧠 Optimizing (COBYLA)...") loss_func = SVCLoss(C=1.0, gamma='auto') optimizer = COBYLA(maxiter=MAX_ITERS) from qiskit_machine_learning.kernels.algorithms import QuantumKernelTrainer trainer = QuantumKernelTrainer( quantum_kernel=quant_kernel, loss=loss_func, optimizer=optimizer, initial_point=[0.1] * len(ansatz.parameters) ) start_time = time.time() results = trainer.fit(X_train, y_train) elapsed = time.time() - start_time print(f" ✅ Complete in {elapsed:.1f}s ({(elapsed/60):.1f} mins)") print(f" 🎯 Final Loss: {results.optimal_value:.4f}") # 7. RUN FINAL SVC print(" 🏆 Training Final SVM...") qsvc = SVC(kernel=results.quantum_kernel.evaluate, probability=True) qsvc.fit(X_train, y_train) test_probs = qsvc.predict_proba(X_test)[:, 1] test_auc = roc_auc_score(y_test, test_probs) print("\n" + "="*40) print(f"🚀 BASE LITE RESULTS (Batch={BATCH_SIZE})") print("="*40) print(f"✅ Test AUC: {test_auc:.4f}") print("="*40) if __name__ == "__main__": main()