File size: 5,685 Bytes
0f755ec | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | import numpy as np
import os
import time
import gc
# --- 1. ENVIRONMENT ---
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["RAYON_NUM_THREADS"] = "4"
os.environ["QISKIT_PARALLEL"] = "TRUE"
from sklearn.svm import SVC
from sklearn.metrics import roc_auc_score
from qiskit import transpile
from qiskit.circuit import ParameterVector
from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes
from qiskit_machine_learning.utils.loss_functions import SVCLoss
from qiskit_algorithms.optimizers import COBYLA
from qiskit_aer import AerSimulator
# --- CONFIGURATION ---
N_QUBITS = 8
TRAIN_SIZE = 600
TEST_SIZE = 300
MAX_ITERS = 50
OUTPUT_DIR = "optimized_kernel_bypass_results"
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
def main():
print(f"π Starting Quantum Kernel Optimization (MATH BYPASS V4)...")
print(f" β‘ Strategy: Explicit 'save_statevector' instruction added.")
# 1. LOAD DATA
possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz']
data_path = next((p for p in possible_paths if os.path.exists(p)), None)
if not data_path:
print("β Error: qgan_data_optimized.npz not found.")
return
data = np.load(data_path)
X_train_full = data['X_train']
y_train_full = data['y_train']
X_test_full = data['X_test']
y_test_full = data['y_test']
# 2. SUBSAMPLE
pos_idx = np.where(y_train_full == 1)[0]
neg_idx = np.where(y_train_full == 0)[0]
n_pos = min(len(pos_idx), TRAIN_SIZE // 2)
n_neg = min(len(neg_idx), TRAIN_SIZE // 2)
train_idx = np.concatenate([
np.random.choice(pos_idx, n_pos, replace=False),
np.random.choice(neg_idx, n_neg, replace=False)
])
np.random.shuffle(train_idx)
X_train = X_train_full[train_idx]
y_train = y_train_full[train_idx]
X_test = X_test_full[:TEST_SIZE]
y_test = y_test_full[:TEST_SIZE]
# Free memory
del data, X_train_full, y_train_full, X_test_full, y_test_full
gc.collect()
# 3. SETUP CIRCUIT & BACKEND
print(" βοΈ Building Circuit...")
fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear')
ansatz = RealAmplitudes(N_QUBITS, reps=1)
combined = fm.compose(ansatz)
# --- THE FIX: FORCE SIMULATOR TO SAVE DATA ---
combined.save_statevector() # <--- CRITICAL LINE
backend = AerSimulator(method='statevector', max_parallel_threads=4)
transpiled = transpile(combined, backend, optimization_level=1)
# 4. CUSTOM OBJECTIVE FUNCTION
loss_func = SVCLoss(C=1.0, gamma='auto')
def evaluate_kernel(theta):
theta_batch = np.tile(theta, (len(X_train), 1))
full_batch_params = np.hstack([X_train, theta_batch])
# Wrapped lists for Aer safety
binds = []
for row in full_batch_params:
bind_dict = dict(zip(combined.parameters, [[float(x)] for x in row]))
binds.append(bind_dict)
circuits = [transpiled] * len(X_train)
job = backend.run(circuits, parameter_binds=binds)
result = job.result()
# Now this will work because we added save_statevector()
statevectors = np.array([result.get_statevector(i) for i in range(len(X_train))])
kernel_matrix = np.abs(statevectors @ statevectors.conj().T)**2
return kernel_matrix
def objective(theta):
try:
K = evaluate_kernel(theta)
svc = SVC(kernel='precomputed', C=1.0)
svc.fit(K, y_train)
score = svc.score(K, y_train)
loss = 1.0 - score
return loss
except Exception as e:
print(f" β οΈ Warning: Objective failed ({e}). Returning high loss.")
return 1.0
# 5. OPTIMIZATION LOOP
print(" π§ Optimizing Geometry (COBYLA)...")
optimizer = COBYLA(maxiter=MAX_ITERS)
n_train_params = len(ansatz.parameters)
initial_theta = np.random.random(n_train_params) * 0.1
start_time = time.time()
history = []
def callback(x):
pass
res = optimizer.minimize(objective, x0=initial_theta)
print(f" β
Optimization Complete in {time.time() - start_time:.1f}s")
# 6. FINAL EVALUATION
print(" π Evaluating Final Kernel...")
optimal_theta = res.x
# Helper to re-use logic
def get_statevectors(data_x, theta):
theta_batch = np.tile(theta, (len(data_x), 1))
full_params = np.hstack([data_x, theta_batch])
binds = []
for row in full_params:
binds.append(dict(zip(combined.parameters, [[float(x)] for x in row])))
job = backend.run([transpiled] * len(data_x), parameter_binds=binds)
return np.array([job.result().get_statevector(i) for i in range(len(data_x))])
statevectors_train = get_statevectors(X_train, optimal_theta)
statevectors_test = get_statevectors(X_test, optimal_theta)
K_train = np.abs(statevectors_train @ statevectors_train.conj().T)**2
K_test = np.abs(statevectors_test @ statevectors_train.conj().T)**2
qsvc = SVC(kernel='precomputed', probability=True)
qsvc.fit(K_train, y_train)
test_probs = qsvc.predict_proba(K_test)[:, 1]
test_auc = roc_auc_score(y_test, test_probs)
print("\n" + "="*40)
print("π MATH BYPASS KERNEL RESULTS")
print("="*40)
print(f"β
Test AUC: {test_auc:.4f}")
print(f"π Linear Base: 0.7500")
print("="*40)
np.save(f"{OUTPUT_DIR}/optimized_weights_bypass.npy", optimal_theta)
if __name__ == "__main__":
main() |