QGAN_Project / vGe0.1 /Foptimize_quantum_kernel_fullturbo.py
1bnjmn3's picture
Add files using upload-large-folder tool
0f755ec verified
import numpy as np
import os
import time
import gc
# --- 1. ENVIRONMENT CONFIGURATION ---
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["RAYON_NUM_THREADS"] = "4"
os.environ["QISKIT_PARALLEL"] = "TRUE"
from sklearn.svm import SVC
from sklearn.metrics import roc_auc_score
from qiskit import transpile
from qiskit.circuit import ParameterVector
from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes
from qiskit_machine_learning.kernels import TrainableFidelityQuantumKernel
from qiskit_machine_learning.utils.loss_functions import SVCLoss
from qiskit_algorithms.optimizers import COBYLA
# --- AER V2 IMPORTS ---
from qiskit_aer.primitives import SamplerV2 as AerSampler
from qiskit_aer import AerSimulator
from qiskit_machine_learning.state_fidelities import ComputeUncompute, BaseStateFidelity
# --- CONFIGURATION ---
N_QUBITS = 8
TRAIN_SIZE = 600
TEST_SIZE = 300
MAX_ITERS = 50
BATCH_SIZE = 2000
OUTPUT_DIR = "optimized_kernel_full_results"
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# --- THE FIX: DUMMY JOB ARCHITECTURE ---
class DummyResult:
def __init__(self, fidelities):
self.fidelities = fidelities
class DummyJob:
def __init__(self, fidelities):
self._fidelities = fidelities
def submit(self):
return
def result(self):
return DummyResult(self._fidelities)
class BatchedFidelity(BaseStateFidelity):
def __init__(self, fidelity_inst, batch_size=2000):
self._fidelity = fidelity_inst
self._batch_size = batch_size
super().__init__()
def create_fidelity_circuit(self, circuit_1, circuit_2):
return self._fidelity.create_fidelity_circuit(circuit_1, circuit_2)
def _run(self, circuits_1, circuits_2, values_1=None, values_2=None, **options):
n_samples = len(circuits_1)
results = []
# print(f" โฎ‘ Processing {n_samples} comparisons in batches...")
for i in range(0, n_samples, self._batch_size):
end = min(i + self._batch_size, n_samples)
c1_batch = circuits_1[i:end]
c2_batch = circuits_2[i:end]
v1_batch = values_1[i:end] if values_1 is not None else None
v2_batch = values_2[i:end] if values_2 is not None else None
# --- FIX STARTS HERE ---
# Correct API: Run job -> Get Result -> Extract Fidelities
job = self._fidelity.run(c1_batch, c2_batch, v1_batch, v2_batch, **options)
batch_values = job.result().fidelities
# --- FIX ENDS HERE ---
results.append(batch_values)
# Memory Cleanup
del job
del batch_values
final_fidelities = np.concatenate(results)
return DummyJob(final_fidelities)
def main():
print(f"๐Ÿš€ Starting Quantum Kernel Optimization (BATCHED V5 FINAL)...")
print(f" โšก Backend: Aer C++ Simulator (Statevector)")
print(f" ๐Ÿ›ก๏ธ Safety: BatchedFidelity Wrapper Active")
# 1. LOAD DATA
possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz']
data_path = next((p for p in possible_paths if os.path.exists(p)), None)
if not data_path:
print("โŒ Error: qgan_data_optimized.npz not found.")
return
data = np.load(data_path)
X_train_full = data['X_train']
y_train_full = data['y_train']
X_test_full = data['X_test']
y_test_full = data['y_test']
# 2. SUBSAMPLE
pos_idx = np.where(y_train_full == 1)[0]
neg_idx = np.where(y_train_full == 0)[0]
n_pos = min(len(pos_idx), TRAIN_SIZE // 2)
n_neg = min(len(neg_idx), TRAIN_SIZE // 2)
train_idx = np.concatenate([
np.random.choice(pos_idx, n_pos, replace=False),
np.random.choice(neg_idx, n_neg, replace=False)
])
np.random.shuffle(train_idx)
X_train = X_train_full[train_idx]
y_train = y_train_full[train_idx]
X_test = X_test_full[:TEST_SIZE]
y_test = y_test_full[:TEST_SIZE]
# --- 3. MEMORY FLUSH ---
print(" ๐Ÿงน Freeing unused memory...")
del data, X_train_full, y_train_full, X_test_full, y_test_full
gc.collect()
# 4. CONFIGURE AER BACKEND
backend = AerSimulator(
method="statevector",
precision="single",
max_parallel_threads=4,
max_memory_mb=12000
)
aer_sampler = AerSampler.from_backend(backend)
aer_sampler.options.default_shots = None
# 5. DEFINE KERNEL
print(" โš›๏ธ Initializing Trainable Kernel...")
fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear')
ansatz = RealAmplitudes(N_QUBITS, reps=1)
combined_circuit = fm.compose(ansatz)
print(" ๐Ÿ”ง Transpiling (Optimization Level 3)...")
transpiled_circuit = transpile(combined_circuit, backend=backend, optimization_level=3)
base_fidelity = ComputeUncompute(sampler=aer_sampler)
batched_fidelity = BatchedFidelity(base_fidelity, batch_size=BATCH_SIZE)
quant_kernel = TrainableFidelityQuantumKernel(
feature_map=transpiled_circuit,
training_parameters=ansatz.parameters,
fidelity=batched_fidelity
)
# 6. OPTIMIZE KERNEL
print(" ๐Ÿง  Optimizing Kernel Geometry (COBYLA)...")
loss_func = SVCLoss(C=1.0, gamma='auto')
optimizer = COBYLA(maxiter=MAX_ITERS)
from qiskit_machine_learning.kernels.algorithms import QuantumKernelTrainer
trainer = QuantumKernelTrainer(
quantum_kernel=quant_kernel,
loss=loss_func,
optimizer=optimizer,
initial_point=[0.1] * len(ansatz.parameters)
)
start_time = time.time()
# Train
results = trainer.fit(X_train, y_train)
elapsed = time.time() - start_time
print(f" โœ… Optimization Complete in {elapsed:.1f}s ({(elapsed/60):.1f} mins)")
print(f" ๐ŸŽฏ Final Loss: {results.optimal_value:.4f}")
# 7. RUN FINAL SVC
print(" ๐Ÿ† Training Final SVM with Optimized Kernel...")
optimized_kernel = results.quantum_kernel
qsvc = SVC(kernel=optimized_kernel.evaluate, probability=True)
qsvc.fit(X_train, y_train)
test_probs = qsvc.predict_proba(X_test)[:, 1]
test_auc = roc_auc_score(y_test, test_probs)
print("\n" + "="*40)
print("๐Ÿš€ FULL-SCALE OPTIMIZED KERNEL RESULTS (V5 FIXED)")
print("="*40)
print(f"โœ… Test AUC: {test_auc:.4f}")
print(f"๐Ÿ“‰ Linear Base: 0.7500")
print("="*40)
if test_auc > 0.75:
print("๐ŸŽ‰ SUCCESS: Optimization found a better topology.")
np.save(f"{OUTPUT_DIR}/optimized_weights_full.npy", results.optimal_point)
else:
print("๐Ÿ”ธ Result: Optimization matched or slightly underperformed baseline.")
if __name__ == "__main__":
main()