import numpy as np import os import time import gc # --- 1. ENVIRONMENT CONFIGURATION --- os.environ["OMP_NUM_THREADS"] = "4" os.environ["RAYON_NUM_THREADS"] = "4" os.environ["QISKIT_PARALLEL"] = "TRUE" from sklearn.svm import SVC from sklearn.metrics import roc_auc_score from qiskit import transpile from qiskit.circuit import ParameterVector from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes from qiskit_machine_learning.kernels import TrainableFidelityQuantumKernel from qiskit_machine_learning.utils.loss_functions import SVCLoss from qiskit_algorithms.optimizers import COBYLA # --- AER V2 IMPORTS --- from qiskit_aer.primitives import SamplerV2 as AerSampler from qiskit_aer import AerSimulator from qiskit_machine_learning.state_fidelities import ComputeUncompute, BaseStateFidelity # --- CONFIGURATION --- N_QUBITS = 8 TRAIN_SIZE = 600 TEST_SIZE = 300 MAX_ITERS = 50 BATCH_SIZE = 2000 OUTPUT_DIR = "optimized_kernel_full_results" if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) # --- THE FIX: DUMMY JOB ARCHITECTURE --- class DummyResult: def __init__(self, fidelities): self.fidelities = fidelities class DummyJob: def __init__(self, fidelities): self._fidelities = fidelities def submit(self): return def result(self): return DummyResult(self._fidelities) class BatchedFidelity(BaseStateFidelity): def __init__(self, fidelity_inst, batch_size=2000): self._fidelity = fidelity_inst self._batch_size = batch_size super().__init__() def create_fidelity_circuit(self, circuit_1, circuit_2): return self._fidelity.create_fidelity_circuit(circuit_1, circuit_2) def _run(self, circuits_1, circuits_2, values_1=None, values_2=None, **options): n_samples = len(circuits_1) results = [] # print(f" โฎ‘ Processing {n_samples} comparisons in batches...") for i in range(0, n_samples, self._batch_size): end = min(i + self._batch_size, n_samples) c1_batch = circuits_1[i:end] c2_batch = circuits_2[i:end] v1_batch = values_1[i:end] if values_1 is not None else None v2_batch = values_2[i:end] if values_2 is not None else None # --- FIX STARTS HERE --- # Correct API: Run job -> Get Result -> Extract Fidelities job = self._fidelity.run(c1_batch, c2_batch, v1_batch, v2_batch, **options) batch_values = job.result().fidelities # --- FIX ENDS HERE --- results.append(batch_values) # Memory Cleanup del job del batch_values final_fidelities = np.concatenate(results) return DummyJob(final_fidelities) def main(): print(f"๐Ÿš€ Starting Quantum Kernel Optimization (BATCHED V5 FINAL)...") print(f" โšก Backend: Aer C++ Simulator (Statevector)") print(f" ๐Ÿ›ก๏ธ Safety: BatchedFidelity Wrapper Active") # 1. LOAD DATA possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz'] data_path = next((p for p in possible_paths if os.path.exists(p)), None) if not data_path: print("โŒ Error: qgan_data_optimized.npz not found.") return data = np.load(data_path) X_train_full = data['X_train'] y_train_full = data['y_train'] X_test_full = data['X_test'] y_test_full = data['y_test'] # 2. SUBSAMPLE pos_idx = np.where(y_train_full == 1)[0] neg_idx = np.where(y_train_full == 0)[0] n_pos = min(len(pos_idx), TRAIN_SIZE // 2) n_neg = min(len(neg_idx), TRAIN_SIZE // 2) train_idx = np.concatenate([ np.random.choice(pos_idx, n_pos, replace=False), np.random.choice(neg_idx, n_neg, replace=False) ]) np.random.shuffle(train_idx) X_train = X_train_full[train_idx] y_train = y_train_full[train_idx] X_test = X_test_full[:TEST_SIZE] y_test = y_test_full[:TEST_SIZE] # --- 3. MEMORY FLUSH --- print(" ๐Ÿงน Freeing unused memory...") del data, X_train_full, y_train_full, X_test_full, y_test_full gc.collect() # 4. CONFIGURE AER BACKEND backend = AerSimulator( method="statevector", precision="single", max_parallel_threads=4, max_memory_mb=12000 ) aer_sampler = AerSampler.from_backend(backend) aer_sampler.options.default_shots = None # 5. DEFINE KERNEL print(" โš›๏ธ Initializing Trainable Kernel...") fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear') ansatz = RealAmplitudes(N_QUBITS, reps=1) combined_circuit = fm.compose(ansatz) print(" ๐Ÿ”ง Transpiling (Optimization Level 3)...") transpiled_circuit = transpile(combined_circuit, backend=backend, optimization_level=3) base_fidelity = ComputeUncompute(sampler=aer_sampler) batched_fidelity = BatchedFidelity(base_fidelity, batch_size=BATCH_SIZE) quant_kernel = TrainableFidelityQuantumKernel( feature_map=transpiled_circuit, training_parameters=ansatz.parameters, fidelity=batched_fidelity ) # 6. OPTIMIZE KERNEL print(" ๐Ÿง  Optimizing Kernel Geometry (COBYLA)...") loss_func = SVCLoss(C=1.0, gamma='auto') optimizer = COBYLA(maxiter=MAX_ITERS) from qiskit_machine_learning.kernels.algorithms import QuantumKernelTrainer trainer = QuantumKernelTrainer( quantum_kernel=quant_kernel, loss=loss_func, optimizer=optimizer, initial_point=[0.1] * len(ansatz.parameters) ) start_time = time.time() # Train results = trainer.fit(X_train, y_train) elapsed = time.time() - start_time print(f" โœ… Optimization Complete in {elapsed:.1f}s ({(elapsed/60):.1f} mins)") print(f" ๐ŸŽฏ Final Loss: {results.optimal_value:.4f}") # 7. RUN FINAL SVC print(" ๐Ÿ† Training Final SVM with Optimized Kernel...") optimized_kernel = results.quantum_kernel qsvc = SVC(kernel=optimized_kernel.evaluate, probability=True) qsvc.fit(X_train, y_train) test_probs = qsvc.predict_proba(X_test)[:, 1] test_auc = roc_auc_score(y_test, test_probs) print("\n" + "="*40) print("๐Ÿš€ FULL-SCALE OPTIMIZED KERNEL RESULTS (V5 FIXED)") print("="*40) print(f"โœ… Test AUC: {test_auc:.4f}") print(f"๐Ÿ“‰ Linear Base: 0.7500") print("="*40) if test_auc > 0.75: print("๐ŸŽ‰ SUCCESS: Optimization found a better topology.") np.save(f"{OUTPUT_DIR}/optimized_weights_full.npy", results.optimal_point) else: print("๐Ÿ”ธ Result: Optimization matched or slightly underperformed baseline.") if __name__ == "__main__": main()