import numpy as np import pandas as pd import os import sys import time # Try to import Optuna (Install if missing) try: import optuna except ImportError: print("āš ļø Optuna not found. Installing...") import subprocess subprocess.check_call([sys.executable, "-m", "pip", "install", "optuna"]) import optuna from sklearn.ensemble import RandomForestClassifier from sklearn.svm import SVC from sklearn.model_selection import train_test_split from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import roc_auc_score from qiskit import QuantumCircuit from qiskit.circuit.library import ZZFeatureMap, PauliFeatureMap from qiskit_machine_learning.kernels import FidelityQuantumKernel from qiskit_aer import AerSimulator # M1 Optimization os.environ["OMP_NUM_THREADS"] = "4" os.environ["QISKIT_IN_PARALLEL"] = "TRUE" print("šŸš€ INITIATING 'HAIL MARY' OPTIMIZATION (OPTUNA)...") # --- 1. DATA PREP (MULTI-MODAL) --- possible_paths = ['vG.0.1/real_tokamak_data_v2.csv', 'real_tokamak_data_v2.csv'] df = None for path in possible_paths: if os.path.exists(path): print(f" āœ… Found data at: {path}") df = pd.read_csv(path) break if df is None: exit() df.replace([np.inf, -np.inf], np.nan, inplace=True) df.fillna(0, inplace=True) y = df['label'].values # View A: Time Domain (RF) prefixes = ['ip', 'n1', 'beta', 'li', 'q95'] time_features = [] for p in prefixes: cols = [c for c in df.columns if c.startswith(p + '_')] cols.sort(key=lambda x: int(x.split('_')[1])) if len(cols) == 100: time_features.append(df[cols].values) X_time = np.hstack(time_features) # View B: Frequency Domain (Quantum) fft_features = [] for p in ['n1', 'ip']: cols = [c for c in df.columns if c.startswith(p + '_')] cols.sort(key=lambda x: int(x.split('_')[1])) signal = df[cols].values fft_vals = np.abs(np.fft.rfft(signal, axis=1))[:, 1:] indices = np.linspace(0, fft_vals.shape[1]-1, 4, dtype=int) fft_features.append(fft_vals[:, indices]) X_freq = np.hstack(fft_features) scaler_q = MinMaxScaler(feature_range=(0, 2 * np.pi)) X_freq = scaler_q.fit_transform(X_freq) # Split X_time_train, X_time_test, X_freq_train, X_freq_test, y_train, y_test = train_test_split( X_time, X_freq, y, test_size=0.2, stratify=y, random_state=42 ) # --- 2. MINE HARD NEGATIVES --- print(" 🌲 Training Baseline RF to find Hard Negatives...") rf = RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1) rf.fit(X_time_train, y_train) probs_train = rf.predict_proba(X_time_train)[:, 1] # Identify Hard Indices errors = np.abs(y_train - probs_train) hard_indices = np.argsort(errors)[-350:] # Top 350 hardest cases X_hard_train = X_freq_train[hard_indices] y_hard_train = y_train[hard_indices] # We need a validation set FOR OPTIMIZATION that is also "Hard" # So we split the hard residuals into Train/Val X_opt_train, X_opt_val, y_opt_train, y_opt_val = train_test_split( X_hard_train, y_hard_train, test_size=0.3, random_state=42 ) print(f" Optimization Dataset: {len(X_opt_train)} Train | {len(X_opt_val)} Val (All Hard Cases)") # --- 3. DEFINE OPTUNA OBJECTIVE --- def objective(trial): # A. Tune Circuit Physics reps = trial.suggest_int('reps', 1, 3) entanglement = trial.suggest_categorical('entanglement', ['linear', 'circular', 'full']) # B. Tune Feature Map Type # ZZFeatureMap is standard, Pauli allows more complex rotations map_type = trial.suggest_categorical('map_type', ['ZZ', 'Pauli']) if map_type == 'ZZ': feature_map = ZZFeatureMap(feature_dimension=8, reps=reps, entanglement=entanglement) else: feature_map = PauliFeatureMap(feature_dimension=8, reps=reps, paulis=['Z', 'XX'], entanglement=entanglement) # C. Tune SVM Hyperparameters (The Classifier on top of the Kernel) c_value = trial.suggest_float('C', 0.1, 100.0, log=True) # Build Kernel backend = AerSimulator(method='statevector', max_parallel_threads=4) kernel = FidelityQuantumKernel(feature_map=feature_map) # Train SVM try: qsvc = SVC(kernel=kernel.evaluate, C=c_value, probability=True) qsvc.fit(X_opt_train, y_opt_train) # Evaluate on Validation Hard Negatives # We want to maximize AUC on the shots the RF got wrong preds = qsvc.predict_proba(X_opt_val)[:, 1] auc = roc_auc_score(y_opt_val, preds) except Exception as e: print(f"Trial fail: {e}") return 0.0 return auc # --- 4. RUN OPTIMIZATION --- print("\nāš›ļø STARTING OPTIMIZATION (20 TRIALS)...") study = optuna.create_study(direction='maximize') study.optimize(objective, n_trials=20) print("\n" + "="*40) print("šŸ† BEST PARAMETERS FOUND") print("="*40) print(study.best_params) print(f"šŸš€ Best Residual AUC: {study.best_value:.4f}") # --- 5. FINAL VERIFICATION --- # Train the FINAL model with best params on ALL hard data print("\n✨ Training Final Model with Best Params...") best = study.best_params if best['map_type'] == 'ZZ': fm = ZZFeatureMap(8, reps=best['reps'], entanglement=best['entanglement']) else: fm = PauliFeatureMap(8, reps=best['reps'], paulis=['Z', 'XX'], entanglement=best['entanglement']) kernel_final = FidelityQuantumKernel(feature_map=fm) qsvc_final = SVC(kernel=kernel_final.evaluate, C=best['C'], probability=True) qsvc_final.fit(X_hard_train, y_hard_train) # Predict on Global Test Set y_pred_q = qsvc_final.predict_proba(X_freq_test)[:, 1] # RF Baseline y_pred_rf = rf.predict_proba(X_time_test)[:, 1] rf_auc = roc_auc_score(y_test, y_pred_rf) # Blend y_blend = (0.7 * y_pred_rf) + (0.3 * y_pred_q) blend_auc = roc_auc_score(y_test, y_blend) print("\n" + "="*40) print(f"šŸ FINAL PROJECT SCORE (OPTIMIZED)") print("="*40) print(f"🌲 Classical Baseline: {rf_auc:.4f}") print(f"šŸš€ Optimized Ensemble: {blend_auc:.4f}") print(f"šŸ“ˆ Boost: {blend_auc - rf_auc:+.5f}") print("="*40) # Save Params import json with open('best_quantum_params.json', 'w') as f: json.dump(best, f) print("šŸ’¾ Saved parameters to 'best_quantum_params.json'.")