| import numpy as np |
| import matplotlib.pyplot as plt |
| from sklearn.preprocessing import StandardScaler |
| from sklearn.decomposition import PCA |
| import joblib |
| import os |
|
|
| |
| N_DIM = 2 |
| BINS_PER_DIM = 4 |
| OUTPUT_DIR = "v2_pipeline_output" |
|
|
| if not os.path.exists(OUTPUT_DIR): |
| os.makedirs(OUTPUT_DIR) |
|
|
| def process_optimized_data(): |
| print("π Starting Phase I: Data Pipeline (V3 - Optimized Input)...") |
| |
| |
| |
| file_path = 'vG.0.1/qgan_data_optimized.npz' |
| |
| if not os.path.exists(file_path): |
| |
| if os.path.exists('qgan_data_optimized.npz'): |
| file_path = 'qgan_data_optimized.npz' |
| else: |
| print(f"β Error: '{file_path}' not found.") |
| return |
|
|
| print(f" β
Loading: {file_path}") |
| data = np.load(file_path) |
| X_train = data['X_train'] |
| y_train = data['y_train'] |
| X_test = data['X_test'] |
| y_test = data['y_test'] |
| |
| |
| |
| |
| X_full = np.concatenate([X_train, X_test]) |
| y_full = np.concatenate([y_train, y_test]) |
| |
| X_healthy = X_full[y_full == 0] |
| X_anomalous = X_full[y_full == 1] |
| |
| print(f" π Total Healthy Samples (The Map): {len(X_healthy)}") |
| print(f" π Total Anomalous Samples (The Trap): {len(X_anomalous)}") |
|
|
| |
| |
| print(" βοΈ Re-Normalizing for PCA...") |
| scaler = StandardScaler() |
| X_healthy_scaled = scaler.fit_transform(X_healthy) |
| X_anomalous_scaled = scaler.transform(X_anomalous) |
|
|
| |
| |
| print(f" π Compressing 8 Features -> {N_DIM} Dimensions...") |
| pca = PCA(n_components=N_DIM) |
| X_healthy_pca = pca.fit_transform(X_healthy_scaled) |
| X_anomalous_pca = pca.transform(X_anomalous_scaled) |
| |
| print(f" Explained Variance: {pca.explained_variance_ratio_}") |
| print(f" Total Information Retained: {sum(pca.explained_variance_ratio_):.2%}") |
|
|
| |
| print(" πΈοΈ Generating Quantum Target Distribution (4x4 Grid)...") |
| |
| hist, x_edges, y_edges = np.histogram2d( |
| X_healthy_pca[:, 0], |
| X_healthy_pca[:, 1], |
| bins=BINS_PER_DIM, |
| density=True |
| ) |
| |
| |
| target_distribution = hist.flatten() |
| target_distribution = target_distribution / np.sum(target_distribution) |
|
|
| |
| plt.figure(figsize=(10, 8)) |
| |
| plt.scatter(X_healthy_pca[:, 0], X_healthy_pca[:, 1], |
| c='blue', alpha=0.3, s=10, label='Healthy (Top 8 Physics)') |
| |
| plt.scatter(X_anomalous_pca[:, 0], X_anomalous_pca[:, 1], |
| c='red', alpha=0.3, s=10, label='Disruptions') |
| |
| |
| for x in x_edges: plt.axvline(x, color='gray', linestyle='--', alpha=0.3) |
| for y in y_edges: plt.axhline(y, color='gray', linestyle='--', alpha=0.3) |
|
|
| plt.title("V3 Topology: Optimized Physics Features (8->2 Dim)") |
| plt.xlabel("PC 1") |
| plt.ylabel("PC 2") |
| plt.legend() |
| plt.savefig(f"{OUTPUT_DIR}/real_data_topology_v3.png") |
| print(f" πΈ Topology map saved to '{OUTPUT_DIR}/real_data_topology_v3.png'") |
|
|
| |
| output_file = f"{OUTPUT_DIR}/processed_data.npz" |
| np.savez(output_file, |
| target_distribution=target_distribution, |
| grid_bounds=(x_edges, y_edges), |
| X_healthy_pca=X_healthy_pca, |
| X_anomalous_pca=X_anomalous_pca) |
| |
| joblib.dump(scaler, f"{OUTPUT_DIR}/scaler.pkl") |
| joblib.dump(pca, f"{OUTPUT_DIR}/pca.pkl") |
| |
| print(f"\nβ
SUCCESS. V3 Pipeline Complete. Ready for Quantum Training.") |
|
|
| if __name__ == "__main__": |
| process_optimized_data() |