Spaces:
Build error
Build error
| """ | |
| batch_generate_matrices_with_gate.py - Enhanced with GATE predictions | |
| ======================================================================== | |
| Generates 5-channel unified probability matrices: | |
| - Channel 0: Autoencoder anomaly scores | |
| - Channel 1: Isolation Forest probabilities | |
| - Channel 2: K-Means probabilities | |
| - Channel 3: Archaeological similarity scores | |
| - Channel 4: GATE model final predictions (NEW!) | |
| Note: This version handles PyTorch import errors gracefully | |
| """ | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import List, Tuple, Optional | |
| import sys | |
| from tqdm import tqdm | |
| import joblib | |
| # Try to import PyTorch, but don't fail if it's not available | |
| PYTORCH_AVAILABLE = False | |
| try: | |
| import torch | |
| import torch.nn as nn | |
| PYTORCH_AVAILABLE = True | |
| print("β PyTorch available") | |
| except (ImportError, OSError) as e: | |
| print(f"β οΈ PyTorch not available: {e}") | |
| print(" Will use scikit-learn GATE model instead") | |
| torch = None | |
| nn = None | |
| # Import from your existing files | |
| from utils import ( | |
| ResUNetAutoencoder, | |
| ResUNetEncoder, | |
| load_patches, | |
| load_model, | |
| normalize_scores, | |
| broadcast_patch_scores_to_pixels, | |
| save_unified_probability_matrix, | |
| compute_autoencoder_probabilities, | |
| compute_iforest_probabilities, | |
| ) | |
| from arch_similarity_utils import compute_arch_similarity_channel | |
| try: | |
| from utils import load_kmeans_model, compute_kmeans_probabilities | |
| KMEANS_AVAILABLE = True | |
| except ImportError: | |
| KMEANS_AVAILABLE = False | |
| print("β οΈ K-Means functions not available, will skip Channel 2") | |
| # ======================== | |
| # GATE Model Definition (PyTorch - optional) | |
| # ======================== | |
| if PYTORCH_AVAILABLE: | |
| class GateMLP(nn.Module): | |
| """PyTorch version of the GATE MLP model""" | |
| def __init__(self): | |
| super(GateMLP, self).__init__() | |
| self.fc1 = nn.Linear(4, 16) | |
| self.fc2 = nn.Linear(16, 8) | |
| self.fc3 = nn.Linear(8, 1) | |
| self.relu = nn.ReLU() | |
| self.sigmoid = nn.Sigmoid() | |
| def forward(self, x): | |
| x = self.relu(self.fc1(x)) | |
| x = self.relu(self.fc2(x)) | |
| x = self.sigmoid(self.fc3(x)) | |
| return x | |
| def load_gate_model_pytorch(model_path: str, device) -> Tuple: | |
| """Load PyTorch GATE model (only if PyTorch is available)""" | |
| if not PYTORCH_AVAILABLE: | |
| raise RuntimeError("PyTorch not available") | |
| checkpoint = torch.load(model_path, map_location=device) | |
| model = GateMLP().to(device) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| model.eval() | |
| scaler_mean = checkpoint['scaler_mean'] | |
| scaler_scale = checkpoint['scaler_scale'] | |
| return model, scaler_mean, scaler_scale | |
| def load_gate_model_sklearn(model_path: str, scaler_path: str) -> Tuple: | |
| """Load scikit-learn GATE model""" | |
| model = joblib.load(model_path) | |
| scaler = joblib.load(scaler_path) | |
| scaler_mean = scaler.mean_ | |
| scaler_scale = scaler.scale_ | |
| return model, scaler_mean, scaler_scale, scaler | |
| def compute_gate_predictions_sklearn( | |
| prob_autoencoder: np.ndarray, | |
| prob_iforest: np.ndarray, | |
| prob_kmeans: np.ndarray, | |
| arch_similarity: np.ndarray, | |
| gate_model, | |
| scaler | |
| ) -> np.ndarray: | |
| """ | |
| Compute GATE predictions using scikit-learn model | |
| Args: | |
| prob_autoencoder: (num_patches, 64, 64) autoencoder scores | |
| prob_iforest: (num_patches, 64, 64) iforest scores | |
| prob_kmeans: (num_patches, 64, 64) kmeans scores | |
| arch_similarity: (num_patches, 64, 64) similarity scores | |
| gate_model: Trained sklearn GATE model | |
| scaler: Fitted StandardScaler | |
| Returns: | |
| gate_predictions: (num_patches, 64, 64) GATE probability predictions | |
| """ | |
| num_patches = prob_autoencoder.shape[0] | |
| # Average pool each channel to get per-patch features | |
| feat1 = prob_autoencoder.mean(axis=(1, 2)) # Autoencoder | |
| feat2 = prob_iforest.mean(axis=(1, 2)) # IForest | |
| feat3 = prob_kmeans.mean(axis=(1, 2)) # KMeans | |
| feat4 = arch_similarity.mean(axis=(1, 2)) # Arch similarity | |
| # Stack features: (num_patches, 4) | |
| features = np.stack([feat1, feat2, feat3, feat4], axis=1) | |
| # Normalize features using the fitted scaler | |
| features_scaled = scaler.transform(features) | |
| # Sklearn inference - get probability of class 1 (archaeological) | |
| # In compute_gate_predictions_sklearn(): | |
| predictions = gate_model.predict_proba(features_scaled)[:, 1] | |
| binary_predictions = (predictions > 0.5).astype(float) # 0 or 1 | |
| gate_predictions = broadcast_patch_scores_to_pixels(binary_predictions, patch_size=64) # (num_patches,) | |
| return gate_predictions | |
| def compute_gate_predictions_pytorch( | |
| prob_autoencoder: np.ndarray, | |
| prob_iforest: np.ndarray, | |
| prob_kmeans: np.ndarray, | |
| arch_similarity: np.ndarray, | |
| gate_model, | |
| scaler_mean: np.ndarray, | |
| scaler_scale: np.ndarray, | |
| device | |
| ) -> np.ndarray: | |
| """ | |
| Compute GATE predictions using PyTorch model | |
| Returns: | |
| gate_predictions: (num_patches, 64, 64) GATE probability predictions | |
| """ | |
| num_patches = prob_autoencoder.shape[0] | |
| # Average pool each channel to get per-patch features | |
| feat1 = prob_autoencoder.mean(axis=(1, 2)) | |
| feat2 = prob_iforest.mean(axis=(1, 2)) | |
| feat3 = prob_kmeans.mean(axis=(1, 2)) | |
| feat4 = arch_similarity.mean(axis=(1, 2)) | |
| # Stack features: (num_patches, 4) | |
| features = np.stack([feat1, feat2, feat3, feat4], axis=1) | |
| # Normalize features | |
| features_scaled = (features - scaler_mean) / scaler_scale | |
| # PyTorch inference | |
| features_tensor = torch.FloatTensor(features_scaled).to(device) | |
| with torch.no_grad(): | |
| predictions = gate_model(features_tensor).cpu().numpy().squeeze() # (num_patches,) | |
| # Broadcast predictions back to pixel level | |
| gate_predictions = broadcast_patch_scores_to_pixels(predictions, patch_size=64) | |
| return gate_predictions | |
| class BatchConfig: | |
| """Configuration for batch generation with GATE model""" | |
| # Paths | |
| PATCHES_DIR = Path('D:/All_Coding_stuff/SONAR 2.0/Final Class/patches/patches_final') | |
| AUTOENCODER_PATH = Path('D:/All_Coding_stuff/SONAR 2.0/Final Class/models/best_model_aoi.pth') | |
| # Updated: Use new 128-dim models | |
| IFOREST_MODEL_PATH = Path('D:/All_Coding_stuff/SONAR 2.0/Final Class/models/isolation_forest_model_128dim.pkl') | |
| KMEANS_MODEL_PATH = Path('D:/All_Coding_stuff/SONAR 2.0/Final Class/models/kmeans_model_128dim.pkl') | |
| ARCH_EMBEDDINGS_CSV = Path('D:/All_Coding_stuff/SONAR 2.0/GATE/Arch_embedding_only_128dim.csv') | |
| # GATE model paths - sklearn version (always works) | |
| GATE_MODEL_PKL = Path('D:/All_Coding_stuff/SONAR 2.0/GATE/gate_mlp_model.pkl') | |
| GATE_SCALER_PATH = Path('D:/All_Coding_stuff/SONAR 2.0/GATE/gate_scaler.pkl') | |
| # PyTorch GATE model (optional, only if PyTorch works) | |
| GATE_MODEL_PT = Path('D:/All_Coding_stuff/SONAR 2.0/GATE/gate_mlp_model.pt') | |
| UNIFIED_PROB_DIR = Path('D:/All_Coding_stuff/SONAR 2.0/Final Class/src/unified_probability_matrices_with_gate') | |
| # Model parameters | |
| EMBEDDING_DIM = 128 | |
| BATCH_SIZE = 32 | |
| USE_SIGMOID_SCORES = True | |
| # GATE model preference (will auto-fallback to sklearn if PyTorch unavailable) | |
| PREFER_PYTORCH_GATE = True | |
| # Channel names (now 5 channels!) | |
| CHANNEL_NAMES = ['prob_autoencoder', 'prob_iforest', 'prob_kmeans', 'arch_similarity', 'gate_prediction'] | |
| def discover_aois(patches_dir: Path) -> List[str]: | |
| """Discover all AOI names from patches directory""" | |
| all_patch_files = sorted(list(patches_dir.glob("AOI_*_all_patches.npz"))) | |
| aoi_names = [f.stem.replace('_all_patches', '') for f in all_patch_files] | |
| return aoi_names | |
| def check_existing_matrices(aoi_names: List[str], output_dir: Path) -> tuple: | |
| """Check which matrices already exist""" | |
| existing = [] | |
| missing = [] | |
| for aoi_name in aoi_names: | |
| matrix_path = output_dir / f"{aoi_name}_unified_prob_matrix.npz" | |
| if matrix_path.exists(): | |
| existing.append(aoi_name) | |
| else: | |
| missing.append(aoi_name) | |
| return existing, missing | |
| def generate_single_matrix_with_gate( | |
| aoi_name: str, | |
| config: BatchConfig, | |
| device, | |
| autoencoder, | |
| encoder, | |
| iforest, | |
| scaler_iforest, | |
| gate_model, | |
| gate_scaler_or_params, | |
| use_pytorch_gate: bool, | |
| kmeans=None, | |
| scaler_kmeans=None | |
| ): | |
| """Generate unified probability matrix with GATE predictions for a single AOI""" | |
| # Load patches | |
| all_patches_file = config.PATCHES_DIR / f"{aoi_name}_all_patches.npz" | |
| if not all_patches_file.exists(): | |
| raise FileNotFoundError(f"Patches not found: {all_patches_file}") | |
| patches, metadata = load_patches(all_patches_file) | |
| num_patches = len(patches) | |
| # Initialize unified matrix: (num_patches, 64, 64, 5) - NOW 5 CHANNELS! | |
| unified_matrix = np.zeros((num_patches, 64, 64, 5), dtype=np.float32) | |
| # Channel 0: Autoencoder | |
| prob_scores_ae, pixel_scores_ae, reconstructed_patches = compute_autoencoder_probabilities( | |
| model=autoencoder, | |
| patches=patches, | |
| metadata=metadata, | |
| device=device, | |
| batch_size=config.BATCH_SIZE, | |
| use_sigmoid_scores=config.USE_SIGMOID_SCORES | |
| ) | |
| pixel_scores_ae_norm = normalize_scores(pixel_scores_ae) | |
| unified_matrix[:, :, :, 0] = pixel_scores_ae_norm | |
| del reconstructed_patches | |
| if PYTORCH_AVAILABLE and torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| # Channel 1: Isolation Forest | |
| prob_scores_if, embeddings_if, predictions_if = compute_iforest_probabilities( | |
| encoder, iforest, scaler_iforest, patches, metadata, device, config.BATCH_SIZE | |
| ) | |
| prob_scores_if_pixels = broadcast_patch_scores_to_pixels(prob_scores_if, patch_size=64) | |
| unified_matrix[:, :, :, 1] = prob_scores_if_pixels | |
| del embeddings_if, predictions_if | |
| # Channel 2: K-Means (if available) | |
| if KMEANS_AVAILABLE and kmeans is not None: | |
| try: | |
| prob_scores_km, similarity_scores_km, cluster_assignments = compute_kmeans_probabilities( | |
| encoder, kmeans, scaler_kmeans, patches, metadata, device, config.BATCH_SIZE | |
| ) | |
| prob_scores_km_pixels = broadcast_patch_scores_to_pixels(prob_scores_km, patch_size=64) | |
| unified_matrix[:, :, :, 2] = prob_scores_km_pixels | |
| except Exception as e: | |
| print(f" β οΈ K-Means failed: {e}, setting Channel 2 to zeros") | |
| unified_matrix[:, :, :, 2] = 0.0 | |
| else: | |
| unified_matrix[:, :, :, 2] = 0.0 | |
| # Channel 3: Archaeological Similarity | |
| similarity_channel, analysis_dict = compute_arch_similarity_channel( | |
| encoder=encoder, | |
| patches=patches, | |
| metadata=metadata, | |
| arch_csv_path=str(config.ARCH_EMBEDDINGS_CSV), | |
| device=device, | |
| batch_size=config.BATCH_SIZE, | |
| embedding_dim=config.EMBEDDING_DIM, | |
| patch_size=64 | |
| ) | |
| unified_matrix[:, :, :, 3] = similarity_channel | |
| # Channel 4: GATE Model Predictions (NEW!) | |
| print(f" Computing GATE predictions...") | |
| if use_pytorch_gate: | |
| # PyTorch version | |
| gate_predictions = compute_gate_predictions_pytorch( | |
| prob_autoencoder=unified_matrix[:, :, :, 0], | |
| prob_iforest=unified_matrix[:, :, :, 1], | |
| prob_kmeans=unified_matrix[:, :, :, 2], | |
| arch_similarity=unified_matrix[:, :, :, 3], | |
| gate_model=gate_model, | |
| scaler_mean=gate_scaler_or_params[0], | |
| scaler_scale=gate_scaler_or_params[1], | |
| device=device | |
| ) | |
| else: | |
| # Sklearn version | |
| gate_predictions = compute_gate_predictions_sklearn( | |
| prob_autoencoder=unified_matrix[:, :, :, 0], | |
| prob_iforest=unified_matrix[:, :, :, 1], | |
| prob_kmeans=unified_matrix[:, :, :, 2], | |
| arch_similarity=unified_matrix[:, :, :, 3], | |
| gate_model=gate_model, | |
| scaler=gate_scaler_or_params # scaler object | |
| ) | |
| unified_matrix[:, :, :, 4] = gate_predictions | |
| # Save with updated channel names | |
| save_unified_probability_matrix( | |
| aoi_name=aoi_name, | |
| unified_matrix=unified_matrix, | |
| metadata=metadata, | |
| save_dir=config.UNIFIED_PROB_DIR, | |
| channel_names=config.CHANNEL_NAMES | |
| ) | |
| return unified_matrix.shape | |
| def main(): | |
| """Main batch generation pipeline with GATE predictions""" | |
| config = BatchConfig() | |
| # Determine device | |
| if PYTORCH_AVAILABLE: | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| else: | |
| device = 'cpu' # String for sklearn compatibility | |
| print(f"\n{'#'*80}") | |
| print(f"BATCH UNIFIED PROBABILITY MATRIX GENERATION (WITH GATE MODEL)") | |
| print(f"{'#'*80}") | |
| print(f"Device: {device}") | |
| print(f"PyTorch available: {PYTORCH_AVAILABLE}") | |
| print(f"Embedding dimension: {config.EMBEDDING_DIM}") | |
| print(f"Patches directory: {config.PATCHES_DIR}") | |
| print(f"Output directory: {config.UNIFIED_PROB_DIR}") | |
| print(f"{'#'*80}\n") | |
| # Create output directory | |
| config.UNIFIED_PROB_DIR.mkdir(exist_ok=True, parents=True) | |
| # Verify critical files | |
| critical_files = [ | |
| config.AUTOENCODER_PATH, | |
| config.IFOREST_MODEL_PATH, | |
| config.ARCH_EMBEDDINGS_CSV, | |
| config.GATE_MODEL_PKL, | |
| config.GATE_SCALER_PATH | |
| ] | |
| for file_path in critical_files: | |
| if not file_path.exists(): | |
| print(f"β CRITICAL FILE MISSING: {file_path}") | |
| sys.exit(1) | |
| print("β All critical files found\n") | |
| # Discover AOIs | |
| print("π Discovering AOIs...") | |
| all_aoi_names = discover_aois(config.PATCHES_DIR) | |
| print(f" Found {len(all_aoi_names)} AOIs") | |
| # Check existing matrices | |
| existing, missing = check_existing_matrices(all_aoi_names, config.UNIFIED_PROB_DIR) | |
| print(f"\nπ Status:") | |
| print(f" Already generated: {len(existing)}") | |
| print(f" Missing: {len(missing)}") | |
| if len(missing) == 0: | |
| print("\nβ All matrices already generated!") | |
| return | |
| # Ask user | |
| print(f"\nπ Will generate {len(missing)} missing matrices with GATE predictions") | |
| response = input("Continue? (y/n): ").strip().lower() | |
| if response != 'y': | |
| print("Cancelled.") | |
| return | |
| # Load models ONCE | |
| print(f"\nπ₯ Loading models...") | |
| # 1. Load Autoencoder | |
| print(" Loading autoencoder...") | |
| if PYTORCH_AVAILABLE: | |
| autoencoder = ResUNetAutoencoder(in_channels=7).to(device) | |
| autoencoder.load_state_dict(torch.load(config.AUTOENCODER_PATH, map_location=device)) | |
| autoencoder.eval() | |
| print(f" β Autoencoder loaded (latent dim: 256)") | |
| else: | |
| print(" β Cannot load autoencoder without PyTorch") | |
| sys.exit(1) | |
| # 2. Load Encoder for IForest and K-Means | |
| print(f" Loading encoder (embedding_dim={config.EMBEDDING_DIM})...") | |
| encoder = ResUNetEncoder(in_channels=7, embedding_dim=config.EMBEDDING_DIM).to(device) | |
| encoder.load_from_autoencoder(str(config.AUTOENCODER_PATH)) | |
| encoder.eval() | |
| print(f" β Encoder loaded and ready") | |
| # 3. Load Isolation Forest | |
| print(" Loading Isolation Forest...") | |
| iforest, scaler_iforest = load_model(str(config.IFOREST_MODEL_PATH)) | |
| # 4. Load K-Means (optional) | |
| kmeans, scaler_kmeans = None, None | |
| if KMEANS_AVAILABLE and config.KMEANS_MODEL_PATH.exists(): | |
| print(" Loading K-Means...") | |
| try: | |
| kmeans, scaler_kmeans = load_kmeans_model(str(config.KMEANS_MODEL_PATH)) | |
| except Exception as e: | |
| print(f" β οΈ Could not load K-Means: {e}") | |
| # 5. Load GATE Model (NEW!) | |
| print(" Loading GATE model...") | |
| use_pytorch_gate = False | |
| gate_scaler_or_params = None | |
| # Try PyTorch first if available and preferred | |
| if PYTORCH_AVAILABLE and config.PREFER_PYTORCH_GATE and config.GATE_MODEL_PT.exists(): | |
| try: | |
| gate_model, scaler_mean, scaler_scale = load_gate_model_pytorch( | |
| str(config.GATE_MODEL_PT), device | |
| ) | |
| gate_scaler_or_params = (scaler_mean, scaler_scale) | |
| use_pytorch_gate = True | |
| print(f" β GATE PyTorch model loaded") | |
| except Exception as e: | |
| print(f" β οΈ Could not load PyTorch GATE model: {e}") | |
| print(f" Falling back to scikit-learn model...") | |
| # Fallback to sklearn | |
| if not use_pytorch_gate: | |
| gate_model, scaler_mean, scaler_scale, scaler = load_gate_model_sklearn( | |
| str(config.GATE_MODEL_PKL), str(config.GATE_SCALER_PATH) | |
| ) | |
| gate_scaler_or_params = scaler # Pass the whole scaler object | |
| print(f" β GATE scikit-learn model loaded") | |
| print("β All models loaded\n") | |
| print(f"Using {'PyTorch' if use_pytorch_gate else 'Scikit-learn'} GATE model\n") | |
| # Process missing AOIs | |
| print(f"{'='*80}") | |
| print(f"PROCESSING {len(missing)} AOIs") | |
| print(f"{'='*80}\n") | |
| success_count = 0 | |
| fail_count = 0 | |
| for i, aoi_name in enumerate(missing, 1): | |
| print(f"[{i}/{len(missing)}] {aoi_name}") | |
| try: | |
| # Generate matrix with GATE predictions | |
| shape = generate_single_matrix_with_gate( | |
| aoi_name, | |
| config, | |
| device, | |
| autoencoder, | |
| encoder, | |
| iforest, | |
| scaler_iforest, | |
| gate_model, | |
| gate_scaler_or_params, | |
| use_pytorch_gate, | |
| kmeans, | |
| scaler_kmeans | |
| ) | |
| print(f" β Success! Shape: {shape} (5 channels including GATE)\n") | |
| success_count += 1 | |
| except Exception as e: | |
| print(f" β Failed: {e}\n") | |
| fail_count += 1 | |
| import traceback | |
| traceback.print_exc() | |
| # Summary | |
| print(f"\n{'='*80}") | |
| print(f"BATCH GENERATION COMPLETE") | |
| print(f"{'='*80}") | |
| print(f"β Successful: {success_count}/{len(missing)}") | |
| print(f"β Failed: {fail_count}/{len(missing)}") | |
| print(f"{'='*80}\n") | |
| if success_count > 0: | |
| print(f"β Generated matrices saved to: {config.UNIFIED_PROB_DIR}") | |
| print(f"β Each matrix now has 5 channels:") | |
| print(f" - Channel 0: Autoencoder anomaly scores") | |
| print(f" - Channel 1: Isolation Forest probabilities") | |
| print(f" - Channel 2: K-Means probabilities") | |
| print(f" - Channel 3: Archaeological similarity") | |
| print(f" - Channel 4: GATE final predictions β") | |
| if __name__ == "__main__": | |
| main() |