| import numpy as np |
| import os |
| import time |
| import gc |
|
|
| |
| os.environ["OMP_NUM_THREADS"] = "4" |
| os.environ["RAYON_NUM_THREADS"] = "4" |
| os.environ["QISKIT_PARALLEL"] = "TRUE" |
|
|
| from sklearn.svm import SVC |
| from sklearn.metrics import roc_auc_score |
| from qiskit import transpile |
| from qiskit.circuit import ParameterVector |
| from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes |
| from qiskit_machine_learning.utils.loss_functions import SVCLoss |
| from qiskit_algorithms.optimizers import COBYLA |
| from qiskit_aer import AerSimulator |
|
|
| |
| N_QUBITS = 8 |
| TRAIN_SIZE = 600 |
| TEST_SIZE = 300 |
| MAX_ITERS = 50 |
|
|
| OUTPUT_DIR = "optimized_kernel_bypass_results" |
| if not os.path.exists(OUTPUT_DIR): |
| os.makedirs(OUTPUT_DIR) |
|
|
| def main(): |
| print(f"π Starting Quantum Kernel Optimization (MATH BYPASS V4)...") |
| print(f" β‘ Strategy: Explicit 'save_statevector' instruction added.") |
|
|
| |
| possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz'] |
| data_path = next((p for p in possible_paths if os.path.exists(p)), None) |
| if not data_path: |
| print("β Error: qgan_data_optimized.npz not found.") |
| return |
|
|
| data = np.load(data_path) |
| X_train_full = data['X_train'] |
| y_train_full = data['y_train'] |
| X_test_full = data['X_test'] |
| y_test_full = data['y_test'] |
|
|
| |
| pos_idx = np.where(y_train_full == 1)[0] |
| neg_idx = np.where(y_train_full == 0)[0] |
| n_pos = min(len(pos_idx), TRAIN_SIZE // 2) |
| n_neg = min(len(neg_idx), TRAIN_SIZE // 2) |
| |
| train_idx = np.concatenate([ |
| np.random.choice(pos_idx, n_pos, replace=False), |
| np.random.choice(neg_idx, n_neg, replace=False) |
| ]) |
| np.random.shuffle(train_idx) |
| |
| X_train = X_train_full[train_idx] |
| y_train = y_train_full[train_idx] |
| X_test = X_test_full[:TEST_SIZE] |
| y_test = y_test_full[:TEST_SIZE] |
|
|
| |
| del data, X_train_full, y_train_full, X_test_full, y_test_full |
| gc.collect() |
|
|
| |
| print(" βοΈ Building Circuit...") |
| fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear') |
| ansatz = RealAmplitudes(N_QUBITS, reps=1) |
| combined = fm.compose(ansatz) |
| |
| |
| combined.save_statevector() |
| |
| backend = AerSimulator(method='statevector', max_parallel_threads=4) |
| transpiled = transpile(combined, backend, optimization_level=1) |
|
|
| |
| loss_func = SVCLoss(C=1.0, gamma='auto') |
| |
| def evaluate_kernel(theta): |
| theta_batch = np.tile(theta, (len(X_train), 1)) |
| full_batch_params = np.hstack([X_train, theta_batch]) |
| |
| |
| binds = [] |
| for row in full_batch_params: |
| bind_dict = dict(zip(combined.parameters, [[float(x)] for x in row])) |
| binds.append(bind_dict) |
| |
| circuits = [transpiled] * len(X_train) |
| |
| job = backend.run(circuits, parameter_binds=binds) |
| result = job.result() |
| |
| |
| statevectors = np.array([result.get_statevector(i) for i in range(len(X_train))]) |
| |
| kernel_matrix = np.abs(statevectors @ statevectors.conj().T)**2 |
| return kernel_matrix |
|
|
| def objective(theta): |
| try: |
| K = evaluate_kernel(theta) |
| svc = SVC(kernel='precomputed', C=1.0) |
| svc.fit(K, y_train) |
| score = svc.score(K, y_train) |
| loss = 1.0 - score |
| return loss |
| except Exception as e: |
| print(f" β οΈ Warning: Objective failed ({e}). Returning high loss.") |
| return 1.0 |
|
|
| |
| print(" π§ Optimizing Geometry (COBYLA)...") |
| optimizer = COBYLA(maxiter=MAX_ITERS) |
| |
| n_train_params = len(ansatz.parameters) |
| initial_theta = np.random.random(n_train_params) * 0.1 |
| |
| start_time = time.time() |
| history = [] |
| |
| def callback(x): |
| pass |
|
|
| res = optimizer.minimize(objective, x0=initial_theta) |
| |
| print(f" β
Optimization Complete in {time.time() - start_time:.1f}s") |
| |
| |
| print(" π Evaluating Final Kernel...") |
| optimal_theta = res.x |
| |
| |
| def get_statevectors(data_x, theta): |
| theta_batch = np.tile(theta, (len(data_x), 1)) |
| full_params = np.hstack([data_x, theta_batch]) |
| |
| binds = [] |
| for row in full_params: |
| binds.append(dict(zip(combined.parameters, [[float(x)] for x in row]))) |
| |
| job = backend.run([transpiled] * len(data_x), parameter_binds=binds) |
| return np.array([job.result().get_statevector(i) for i in range(len(data_x))]) |
|
|
| statevectors_train = get_statevectors(X_train, optimal_theta) |
| statevectors_test = get_statevectors(X_test, optimal_theta) |
| |
| K_train = np.abs(statevectors_train @ statevectors_train.conj().T)**2 |
| K_test = np.abs(statevectors_test @ statevectors_train.conj().T)**2 |
| |
| qsvc = SVC(kernel='precomputed', probability=True) |
| qsvc.fit(K_train, y_train) |
| |
| test_probs = qsvc.predict_proba(K_test)[:, 1] |
| test_auc = roc_auc_score(y_test, test_probs) |
| |
| print("\n" + "="*40) |
| print("π MATH BYPASS KERNEL RESULTS") |
| print("="*40) |
| print(f"β
Test AUC: {test_auc:.4f}") |
| print(f"π Linear Base: 0.7500") |
| print("="*40) |
| |
| np.save(f"{OUTPUT_DIR}/optimized_weights_bypass.npy", optimal_theta) |
|
|
| if __name__ == "__main__": |
| main() |