| import numpy as np |
| import pandas as pd |
| import os |
| import sys |
| import time |
|
|
| |
| try: |
| import optuna |
| except ImportError: |
| print("β οΈ Optuna not found. Installing...") |
| import subprocess |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "optuna"]) |
| import optuna |
|
|
| from sklearn.ensemble import RandomForestClassifier |
| from sklearn.svm import SVC |
| from sklearn.model_selection import train_test_split |
| from sklearn.preprocessing import MinMaxScaler |
| from sklearn.metrics import roc_auc_score |
|
|
| from qiskit import QuantumCircuit |
| from qiskit.circuit.library import ZZFeatureMap, PauliFeatureMap |
| from qiskit_machine_learning.kernels import FidelityQuantumKernel |
| from qiskit_aer import AerSimulator |
|
|
| |
| os.environ["OMP_NUM_THREADS"] = "4" |
| os.environ["QISKIT_IN_PARALLEL"] = "TRUE" |
|
|
| print("π INITIATING 'HAIL MARY' OPTIMIZATION (OPTUNA)...") |
|
|
| |
| possible_paths = ['vG.0.1/real_tokamak_data_v2.csv', 'real_tokamak_data_v2.csv'] |
| df = None |
| for path in possible_paths: |
| if os.path.exists(path): |
| print(f" β
Found data at: {path}") |
| df = pd.read_csv(path) |
| break |
| if df is None: exit() |
|
|
| df.replace([np.inf, -np.inf], np.nan, inplace=True) |
| df.fillna(0, inplace=True) |
| y = df['label'].values |
|
|
| |
| prefixes = ['ip', 'n1', 'beta', 'li', 'q95'] |
| time_features = [] |
| for p in prefixes: |
| cols = [c for c in df.columns if c.startswith(p + '_')] |
| cols.sort(key=lambda x: int(x.split('_')[1])) |
| if len(cols) == 100: time_features.append(df[cols].values) |
| X_time = np.hstack(time_features) |
|
|
| |
| fft_features = [] |
| for p in ['n1', 'ip']: |
| cols = [c for c in df.columns if c.startswith(p + '_')] |
| cols.sort(key=lambda x: int(x.split('_')[1])) |
| signal = df[cols].values |
| fft_vals = np.abs(np.fft.rfft(signal, axis=1))[:, 1:] |
| indices = np.linspace(0, fft_vals.shape[1]-1, 4, dtype=int) |
| fft_features.append(fft_vals[:, indices]) |
| X_freq = np.hstack(fft_features) |
| scaler_q = MinMaxScaler(feature_range=(0, 2 * np.pi)) |
| X_freq = scaler_q.fit_transform(X_freq) |
|
|
| |
| X_time_train, X_time_test, X_freq_train, X_freq_test, y_train, y_test = train_test_split( |
| X_time, X_freq, y, test_size=0.2, stratify=y, random_state=42 |
| ) |
|
|
| |
| print(" π² Training Baseline RF to find Hard Negatives...") |
| rf = RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1) |
| rf.fit(X_time_train, y_train) |
| probs_train = rf.predict_proba(X_time_train)[:, 1] |
|
|
| |
| errors = np.abs(y_train - probs_train) |
| hard_indices = np.argsort(errors)[-350:] |
|
|
| X_hard_train = X_freq_train[hard_indices] |
| y_hard_train = y_train[hard_indices] |
|
|
| |
| |
| X_opt_train, X_opt_val, y_opt_train, y_opt_val = train_test_split( |
| X_hard_train, y_hard_train, test_size=0.3, random_state=42 |
| ) |
|
|
| print(f" Optimization Dataset: {len(X_opt_train)} Train | {len(X_opt_val)} Val (All Hard Cases)") |
|
|
| |
| def objective(trial): |
| |
| reps = trial.suggest_int('reps', 1, 3) |
| entanglement = trial.suggest_categorical('entanglement', ['linear', 'circular', 'full']) |
| |
| |
| |
| map_type = trial.suggest_categorical('map_type', ['ZZ', 'Pauli']) |
| |
| if map_type == 'ZZ': |
| feature_map = ZZFeatureMap(feature_dimension=8, reps=reps, entanglement=entanglement) |
| else: |
| feature_map = PauliFeatureMap(feature_dimension=8, reps=reps, paulis=['Z', 'XX'], entanglement=entanglement) |
|
|
| |
| c_value = trial.suggest_float('C', 0.1, 100.0, log=True) |
| |
| |
| backend = AerSimulator(method='statevector', max_parallel_threads=4) |
| kernel = FidelityQuantumKernel(feature_map=feature_map) |
| |
| |
| try: |
| qsvc = SVC(kernel=kernel.evaluate, C=c_value, probability=True) |
| qsvc.fit(X_opt_train, y_opt_train) |
| |
| |
| |
| preds = qsvc.predict_proba(X_opt_val)[:, 1] |
| auc = roc_auc_score(y_opt_val, preds) |
| except Exception as e: |
| print(f"Trial fail: {e}") |
| return 0.0 |
| |
| return auc |
|
|
| |
| print("\nβοΈ STARTING OPTIMIZATION (20 TRIALS)...") |
| study = optuna.create_study(direction='maximize') |
| study.optimize(objective, n_trials=20) |
|
|
| print("\n" + "="*40) |
| print("π BEST PARAMETERS FOUND") |
| print("="*40) |
| print(study.best_params) |
| print(f"π Best Residual AUC: {study.best_value:.4f}") |
|
|
| |
| |
| print("\n⨠Training Final Model with Best Params...") |
| best = study.best_params |
|
|
| if best['map_type'] == 'ZZ': |
| fm = ZZFeatureMap(8, reps=best['reps'], entanglement=best['entanglement']) |
| else: |
| fm = PauliFeatureMap(8, reps=best['reps'], paulis=['Z', 'XX'], entanglement=best['entanglement']) |
|
|
| kernel_final = FidelityQuantumKernel(feature_map=fm) |
| qsvc_final = SVC(kernel=kernel_final.evaluate, C=best['C'], probability=True) |
| qsvc_final.fit(X_hard_train, y_hard_train) |
|
|
| |
| y_pred_q = qsvc_final.predict_proba(X_freq_test)[:, 1] |
|
|
| |
| y_pred_rf = rf.predict_proba(X_time_test)[:, 1] |
| rf_auc = roc_auc_score(y_test, y_pred_rf) |
|
|
| |
| y_blend = (0.7 * y_pred_rf) + (0.3 * y_pred_q) |
| blend_auc = roc_auc_score(y_test, y_blend) |
|
|
| print("\n" + "="*40) |
| print(f"π FINAL PROJECT SCORE (OPTIMIZED)") |
| print("="*40) |
| print(f"π² Classical Baseline: {rf_auc:.4f}") |
| print(f"π Optimized Ensemble: {blend_auc:.4f}") |
| print(f"π Boost: {blend_auc - rf_auc:+.5f}") |
| print("="*40) |
|
|
| |
| import json |
| with open('best_quantum_params.json', 'w') as f: |
| json.dump(best, f) |
| print("πΎ Saved parameters to 'best_quantum_params.json'.") |