QGAN_Project / vGe0.1 /optimize_qkernel_vare.py
1bnjmn3's picture
Add files using upload-large-folder tool
0f755ec verified
import numpy as np
import os
import time
import gc
# --- 1. ENVIRONMENT ---
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["RAYON_NUM_THREADS"] = "4"
os.environ["QISKIT_PARALLEL"] = "TRUE"
from sklearn.svm import SVC
from sklearn.metrics import roc_auc_score
from qiskit import transpile
from qiskit.circuit import ParameterVector
from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes
from qiskit_machine_learning.utils.loss_functions import SVCLoss
from qiskit_algorithms.optimizers import COBYLA
from qiskit_aer import AerSimulator
# --- CONFIGURATION ---
N_QUBITS = 8
TRAIN_SIZE = 600
TEST_SIZE = 300
MAX_ITERS = 50
OUTPUT_DIR = "optimized_kernel_bypass_results"
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
def main():
print(f"πŸš€ Starting Quantum Kernel Optimization (MATH BYPASS V4)...")
print(f" ⚑ Strategy: Explicit 'save_statevector' instruction added.")
# 1. LOAD DATA
possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz']
data_path = next((p for p in possible_paths if os.path.exists(p)), None)
if not data_path:
print("❌ Error: qgan_data_optimized.npz not found.")
return
data = np.load(data_path)
X_train_full = data['X_train']
y_train_full = data['y_train']
X_test_full = data['X_test']
y_test_full = data['y_test']
# 2. SUBSAMPLE
pos_idx = np.where(y_train_full == 1)[0]
neg_idx = np.where(y_train_full == 0)[0]
n_pos = min(len(pos_idx), TRAIN_SIZE // 2)
n_neg = min(len(neg_idx), TRAIN_SIZE // 2)
train_idx = np.concatenate([
np.random.choice(pos_idx, n_pos, replace=False),
np.random.choice(neg_idx, n_neg, replace=False)
])
np.random.shuffle(train_idx)
X_train = X_train_full[train_idx]
y_train = y_train_full[train_idx]
X_test = X_test_full[:TEST_SIZE]
y_test = y_test_full[:TEST_SIZE]
# Free memory
del data, X_train_full, y_train_full, X_test_full, y_test_full
gc.collect()
# 3. SETUP CIRCUIT & BACKEND
print(" βš›οΈ Building Circuit...")
fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear')
ansatz = RealAmplitudes(N_QUBITS, reps=1)
combined = fm.compose(ansatz)
# --- THE FIX: FORCE SIMULATOR TO SAVE DATA ---
combined.save_statevector() # <--- CRITICAL LINE
backend = AerSimulator(method='statevector', max_parallel_threads=4)
transpiled = transpile(combined, backend, optimization_level=1)
# 4. CUSTOM OBJECTIVE FUNCTION
loss_func = SVCLoss(C=1.0, gamma='auto')
def evaluate_kernel(theta):
theta_batch = np.tile(theta, (len(X_train), 1))
full_batch_params = np.hstack([X_train, theta_batch])
# Wrapped lists for Aer safety
binds = []
for row in full_batch_params:
bind_dict = dict(zip(combined.parameters, [[float(x)] for x in row]))
binds.append(bind_dict)
circuits = [transpiled] * len(X_train)
job = backend.run(circuits, parameter_binds=binds)
result = job.result()
# Now this will work because we added save_statevector()
statevectors = np.array([result.get_statevector(i) for i in range(len(X_train))])
kernel_matrix = np.abs(statevectors @ statevectors.conj().T)**2
return kernel_matrix
def objective(theta):
try:
K = evaluate_kernel(theta)
svc = SVC(kernel='precomputed', C=1.0)
svc.fit(K, y_train)
score = svc.score(K, y_train)
loss = 1.0 - score
return loss
except Exception as e:
print(f" ⚠️ Warning: Objective failed ({e}). Returning high loss.")
return 1.0
# 5. OPTIMIZATION LOOP
print(" 🧠 Optimizing Geometry (COBYLA)...")
optimizer = COBYLA(maxiter=MAX_ITERS)
n_train_params = len(ansatz.parameters)
initial_theta = np.random.random(n_train_params) * 0.1
start_time = time.time()
history = []
def callback(x):
pass
res = optimizer.minimize(objective, x0=initial_theta)
print(f" βœ… Optimization Complete in {time.time() - start_time:.1f}s")
# 6. FINAL EVALUATION
print(" πŸ† Evaluating Final Kernel...")
optimal_theta = res.x
# Helper to re-use logic
def get_statevectors(data_x, theta):
theta_batch = np.tile(theta, (len(data_x), 1))
full_params = np.hstack([data_x, theta_batch])
binds = []
for row in full_params:
binds.append(dict(zip(combined.parameters, [[float(x)] for x in row])))
job = backend.run([transpiled] * len(data_x), parameter_binds=binds)
return np.array([job.result().get_statevector(i) for i in range(len(data_x))])
statevectors_train = get_statevectors(X_train, optimal_theta)
statevectors_test = get_statevectors(X_test, optimal_theta)
K_train = np.abs(statevectors_train @ statevectors_train.conj().T)**2
K_test = np.abs(statevectors_test @ statevectors_train.conj().T)**2
qsvc = SVC(kernel='precomputed', probability=True)
qsvc.fit(K_train, y_train)
test_probs = qsvc.predict_proba(K_test)[:, 1]
test_auc = roc_auc_score(y_test, test_probs)
print("\n" + "="*40)
print("πŸš€ MATH BYPASS KERNEL RESULTS")
print("="*40)
print(f"βœ… Test AUC: {test_auc:.4f}")
print(f"πŸ“‰ Linear Base: 0.7500")
print("="*40)
np.save(f"{OUTPUT_DIR}/optimized_weights_bypass.npy", optimal_theta)
if __name__ == "__main__":
main()