File size: 6,352 Bytes
0f755ec | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | import numpy as np
import os
import time
import gc
# --- 1. ENVIRONMENT CONFIGURATION (M1 TURBO) ---
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["RAYON_NUM_THREADS"] = "4"
os.environ["QISKIT_PARALLEL"] = "TRUE"
from sklearn.svm import SVC
from sklearn.metrics import roc_auc_score
from qiskit import transpile
from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes
from qiskit_machine_learning.kernels import TrainableFidelityQuantumKernel
from qiskit_machine_learning.utils.loss_functions import SVCLoss
from qiskit_algorithms.optimizers import COBYLA
# --- AER V2 IMPORTS ---
from qiskit_aer.primitives import SamplerV2 as AerSampler
from qiskit_aer import AerSimulator
from qiskit_machine_learning.state_fidelities import ComputeUncompute, BaseStateFidelity
# --- BASE LITE CONFIGURATION (MODIFY THESE FOR VARIATIONS) ---
N_QUBITS = 8
TRAIN_SIZE = 200 # Reduced from 600
TEST_SIZE = 300
MAX_ITERS = 5 # Reduced from 50 (Smoke Test)
BATCH_SIZE = 5000 # Increased from 2000 (Test RAM usage)
OPT_LEVEL = 1 # Reduced from 3 (Test compilation speed)
OUTPUT_DIR = "lite_diagnostics"
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# --- DUMMY JOB WRAPPER ---
class DummyResult:
def __init__(self, fidelities):
self.fidelities = fidelities
class DummyJob:
def __init__(self, fidelities):
self._fidelities = fidelities
def submit(self): return
def result(self): return DummyResult(self._fidelities)
class BatchedFidelity(BaseStateFidelity):
def __init__(self, fidelity_inst, batch_size):
self._fidelity = fidelity_inst
self._batch_size = batch_size
super().__init__()
def create_fidelity_circuit(self, circuit_1, circuit_2):
return self._fidelity.create_fidelity_circuit(circuit_1, circuit_2)
def _run(self, circuits_1, circuits_2, values_1=None, values_2=None, **options):
n_samples = len(circuits_1)
results = []
# TIMING DIAGNOSTIC
batch_start = time.time()
for i in range(0, n_samples, self._batch_size):
end = min(i + self._batch_size, n_samples)
c1_batch = circuits_1[i:end]
c2_batch = circuits_2[i:end]
v1_batch = values_1[i:end] if values_1 is not None else None
v2_batch = values_2[i:end] if values_2 is not None else None
# Run Job
job = self._fidelity.run(c1_batch, c2_batch, v1_batch, v2_batch, **options)
batch_values = job.result().fidelities
results.append(batch_values)
# Print progress for every batch to debug speed
if (i // self._batch_size) % 5 == 0:
print(f" Batch {i}/{n_samples} processed in {time.time() - batch_start:.2f}s")
batch_start = time.time() # Reset timer
del job, batch_values
return DummyJob(np.concatenate(results))
def main():
print(f"π Starting Base Lite Diagnostic...")
print(f" βοΈ Config: Train={TRAIN_SIZE} | Batch={BATCH_SIZE} | Iters={MAX_ITERS} | Opt={OPT_LEVEL}")
# 1. LOAD DATA
possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz']
data_path = next((p for p in possible_paths if os.path.exists(p)), None)
if not data_path: return
data = np.load(data_path)
X_train_full = data['X_train']
y_train_full = data['y_train']
X_test_full = data['X_test']
y_test_full = data['y_test']
# 2. SUBSAMPLE
pos_idx = np.where(y_train_full == 1)[0]
neg_idx = np.where(y_train_full == 0)[0]
train_idx = np.concatenate([
np.random.choice(pos_idx, TRAIN_SIZE // 2, replace=False),
np.random.choice(neg_idx, TRAIN_SIZE // 2, replace=False)
])
np.random.shuffle(train_idx)
X_train = X_train_full[train_idx]
y_train = y_train_full[train_idx]
X_test = X_test_full[:TEST_SIZE]
y_test = y_test_full[:TEST_SIZE]
# --- 3. MEMORY FLUSH ---
del data, X_train_full, y_train_full, X_test_full, y_test_full
gc.collect()
# 4. CONFIGURE AER BACKEND
backend = AerSimulator(
method="statevector",
precision="single",
max_parallel_threads=4,
max_memory_mb=12000
)
aer_sampler = AerSampler.from_backend(backend)
aer_sampler.options.default_shots = None
# 5. DEFINE KERNEL
print(" βοΈ Initializing Kernel...")
fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear')
ansatz = RealAmplitudes(N_QUBITS, reps=1)
combined_circuit = fm.compose(ansatz)
print(f" π§ Transpiling (Level {OPT_LEVEL})...")
transpiled_circuit = transpile(combined_circuit, backend=backend, optimization_level=OPT_LEVEL)
base_fidelity = ComputeUncompute(sampler=aer_sampler)
batched_fidelity = BatchedFidelity(base_fidelity, batch_size=BATCH_SIZE)
quant_kernel = TrainableFidelityQuantumKernel(
feature_map=transpiled_circuit,
training_parameters=ansatz.parameters,
fidelity=batched_fidelity
)
# 6. OPTIMIZE
print(" π§ Optimizing (COBYLA)...")
loss_func = SVCLoss(C=1.0, gamma='auto')
optimizer = COBYLA(maxiter=MAX_ITERS)
from qiskit_machine_learning.kernels.algorithms import QuantumKernelTrainer
trainer = QuantumKernelTrainer(
quantum_kernel=quant_kernel,
loss=loss_func,
optimizer=optimizer,
initial_point=[0.1] * len(ansatz.parameters)
)
start_time = time.time()
results = trainer.fit(X_train, y_train)
elapsed = time.time() - start_time
print(f" β
Complete in {elapsed:.1f}s ({(elapsed/60):.1f} mins)")
print(f" π― Final Loss: {results.optimal_value:.4f}")
# 7. RUN FINAL SVC
print(" π Training Final SVM...")
qsvc = SVC(kernel=results.quantum_kernel.evaluate, probability=True)
qsvc.fit(X_train, y_train)
test_probs = qsvc.predict_proba(X_test)[:, 1]
test_auc = roc_auc_score(y_test, test_probs)
print("\n" + "="*40)
print(f"π BASE LITE RESULTS (Batch={BATCH_SIZE})")
print("="*40)
print(f"β
Test AUC: {test_auc:.4f}")
print("="*40)
if __name__ == "__main__":
main() |