import numpy as np import os import time import gc # --- 1. ENVIRONMENT --- os.environ["OMP_NUM_THREADS"] = "4" os.environ["RAYON_NUM_THREADS"] = "4" os.environ["QISKIT_PARALLEL"] = "TRUE" from sklearn.svm import SVC from sklearn.metrics import roc_auc_score from qiskit import transpile from qiskit.circuit import ParameterVector from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes from qiskit_machine_learning.utils.loss_functions import SVCLoss from qiskit_algorithms.optimizers import COBYLA from qiskit_aer import AerSimulator # --- CONFIGURATION --- N_QUBITS = 8 TRAIN_SIZE = 600 TEST_SIZE = 300 MAX_ITERS = 50 OUTPUT_DIR = "optimized_kernel_bypass_results" if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) def main(): print(f"🚀 Starting Quantum Kernel Optimization (MATH BYPASS V4)...") print(f" ⚡ Strategy: Explicit 'save_statevector' instruction added.") # 1. LOAD DATA possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz'] data_path = next((p for p in possible_paths if os.path.exists(p)), None) if not data_path: print("❌ Error: qgan_data_optimized.npz not found.") return data = np.load(data_path) X_train_full = data['X_train'] y_train_full = data['y_train'] X_test_full = data['X_test'] y_test_full = data['y_test'] # 2. SUBSAMPLE pos_idx = np.where(y_train_full == 1)[0] neg_idx = np.where(y_train_full == 0)[0] n_pos = min(len(pos_idx), TRAIN_SIZE // 2) n_neg = min(len(neg_idx), TRAIN_SIZE // 2) train_idx = np.concatenate([ np.random.choice(pos_idx, n_pos, replace=False), np.random.choice(neg_idx, n_neg, replace=False) ]) np.random.shuffle(train_idx) X_train = X_train_full[train_idx] y_train = y_train_full[train_idx] X_test = X_test_full[:TEST_SIZE] y_test = y_test_full[:TEST_SIZE] # Free memory del data, X_train_full, y_train_full, X_test_full, y_test_full gc.collect() # 3. SETUP CIRCUIT & BACKEND print(" ⚛️ Building Circuit...") fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear') ansatz = RealAmplitudes(N_QUBITS, reps=1) combined = fm.compose(ansatz) # --- THE FIX: FORCE SIMULATOR TO SAVE DATA --- combined.save_statevector() # <--- CRITICAL LINE backend = AerSimulator(method='statevector', max_parallel_threads=4) transpiled = transpile(combined, backend, optimization_level=1) # 4. CUSTOM OBJECTIVE FUNCTION loss_func = SVCLoss(C=1.0, gamma='auto') def evaluate_kernel(theta): theta_batch = np.tile(theta, (len(X_train), 1)) full_batch_params = np.hstack([X_train, theta_batch]) # Wrapped lists for Aer safety binds = [] for row in full_batch_params: bind_dict = dict(zip(combined.parameters, [[float(x)] for x in row])) binds.append(bind_dict) circuits = [transpiled] * len(X_train) job = backend.run(circuits, parameter_binds=binds) result = job.result() # Now this will work because we added save_statevector() statevectors = np.array([result.get_statevector(i) for i in range(len(X_train))]) kernel_matrix = np.abs(statevectors @ statevectors.conj().T)**2 return kernel_matrix def objective(theta): try: K = evaluate_kernel(theta) svc = SVC(kernel='precomputed', C=1.0) svc.fit(K, y_train) score = svc.score(K, y_train) loss = 1.0 - score return loss except Exception as e: print(f" ⚠️ Warning: Objective failed ({e}). Returning high loss.") return 1.0 # 5. OPTIMIZATION LOOP print(" 🧠 Optimizing Geometry (COBYLA)...") optimizer = COBYLA(maxiter=MAX_ITERS) n_train_params = len(ansatz.parameters) initial_theta = np.random.random(n_train_params) * 0.1 start_time = time.time() history = [] def callback(x): pass res = optimizer.minimize(objective, x0=initial_theta) print(f" ✅ Optimization Complete in {time.time() - start_time:.1f}s") # 6. FINAL EVALUATION print(" 🏆 Evaluating Final Kernel...") optimal_theta = res.x # Helper to re-use logic def get_statevectors(data_x, theta): theta_batch = np.tile(theta, (len(data_x), 1)) full_params = np.hstack([data_x, theta_batch]) binds = [] for row in full_params: binds.append(dict(zip(combined.parameters, [[float(x)] for x in row]))) job = backend.run([transpiled] * len(data_x), parameter_binds=binds) return np.array([job.result().get_statevector(i) for i in range(len(data_x))]) statevectors_train = get_statevectors(X_train, optimal_theta) statevectors_test = get_statevectors(X_test, optimal_theta) K_train = np.abs(statevectors_train @ statevectors_train.conj().T)**2 K_test = np.abs(statevectors_test @ statevectors_train.conj().T)**2 qsvc = SVC(kernel='precomputed', probability=True) qsvc.fit(K_train, y_train) test_probs = qsvc.predict_proba(K_test)[:, 1] test_auc = roc_auc_score(y_test, test_probs) print("\n" + "="*40) print("🚀 MATH BYPASS KERNEL RESULTS") print("="*40) print(f"✅ Test AUC: {test_auc:.4f}") print(f"📉 Linear Base: 0.7500") print("="*40) np.save(f"{OUTPUT_DIR}/optimized_weights_bypass.npy", optimal_theta) if __name__ == "__main__": main()