File size: 5,685 Bytes
0f755ec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import numpy as np
import os
import time
import gc

# --- 1. ENVIRONMENT ---
os.environ["OMP_NUM_THREADS"] = "4"       
os.environ["RAYON_NUM_THREADS"] = "4"     
os.environ["QISKIT_PARALLEL"] = "TRUE"    

from sklearn.svm import SVC
from sklearn.metrics import roc_auc_score
from qiskit import transpile 
from qiskit.circuit import ParameterVector
from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes
from qiskit_machine_learning.utils.loss_functions import SVCLoss
from qiskit_algorithms.optimizers import COBYLA
from qiskit_aer import AerSimulator

# --- CONFIGURATION ---
N_QUBITS = 8         
TRAIN_SIZE = 600     
TEST_SIZE = 300
MAX_ITERS = 50       

OUTPUT_DIR = "optimized_kernel_bypass_results"
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

def main():
    print(f"πŸš€ Starting Quantum Kernel Optimization (MATH BYPASS V4)...")
    print(f"   ⚑ Strategy: Explicit 'save_statevector' instruction added.")

    # 1. LOAD DATA
    possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz']
    data_path = next((p for p in possible_paths if os.path.exists(p)), None)
    if not data_path: 
        print("❌ Error: qgan_data_optimized.npz not found.")
        return

    data = np.load(data_path)
    X_train_full = data['X_train']
    y_train_full = data['y_train']
    X_test_full = data['X_test']
    y_test_full = data['y_test']

    # 2. SUBSAMPLE
    pos_idx = np.where(y_train_full == 1)[0]
    neg_idx = np.where(y_train_full == 0)[0]
    n_pos = min(len(pos_idx), TRAIN_SIZE // 2)
    n_neg = min(len(neg_idx), TRAIN_SIZE // 2)
    
    train_idx = np.concatenate([
        np.random.choice(pos_idx, n_pos, replace=False),
        np.random.choice(neg_idx, n_neg, replace=False)
    ])
    np.random.shuffle(train_idx)
    
    X_train = X_train_full[train_idx]
    y_train = y_train_full[train_idx]
    X_test = X_test_full[:TEST_SIZE]
    y_test = y_test_full[:TEST_SIZE]

    # Free memory
    del data, X_train_full, y_train_full, X_test_full, y_test_full
    gc.collect()

    # 3. SETUP CIRCUIT & BACKEND
    print("   βš›οΈ  Building Circuit...")
    fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear')
    ansatz = RealAmplitudes(N_QUBITS, reps=1)
    combined = fm.compose(ansatz)
    
    # --- THE FIX: FORCE SIMULATOR TO SAVE DATA ---
    combined.save_statevector()  # <--- CRITICAL LINE
    
    backend = AerSimulator(method='statevector', max_parallel_threads=4)
    transpiled = transpile(combined, backend, optimization_level=1) 

    # 4. CUSTOM OBJECTIVE FUNCTION
    loss_func = SVCLoss(C=1.0, gamma='auto')
    
    def evaluate_kernel(theta):
        theta_batch = np.tile(theta, (len(X_train), 1))
        full_batch_params = np.hstack([X_train, theta_batch])
        
        # Wrapped lists for Aer safety
        binds = []
        for row in full_batch_params:
            bind_dict = dict(zip(combined.parameters, [[float(x)] for x in row]))
            binds.append(bind_dict)
        
        circuits = [transpiled] * len(X_train)
        
        job = backend.run(circuits, parameter_binds=binds)
        result = job.result()
        
        # Now this will work because we added save_statevector()
        statevectors = np.array([result.get_statevector(i) for i in range(len(X_train))])
        
        kernel_matrix = np.abs(statevectors @ statevectors.conj().T)**2
        return kernel_matrix

    def objective(theta):
        try:
            K = evaluate_kernel(theta)
            svc = SVC(kernel='precomputed', C=1.0)
            svc.fit(K, y_train)
            score = svc.score(K, y_train)
            loss = 1.0 - score
            return loss
        except Exception as e:
            print(f"      ⚠️ Warning: Objective failed ({e}). Returning high loss.")
            return 1.0

    # 5. OPTIMIZATION LOOP
    print("   🧠 Optimizing Geometry (COBYLA)...")
    optimizer = COBYLA(maxiter=MAX_ITERS)
    
    n_train_params = len(ansatz.parameters)
    initial_theta = np.random.random(n_train_params) * 0.1
    
    start_time = time.time()
    history = []
    
    def callback(x):
        pass 

    res = optimizer.minimize(objective, x0=initial_theta)
    
    print(f"   βœ… Optimization Complete in {time.time() - start_time:.1f}s")
    
    # 6. FINAL EVALUATION
    print("   πŸ† Evaluating Final Kernel...")
    optimal_theta = res.x
    
    # Helper to re-use logic
    def get_statevectors(data_x, theta):
        theta_batch = np.tile(theta, (len(data_x), 1))
        full_params = np.hstack([data_x, theta_batch])
        
        binds = []
        for row in full_params:
            binds.append(dict(zip(combined.parameters, [[float(x)] for x in row])))
            
        job = backend.run([transpiled] * len(data_x), parameter_binds=binds)
        return np.array([job.result().get_statevector(i) for i in range(len(data_x))])

    statevectors_train = get_statevectors(X_train, optimal_theta)
    statevectors_test = get_statevectors(X_test, optimal_theta)
    
    K_train = np.abs(statevectors_train @ statevectors_train.conj().T)**2
    K_test = np.abs(statevectors_test @ statevectors_train.conj().T)**2
    
    qsvc = SVC(kernel='precomputed', probability=True)
    qsvc.fit(K_train, y_train)
    
    test_probs = qsvc.predict_proba(K_test)[:, 1]
    test_auc = roc_auc_score(y_test, test_probs)
    
    print("\n" + "="*40)
    print("πŸš€ MATH BYPASS KERNEL RESULTS")
    print("="*40)
    print(f"βœ… Test AUC:      {test_auc:.4f}")
    print(f"πŸ“‰ Linear Base:   0.7500")
    print("="*40)
    
    np.save(f"{OUTPUT_DIR}/optimized_weights_bypass.npy", optimal_theta)

if __name__ == "__main__":
    main()