| import numpy as np |
| import os |
| import time |
| import gc |
|
|
| |
| os.environ["OMP_NUM_THREADS"] = "4" |
| os.environ["RAYON_NUM_THREADS"] = "4" |
| os.environ["QISKIT_PARALLEL"] = "TRUE" |
|
|
| from sklearn.svm import SVC |
| from sklearn.metrics import roc_auc_score |
| from qiskit import transpile |
| from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes |
| from qiskit_machine_learning.kernels import TrainableFidelityQuantumKernel |
| from qiskit_machine_learning.utils.loss_functions import SVCLoss |
| from qiskit_algorithms.optimizers import COBYLA |
|
|
| |
| from qiskit_aer.primitives import SamplerV2 as AerSampler |
| from qiskit_aer import AerSimulator |
| from qiskit_machine_learning.state_fidelities import ComputeUncompute, BaseStateFidelity |
|
|
| |
| N_QUBITS = 8 |
| TRAIN_SIZE = 200 |
| TEST_SIZE = 300 |
| MAX_ITERS = 5 |
| BATCH_SIZE = 5000 |
| OPT_LEVEL = 1 |
|
|
| OUTPUT_DIR = "lite_diagnostics" |
| if not os.path.exists(OUTPUT_DIR): |
| os.makedirs(OUTPUT_DIR) |
|
|
| |
| class DummyResult: |
| def __init__(self, fidelities): |
| self.fidelities = fidelities |
|
|
| class DummyJob: |
| def __init__(self, fidelities): |
| self._fidelities = fidelities |
| def submit(self): return |
| def result(self): return DummyResult(self._fidelities) |
|
|
| class BatchedFidelity(BaseStateFidelity): |
| def __init__(self, fidelity_inst, batch_size): |
| self._fidelity = fidelity_inst |
| self._batch_size = batch_size |
| super().__init__() |
| |
| def create_fidelity_circuit(self, circuit_1, circuit_2): |
| return self._fidelity.create_fidelity_circuit(circuit_1, circuit_2) |
| |
| def _run(self, circuits_1, circuits_2, values_1=None, values_2=None, **options): |
| n_samples = len(circuits_1) |
| results = [] |
| |
| |
| batch_start = time.time() |
| |
| for i in range(0, n_samples, self._batch_size): |
| end = min(i + self._batch_size, n_samples) |
| |
| c1_batch = circuits_1[i:end] |
| c2_batch = circuits_2[i:end] |
| v1_batch = values_1[i:end] if values_1 is not None else None |
| v2_batch = values_2[i:end] if values_2 is not None else None |
| |
| |
| job = self._fidelity.run(c1_batch, c2_batch, v1_batch, v2_batch, **options) |
| batch_values = job.result().fidelities |
| results.append(batch_values) |
| |
| |
| if (i // self._batch_size) % 5 == 0: |
| print(f" Batch {i}/{n_samples} processed in {time.time() - batch_start:.2f}s") |
| batch_start = time.time() |
| |
| del job, batch_values |
| |
| return DummyJob(np.concatenate(results)) |
|
|
| def main(): |
| print(f"π Starting Base Lite Diagnostic...") |
| print(f" βοΈ Config: Train={TRAIN_SIZE} | Batch={BATCH_SIZE} | Iters={MAX_ITERS} | Opt={OPT_LEVEL}") |
|
|
| |
| possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz'] |
| data_path = next((p for p in possible_paths if os.path.exists(p)), None) |
| if not data_path: return |
|
|
| data = np.load(data_path) |
| X_train_full = data['X_train'] |
| y_train_full = data['y_train'] |
| X_test_full = data['X_test'] |
| y_test_full = data['y_test'] |
|
|
| |
| pos_idx = np.where(y_train_full == 1)[0] |
| neg_idx = np.where(y_train_full == 0)[0] |
| |
| train_idx = np.concatenate([ |
| np.random.choice(pos_idx, TRAIN_SIZE // 2, replace=False), |
| np.random.choice(neg_idx, TRAIN_SIZE // 2, replace=False) |
| ]) |
| np.random.shuffle(train_idx) |
| |
| X_train = X_train_full[train_idx] |
| y_train = y_train_full[train_idx] |
| X_test = X_test_full[:TEST_SIZE] |
| y_test = y_test_full[:TEST_SIZE] |
|
|
| |
| del data, X_train_full, y_train_full, X_test_full, y_test_full |
| gc.collect() |
|
|
| |
| backend = AerSimulator( |
| method="statevector", |
| precision="single", |
| max_parallel_threads=4, |
| max_memory_mb=12000 |
| ) |
| aer_sampler = AerSampler.from_backend(backend) |
| aer_sampler.options.default_shots = None |
|
|
| |
| print(" βοΈ Initializing Kernel...") |
| fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear') |
| ansatz = RealAmplitudes(N_QUBITS, reps=1) |
| combined_circuit = fm.compose(ansatz) |
| |
| print(f" π§ Transpiling (Level {OPT_LEVEL})...") |
| transpiled_circuit = transpile(combined_circuit, backend=backend, optimization_level=OPT_LEVEL) |
| |
| base_fidelity = ComputeUncompute(sampler=aer_sampler) |
| batched_fidelity = BatchedFidelity(base_fidelity, batch_size=BATCH_SIZE) |
| |
| quant_kernel = TrainableFidelityQuantumKernel( |
| feature_map=transpiled_circuit, |
| training_parameters=ansatz.parameters, |
| fidelity=batched_fidelity |
| ) |
|
|
| |
| print(" π§ Optimizing (COBYLA)...") |
| |
| loss_func = SVCLoss(C=1.0, gamma='auto') |
| optimizer = COBYLA(maxiter=MAX_ITERS) |
| |
| from qiskit_machine_learning.kernels.algorithms import QuantumKernelTrainer |
| |
| trainer = QuantumKernelTrainer( |
| quantum_kernel=quant_kernel, |
| loss=loss_func, |
| optimizer=optimizer, |
| initial_point=[0.1] * len(ansatz.parameters) |
| ) |
| |
| start_time = time.time() |
| results = trainer.fit(X_train, y_train) |
| |
| elapsed = time.time() - start_time |
| print(f" β
Complete in {elapsed:.1f}s ({(elapsed/60):.1f} mins)") |
| print(f" π― Final Loss: {results.optimal_value:.4f}") |
| |
| |
| print(" π Training Final SVM...") |
| qsvc = SVC(kernel=results.quantum_kernel.evaluate, probability=True) |
| qsvc.fit(X_train, y_train) |
| |
| test_probs = qsvc.predict_proba(X_test)[:, 1] |
| test_auc = roc_auc_score(y_test, test_probs) |
| |
| print("\n" + "="*40) |
| print(f"π BASE LITE RESULTS (Batch={BATCH_SIZE})") |
| print("="*40) |
| print(f"β
Test AUC: {test_auc:.4f}") |
| print("="*40) |
|
|
| if __name__ == "__main__": |
| main() |