QGAN_Project / vG0.2 /onelastdance_optimized.py
1bnjmn3's picture
Add files using upload-large-folder tool
17313b4 verified
import numpy as np
import pandas as pd
import os
import sys
import time
# Try to import Optuna (Install if missing)
try:
import optuna
except ImportError:
print("⚠️ Optuna not found. Installing...")
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "optuna"])
import optuna
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import roc_auc_score
from qiskit import QuantumCircuit
from qiskit.circuit.library import ZZFeatureMap, PauliFeatureMap
from qiskit_machine_learning.kernels import FidelityQuantumKernel
from qiskit_aer import AerSimulator
# M1 Optimization
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["QISKIT_IN_PARALLEL"] = "TRUE"
print("πŸš€ INITIATING 'HAIL MARY' OPTIMIZATION (OPTUNA)...")
# --- 1. DATA PREP (MULTI-MODAL) ---
possible_paths = ['vG.0.1/real_tokamak_data_v2.csv', 'real_tokamak_data_v2.csv']
df = None
for path in possible_paths:
if os.path.exists(path):
print(f" βœ… Found data at: {path}")
df = pd.read_csv(path)
break
if df is None: exit()
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df.fillna(0, inplace=True)
y = df['label'].values
# View A: Time Domain (RF)
prefixes = ['ip', 'n1', 'beta', 'li', 'q95']
time_features = []
for p in prefixes:
cols = [c for c in df.columns if c.startswith(p + '_')]
cols.sort(key=lambda x: int(x.split('_')[1]))
if len(cols) == 100: time_features.append(df[cols].values)
X_time = np.hstack(time_features)
# View B: Frequency Domain (Quantum)
fft_features = []
for p in ['n1', 'ip']:
cols = [c for c in df.columns if c.startswith(p + '_')]
cols.sort(key=lambda x: int(x.split('_')[1]))
signal = df[cols].values
fft_vals = np.abs(np.fft.rfft(signal, axis=1))[:, 1:]
indices = np.linspace(0, fft_vals.shape[1]-1, 4, dtype=int)
fft_features.append(fft_vals[:, indices])
X_freq = np.hstack(fft_features)
scaler_q = MinMaxScaler(feature_range=(0, 2 * np.pi))
X_freq = scaler_q.fit_transform(X_freq)
# Split
X_time_train, X_time_test, X_freq_train, X_freq_test, y_train, y_test = train_test_split(
X_time, X_freq, y, test_size=0.2, stratify=y, random_state=42
)
# --- 2. MINE HARD NEGATIVES ---
print(" 🌲 Training Baseline RF to find Hard Negatives...")
rf = RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1)
rf.fit(X_time_train, y_train)
probs_train = rf.predict_proba(X_time_train)[:, 1]
# Identify Hard Indices
errors = np.abs(y_train - probs_train)
hard_indices = np.argsort(errors)[-350:] # Top 350 hardest cases
X_hard_train = X_freq_train[hard_indices]
y_hard_train = y_train[hard_indices]
# We need a validation set FOR OPTIMIZATION that is also "Hard"
# So we split the hard residuals into Train/Val
X_opt_train, X_opt_val, y_opt_train, y_opt_val = train_test_split(
X_hard_train, y_hard_train, test_size=0.3, random_state=42
)
print(f" Optimization Dataset: {len(X_opt_train)} Train | {len(X_opt_val)} Val (All Hard Cases)")
# --- 3. DEFINE OPTUNA OBJECTIVE ---
def objective(trial):
# A. Tune Circuit Physics
reps = trial.suggest_int('reps', 1, 3)
entanglement = trial.suggest_categorical('entanglement', ['linear', 'circular', 'full'])
# B. Tune Feature Map Type
# ZZFeatureMap is standard, Pauli allows more complex rotations
map_type = trial.suggest_categorical('map_type', ['ZZ', 'Pauli'])
if map_type == 'ZZ':
feature_map = ZZFeatureMap(feature_dimension=8, reps=reps, entanglement=entanglement)
else:
feature_map = PauliFeatureMap(feature_dimension=8, reps=reps, paulis=['Z', 'XX'], entanglement=entanglement)
# C. Tune SVM Hyperparameters (The Classifier on top of the Kernel)
c_value = trial.suggest_float('C', 0.1, 100.0, log=True)
# Build Kernel
backend = AerSimulator(method='statevector', max_parallel_threads=4)
kernel = FidelityQuantumKernel(feature_map=feature_map)
# Train SVM
try:
qsvc = SVC(kernel=kernel.evaluate, C=c_value, probability=True)
qsvc.fit(X_opt_train, y_opt_train)
# Evaluate on Validation Hard Negatives
# We want to maximize AUC on the shots the RF got wrong
preds = qsvc.predict_proba(X_opt_val)[:, 1]
auc = roc_auc_score(y_opt_val, preds)
except Exception as e:
print(f"Trial fail: {e}")
return 0.0
return auc
# --- 4. RUN OPTIMIZATION ---
print("\nβš›οΈ STARTING OPTIMIZATION (20 TRIALS)...")
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20)
print("\n" + "="*40)
print("πŸ† BEST PARAMETERS FOUND")
print("="*40)
print(study.best_params)
print(f"πŸš€ Best Residual AUC: {study.best_value:.4f}")
# --- 5. FINAL VERIFICATION ---
# Train the FINAL model with best params on ALL hard data
print("\n✨ Training Final Model with Best Params...")
best = study.best_params
if best['map_type'] == 'ZZ':
fm = ZZFeatureMap(8, reps=best['reps'], entanglement=best['entanglement'])
else:
fm = PauliFeatureMap(8, reps=best['reps'], paulis=['Z', 'XX'], entanglement=best['entanglement'])
kernel_final = FidelityQuantumKernel(feature_map=fm)
qsvc_final = SVC(kernel=kernel_final.evaluate, C=best['C'], probability=True)
qsvc_final.fit(X_hard_train, y_hard_train)
# Predict on Global Test Set
y_pred_q = qsvc_final.predict_proba(X_freq_test)[:, 1]
# RF Baseline
y_pred_rf = rf.predict_proba(X_time_test)[:, 1]
rf_auc = roc_auc_score(y_test, y_pred_rf)
# Blend
y_blend = (0.7 * y_pred_rf) + (0.3 * y_pred_q)
blend_auc = roc_auc_score(y_test, y_blend)
print("\n" + "="*40)
print(f"🏁 FINAL PROJECT SCORE (OPTIMIZED)")
print("="*40)
print(f"🌲 Classical Baseline: {rf_auc:.4f}")
print(f"πŸš€ Optimized Ensemble: {blend_auc:.4f}")
print(f"πŸ“ˆ Boost: {blend_auc - rf_auc:+.5f}")
print("="*40)
# Save Params
import json
with open('best_quantum_params.json', 'w') as f:
json.dump(best, f)
print("πŸ’Ύ Saved parameters to 'best_quantum_params.json'.")