""" Comprehensive Model Evaluation Script for Code-Specialized Embedding Models. This script evaluates embedding models on both task performance and operational metrics: Task Performance: - CodeSearchNet evaluation (NDCG, MRR, Recall metrics) - Code search accuracy across programming languages Operational Performance: - Inference speed (latency and throughput) - Memory efficiency (RAM and GPU usage) - Model size and storage requirements - CPU vs GPU performance scaling Usage: distiller evaluate [--use-beam] [--skip-benchmark] # Run evaluation locally or on Beam """ # Try to import flash_attn to check if it's available import contextlib import importlib.util import json import logging import os import time import traceback from pathlib import Path from typing import Any import numpy as np import pandas as pd import psutil import torch import typer from beam import function from datasets import Dataset, load_dataset from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity from tqdm import tqdm from .beam_utils import download_specific_evaluation_file from .config import ( DEFAULT_EVALUATION_MODELS, codesearchnet_config, directories, get_evaluation_function_kwargs, get_safe_model_name, get_volume_config, languages_config, ) # Check if flash_attn is available and compatible FLASH_ATTN_AVAILABLE = importlib.util.find_spec("flash_attn") is not None logger = logging.getLogger(__name__) # ============================================================================= # EVALUATION CONFIGURATION # ============================================================================= BATCH_SIZE = 10 LOCAL_EVALUATION_DIR = directories.evaluation_results LOCAL_BENCHMARK_DIR = directories.benchmark_results LOCAL_MODELS_DIR = directories.final VOLUME_CONFIG = get_volume_config() # ============================================================================= # CORE EVALUATION CLASSES # ============================================================================= # Sample texts for benchmarking (various lengths) BENCHMARK_TEXTS = { "short": [ "def add(a, b): return a + b", "function multiply(x, y) { return x * y; }", "class Calculator { public int subtract(int a, int b) { return a - b; } }", ] * 100, # 300 short texts "medium": [ "def fibonacci(n):\n if n <= 1:\n return n\n return fibonacci(n-1) + fibonacci(n-2)", "function quickSort(arr) {\n if (arr.length <= 1) return arr;\n const pivot = arr[arr.length - 1];\n const left = [], right = [];\n for (let i = 0; i < arr.length - 1; i++) {\n if (arr[i] < pivot) left.push(arr[i]);\n else right.push(arr[i]);\n }\n return [...quickSort(left), pivot, ...quickSort(right)];\n}", ] * 50, # 100 medium texts "long": [ """ def complex_algorithm(data, config): ''' Complex data processing algorithm with multiple steps. ''' results = [] # Data validation and processing steps... return results """.strip(), ] * 20, # 20 long texts } def reset_cuda_state() -> None: """Aggressively reset CUDA state after memory allocation errors.""" if not torch.cuda.is_available(): return try: # Clear all CUDA caches torch.cuda.empty_cache() torch.cuda.ipc_collect() torch.cuda.reset_peak_memory_stats() # Try to force garbage collection import gc gc.collect() logger.info("๐Ÿงน CUDA state reset completed") except Exception as e: logger.warning(f"โš ๏ธ Could not fully reset CUDA state: {e}") def configure_flash_attention() -> dict[str, Any]: """Configure flash attention settings and return model kwargs.""" model_kwargs: dict[str, Any] = {} if not FLASH_ATTN_AVAILABLE: logger.info("โš ๏ธ Flash attention not available - using standard attention") return model_kwargs # Set environment variables for flash attention and CUDA memory management os.environ["TOKENIZERS_PARALLELISM"] = "false" os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" # Check if we're on a compatible GPU try: if torch.cuda.is_available(): device_capability = torch.cuda.get_device_capability() # Flash attention requires compute capability >= 7.5 (Turing, Ampere, Ada, Hopper) if device_capability[0] >= 7 and (device_capability[0] > 7 or device_capability[1] >= 5): logger.info("โœ… Flash attention available - compatible GPU detected") # For SentenceTransformer, we'll use environment variables to enable flash attention os.environ["TRANSFORMERS_FLASH_ATTENTION"] = "1" else: logger.info(f"โš ๏ธ GPU compute capability {device_capability} < 7.5 - flash attention disabled") else: logger.info("โš ๏ธ No CUDA available - flash attention disabled") except Exception as e: logger.warning(f"โš ๏ธ Failed to check GPU compatibility: {e} - flash attention disabled") return model_kwargs def load_model_with_flash_attention(model_path: str, device: str = "auto") -> SentenceTransformer: """Load a SentenceTransformer model with flash attention if available.""" # Convert "auto" device to actual device target_device = "cuda" if device == "auto" and torch.cuda.is_available() else device if device == "auto" and not torch.cuda.is_available(): target_device = "cpu" # Configure flash attention via environment variables configure_flash_attention() # Load model with standard SentenceTransformer initialization logger.info(f"๐Ÿ“‚ Loading model: {Path(model_path).name}") try: # Try loading directly to target device first model = SentenceTransformer(model_path, device=target_device, trust_remote_code=True) logger.info(f"โœ… Model loaded successfully on {target_device}") return model except (torch.OutOfMemoryError, RuntimeError) as oom_error: # Handle both torch.OutOfMemoryError and RuntimeError (CUDA driver errors) is_oom = isinstance(oom_error, torch.OutOfMemoryError) or "out of memory" in str(oom_error).lower() if is_oom and target_device != "cpu": logger.warning(f"โš ๏ธ OOM loading directly to {target_device}, trying CPU first: {oom_error}") try: # Clear CUDA cache more aggressively after OOM reset_cuda_state() logger.info("๐Ÿ”„ Loading model on CPU first, then trying to move to GPU...") model = SentenceTransformer(model_path, device="cpu", trust_remote_code=True) logger.info("๐Ÿ“ฆ Model loaded on CPU, attempting GPU transfer...") # Try moving to GPU with additional error handling try: model = model.to(target_device) logger.info(f"โœ… Model successfully moved to {target_device}") return model except (RuntimeError, AssertionError) as gpu_move_error: # Handle PyTorch internal errors and CUDA allocator issues logger.warning(f"โš ๏ธ GPU transfer failed: {gpu_move_error}") if "INTERNAL ASSERT FAILED" in str(gpu_move_error) or "handles_" in str(gpu_move_error): logger.warning("๐Ÿ”ง Detected CUDA allocator corruption, resetting and staying on CPU") # Try to reset CUDA context reset_cuda_state() else: # Re-raise unexpected GPU transfer errors raise logger.info("โœ… Model will remain on CPU due to GPU memory issues") return model except Exception as fallback_error: logger.warning(f"โš ๏ธ CPU fallback failed: {fallback_error}, loading fresh on CPU") # Clear any remaining CUDA state reset_cuda_state() model = SentenceTransformer(model_path, device="cpu", trust_remote_code=True) logger.info("โœ… Model loaded on CPU (all GPU attempts failed)") return model else: # Re-raise if not OOM or already on CPU raise except ValueError as e: if "'MaxSim' is not a valid SimilarityFunction" in str(e): logger.warning(f"โš ๏ธ Model {Path(model_path).name} uses unsupported MaxSim similarity function") logger.info("๐Ÿ”ง Attempting workaround by loading with custom config...") # Try loading with similarity function override try: # Load model components manually and override similarity function import json import tempfile from pathlib import Path as PathLib # Create temporary directory for modified config with tempfile.TemporaryDirectory() as temp_dir: temp_path = PathLib(temp_dir) / "temp_model" # Download/copy model files if model_path.startswith("http") or ("/" in model_path and not PathLib(model_path).exists()): # It's a HuggingFace model ID from huggingface_hub import snapshot_download snapshot_download(model_path, local_dir=temp_path, ignore_patterns=["*.bin"]) else: # It's a local path import shutil shutil.copytree(model_path, temp_path) # Modify config to use supported similarity function config_path = temp_path / "config_sentence_transformers.json" if config_path.exists(): with config_path.open() as f: config = json.load(f) # Override similarity function to 'cosine' (supported) if "similarity_fn_name" in config: logger.info( f"๐Ÿ”ง Changing similarity function from '{config['similarity_fn_name']}' to 'cosine'" ) config["similarity_fn_name"] = "cosine" with config_path.open("w") as f: json.dump(config, f, indent=2) # Load model with modified config model = SentenceTransformer(str(temp_path), device=device, trust_remote_code=True) logger.info("โœ… Model loaded successfully with similarity function workaround") return model except Exception as workaround_error: logger.warning(f"โš ๏ธ Similarity function workaround failed: {workaround_error}") logger.info("๐Ÿ”ง Attempting direct model component loading...") # Last resort: try loading model components directly try: from sentence_transformers.models import Pooling, Transformer # Load model components manually logger.info("๐Ÿ”„ Loading model components directly...") # Create SentenceTransformer components using model path transformer = Transformer(model_path) pooling = Pooling(transformer.get_word_embedding_dimension()) # Create SentenceTransformer with manual components model = SentenceTransformer(modules=[transformer, pooling], device=device) logger.info("โœ… Model loaded successfully with direct component loading") return model except Exception as direct_error: logger.warning(f"โš ๏ธ Direct component loading failed: {direct_error}") logger.exception(f"โŒ All loading methods failed for {Path(model_path).name}") raise e from direct_error else: raise except Exception: logger.exception(f"โŒ Failed to load model {Path(model_path).name}") raise class PerformanceBenchmark: """Comprehensive performance benchmarking for embedding models.""" def __init__(self, model_path: str, model_name: str | None = None) -> None: """Initialize benchmarker with model.""" self.model_path = model_path self.model_name = model_name or Path(model_path).name self.model: SentenceTransformer | None = None self.device = "cuda" if torch.cuda.is_available() else "cpu" self.results: dict[str, Any] = {} def load_model(self) -> None: """Load the embedding model.""" logger.info(f"Loading model from {self.model_path}") start_time = time.time() try: self.model = load_model_with_flash_attention(self.model_path, self.device) load_time = time.time() - start_time logger.info(f"โœ… Model loaded in {load_time:.2f}s on {self.device}") self.results["model_load_time"] = load_time except Exception: logger.exception("โŒ Failed to load model") self.results["error"] = traceback.format_exc() raise def measure_model_size(self) -> dict[str, float]: """Measure model size metrics.""" logger.info("๐Ÿ“ Measuring model size...") size_metrics: dict[str, Any] = {} # Disk size try: if Path(self.model_path).is_dir(): # Local directory - calculate size of model files only model_extensions = {".safetensors", ".bin", ".json", ".txt", ".tokenizer"} total_size = 0 model_dir = Path(self.model_path) for file_path in model_dir.rglob("*"): if file_path.is_file() and ( file_path.suffix.lower() in model_extensions or "tokenizer" in file_path.name.lower() ): total_size += file_path.stat().st_size size_metrics["disk_size_mb"] = total_size / (1024 * 1024) # HuggingFace model - estimate based on model parameters elif self.model is not None: param_count = sum(p.numel() for p in self.model.parameters()) # Rough estimate: 4 bytes per parameter (float32) estimated_size = param_count * 4 size_metrics["disk_size_mb"] = estimated_size / (1024 * 1024) else: size_metrics["disk_size_mb"] = 0.0 except Exception as e: logger.warning(f"โš ๏ธ Could not calculate disk size: {e}") size_metrics["disk_size_mb"] = 0.0 # Memory size (if model is loaded) if self.model is not None: try: # Parameter count param_count = sum(p.numel() for p in self.model.parameters()) size_metrics["parameter_count"] = param_count size_metrics["parameters_millions"] = param_count / 1e6 # Memory usage estimate param_size = sum(p.numel() * p.element_size() for p in self.model.parameters()) buffer_size = sum(b.numel() * b.element_size() for b in self.model.buffers()) size_metrics["memory_size_mb"] = (param_size + buffer_size) / (1024 * 1024) size_metrics["ram_usage_mb"] = size_metrics["memory_size_mb"] # GPU memory if using CUDA if self.device == "cuda" and torch.cuda.is_available(): size_metrics["gpu_memory_mb"] = torch.cuda.memory_allocated() / (1024 * 1024) size_metrics["gpu_name"] = torch.cuda.get_device_name(0) # Embedding dimension if available if hasattr(self.model, "get_sentence_embedding_dimension"): size_metrics["embedding_dim"] = self.model.get_sentence_embedding_dimension() except Exception as e: logger.warning(f"โš ๏ธ Could not calculate memory size: {e}") # Update results self.results["size_metrics"] = size_metrics return size_metrics def benchmark_inference_speed(self, batch_sizes: list[int] | None = None) -> dict[str, Any]: """Benchmark inference speed with different batch sizes.""" if batch_sizes is None: batch_sizes = [1, 8, 16, 32] logger.info(f"โšก Benchmarking inference speed with batch sizes: {batch_sizes}") if self.model is None: self.load_model() speed_results: dict[str, Any] = {"medium": {}} # Use medium-length texts for speed testing test_texts = BENCHMARK_TEXTS["medium"] for batch_size in batch_sizes: logger.info(f" ๐Ÿ“Š Testing batch size: {batch_size}") # Prepare batch batch = ( test_texts[:batch_size] if batch_size <= len(test_texts) else test_texts * ((batch_size // len(test_texts)) + 1) ) batch = batch[:batch_size] # Warmup if self.model is not None: _ = self.model.encode(batch[: min(4, len(batch))], convert_to_tensor=False) # Benchmark multiple runs latencies = [] num_runs = max(3, 20 // batch_size) # More runs for smaller batches for _ in range(num_runs): start_time = time.time() if self.model is not None: _ = self.model.encode(batch, convert_to_tensor=False, normalize_embeddings=True) end_time = time.time() latencies.append(end_time - start_time) # Calculate metrics avg_latency = sum(latencies) / len(latencies) throughput = batch_size / avg_latency time_per_text_ms = (avg_latency / batch_size) * 1000 batch_key = f"batch_{batch_size}" speed_results["medium"][batch_key] = { "time_per_text_ms": time_per_text_ms, "texts_per_second": throughput, "tokens_per_second": throughput * 50, # Estimate 50 tokens per text } logger.info(f" โšก Latency: {avg_latency:.3f}s, Throughput: {throughput:.1f} texts/sec") # Update results self.results["speed_benchmarks"] = speed_results return speed_results def benchmark_memory_scaling(self, batch_sizes: list[int] | None = None) -> dict[str, Any]: """Benchmark memory usage scaling with batch size.""" if batch_sizes is None: batch_sizes = [1, 8, 16, 32] logger.info(f"๐Ÿง  Benchmarking memory scaling with batch sizes: {batch_sizes}") if self.model is None: self.load_model() memory_results: dict[str, Any] = {} test_texts = BENCHMARK_TEXTS["medium"] for batch_size in batch_sizes: logger.info(f" ๐Ÿ“Š Testing memory with batch size: {batch_size}") # Prepare batch batch = ( test_texts[:batch_size] if batch_size <= len(test_texts) else test_texts * ((batch_size // len(test_texts)) + 1) ) batch = batch[:batch_size] # Clear GPU cache if using CUDA if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.reset_peak_memory_stats() try: # Run inference if self.model is not None: _ = self.model.encode(batch, convert_to_tensor=False) # Measure peak memory if torch.cuda.is_available(): peak_memory = torch.cuda.max_memory_allocated() / (1024 * 1024) memory_per_text = peak_memory / batch_size else: # Use psutil for CPU memory (less accurate) peak_memory = psutil.virtual_memory().used / (1024 * 1024) memory_per_text = 0 # Can't accurately measure per-text on CPU batch_key = f"batch_{batch_size}" memory_results[batch_key] = { "memory_used_mb": peak_memory, "memory_per_text_mb": memory_per_text, "oom": False, } logger.info(f" ๐Ÿง  Peak memory: {peak_memory:.1f}MB, Per text: {memory_per_text:.2f}MB") except Exception as e: logger.warning(f"โš ๏ธ Memory benchmark failed for batch {batch_size}: {e}") batch_key = f"batch_{batch_size}" memory_results[batch_key] = { "oom": True, "error": str(e), } self.results["memory_benchmarks"] = memory_results return memory_results def benchmark_cpu_vs_gpu(self) -> dict[str, Any]: """Compare CPU vs GPU performance.""" logger.info("โš–๏ธ Benchmarking CPU vs GPU performance") if not torch.cuda.is_available(): logger.warning("โš ๏ธ CUDA not available - skipping GPU benchmark") return {} comparison_results: dict[str, Any] = {} test_texts = BENCHMARK_TEXTS["medium"][:16] # Use 16 texts for comparison for device in ["cpu", "cuda"]: logger.info(f" ๐Ÿ“Š Testing on {device.upper()}") try: model = load_model_with_flash_attention(self.model_path, device) # Warmup _ = model.encode(test_texts[:4], convert_to_tensor=False) # Benchmark start_time = time.time() _ = model.encode(test_texts, convert_to_tensor=False, normalize_embeddings=True) end_time = time.time() latency = end_time - start_time throughput = len(test_texts) / latency comparison_results[device] = { "texts_per_second": throughput, } logger.info(f" โšก {device.upper()}: {latency:.3f}s, {throughput:.1f} texts/sec") # Clean up del model if device == "cuda": torch.cuda.empty_cache() except Exception as e: logger.warning(f"โš ๏ธ Failed to benchmark {device}: {e}") comparison_results[device] = {"error": str(e)} # Calculate speedup if "cpu" in comparison_results and "cuda" in comparison_results: cpu_throughput = comparison_results["cpu"].get("texts_per_second", 0) gpu_throughput = comparison_results["cuda"].get("texts_per_second", 0) if cpu_throughput > 0: speedup = gpu_throughput / cpu_throughput comparison_results["gpu_speedup"] = speedup logger.info(f" ๐Ÿš€ GPU Speedup: {speedup:.1f}x") self.results["cpu_vs_gpu"] = comparison_results return comparison_results def run_comprehensive_benchmark(self) -> dict[str, Any]: """Run all benchmarks and return comprehensive results.""" logger.info(f"๐Ÿ Starting comprehensive benchmark for {self.model_name}") # Model information self.results["model_name"] = self.model_name self.results["model_path"] = self.model_path self.results["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S") # Run all benchmarks try: self.load_model() self.measure_model_size() self.benchmark_inference_speed([1, 8, 16, 32]) self.benchmark_memory_scaling([1, 8, 16, 32]) self.benchmark_cpu_vs_gpu() logger.info(f"โœ… Comprehensive benchmark completed for {self.model_name}") except Exception: logger.exception(f"โŒ Benchmark failed for {self.model_name}") self.results["error"] = traceback.format_exc() return self.results class CodeSearchNetEvaluator: """Evaluator for CodeSearchNet-style code search tasks.""" def __init__(self, model_path: str, model_name: str | None = None) -> None: """Initialize the evaluator with a model.""" self.model_path = model_path self.model_name = model_name or Path(model_path).name self.model: SentenceTransformer | None = None self._load_model() def _load_model(self) -> None: """Load the embedding model.""" logger.info(f"Loading model from {self.model_path}") try: self.model = load_model_with_flash_attention(self.model_path) logger.info(f"Successfully loaded model: {self.model_name}") except Exception: logger.exception(f"Failed to load model from {self.model_path}") raise def encode_texts(self, texts: list[str], desc: str = "Encoding") -> np.ndarray: """Encode texts into embeddings with batching and memory management.""" if self.model is None: msg = "Model not loaded" raise RuntimeError(msg) embeddings = [] # Use smaller batch size to avoid OOM effective_batch_size = min(BATCH_SIZE, 5) # Limit to 5 for large models for i in tqdm(range(0, len(texts), effective_batch_size), desc=desc): batch = texts[i : i + effective_batch_size] try: batch_embeddings = self.model.encode(batch, convert_to_tensor=False, normalize_embeddings=True) embeddings.append(batch_embeddings) # Clear CUDA cache periodically to prevent memory buildup if torch.cuda.is_available() and i > 0 and i % (effective_batch_size * 4) == 0: torch.cuda.empty_cache() except (torch.OutOfMemoryError, RuntimeError) as e: # Handle both torch.OutOfMemoryError and RuntimeError (CUDA driver errors) is_oom = isinstance(e, torch.OutOfMemoryError) or "out of memory" in str(e).lower() if is_oom: logger.warning( f"โš ๏ธ OOM during encoding batch {i // effective_batch_size + 1}, trying smaller batch..." ) # Try encoding one at a time for single_text in batch: try: single_embedding = self.model.encode( [single_text], convert_to_tensor=False, normalize_embeddings=True ) embeddings.append(single_embedding) if torch.cuda.is_available(): torch.cuda.empty_cache() except (torch.OutOfMemoryError, RuntimeError) as single_e: if isinstance(single_e, torch.OutOfMemoryError) or "out of memory" in str(single_e).lower(): logger.exception("โŒ Cannot encode even single text, model too large for GPU") raise raise else: raise return np.vstack(embeddings) def evaluate_language(self, language: str, max_queries: int = 100) -> dict[str, Any]: """Evaluate on a specific programming language.""" logger.info(f"Evaluating on {language} language (max {max_queries} queries)") try: # Load ONLY test split for the language with streaming to avoid loading full dataset logger.info(f"๐Ÿ“ฅ Loading test split for {language}...") dataset = load_dataset( codesearchnet_config.dataset_name, language, split=f"test[:{max_queries * 10}]", # Load 10x more than needed to ensure we get enough valid pairs trust_remote_code=True, ) if not isinstance(dataset, Dataset): logger.error(f"Unexpected dataset type for {language}: {type(dataset)}") return {} logger.info(f"๐Ÿ“Š Loaded {len(dataset)} examples from {language} test split") queries: list[str] = [] codes: list[str] = [] query_ids: list[str] = [] # Process examples and stop once we have enough valid pairs for i, example in enumerate(dataset): if len(queries) >= max_queries: # Stop once we have enough break doc_string = example.get("func_documentation_string", "").strip() code_string = example.get("func_code_string", "").strip() if doc_string and code_string and len(doc_string.split()) >= 3: queries.append(doc_string) codes.append(code_string) query_ids.append(f"{language}_{i}") if len(queries) == 0: logger.warning(f"No valid query-code pairs found for {language}") return {} # Truncate to exactly max_queries if we have more if len(queries) > max_queries: queries = queries[:max_queries] codes = codes[:max_queries] query_ids = query_ids[:max_queries] logger.info(f"Found {len(queries)} valid query-code pairs for {language}") # Check available memory before encoding if torch.cuda.is_available(): free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated() free_gb = free_memory / (1024**3) logger.info(f"๐Ÿ’พ Available GPU memory before encoding: {free_gb:.1f} GB") if free_gb < 2.0: # Less than 2GB free logger.warning(f"โš ๏ธ Low GPU memory ({free_gb:.1f} GB), using conservative encoding") torch.cuda.empty_cache() # Encode queries and codes start_time = time.time() query_embeddings = self.encode_texts(queries, f"Encoding {language} queries") code_embeddings = self.encode_texts(codes, f"Encoding {language} code") encoding_time = time.time() - start_time # Compute similarities and metrics similarities = cosine_similarity(query_embeddings, code_embeddings) metrics = self._compute_retrieval_metrics(similarities) # Prepare results results = { "language": language, "model_name": self.model_name, "num_queries": len(queries), "encoding_time_seconds": encoding_time, "metrics": metrics, } logger.info(f"โœ… {language} evaluation completed in {encoding_time:.2f}s") return results except Exception: logger.exception(f"โŒ Failed to evaluate {language}") return {} def _compute_retrieval_metrics(self, similarities: np.ndarray) -> dict[str, float]: """Compute retrieval metrics from similarity matrix.""" n_queries = similarities.shape[0] # For each query, the correct code is at the same index correct_indices = np.arange(n_queries) # Rank all codes for each query ranked_indices = np.argsort(similarities, axis=1)[:, ::-1] metrics = {} # Compute metrics for different k values for k in [1, 5, 10]: if k <= similarities.shape[1]: # Recall@k recall_k = np.mean([correct_indices[i] in ranked_indices[i, :k] for i in range(n_queries)]) metrics[f"recall@{k}"] = recall_k # NDCG@k ndcg_k = np.mean( [self._compute_ndcg(ranked_indices[i], correct_indices[i], k) for i in range(n_queries)] ) metrics[f"ndcg@{k}"] = ndcg_k # Mean Reciprocal Rank reciprocal_ranks = [] for i in range(n_queries): rank = np.where(ranked_indices[i] == correct_indices[i])[0] if len(rank) > 0: reciprocal_ranks.append(1.0 / (rank[0] + 1)) else: reciprocal_ranks.append(0.0) metrics["mrr"] = np.mean(reciprocal_ranks) # Add mean rank and median rank mean_ranks = [] for i in range(n_queries): rank = np.where(ranked_indices[i] == correct_indices[i])[0] if len(rank) > 0: mean_ranks.append(rank[0] + 1) # 1-indexed else: mean_ranks.append(similarities.shape[1]) # Worst possible rank metrics["mean_rank"] = np.mean(mean_ranks) metrics["median_rank"] = np.median(mean_ranks) # Ensure all values are float return {k: float(v) for k, v in metrics.items()} def _compute_ndcg(self, ranked_indices: np.ndarray, correct_idx: int, k: int) -> float: """Compute NDCG@k for a single query.""" if correct_idx in ranked_indices[:k]: rank = np.where(ranked_indices[:k] == correct_idx)[0][0] return 1.0 / np.log2(rank + 2) return 0.0 def evaluate_all_languages( self, max_queries_per_lang: int = 100, languages: list[str] | None = None ) -> dict[str, Any]: """Evaluate on all specified languages.""" eval_languages = languages or languages_config.all logger.info(f"๐Ÿš€ Starting evaluation on {len(eval_languages)} languages") logger.info(f"๐Ÿ“Š Model: {self.model_name}") logger.info(f"๐Ÿ”ข Max queries per language: {max_queries_per_lang}") start_time = time.time() results = { "model_name": self.model_name, "model_path": self.model_path, "languages": {}, "overall": {}, "evaluation_time_seconds": 0, } languages_dict: dict[str, Any] = {} # Evaluate each language for language in eval_languages: logger.info(f"\n{'=' * 50}") logger.info(f"๐Ÿ” Evaluating {language}") logger.info(f"{'=' * 50}") lang_results = self.evaluate_language(language, max_queries_per_lang) if lang_results: languages_dict[language] = lang_results results["languages"] = languages_dict # Compute overall metrics if languages_dict: overall_metrics = {} metric_names = list(next(iter(languages_dict.values()))["metrics"].keys()) for metric in metric_names: values = [languages_dict[lang]["metrics"][metric] for lang in languages_dict] overall_metrics[metric] = np.mean(values) results["overall"] = overall_metrics total_time = time.time() - start_time results["evaluation_time_seconds"] = total_time logger.info(f"Evaluation completed in {total_time:.2f} seconds") return results class ComprehensiveModelEvaluator: """Combined evaluator for both task performance and operational benchmarks.""" def __init__(self, model_path: str, model_name: str | None = None) -> None: """Initialize the comprehensive evaluator with a model.""" self.model_path = model_path self.model_name = model_name or Path(model_path).name # Initialize sub-evaluators self.codesearch_evaluator = CodeSearchNetEvaluator(model_path, model_name) self.performance_benchmarker = PerformanceBenchmark(model_path, model_name) self.results: dict[str, Any] = {} def run_comprehensive_evaluation( self, max_queries_per_lang: int = 100, languages: list[str] | None = None, skip_benchmark: bool = False, ) -> dict[str, Any]: """Run both CodeSearchNet evaluation and performance benchmarking.""" logger.info(f"๐Ÿš€ Starting comprehensive evaluation for {self.model_name}") start_time = time.time() # Initialize results structure self.results = { "model_name": self.model_name, "model_path": self.model_path, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "evaluation_time_seconds": 0, } try: # 1. Run CodeSearchNet evaluation logger.info("๐Ÿ” Running CodeSearchNet task evaluation...") codesearch_results = self.codesearch_evaluator.evaluate_all_languages(max_queries_per_lang, languages) # Extract CodeSearchNet metrics self.results.update( { "codesearch_languages": codesearch_results.get("languages", {}), "codesearch_overall": codesearch_results.get("overall", {}), } ) # 2. Run performance benchmarking (unless skipped) if not skip_benchmark: logger.info("โšก Running operational performance benchmarking...") benchmark_results = self.performance_benchmarker.run_comprehensive_benchmark() # Extract benchmark metrics self.results.update( { "size_metrics": benchmark_results.get("size_metrics", {}), "speed_benchmarks": benchmark_results.get("speed_benchmarks", {}), "memory_benchmarks": benchmark_results.get("memory_benchmarks", {}), "cpu_vs_gpu": benchmark_results.get("cpu_vs_gpu", {}), } ) else: logger.info("โญ๏ธ Skipping performance benchmarking") self.results["benchmark_skipped"] = True except Exception as e: logger.exception(f"โŒ Comprehensive evaluation failed for {self.model_name}") self.results["error"] = str(e) # Calculate total time total_time = time.time() - start_time self.results["evaluation_time_seconds"] = total_time logger.info(f"โœ… Comprehensive evaluation completed in {total_time:.2f} seconds") return self.results def print_summary(self) -> None: """Print a comprehensive summary of all results.""" logger.info(f"\n{'=' * 60}") logger.info(f"๐Ÿ“Š COMPREHENSIVE EVALUATION RESULTS: {self.model_name}") logger.info(f"{'=' * 60}") # CodeSearchNet results overall = self.results.get("codesearch_overall", {}) if overall: logger.info("๐Ÿ” CodeSearchNet Performance:") for metric, value in overall.items(): logger.info(f" ๐ŸŽฏ {metric.upper()}: {value:.4f}") # Benchmark results if not self.results.get("benchmark_skipped", False): size_metrics = self.results.get("size_metrics", {}) if size_metrics: logger.info(f"\n๐Ÿ“ Model Size: {size_metrics.get('disk_size_mb', 0):.1f}MB") if "parameters_millions" in size_metrics: logger.info(f"๐Ÿ”ข Parameters: {size_metrics['parameters_millions']:.1f}M") speed_benchmarks = self.results.get("speed_benchmarks", {}) if "medium" in speed_benchmarks and "batch_32" in speed_benchmarks["medium"]: batch_32 = speed_benchmarks["medium"]["batch_32"] logger.info(f"โšก Throughput (batch 32): {batch_32.get('texts_per_second', 0):.1f} texts/sec") cpu_vs_gpu = self.results.get("cpu_vs_gpu", {}) if "gpu_speedup" in cpu_vs_gpu: speedup = cpu_vs_gpu["gpu_speedup"] logger.info(f"๐Ÿš€ GPU speedup: {speedup:.1f}x") # Language breakdown languages = self.results.get("codesearch_languages", {}) if languages: logger.info("\n๐Ÿ“‹ Language Breakdown:") for lang, lang_results in languages.items(): metrics = lang_results.get("metrics", {}) ndcg10 = metrics.get("ndcg@10", 0) mrr = metrics.get("mrr", 0) logger.info(f" {lang}: NDCG@10={ndcg10:.4f}, MRR={mrr:.4f}") # ============================================================================= # UTILITY FUNCTIONS # ============================================================================= def check_existing_results(model_name: str, local_dir: str = LOCAL_EVALUATION_DIR) -> dict[str, Any] | None: """Check if comprehensive evaluation results already exist for a model.""" local_path = Path(local_dir) safe_model_name = get_safe_model_name(model_name) # Check for new comprehensive format first comprehensive_file = local_path / f"comprehensive_eval_{safe_model_name}.json" if comprehensive_file.exists(): try: with comprehensive_file.open("r") as f: results = json.load(f) logger.info(f"โœ… Found existing comprehensive results for {model_name}") return results except Exception as e: logger.warning(f"โš ๏ธ Could not load existing comprehensive results for {model_name}: {e}") # Fallback to legacy codesearchnet format for backward compatibility legacy_file = local_path / f"codesearchnet_eval_{safe_model_name}.json" if legacy_file.exists(): try: with legacy_file.open("r") as f: results = json.load(f) logger.info(f"โœ… Found existing legacy results for {model_name}") return results except Exception as e: logger.warning(f"โš ๏ธ Could not load existing legacy results for {model_name}: {e}") return None def save_evaluation_results(results: dict[str, Any], local_dir: str = LOCAL_EVALUATION_DIR) -> bool: """Save comprehensive evaluation results to local directory as a single JSON file.""" try: local_path = Path(local_dir) local_path.mkdir(parents=True, exist_ok=True) model_name = results.get("model_name", "unknown") safe_model_name = get_safe_model_name(model_name) # Save single comprehensive results file (CodeSearchNet + Benchmark combined) result_file = local_path / f"comprehensive_eval_{safe_model_name}.json" with result_file.open("w") as f: json.dump(results, f, indent=2, default=str) logger.info(f"๐Ÿ’พ Saved comprehensive evaluation results for {model_name}") return True except Exception: logger.exception("โŒ Error saving evaluation results") return False def discover_local_models(models_dir: str = LOCAL_MODELS_DIR) -> list[str]: """Discover models in the local models directory.""" models_path = Path(models_dir) discovered_models = [] if models_path.exists(): for model_dir in models_path.iterdir(): if model_dir.is_dir() and ( any(model_dir.glob("*.json")) or any(model_dir.glob("*.bin")) or any(model_dir.glob("*.safetensors")) ): discovered_models.append(str(model_dir)) logger.info(f"๐Ÿ“ Found local model: {model_dir.name}") return discovered_models def print_results_summary(results: dict[str, Any]) -> None: """Print a formatted summary of comprehensive evaluation results.""" logger.info(f"\n{'=' * 60}") logger.info(f"๐Ÿ“Š COMPREHENSIVE EVALUATION: {results.get('model_name', 'Unknown')}") logger.info(f"{'=' * 60}") # CodeSearchNet results overall = results.get("codesearch_overall", {}) if overall: logger.info("๐Ÿ” CodeSearchNet Performance:") for metric, value in overall.items(): logger.info(f" ๐ŸŽฏ {metric.upper()}: {value:.4f}") # Benchmark results if not results.get("benchmark_skipped", False): size_metrics = results.get("size_metrics", {}) if size_metrics: logger.info(f"\n๐Ÿ“ Model Size: {size_metrics.get('disk_size_mb', 0):.1f}MB") if "parameters_millions" in size_metrics: logger.info(f"๐Ÿ”ข Parameters: {size_metrics['parameters_millions']:.1f}M") speed_benchmarks = results.get("speed_benchmarks", {}) if "medium" in speed_benchmarks and "batch_32" in speed_benchmarks["medium"]: batch_32 = speed_benchmarks["medium"]["batch_32"] logger.info(f"โšก Throughput (batch 32): {batch_32.get('texts_per_second', 0):.1f} texts/sec") # Language breakdown languages = results.get("codesearch_languages", {}) if languages: logger.info("\n๐Ÿ“‹ Language Breakdown:") for lang, lang_results in languages.items(): metrics = lang_results.get("metrics", {}) ndcg10 = metrics.get("ndcg@10", 0) mrr = metrics.get("mrr", 0) logger.info(f" {lang}: NDCG@10={ndcg10:.4f}, MRR={mrr:.4f}") def create_comparison_report(all_results: list[dict[str, Any]], output_dir: str = LOCAL_EVALUATION_DIR) -> None: """Create a comprehensive comparison report with both CodeSearchNet and benchmark data.""" if not all_results: return logger.info("๐Ÿ“Š Creating comprehensive comparison report...") # Create evaluation comparison dataframe evaluation_data = [] benchmark_data = [] for result in all_results: model_name = result.get("model_name", "Unknown") # CodeSearchNet data overall = result.get("codesearch_overall", {}) eval_row = {"model_name": model_name} eval_row.update(overall) evaluation_data.append(eval_row) # Benchmark data (if available) if not result.get("benchmark_skipped", False): benchmark_row = {"model_name": model_name} size_metrics = result.get("size_metrics", {}) speed_benchmarks = result.get("speed_benchmarks", {}) benchmark_row.update(size_metrics) if "medium" in speed_benchmarks and "batch_32" in speed_benchmarks["medium"]: batch_32 = speed_benchmarks["medium"]["batch_32"] benchmark_row["best_throughput"] = batch_32.get("texts_per_second", 0) benchmark_data.append(benchmark_row) # Save comparison results output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # Combined evaluation comparison CSV (includes both CodeSearchNet and key benchmark metrics) if evaluation_data and benchmark_data: # Merge evaluation and benchmark data combined_data = [] benchmark_dict = {row["model_name"]: row for row in benchmark_data} for eval_row in evaluation_data: model_name = eval_row["model_name"] combined_row = eval_row.copy() # Add benchmark metrics if available if model_name in benchmark_dict: benchmark_row = benchmark_dict[model_name] combined_row.update( { "disk_size_mb": benchmark_row.get("disk_size_mb", 0), "parameters_millions": benchmark_row.get("parameters_millions", 0), "best_throughput": benchmark_row.get("best_throughput", 0), } ) combined_data.append(combined_row) combined_df = pd.DataFrame(combined_data) combined_csv = output_path / "comprehensive_comparison.csv" combined_df.to_csv(combined_csv, index=False) logger.info(f"๐Ÿ“„ Comprehensive comparison CSV saved: {combined_csv}") # Detailed JSON export json_path = output_path / "comprehensive_evaluation.json" with json_path.open("w") as f: json.dump(all_results, f, indent=2, default=str) logger.info(f"๐Ÿ“„ Comprehensive results JSON saved: {json_path}") # ============================================================================= # MAIN EVALUATION FUNCTIONS # ============================================================================= def run_evaluation( models: list[str], max_queries: int = 100, languages: list[str] | None = None, use_beam: bool = False, skip_benchmark: bool = False, ) -> list[dict[str, Any]]: """Main evaluation function that handles both local and Beam execution.""" logger.info(f"๐Ÿš€ Starting comprehensive evaluation ({'Beam' if use_beam else 'Local'})") logger.info(f"๐Ÿ“Š Evaluating {len(models)} models on {len(languages or languages_config.all)} languages") logger.info(f"โšก Benchmarking: {'Disabled' if skip_benchmark else 'Enabled'}") # Check for existing results and skip already evaluated models models_to_evaluate = [] skipped_models = [] all_results = [] for model_path in models: model_name = Path(model_path).name existing_results = check_existing_results(model_name) if existing_results: logger.info(f"โœ… Model {model_name} already evaluated, skipping") all_results.append(existing_results) skipped_models.append(model_name) else: models_to_evaluate.append(model_path) if not models_to_evaluate: logger.info("๐ŸŽ‰ All models already evaluated!") return all_results logger.info(f"๐Ÿ“Š Need to evaluate {len(models_to_evaluate)} models") if use_beam: # Run on Beam new_results = _run_beam_evaluation(models_to_evaluate, max_queries, languages, skip_benchmark) else: # Run locally new_results = _run_local_evaluation(models_to_evaluate, max_queries, languages, skip_benchmark) all_results.extend(new_results) # Create comparison report if len(all_results) > 1: create_comparison_report(all_results) # Print summary newly_evaluated = len(new_results) logger.info(f"\n{'=' * 60}") logger.info("๐Ÿ“Š EVALUATION SUMMARY") logger.info(f"{'=' * 60}") logger.info(f"๐Ÿ“Š Total models: {len(models)}") logger.info(f"โœ… Newly evaluated: {newly_evaluated}") logger.info(f"โญ๏ธ Skipped (already done): {len(skipped_models)}") logger.info(f"๐ŸŽฏ Total results: {len(all_results)}") logger.info(f"โšก Benchmarking: {'Disabled' if skip_benchmark else 'Enabled'}") return all_results def _run_local_evaluation( models: list[str], max_queries: int = 100, languages: list[str] | None = None, skip_benchmark: bool = False, ) -> list[dict[str, Any]]: """Run comprehensive evaluation locally.""" logger.info("๐Ÿ–ฅ๏ธ Running local comprehensive evaluation") results = [] for model_path in models: model_name = Path(model_path).name logger.info(f"\n{'=' * 60}") logger.info(f"๐Ÿ” Evaluating model: {model_name}") logger.info(f"{'=' * 60}") try: evaluator = ComprehensiveModelEvaluator(model_path, model_name) result = evaluator.run_comprehensive_evaluation(max_queries, languages, skip_benchmark) # Save results locally save_evaluation_results(result) print_results_summary(result) results.append(result) except Exception: logger.exception(f"โŒ Failed to evaluate {model_name}") continue return results @function(**get_evaluation_function_kwargs()) def _beam_evaluate_single_model( model_path: str, max_queries: int = 100, languages: list[str] | None = None, skip_benchmark: bool = False, ) -> dict[str, Any]: """Beam function to comprehensively evaluate a single model.""" # Set CUDA memory settings BEFORE any CUDA operations import os os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" model_name = Path(model_path).name logger.info(f"๐Ÿš€ Beam comprehensive evaluation starting for {model_name}") # Clear CUDA cache if available try: import torch if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.reset_peak_memory_stats() logger.info( f"๐Ÿงน Cleared CUDA cache. Available memory: {torch.cuda.get_device_properties(0).total_memory // (1024**3)} GB" ) except Exception as e: logger.warning(f"โš ๏ธ Could not clear CUDA cache: {e}") try: logger.info("๐Ÿ”ง Creating ComprehensiveModelEvaluator...") evaluator = ComprehensiveModelEvaluator(model_path, model_name) logger.info("โœ… ComprehensiveModelEvaluator created successfully") logger.info("๐Ÿš€ Starting comprehensive evaluation...") results = evaluator.run_comprehensive_evaluation(max_queries, languages, skip_benchmark) logger.info("โœ… Comprehensive evaluation completed") # Validate results if not results or "model_name" not in results: logger.error(f"โŒ Invalid evaluation results for {model_name}: {results}") return {"error": "Invalid evaluation results", "model_name": model_name} # Save to Beam volume as single comprehensive file logger.info("๐Ÿ’พ Saving results to Beam volume...") volume_results_dir = Path(VOLUME_CONFIG.mount_path) / "evaluation_results" volume_results_dir.mkdir(parents=True, exist_ok=True) safe_model_name = get_safe_model_name(model_name) result_file = volume_results_dir / f"comprehensive_eval_{safe_model_name}.json" with result_file.open("w") as f: json.dump(results, f, indent=2, default=str) logger.info(f"๐Ÿ’พ Saved Beam comprehensive evaluation results for {model_name} to {result_file}") logger.info(f"๐ŸŽฏ Final results summary: {len(results.get('codesearch_languages', {}))} languages evaluated") return results except (torch.OutOfMemoryError, RuntimeError, AssertionError) as e: # Handle CUDA errors including OOM, driver errors, and PyTorch internal assertion failures is_oom = isinstance(e, torch.OutOfMemoryError) or "out of memory" in str(e).lower() is_cuda_error = is_oom or "cuda" in str(e).lower() or "INTERNAL ASSERT FAILED" in str(e) or "handles_" in str(e) if is_cuda_error: error_type = "CUDA OOM" if is_oom else "CUDA Error" logger.exception(f"โŒ {error_type} during evaluation of {model_name}") # Try to clear memory and reset CUDA state more aggressively with contextlib.suppress(Exception): reset_cuda_state() return { "error": f"{error_type}: {e!s}", "model_name": model_name, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "evaluation_failed": True, "oom": is_oom, "cuda_error": True, } # Re-raise if not a CUDA-related error raise except Exception as e: logger.exception(f"โŒ Beam comprehensive evaluation failed for {model_name}") # Return error info in a structured way error_result = { "error": str(e), "model_name": model_name, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "evaluation_failed": True, } # Try to save error result to volume try: volume_results_dir = Path(VOLUME_CONFIG.mount_path) / "evaluation_results" volume_results_dir.mkdir(parents=True, exist_ok=True) safe_model_name = get_safe_model_name(model_name) error_file = volume_results_dir / f"error_eval_{safe_model_name}.json" with error_file.open("w") as f: json.dump(error_result, f, indent=2, default=str) logger.info(f"๐Ÿ’พ Saved error info to {error_file}") except Exception: logger.exception("โŒ Could not save error info") return error_result def _run_beam_evaluation( models: list[str], max_queries: int = 100, languages: list[str] | None = None, skip_benchmark: bool = False, ) -> list[dict[str, Any]]: """Run comprehensive evaluation on Beam and download results.""" logger.info("โ˜๏ธ Running Beam comprehensive evaluation") results = [] for model_path in models: model_name = Path(model_path).name logger.info(f"๐Ÿš€ Starting Beam comprehensive evaluation for {model_name}") try: # Run evaluation on Beam result = _beam_evaluate_single_model.remote(model_path, max_queries, languages, skip_benchmark) if result: # Check if this is an error result if result.get("evaluation_failed", False): logger.error(f"โŒ Beam evaluation failed for {model_name}: {result.get('error', 'Unknown error')}") if result.get("oom", False): logger.error("๐Ÿ’ฅ Out of memory error - model may be too large for available GPU") continue # Download the comprehensive result file from Beam success = download_specific_evaluation_file( VOLUME_CONFIG.name, model_name, "evaluation_results", LOCAL_EVALUATION_DIR, file_prefix="comprehensive_eval", ) if success: logger.info(f"๐Ÿ“ฅ Downloaded comprehensive results for {model_name}") print_results_summary(result) results.append(result) else: logger.warning(f"โš ๏ธ Could not download results for {model_name}") else: logger.warning(f"โš ๏ธ No result returned for {model_name}") except Exception: logger.exception(f"โŒ Beam comprehensive evaluation failed for {model_name}") continue return results # ============================================================================= # CLI INTERFACE # ============================================================================= def main( use_beam: bool = typer.Option(default=False, help="Use Beam for evaluation"), skip_third_party: bool = typer.Option(default=False, help="Skip third-party models"), skip_benchmark: bool = typer.Option(default=False, help="Skip performance benchmarking"), max_queries: int = typer.Option(default=100, help="Maximum queries per language"), ) -> None: """Main comprehensive evaluation function.""" logger.info("๐Ÿš€ Starting comprehensive model evaluation (CodeSearchNet + Performance)") # Build model list models = [] # Add third-party models if not skipped if not skip_third_party: logger.info("๐Ÿ“Š Including third-party peer models for comparison") models.extend(DEFAULT_EVALUATION_MODELS) else: logger.info("โญ๏ธ Skipping third-party models") # Discover local models from code_model2vec/final logger.info("๐Ÿ” Discovering local distillation models...") local_models = discover_local_models() if local_models: logger.info(f"โœ… Found {len(local_models)} local models:") for model_path in local_models: models.append(model_path) logger.info(f" ๐Ÿ“ {Path(model_path).name}") else: logger.warning("โš ๏ธ No local distillation models found") if skip_third_party: logger.error("โŒ No models to evaluate!") return if not models: logger.error("โŒ No models to evaluate!") return logger.info(f"๐Ÿ“Š Will evaluate {len(models)} models:") for i, model in enumerate(models, 1): logger.info(f" {i}. {Path(model).name}") # Run evaluation results = run_evaluation( models=models, max_queries=max_queries, languages=languages_config.all, use_beam=use_beam, skip_benchmark=skip_benchmark, ) logger.info("๐ŸŽ‰ Comprehensive evaluation workflow completed!") logger.info(f"๐Ÿ“Š Successfully evaluated {len(results)} models") logger.info(f"๐Ÿ’พ Results saved to: {LOCAL_EVALUATION_DIR}") logger.info("๐Ÿ“„ Format: Single comprehensive JSON per model (CodeSearchNet + Benchmarks)") if __name__ == "__main__": typer.run(main)