File size: 6,920 Bytes
0f755ec | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | import numpy as np
import os
import time
import gc
# --- 1. ENVIRONMENT CONFIGURATION ---
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["RAYON_NUM_THREADS"] = "4"
os.environ["QISKIT_PARALLEL"] = "TRUE"
from sklearn.svm import SVC
from sklearn.metrics import roc_auc_score
from qiskit import transpile
from qiskit.circuit import ParameterVector
from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes
from qiskit_machine_learning.kernels import TrainableFidelityQuantumKernel
from qiskit_machine_learning.utils.loss_functions import SVCLoss
from qiskit_algorithms.optimizers import COBYLA
# --- AER V2 IMPORTS ---
from qiskit_aer.primitives import SamplerV2 as AerSampler
from qiskit_aer import AerSimulator
from qiskit_machine_learning.state_fidelities import ComputeUncompute, BaseStateFidelity
# --- CONFIGURATION ---
N_QUBITS = 8
TRAIN_SIZE = 600
TEST_SIZE = 300
MAX_ITERS = 50
BATCH_SIZE = 2000
OUTPUT_DIR = "optimized_kernel_full_results"
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# --- THE FIX: DUMMY JOB ARCHITECTURE ---
class DummyResult:
def __init__(self, fidelities):
self.fidelities = fidelities
class DummyJob:
def __init__(self, fidelities):
self._fidelities = fidelities
def submit(self):
return
def result(self):
return DummyResult(self._fidelities)
class BatchedFidelity(BaseStateFidelity):
def __init__(self, fidelity_inst, batch_size=2000):
self._fidelity = fidelity_inst
self._batch_size = batch_size
super().__init__()
def create_fidelity_circuit(self, circuit_1, circuit_2):
return self._fidelity.create_fidelity_circuit(circuit_1, circuit_2)
def _run(self, circuits_1, circuits_2, values_1=None, values_2=None, **options):
n_samples = len(circuits_1)
results = []
# print(f" โฎ Processing {n_samples} comparisons in batches...")
for i in range(0, n_samples, self._batch_size):
end = min(i + self._batch_size, n_samples)
c1_batch = circuits_1[i:end]
c2_batch = circuits_2[i:end]
v1_batch = values_1[i:end] if values_1 is not None else None
v2_batch = values_2[i:end] if values_2 is not None else None
# --- FIX STARTS HERE ---
# Correct API: Run job -> Get Result -> Extract Fidelities
job = self._fidelity.run(c1_batch, c2_batch, v1_batch, v2_batch, **options)
batch_values = job.result().fidelities
# --- FIX ENDS HERE ---
results.append(batch_values)
# Memory Cleanup
del job
del batch_values
final_fidelities = np.concatenate(results)
return DummyJob(final_fidelities)
def main():
print(f"๐ Starting Quantum Kernel Optimization (BATCHED V5 FINAL)...")
print(f" โก Backend: Aer C++ Simulator (Statevector)")
print(f" ๐ก๏ธ Safety: BatchedFidelity Wrapper Active")
# 1. LOAD DATA
possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz']
data_path = next((p for p in possible_paths if os.path.exists(p)), None)
if not data_path:
print("โ Error: qgan_data_optimized.npz not found.")
return
data = np.load(data_path)
X_train_full = data['X_train']
y_train_full = data['y_train']
X_test_full = data['X_test']
y_test_full = data['y_test']
# 2. SUBSAMPLE
pos_idx = np.where(y_train_full == 1)[0]
neg_idx = np.where(y_train_full == 0)[0]
n_pos = min(len(pos_idx), TRAIN_SIZE // 2)
n_neg = min(len(neg_idx), TRAIN_SIZE // 2)
train_idx = np.concatenate([
np.random.choice(pos_idx, n_pos, replace=False),
np.random.choice(neg_idx, n_neg, replace=False)
])
np.random.shuffle(train_idx)
X_train = X_train_full[train_idx]
y_train = y_train_full[train_idx]
X_test = X_test_full[:TEST_SIZE]
y_test = y_test_full[:TEST_SIZE]
# --- 3. MEMORY FLUSH ---
print(" ๐งน Freeing unused memory...")
del data, X_train_full, y_train_full, X_test_full, y_test_full
gc.collect()
# 4. CONFIGURE AER BACKEND
backend = AerSimulator(
method="statevector",
precision="single",
max_parallel_threads=4,
max_memory_mb=12000
)
aer_sampler = AerSampler.from_backend(backend)
aer_sampler.options.default_shots = None
# 5. DEFINE KERNEL
print(" โ๏ธ Initializing Trainable Kernel...")
fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear')
ansatz = RealAmplitudes(N_QUBITS, reps=1)
combined_circuit = fm.compose(ansatz)
print(" ๐ง Transpiling (Optimization Level 3)...")
transpiled_circuit = transpile(combined_circuit, backend=backend, optimization_level=3)
base_fidelity = ComputeUncompute(sampler=aer_sampler)
batched_fidelity = BatchedFidelity(base_fidelity, batch_size=BATCH_SIZE)
quant_kernel = TrainableFidelityQuantumKernel(
feature_map=transpiled_circuit,
training_parameters=ansatz.parameters,
fidelity=batched_fidelity
)
# 6. OPTIMIZE KERNEL
print(" ๐ง Optimizing Kernel Geometry (COBYLA)...")
loss_func = SVCLoss(C=1.0, gamma='auto')
optimizer = COBYLA(maxiter=MAX_ITERS)
from qiskit_machine_learning.kernels.algorithms import QuantumKernelTrainer
trainer = QuantumKernelTrainer(
quantum_kernel=quant_kernel,
loss=loss_func,
optimizer=optimizer,
initial_point=[0.1] * len(ansatz.parameters)
)
start_time = time.time()
# Train
results = trainer.fit(X_train, y_train)
elapsed = time.time() - start_time
print(f" โ
Optimization Complete in {elapsed:.1f}s ({(elapsed/60):.1f} mins)")
print(f" ๐ฏ Final Loss: {results.optimal_value:.4f}")
# 7. RUN FINAL SVC
print(" ๐ Training Final SVM with Optimized Kernel...")
optimized_kernel = results.quantum_kernel
qsvc = SVC(kernel=optimized_kernel.evaluate, probability=True)
qsvc.fit(X_train, y_train)
test_probs = qsvc.predict_proba(X_test)[:, 1]
test_auc = roc_auc_score(y_test, test_probs)
print("\n" + "="*40)
print("๐ FULL-SCALE OPTIMIZED KERNEL RESULTS (V5 FIXED)")
print("="*40)
print(f"โ
Test AUC: {test_auc:.4f}")
print(f"๐ Linear Base: 0.7500")
print("="*40)
if test_auc > 0.75:
print("๐ SUCCESS: Optimization found a better topology.")
np.save(f"{OUTPUT_DIR}/optimized_weights_full.npy", results.optimal_point)
else:
print("๐ธ Result: Optimization matched or slightly underperformed baseline.")
if __name__ == "__main__":
main() |