File size: 6,131 Bytes
17313b4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | import numpy as np
import pandas as pd
import os
import sys
import time
# Try to import Optuna (Install if missing)
try:
import optuna
except ImportError:
print("β οΈ Optuna not found. Installing...")
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "optuna"])
import optuna
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import roc_auc_score
from qiskit import QuantumCircuit
from qiskit.circuit.library import ZZFeatureMap, PauliFeatureMap
from qiskit_machine_learning.kernels import FidelityQuantumKernel
from qiskit_aer import AerSimulator
# M1 Optimization
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["QISKIT_IN_PARALLEL"] = "TRUE"
print("π INITIATING 'HAIL MARY' OPTIMIZATION (OPTUNA)...")
# --- 1. DATA PREP (MULTI-MODAL) ---
possible_paths = ['vG.0.1/real_tokamak_data_v2.csv', 'real_tokamak_data_v2.csv']
df = None
for path in possible_paths:
if os.path.exists(path):
print(f" β
Found data at: {path}")
df = pd.read_csv(path)
break
if df is None: exit()
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df.fillna(0, inplace=True)
y = df['label'].values
# View A: Time Domain (RF)
prefixes = ['ip', 'n1', 'beta', 'li', 'q95']
time_features = []
for p in prefixes:
cols = [c for c in df.columns if c.startswith(p + '_')]
cols.sort(key=lambda x: int(x.split('_')[1]))
if len(cols) == 100: time_features.append(df[cols].values)
X_time = np.hstack(time_features)
# View B: Frequency Domain (Quantum)
fft_features = []
for p in ['n1', 'ip']:
cols = [c for c in df.columns if c.startswith(p + '_')]
cols.sort(key=lambda x: int(x.split('_')[1]))
signal = df[cols].values
fft_vals = np.abs(np.fft.rfft(signal, axis=1))[:, 1:]
indices = np.linspace(0, fft_vals.shape[1]-1, 4, dtype=int)
fft_features.append(fft_vals[:, indices])
X_freq = np.hstack(fft_features)
scaler_q = MinMaxScaler(feature_range=(0, 2 * np.pi))
X_freq = scaler_q.fit_transform(X_freq)
# Split
X_time_train, X_time_test, X_freq_train, X_freq_test, y_train, y_test = train_test_split(
X_time, X_freq, y, test_size=0.2, stratify=y, random_state=42
)
# --- 2. MINE HARD NEGATIVES ---
print(" π² Training Baseline RF to find Hard Negatives...")
rf = RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1)
rf.fit(X_time_train, y_train)
probs_train = rf.predict_proba(X_time_train)[:, 1]
# Identify Hard Indices
errors = np.abs(y_train - probs_train)
hard_indices = np.argsort(errors)[-350:] # Top 350 hardest cases
X_hard_train = X_freq_train[hard_indices]
y_hard_train = y_train[hard_indices]
# We need a validation set FOR OPTIMIZATION that is also "Hard"
# So we split the hard residuals into Train/Val
X_opt_train, X_opt_val, y_opt_train, y_opt_val = train_test_split(
X_hard_train, y_hard_train, test_size=0.3, random_state=42
)
print(f" Optimization Dataset: {len(X_opt_train)} Train | {len(X_opt_val)} Val (All Hard Cases)")
# --- 3. DEFINE OPTUNA OBJECTIVE ---
def objective(trial):
# A. Tune Circuit Physics
reps = trial.suggest_int('reps', 1, 3)
entanglement = trial.suggest_categorical('entanglement', ['linear', 'circular', 'full'])
# B. Tune Feature Map Type
# ZZFeatureMap is standard, Pauli allows more complex rotations
map_type = trial.suggest_categorical('map_type', ['ZZ', 'Pauli'])
if map_type == 'ZZ':
feature_map = ZZFeatureMap(feature_dimension=8, reps=reps, entanglement=entanglement)
else:
feature_map = PauliFeatureMap(feature_dimension=8, reps=reps, paulis=['Z', 'XX'], entanglement=entanglement)
# C. Tune SVM Hyperparameters (The Classifier on top of the Kernel)
c_value = trial.suggest_float('C', 0.1, 100.0, log=True)
# Build Kernel
backend = AerSimulator(method='statevector', max_parallel_threads=4)
kernel = FidelityQuantumKernel(feature_map=feature_map)
# Train SVM
try:
qsvc = SVC(kernel=kernel.evaluate, C=c_value, probability=True)
qsvc.fit(X_opt_train, y_opt_train)
# Evaluate on Validation Hard Negatives
# We want to maximize AUC on the shots the RF got wrong
preds = qsvc.predict_proba(X_opt_val)[:, 1]
auc = roc_auc_score(y_opt_val, preds)
except Exception as e:
print(f"Trial fail: {e}")
return 0.0
return auc
# --- 4. RUN OPTIMIZATION ---
print("\nβοΈ STARTING OPTIMIZATION (20 TRIALS)...")
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20)
print("\n" + "="*40)
print("π BEST PARAMETERS FOUND")
print("="*40)
print(study.best_params)
print(f"π Best Residual AUC: {study.best_value:.4f}")
# --- 5. FINAL VERIFICATION ---
# Train the FINAL model with best params on ALL hard data
print("\n⨠Training Final Model with Best Params...")
best = study.best_params
if best['map_type'] == 'ZZ':
fm = ZZFeatureMap(8, reps=best['reps'], entanglement=best['entanglement'])
else:
fm = PauliFeatureMap(8, reps=best['reps'], paulis=['Z', 'XX'], entanglement=best['entanglement'])
kernel_final = FidelityQuantumKernel(feature_map=fm)
qsvc_final = SVC(kernel=kernel_final.evaluate, C=best['C'], probability=True)
qsvc_final.fit(X_hard_train, y_hard_train)
# Predict on Global Test Set
y_pred_q = qsvc_final.predict_proba(X_freq_test)[:, 1]
# RF Baseline
y_pred_rf = rf.predict_proba(X_time_test)[:, 1]
rf_auc = roc_auc_score(y_test, y_pred_rf)
# Blend
y_blend = (0.7 * y_pred_rf) + (0.3 * y_pred_q)
blend_auc = roc_auc_score(y_test, y_blend)
print("\n" + "="*40)
print(f"π FINAL PROJECT SCORE (OPTIMIZED)")
print("="*40)
print(f"π² Classical Baseline: {rf_auc:.4f}")
print(f"π Optimized Ensemble: {blend_auc:.4f}")
print(f"π Boost: {blend_auc - rf_auc:+.5f}")
print("="*40)
# Save Params
import json
with open('best_quantum_params.json', 'w') as f:
json.dump(best, f)
print("πΎ Saved parameters to 'best_quantum_params.json'.") |