|
|
""" |
|
|
Comprehensive Model Evaluation Script for Code-Specialized Embedding Models. |
|
|
|
|
|
This script evaluates embedding models on both task performance and operational metrics: |
|
|
|
|
|
Task Performance: |
|
|
- CodeSearchNet evaluation (NDCG, MRR, Recall metrics) |
|
|
- Code search accuracy across programming languages |
|
|
|
|
|
Operational Performance: |
|
|
- Inference speed (latency and throughput) |
|
|
- Memory efficiency (RAM and GPU usage) |
|
|
- Model size and storage requirements |
|
|
- CPU vs GPU performance scaling |
|
|
|
|
|
Usage: |
|
|
distiller evaluate [--use-beam] [--skip-benchmark] # Run evaluation locally or on Beam |
|
|
""" |
|
|
|
|
|
|
|
|
import contextlib |
|
|
import importlib.util |
|
|
import json |
|
|
import logging |
|
|
import os |
|
|
import time |
|
|
import traceback |
|
|
from pathlib import Path |
|
|
from typing import Any |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import psutil |
|
|
import torch |
|
|
import typer |
|
|
from beam import function |
|
|
from datasets import Dataset, load_dataset |
|
|
from sentence_transformers import SentenceTransformer |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
from tqdm import tqdm |
|
|
|
|
|
from .beam_utils import download_specific_evaluation_file |
|
|
from .config import ( |
|
|
DEFAULT_EVALUATION_MODELS, |
|
|
codesearchnet_config, |
|
|
directories, |
|
|
get_evaluation_function_kwargs, |
|
|
get_safe_model_name, |
|
|
get_volume_config, |
|
|
languages_config, |
|
|
) |
|
|
|
|
|
|
|
|
FLASH_ATTN_AVAILABLE = importlib.util.find_spec("flash_attn") is not None |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BATCH_SIZE = 10 |
|
|
LOCAL_EVALUATION_DIR = directories.evaluation_results |
|
|
LOCAL_BENCHMARK_DIR = directories.benchmark_results |
|
|
LOCAL_MODELS_DIR = directories.final |
|
|
VOLUME_CONFIG = get_volume_config() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BENCHMARK_TEXTS = { |
|
|
"short": [ |
|
|
"def add(a, b): return a + b", |
|
|
"function multiply(x, y) { return x * y; }", |
|
|
"class Calculator { public int subtract(int a, int b) { return a - b; } }", |
|
|
] |
|
|
* 100, |
|
|
"medium": [ |
|
|
"def fibonacci(n):\n if n <= 1:\n return n\n return fibonacci(n-1) + fibonacci(n-2)", |
|
|
"function quickSort(arr) {\n if (arr.length <= 1) return arr;\n const pivot = arr[arr.length - 1];\n const left = [], right = [];\n for (let i = 0; i < arr.length - 1; i++) {\n if (arr[i] < pivot) left.push(arr[i]);\n else right.push(arr[i]);\n }\n return [...quickSort(left), pivot, ...quickSort(right)];\n}", |
|
|
] |
|
|
* 50, |
|
|
"long": [ |
|
|
""" |
|
|
def complex_algorithm(data, config): |
|
|
''' |
|
|
Complex data processing algorithm with multiple steps. |
|
|
''' |
|
|
results = [] |
|
|
# Data validation and processing steps... |
|
|
return results |
|
|
""".strip(), |
|
|
] |
|
|
* 20, |
|
|
} |
|
|
|
|
|
|
|
|
def reset_cuda_state() -> None: |
|
|
"""Aggressively reset CUDA state after memory allocation errors.""" |
|
|
if not torch.cuda.is_available(): |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
torch.cuda.empty_cache() |
|
|
torch.cuda.ipc_collect() |
|
|
torch.cuda.reset_peak_memory_stats() |
|
|
|
|
|
|
|
|
import gc |
|
|
|
|
|
gc.collect() |
|
|
|
|
|
logger.info("π§Ή CUDA state reset completed") |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Could not fully reset CUDA state: {e}") |
|
|
|
|
|
|
|
|
def configure_flash_attention() -> dict[str, Any]: |
|
|
"""Configure flash attention settings and return model kwargs.""" |
|
|
model_kwargs: dict[str, Any] = {} |
|
|
|
|
|
if not FLASH_ATTN_AVAILABLE: |
|
|
logger.info("β οΈ Flash attention not available - using standard attention") |
|
|
return model_kwargs |
|
|
|
|
|
|
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" |
|
|
|
|
|
|
|
|
try: |
|
|
if torch.cuda.is_available(): |
|
|
device_capability = torch.cuda.get_device_capability() |
|
|
|
|
|
if device_capability[0] >= 7 and (device_capability[0] > 7 or device_capability[1] >= 5): |
|
|
logger.info("β
Flash attention available - compatible GPU detected") |
|
|
|
|
|
os.environ["TRANSFORMERS_FLASH_ATTENTION"] = "1" |
|
|
else: |
|
|
logger.info(f"β οΈ GPU compute capability {device_capability} < 7.5 - flash attention disabled") |
|
|
else: |
|
|
logger.info("β οΈ No CUDA available - flash attention disabled") |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Failed to check GPU compatibility: {e} - flash attention disabled") |
|
|
|
|
|
return model_kwargs |
|
|
|
|
|
|
|
|
def load_model_with_flash_attention(model_path: str, device: str = "auto") -> SentenceTransformer: |
|
|
"""Load a SentenceTransformer model with flash attention if available.""" |
|
|
|
|
|
target_device = "cuda" if device == "auto" and torch.cuda.is_available() else device |
|
|
if device == "auto" and not torch.cuda.is_available(): |
|
|
target_device = "cpu" |
|
|
|
|
|
|
|
|
configure_flash_attention() |
|
|
|
|
|
|
|
|
logger.info(f"π Loading model: {Path(model_path).name}") |
|
|
|
|
|
try: |
|
|
|
|
|
model = SentenceTransformer(model_path, device=target_device, trust_remote_code=True) |
|
|
logger.info(f"β
Model loaded successfully on {target_device}") |
|
|
return model |
|
|
except (torch.OutOfMemoryError, RuntimeError) as oom_error: |
|
|
|
|
|
is_oom = isinstance(oom_error, torch.OutOfMemoryError) or "out of memory" in str(oom_error).lower() |
|
|
|
|
|
if is_oom and target_device != "cpu": |
|
|
logger.warning(f"β οΈ OOM loading directly to {target_device}, trying CPU first: {oom_error}") |
|
|
try: |
|
|
|
|
|
reset_cuda_state() |
|
|
|
|
|
logger.info("π Loading model on CPU first, then trying to move to GPU...") |
|
|
model = SentenceTransformer(model_path, device="cpu", trust_remote_code=True) |
|
|
logger.info("π¦ Model loaded on CPU, attempting GPU transfer...") |
|
|
|
|
|
|
|
|
try: |
|
|
model = model.to(target_device) |
|
|
logger.info(f"β
Model successfully moved to {target_device}") |
|
|
return model |
|
|
except (RuntimeError, AssertionError) as gpu_move_error: |
|
|
|
|
|
logger.warning(f"β οΈ GPU transfer failed: {gpu_move_error}") |
|
|
if "INTERNAL ASSERT FAILED" in str(gpu_move_error) or "handles_" in str(gpu_move_error): |
|
|
logger.warning("π§ Detected CUDA allocator corruption, resetting and staying on CPU") |
|
|
|
|
|
reset_cuda_state() |
|
|
else: |
|
|
|
|
|
raise |
|
|
|
|
|
logger.info("β
Model will remain on CPU due to GPU memory issues") |
|
|
return model |
|
|
|
|
|
except Exception as fallback_error: |
|
|
logger.warning(f"β οΈ CPU fallback failed: {fallback_error}, loading fresh on CPU") |
|
|
|
|
|
reset_cuda_state() |
|
|
|
|
|
model = SentenceTransformer(model_path, device="cpu", trust_remote_code=True) |
|
|
logger.info("β
Model loaded on CPU (all GPU attempts failed)") |
|
|
return model |
|
|
else: |
|
|
|
|
|
raise |
|
|
except ValueError as e: |
|
|
if "'MaxSim' is not a valid SimilarityFunction" in str(e): |
|
|
logger.warning(f"β οΈ Model {Path(model_path).name} uses unsupported MaxSim similarity function") |
|
|
logger.info("π§ Attempting workaround by loading with custom config...") |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
import json |
|
|
import tempfile |
|
|
from pathlib import Path as PathLib |
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
temp_path = PathLib(temp_dir) / "temp_model" |
|
|
|
|
|
|
|
|
if model_path.startswith("http") or ("/" in model_path and not PathLib(model_path).exists()): |
|
|
|
|
|
from huggingface_hub import snapshot_download |
|
|
|
|
|
snapshot_download(model_path, local_dir=temp_path, ignore_patterns=["*.bin"]) |
|
|
else: |
|
|
|
|
|
import shutil |
|
|
|
|
|
shutil.copytree(model_path, temp_path) |
|
|
|
|
|
|
|
|
config_path = temp_path / "config_sentence_transformers.json" |
|
|
if config_path.exists(): |
|
|
with config_path.open() as f: |
|
|
config = json.load(f) |
|
|
|
|
|
|
|
|
if "similarity_fn_name" in config: |
|
|
logger.info( |
|
|
f"π§ Changing similarity function from '{config['similarity_fn_name']}' to 'cosine'" |
|
|
) |
|
|
config["similarity_fn_name"] = "cosine" |
|
|
|
|
|
with config_path.open("w") as f: |
|
|
json.dump(config, f, indent=2) |
|
|
|
|
|
|
|
|
model = SentenceTransformer(str(temp_path), device=device, trust_remote_code=True) |
|
|
logger.info("β
Model loaded successfully with similarity function workaround") |
|
|
return model |
|
|
|
|
|
except Exception as workaround_error: |
|
|
logger.warning(f"β οΈ Similarity function workaround failed: {workaround_error}") |
|
|
logger.info("π§ Attempting direct model component loading...") |
|
|
|
|
|
|
|
|
try: |
|
|
from sentence_transformers.models import Pooling, Transformer |
|
|
|
|
|
|
|
|
logger.info("π Loading model components directly...") |
|
|
|
|
|
|
|
|
transformer = Transformer(model_path) |
|
|
pooling = Pooling(transformer.get_word_embedding_dimension()) |
|
|
|
|
|
|
|
|
model = SentenceTransformer(modules=[transformer, pooling], device=device) |
|
|
logger.info("β
Model loaded successfully with direct component loading") |
|
|
return model |
|
|
|
|
|
except Exception as direct_error: |
|
|
logger.warning(f"β οΈ Direct component loading failed: {direct_error}") |
|
|
logger.exception(f"β All loading methods failed for {Path(model_path).name}") |
|
|
raise e from direct_error |
|
|
else: |
|
|
raise |
|
|
except Exception: |
|
|
logger.exception(f"β Failed to load model {Path(model_path).name}") |
|
|
raise |
|
|
|
|
|
|
|
|
class PerformanceBenchmark: |
|
|
"""Comprehensive performance benchmarking for embedding models.""" |
|
|
|
|
|
def __init__(self, model_path: str, model_name: str | None = None) -> None: |
|
|
"""Initialize benchmarker with model.""" |
|
|
self.model_path = model_path |
|
|
self.model_name = model_name or Path(model_path).name |
|
|
self.model: SentenceTransformer | None = None |
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
self.results: dict[str, Any] = {} |
|
|
|
|
|
def load_model(self) -> None: |
|
|
"""Load the embedding model.""" |
|
|
logger.info(f"Loading model from {self.model_path}") |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
self.model = load_model_with_flash_attention(self.model_path, self.device) |
|
|
load_time = time.time() - start_time |
|
|
|
|
|
logger.info(f"β
Model loaded in {load_time:.2f}s on {self.device}") |
|
|
self.results["model_load_time"] = load_time |
|
|
|
|
|
except Exception: |
|
|
logger.exception("β Failed to load model") |
|
|
self.results["error"] = traceback.format_exc() |
|
|
raise |
|
|
|
|
|
def measure_model_size(self) -> dict[str, float]: |
|
|
"""Measure model size metrics.""" |
|
|
logger.info("π Measuring model size...") |
|
|
|
|
|
size_metrics: dict[str, Any] = {} |
|
|
|
|
|
|
|
|
try: |
|
|
if Path(self.model_path).is_dir(): |
|
|
|
|
|
model_extensions = {".safetensors", ".bin", ".json", ".txt", ".tokenizer"} |
|
|
total_size = 0 |
|
|
model_dir = Path(self.model_path) |
|
|
|
|
|
for file_path in model_dir.rglob("*"): |
|
|
if file_path.is_file() and ( |
|
|
file_path.suffix.lower() in model_extensions or "tokenizer" in file_path.name.lower() |
|
|
): |
|
|
total_size += file_path.stat().st_size |
|
|
|
|
|
size_metrics["disk_size_mb"] = total_size / (1024 * 1024) |
|
|
|
|
|
elif self.model is not None: |
|
|
param_count = sum(p.numel() for p in self.model.parameters()) |
|
|
|
|
|
estimated_size = param_count * 4 |
|
|
size_metrics["disk_size_mb"] = estimated_size / (1024 * 1024) |
|
|
else: |
|
|
size_metrics["disk_size_mb"] = 0.0 |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Could not calculate disk size: {e}") |
|
|
size_metrics["disk_size_mb"] = 0.0 |
|
|
|
|
|
|
|
|
if self.model is not None: |
|
|
try: |
|
|
|
|
|
param_count = sum(p.numel() for p in self.model.parameters()) |
|
|
size_metrics["parameter_count"] = param_count |
|
|
size_metrics["parameters_millions"] = param_count / 1e6 |
|
|
|
|
|
|
|
|
param_size = sum(p.numel() * p.element_size() for p in self.model.parameters()) |
|
|
buffer_size = sum(b.numel() * b.element_size() for b in self.model.buffers()) |
|
|
size_metrics["memory_size_mb"] = (param_size + buffer_size) / (1024 * 1024) |
|
|
size_metrics["ram_usage_mb"] = size_metrics["memory_size_mb"] |
|
|
|
|
|
|
|
|
if self.device == "cuda" and torch.cuda.is_available(): |
|
|
size_metrics["gpu_memory_mb"] = torch.cuda.memory_allocated() / (1024 * 1024) |
|
|
size_metrics["gpu_name"] = torch.cuda.get_device_name(0) |
|
|
|
|
|
|
|
|
if hasattr(self.model, "get_sentence_embedding_dimension"): |
|
|
size_metrics["embedding_dim"] = self.model.get_sentence_embedding_dimension() |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Could not calculate memory size: {e}") |
|
|
|
|
|
|
|
|
self.results["size_metrics"] = size_metrics |
|
|
return size_metrics |
|
|
|
|
|
def benchmark_inference_speed(self, batch_sizes: list[int] | None = None) -> dict[str, Any]: |
|
|
"""Benchmark inference speed with different batch sizes.""" |
|
|
if batch_sizes is None: |
|
|
batch_sizes = [1, 8, 16, 32] |
|
|
|
|
|
logger.info(f"β‘ Benchmarking inference speed with batch sizes: {batch_sizes}") |
|
|
|
|
|
if self.model is None: |
|
|
self.load_model() |
|
|
|
|
|
speed_results: dict[str, Any] = {"medium": {}} |
|
|
|
|
|
|
|
|
test_texts = BENCHMARK_TEXTS["medium"] |
|
|
|
|
|
for batch_size in batch_sizes: |
|
|
logger.info(f" π Testing batch size: {batch_size}") |
|
|
|
|
|
|
|
|
batch = ( |
|
|
test_texts[:batch_size] |
|
|
if batch_size <= len(test_texts) |
|
|
else test_texts * ((batch_size // len(test_texts)) + 1) |
|
|
) |
|
|
batch = batch[:batch_size] |
|
|
|
|
|
|
|
|
if self.model is not None: |
|
|
_ = self.model.encode(batch[: min(4, len(batch))], convert_to_tensor=False) |
|
|
|
|
|
|
|
|
latencies = [] |
|
|
num_runs = max(3, 20 // batch_size) |
|
|
|
|
|
for _ in range(num_runs): |
|
|
start_time = time.time() |
|
|
if self.model is not None: |
|
|
_ = self.model.encode(batch, convert_to_tensor=False, normalize_embeddings=True) |
|
|
end_time = time.time() |
|
|
latencies.append(end_time - start_time) |
|
|
|
|
|
|
|
|
avg_latency = sum(latencies) / len(latencies) |
|
|
throughput = batch_size / avg_latency |
|
|
time_per_text_ms = (avg_latency / batch_size) * 1000 |
|
|
|
|
|
batch_key = f"batch_{batch_size}" |
|
|
speed_results["medium"][batch_key] = { |
|
|
"time_per_text_ms": time_per_text_ms, |
|
|
"texts_per_second": throughput, |
|
|
"tokens_per_second": throughput * 50, |
|
|
} |
|
|
|
|
|
logger.info(f" β‘ Latency: {avg_latency:.3f}s, Throughput: {throughput:.1f} texts/sec") |
|
|
|
|
|
|
|
|
self.results["speed_benchmarks"] = speed_results |
|
|
return speed_results |
|
|
|
|
|
def benchmark_memory_scaling(self, batch_sizes: list[int] | None = None) -> dict[str, Any]: |
|
|
"""Benchmark memory usage scaling with batch size.""" |
|
|
if batch_sizes is None: |
|
|
batch_sizes = [1, 8, 16, 32] |
|
|
|
|
|
logger.info(f"π§ Benchmarking memory scaling with batch sizes: {batch_sizes}") |
|
|
|
|
|
if self.model is None: |
|
|
self.load_model() |
|
|
|
|
|
memory_results: dict[str, Any] = {} |
|
|
test_texts = BENCHMARK_TEXTS["medium"] |
|
|
|
|
|
for batch_size in batch_sizes: |
|
|
logger.info(f" π Testing memory with batch size: {batch_size}") |
|
|
|
|
|
|
|
|
batch = ( |
|
|
test_texts[:batch_size] |
|
|
if batch_size <= len(test_texts) |
|
|
else test_texts * ((batch_size // len(test_texts)) + 1) |
|
|
) |
|
|
batch = batch[:batch_size] |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
torch.cuda.reset_peak_memory_stats() |
|
|
|
|
|
try: |
|
|
|
|
|
if self.model is not None: |
|
|
_ = self.model.encode(batch, convert_to_tensor=False) |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
peak_memory = torch.cuda.max_memory_allocated() / (1024 * 1024) |
|
|
memory_per_text = peak_memory / batch_size |
|
|
else: |
|
|
|
|
|
peak_memory = psutil.virtual_memory().used / (1024 * 1024) |
|
|
memory_per_text = 0 |
|
|
|
|
|
batch_key = f"batch_{batch_size}" |
|
|
memory_results[batch_key] = { |
|
|
"memory_used_mb": peak_memory, |
|
|
"memory_per_text_mb": memory_per_text, |
|
|
"oom": False, |
|
|
} |
|
|
|
|
|
logger.info(f" π§ Peak memory: {peak_memory:.1f}MB, Per text: {memory_per_text:.2f}MB") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Memory benchmark failed for batch {batch_size}: {e}") |
|
|
batch_key = f"batch_{batch_size}" |
|
|
memory_results[batch_key] = { |
|
|
"oom": True, |
|
|
"error": str(e), |
|
|
} |
|
|
|
|
|
self.results["memory_benchmarks"] = memory_results |
|
|
return memory_results |
|
|
|
|
|
def benchmark_cpu_vs_gpu(self) -> dict[str, Any]: |
|
|
"""Compare CPU vs GPU performance.""" |
|
|
logger.info("βοΈ Benchmarking CPU vs GPU performance") |
|
|
|
|
|
if not torch.cuda.is_available(): |
|
|
logger.warning("β οΈ CUDA not available - skipping GPU benchmark") |
|
|
return {} |
|
|
|
|
|
comparison_results: dict[str, Any] = {} |
|
|
test_texts = BENCHMARK_TEXTS["medium"][:16] |
|
|
|
|
|
for device in ["cpu", "cuda"]: |
|
|
logger.info(f" π Testing on {device.upper()}") |
|
|
|
|
|
try: |
|
|
model = load_model_with_flash_attention(self.model_path, device) |
|
|
|
|
|
|
|
|
_ = model.encode(test_texts[:4], convert_to_tensor=False) |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
_ = model.encode(test_texts, convert_to_tensor=False, normalize_embeddings=True) |
|
|
end_time = time.time() |
|
|
|
|
|
latency = end_time - start_time |
|
|
throughput = len(test_texts) / latency |
|
|
|
|
|
comparison_results[device] = { |
|
|
"texts_per_second": throughput, |
|
|
} |
|
|
|
|
|
logger.info(f" β‘ {device.upper()}: {latency:.3f}s, {throughput:.1f} texts/sec") |
|
|
|
|
|
|
|
|
del model |
|
|
if device == "cuda": |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Failed to benchmark {device}: {e}") |
|
|
comparison_results[device] = {"error": str(e)} |
|
|
|
|
|
|
|
|
if "cpu" in comparison_results and "cuda" in comparison_results: |
|
|
cpu_throughput = comparison_results["cpu"].get("texts_per_second", 0) |
|
|
gpu_throughput = comparison_results["cuda"].get("texts_per_second", 0) |
|
|
if cpu_throughput > 0: |
|
|
speedup = gpu_throughput / cpu_throughput |
|
|
comparison_results["gpu_speedup"] = speedup |
|
|
logger.info(f" π GPU Speedup: {speedup:.1f}x") |
|
|
|
|
|
self.results["cpu_vs_gpu"] = comparison_results |
|
|
return comparison_results |
|
|
|
|
|
def run_comprehensive_benchmark(self) -> dict[str, Any]: |
|
|
"""Run all benchmarks and return comprehensive results.""" |
|
|
logger.info(f"π Starting comprehensive benchmark for {self.model_name}") |
|
|
|
|
|
|
|
|
self.results["model_name"] = self.model_name |
|
|
self.results["model_path"] = self.model_path |
|
|
self.results["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
|
|
|
try: |
|
|
self.load_model() |
|
|
self.measure_model_size() |
|
|
self.benchmark_inference_speed([1, 8, 16, 32]) |
|
|
self.benchmark_memory_scaling([1, 8, 16, 32]) |
|
|
self.benchmark_cpu_vs_gpu() |
|
|
|
|
|
logger.info(f"β
Comprehensive benchmark completed for {self.model_name}") |
|
|
|
|
|
except Exception: |
|
|
logger.exception(f"β Benchmark failed for {self.model_name}") |
|
|
self.results["error"] = traceback.format_exc() |
|
|
|
|
|
return self.results |
|
|
|
|
|
|
|
|
class CodeSearchNetEvaluator: |
|
|
"""Evaluator for CodeSearchNet-style code search tasks.""" |
|
|
|
|
|
def __init__(self, model_path: str, model_name: str | None = None) -> None: |
|
|
"""Initialize the evaluator with a model.""" |
|
|
self.model_path = model_path |
|
|
self.model_name = model_name or Path(model_path).name |
|
|
self.model: SentenceTransformer | None = None |
|
|
self._load_model() |
|
|
|
|
|
def _load_model(self) -> None: |
|
|
"""Load the embedding model.""" |
|
|
logger.info(f"Loading model from {self.model_path}") |
|
|
try: |
|
|
self.model = load_model_with_flash_attention(self.model_path) |
|
|
logger.info(f"Successfully loaded model: {self.model_name}") |
|
|
except Exception: |
|
|
logger.exception(f"Failed to load model from {self.model_path}") |
|
|
raise |
|
|
|
|
|
def encode_texts(self, texts: list[str], desc: str = "Encoding") -> np.ndarray: |
|
|
"""Encode texts into embeddings with batching and memory management.""" |
|
|
if self.model is None: |
|
|
msg = "Model not loaded" |
|
|
raise RuntimeError(msg) |
|
|
|
|
|
embeddings = [] |
|
|
|
|
|
effective_batch_size = min(BATCH_SIZE, 5) |
|
|
|
|
|
for i in tqdm(range(0, len(texts), effective_batch_size), desc=desc): |
|
|
batch = texts[i : i + effective_batch_size] |
|
|
|
|
|
try: |
|
|
batch_embeddings = self.model.encode(batch, convert_to_tensor=False, normalize_embeddings=True) |
|
|
embeddings.append(batch_embeddings) |
|
|
|
|
|
|
|
|
if torch.cuda.is_available() and i > 0 and i % (effective_batch_size * 4) == 0: |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
except (torch.OutOfMemoryError, RuntimeError) as e: |
|
|
|
|
|
is_oom = isinstance(e, torch.OutOfMemoryError) or "out of memory" in str(e).lower() |
|
|
|
|
|
if is_oom: |
|
|
logger.warning( |
|
|
f"β οΈ OOM during encoding batch {i // effective_batch_size + 1}, trying smaller batch..." |
|
|
) |
|
|
|
|
|
for single_text in batch: |
|
|
try: |
|
|
single_embedding = self.model.encode( |
|
|
[single_text], convert_to_tensor=False, normalize_embeddings=True |
|
|
) |
|
|
embeddings.append(single_embedding) |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
except (torch.OutOfMemoryError, RuntimeError) as single_e: |
|
|
if isinstance(single_e, torch.OutOfMemoryError) or "out of memory" in str(single_e).lower(): |
|
|
logger.exception("β Cannot encode even single text, model too large for GPU") |
|
|
raise |
|
|
raise |
|
|
else: |
|
|
raise |
|
|
|
|
|
return np.vstack(embeddings) |
|
|
|
|
|
def evaluate_language(self, language: str, max_queries: int = 100) -> dict[str, Any]: |
|
|
"""Evaluate on a specific programming language.""" |
|
|
logger.info(f"Evaluating on {language} language (max {max_queries} queries)") |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info(f"π₯ Loading test split for {language}...") |
|
|
dataset = load_dataset( |
|
|
codesearchnet_config.dataset_name, |
|
|
language, |
|
|
split=f"test[:{max_queries * 10}]", |
|
|
trust_remote_code=True, |
|
|
) |
|
|
|
|
|
if not isinstance(dataset, Dataset): |
|
|
logger.error(f"Unexpected dataset type for {language}: {type(dataset)}") |
|
|
return {} |
|
|
|
|
|
logger.info(f"π Loaded {len(dataset)} examples from {language} test split") |
|
|
|
|
|
queries: list[str] = [] |
|
|
codes: list[str] = [] |
|
|
query_ids: list[str] = [] |
|
|
|
|
|
|
|
|
for i, example in enumerate(dataset): |
|
|
if len(queries) >= max_queries: |
|
|
break |
|
|
|
|
|
doc_string = example.get("func_documentation_string", "").strip() |
|
|
code_string = example.get("func_code_string", "").strip() |
|
|
|
|
|
if doc_string and code_string and len(doc_string.split()) >= 3: |
|
|
queries.append(doc_string) |
|
|
codes.append(code_string) |
|
|
query_ids.append(f"{language}_{i}") |
|
|
|
|
|
if len(queries) == 0: |
|
|
logger.warning(f"No valid query-code pairs found for {language}") |
|
|
return {} |
|
|
|
|
|
|
|
|
if len(queries) > max_queries: |
|
|
queries = queries[:max_queries] |
|
|
codes = codes[:max_queries] |
|
|
query_ids = query_ids[:max_queries] |
|
|
|
|
|
logger.info(f"Found {len(queries)} valid query-code pairs for {language}") |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated() |
|
|
free_gb = free_memory / (1024**3) |
|
|
logger.info(f"πΎ Available GPU memory before encoding: {free_gb:.1f} GB") |
|
|
if free_gb < 2.0: |
|
|
logger.warning(f"β οΈ Low GPU memory ({free_gb:.1f} GB), using conservative encoding") |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
query_embeddings = self.encode_texts(queries, f"Encoding {language} queries") |
|
|
code_embeddings = self.encode_texts(codes, f"Encoding {language} code") |
|
|
encoding_time = time.time() - start_time |
|
|
|
|
|
|
|
|
similarities = cosine_similarity(query_embeddings, code_embeddings) |
|
|
metrics = self._compute_retrieval_metrics(similarities) |
|
|
|
|
|
|
|
|
results = { |
|
|
"language": language, |
|
|
"model_name": self.model_name, |
|
|
"num_queries": len(queries), |
|
|
"encoding_time_seconds": encoding_time, |
|
|
"metrics": metrics, |
|
|
} |
|
|
|
|
|
logger.info(f"β
{language} evaluation completed in {encoding_time:.2f}s") |
|
|
return results |
|
|
|
|
|
except Exception: |
|
|
logger.exception(f"β Failed to evaluate {language}") |
|
|
return {} |
|
|
|
|
|
def _compute_retrieval_metrics(self, similarities: np.ndarray) -> dict[str, float]: |
|
|
"""Compute retrieval metrics from similarity matrix.""" |
|
|
n_queries = similarities.shape[0] |
|
|
|
|
|
|
|
|
correct_indices = np.arange(n_queries) |
|
|
|
|
|
|
|
|
ranked_indices = np.argsort(similarities, axis=1)[:, ::-1] |
|
|
|
|
|
metrics = {} |
|
|
|
|
|
|
|
|
for k in [1, 5, 10]: |
|
|
if k <= similarities.shape[1]: |
|
|
|
|
|
recall_k = np.mean([correct_indices[i] in ranked_indices[i, :k] for i in range(n_queries)]) |
|
|
metrics[f"recall@{k}"] = recall_k |
|
|
|
|
|
|
|
|
ndcg_k = np.mean( |
|
|
[self._compute_ndcg(ranked_indices[i], correct_indices[i], k) for i in range(n_queries)] |
|
|
) |
|
|
metrics[f"ndcg@{k}"] = ndcg_k |
|
|
|
|
|
|
|
|
reciprocal_ranks = [] |
|
|
for i in range(n_queries): |
|
|
rank = np.where(ranked_indices[i] == correct_indices[i])[0] |
|
|
if len(rank) > 0: |
|
|
reciprocal_ranks.append(1.0 / (rank[0] + 1)) |
|
|
else: |
|
|
reciprocal_ranks.append(0.0) |
|
|
|
|
|
metrics["mrr"] = np.mean(reciprocal_ranks) |
|
|
|
|
|
|
|
|
mean_ranks = [] |
|
|
for i in range(n_queries): |
|
|
rank = np.where(ranked_indices[i] == correct_indices[i])[0] |
|
|
if len(rank) > 0: |
|
|
mean_ranks.append(rank[0] + 1) |
|
|
else: |
|
|
mean_ranks.append(similarities.shape[1]) |
|
|
|
|
|
metrics["mean_rank"] = np.mean(mean_ranks) |
|
|
metrics["median_rank"] = np.median(mean_ranks) |
|
|
|
|
|
|
|
|
return {k: float(v) for k, v in metrics.items()} |
|
|
|
|
|
def _compute_ndcg(self, ranked_indices: np.ndarray, correct_idx: int, k: int) -> float: |
|
|
"""Compute NDCG@k for a single query.""" |
|
|
if correct_idx in ranked_indices[:k]: |
|
|
rank = np.where(ranked_indices[:k] == correct_idx)[0][0] |
|
|
return 1.0 / np.log2(rank + 2) |
|
|
return 0.0 |
|
|
|
|
|
def evaluate_all_languages( |
|
|
self, max_queries_per_lang: int = 100, languages: list[str] | None = None |
|
|
) -> dict[str, Any]: |
|
|
"""Evaluate on all specified languages.""" |
|
|
eval_languages = languages or languages_config.all |
|
|
|
|
|
logger.info(f"π Starting evaluation on {len(eval_languages)} languages") |
|
|
logger.info(f"π Model: {self.model_name}") |
|
|
logger.info(f"π’ Max queries per language: {max_queries_per_lang}") |
|
|
|
|
|
start_time = time.time() |
|
|
results = { |
|
|
"model_name": self.model_name, |
|
|
"model_path": self.model_path, |
|
|
"languages": {}, |
|
|
"overall": {}, |
|
|
"evaluation_time_seconds": 0, |
|
|
} |
|
|
languages_dict: dict[str, Any] = {} |
|
|
|
|
|
|
|
|
for language in eval_languages: |
|
|
logger.info(f"\n{'=' * 50}") |
|
|
logger.info(f"π Evaluating {language}") |
|
|
logger.info(f"{'=' * 50}") |
|
|
|
|
|
lang_results = self.evaluate_language(language, max_queries_per_lang) |
|
|
if lang_results: |
|
|
languages_dict[language] = lang_results |
|
|
|
|
|
results["languages"] = languages_dict |
|
|
|
|
|
|
|
|
if languages_dict: |
|
|
overall_metrics = {} |
|
|
metric_names = list(next(iter(languages_dict.values()))["metrics"].keys()) |
|
|
|
|
|
for metric in metric_names: |
|
|
values = [languages_dict[lang]["metrics"][metric] for lang in languages_dict] |
|
|
overall_metrics[metric] = np.mean(values) |
|
|
|
|
|
results["overall"] = overall_metrics |
|
|
|
|
|
total_time = time.time() - start_time |
|
|
results["evaluation_time_seconds"] = total_time |
|
|
|
|
|
logger.info(f"Evaluation completed in {total_time:.2f} seconds") |
|
|
return results |
|
|
|
|
|
|
|
|
class ComprehensiveModelEvaluator: |
|
|
"""Combined evaluator for both task performance and operational benchmarks.""" |
|
|
|
|
|
def __init__(self, model_path: str, model_name: str | None = None) -> None: |
|
|
"""Initialize the comprehensive evaluator with a model.""" |
|
|
self.model_path = model_path |
|
|
self.model_name = model_name or Path(model_path).name |
|
|
|
|
|
|
|
|
self.codesearch_evaluator = CodeSearchNetEvaluator(model_path, model_name) |
|
|
self.performance_benchmarker = PerformanceBenchmark(model_path, model_name) |
|
|
|
|
|
self.results: dict[str, Any] = {} |
|
|
|
|
|
def run_comprehensive_evaluation( |
|
|
self, |
|
|
max_queries_per_lang: int = 100, |
|
|
languages: list[str] | None = None, |
|
|
skip_benchmark: bool = False, |
|
|
) -> dict[str, Any]: |
|
|
"""Run both CodeSearchNet evaluation and performance benchmarking.""" |
|
|
logger.info(f"π Starting comprehensive evaluation for {self.model_name}") |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
self.results = { |
|
|
"model_name": self.model_name, |
|
|
"model_path": self.model_path, |
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"evaluation_time_seconds": 0, |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("π Running CodeSearchNet task evaluation...") |
|
|
codesearch_results = self.codesearch_evaluator.evaluate_all_languages(max_queries_per_lang, languages) |
|
|
|
|
|
|
|
|
self.results.update( |
|
|
{ |
|
|
"codesearch_languages": codesearch_results.get("languages", {}), |
|
|
"codesearch_overall": codesearch_results.get("overall", {}), |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
if not skip_benchmark: |
|
|
logger.info("β‘ Running operational performance benchmarking...") |
|
|
benchmark_results = self.performance_benchmarker.run_comprehensive_benchmark() |
|
|
|
|
|
|
|
|
self.results.update( |
|
|
{ |
|
|
"size_metrics": benchmark_results.get("size_metrics", {}), |
|
|
"speed_benchmarks": benchmark_results.get("speed_benchmarks", {}), |
|
|
"memory_benchmarks": benchmark_results.get("memory_benchmarks", {}), |
|
|
"cpu_vs_gpu": benchmark_results.get("cpu_vs_gpu", {}), |
|
|
} |
|
|
) |
|
|
else: |
|
|
logger.info("βοΈ Skipping performance benchmarking") |
|
|
self.results["benchmark_skipped"] = True |
|
|
|
|
|
except Exception as e: |
|
|
logger.exception(f"β Comprehensive evaluation failed for {self.model_name}") |
|
|
self.results["error"] = str(e) |
|
|
|
|
|
|
|
|
total_time = time.time() - start_time |
|
|
self.results["evaluation_time_seconds"] = total_time |
|
|
|
|
|
logger.info(f"β
Comprehensive evaluation completed in {total_time:.2f} seconds") |
|
|
return self.results |
|
|
|
|
|
def print_summary(self) -> None: |
|
|
"""Print a comprehensive summary of all results.""" |
|
|
logger.info(f"\n{'=' * 60}") |
|
|
logger.info(f"π COMPREHENSIVE EVALUATION RESULTS: {self.model_name}") |
|
|
logger.info(f"{'=' * 60}") |
|
|
|
|
|
|
|
|
overall = self.results.get("codesearch_overall", {}) |
|
|
if overall: |
|
|
logger.info("π CodeSearchNet Performance:") |
|
|
for metric, value in overall.items(): |
|
|
logger.info(f" π― {metric.upper()}: {value:.4f}") |
|
|
|
|
|
|
|
|
if not self.results.get("benchmark_skipped", False): |
|
|
size_metrics = self.results.get("size_metrics", {}) |
|
|
if size_metrics: |
|
|
logger.info(f"\nπ Model Size: {size_metrics.get('disk_size_mb', 0):.1f}MB") |
|
|
if "parameters_millions" in size_metrics: |
|
|
logger.info(f"π’ Parameters: {size_metrics['parameters_millions']:.1f}M") |
|
|
|
|
|
speed_benchmarks = self.results.get("speed_benchmarks", {}) |
|
|
if "medium" in speed_benchmarks and "batch_32" in speed_benchmarks["medium"]: |
|
|
batch_32 = speed_benchmarks["medium"]["batch_32"] |
|
|
logger.info(f"β‘ Throughput (batch 32): {batch_32.get('texts_per_second', 0):.1f} texts/sec") |
|
|
|
|
|
cpu_vs_gpu = self.results.get("cpu_vs_gpu", {}) |
|
|
if "gpu_speedup" in cpu_vs_gpu: |
|
|
speedup = cpu_vs_gpu["gpu_speedup"] |
|
|
logger.info(f"π GPU speedup: {speedup:.1f}x") |
|
|
|
|
|
|
|
|
languages = self.results.get("codesearch_languages", {}) |
|
|
if languages: |
|
|
logger.info("\nπ Language Breakdown:") |
|
|
for lang, lang_results in languages.items(): |
|
|
metrics = lang_results.get("metrics", {}) |
|
|
ndcg10 = metrics.get("ndcg@10", 0) |
|
|
mrr = metrics.get("mrr", 0) |
|
|
logger.info(f" {lang}: NDCG@10={ndcg10:.4f}, MRR={mrr:.4f}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def check_existing_results(model_name: str, local_dir: str = LOCAL_EVALUATION_DIR) -> dict[str, Any] | None: |
|
|
"""Check if comprehensive evaluation results already exist for a model.""" |
|
|
local_path = Path(local_dir) |
|
|
safe_model_name = get_safe_model_name(model_name) |
|
|
|
|
|
|
|
|
comprehensive_file = local_path / f"comprehensive_eval_{safe_model_name}.json" |
|
|
if comprehensive_file.exists(): |
|
|
try: |
|
|
with comprehensive_file.open("r") as f: |
|
|
results = json.load(f) |
|
|
logger.info(f"β
Found existing comprehensive results for {model_name}") |
|
|
return results |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Could not load existing comprehensive results for {model_name}: {e}") |
|
|
|
|
|
|
|
|
legacy_file = local_path / f"codesearchnet_eval_{safe_model_name}.json" |
|
|
if legacy_file.exists(): |
|
|
try: |
|
|
with legacy_file.open("r") as f: |
|
|
results = json.load(f) |
|
|
logger.info(f"β
Found existing legacy results for {model_name}") |
|
|
return results |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Could not load existing legacy results for {model_name}: {e}") |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def save_evaluation_results(results: dict[str, Any], local_dir: str = LOCAL_EVALUATION_DIR) -> bool: |
|
|
"""Save comprehensive evaluation results to local directory as a single JSON file.""" |
|
|
try: |
|
|
local_path = Path(local_dir) |
|
|
local_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
model_name = results.get("model_name", "unknown") |
|
|
safe_model_name = get_safe_model_name(model_name) |
|
|
|
|
|
|
|
|
result_file = local_path / f"comprehensive_eval_{safe_model_name}.json" |
|
|
with result_file.open("w") as f: |
|
|
json.dump(results, f, indent=2, default=str) |
|
|
|
|
|
logger.info(f"πΎ Saved comprehensive evaluation results for {model_name}") |
|
|
return True |
|
|
|
|
|
except Exception: |
|
|
logger.exception("β Error saving evaluation results") |
|
|
return False |
|
|
|
|
|
|
|
|
def discover_local_models(models_dir: str = LOCAL_MODELS_DIR) -> list[str]: |
|
|
"""Discover models in the local models directory.""" |
|
|
models_path = Path(models_dir) |
|
|
discovered_models = [] |
|
|
|
|
|
if models_path.exists(): |
|
|
for model_dir in models_path.iterdir(): |
|
|
if model_dir.is_dir() and ( |
|
|
any(model_dir.glob("*.json")) or any(model_dir.glob("*.bin")) or any(model_dir.glob("*.safetensors")) |
|
|
): |
|
|
discovered_models.append(str(model_dir)) |
|
|
logger.info(f"π Found local model: {model_dir.name}") |
|
|
|
|
|
return discovered_models |
|
|
|
|
|
|
|
|
def print_results_summary(results: dict[str, Any]) -> None: |
|
|
"""Print a formatted summary of comprehensive evaluation results.""" |
|
|
logger.info(f"\n{'=' * 60}") |
|
|
logger.info(f"π COMPREHENSIVE EVALUATION: {results.get('model_name', 'Unknown')}") |
|
|
logger.info(f"{'=' * 60}") |
|
|
|
|
|
|
|
|
overall = results.get("codesearch_overall", {}) |
|
|
if overall: |
|
|
logger.info("π CodeSearchNet Performance:") |
|
|
for metric, value in overall.items(): |
|
|
logger.info(f" π― {metric.upper()}: {value:.4f}") |
|
|
|
|
|
|
|
|
if not results.get("benchmark_skipped", False): |
|
|
size_metrics = results.get("size_metrics", {}) |
|
|
if size_metrics: |
|
|
logger.info(f"\nπ Model Size: {size_metrics.get('disk_size_mb', 0):.1f}MB") |
|
|
if "parameters_millions" in size_metrics: |
|
|
logger.info(f"π’ Parameters: {size_metrics['parameters_millions']:.1f}M") |
|
|
|
|
|
speed_benchmarks = results.get("speed_benchmarks", {}) |
|
|
if "medium" in speed_benchmarks and "batch_32" in speed_benchmarks["medium"]: |
|
|
batch_32 = speed_benchmarks["medium"]["batch_32"] |
|
|
logger.info(f"β‘ Throughput (batch 32): {batch_32.get('texts_per_second', 0):.1f} texts/sec") |
|
|
|
|
|
|
|
|
languages = results.get("codesearch_languages", {}) |
|
|
if languages: |
|
|
logger.info("\nπ Language Breakdown:") |
|
|
for lang, lang_results in languages.items(): |
|
|
metrics = lang_results.get("metrics", {}) |
|
|
ndcg10 = metrics.get("ndcg@10", 0) |
|
|
mrr = metrics.get("mrr", 0) |
|
|
logger.info(f" {lang}: NDCG@10={ndcg10:.4f}, MRR={mrr:.4f}") |
|
|
|
|
|
|
|
|
def create_comparison_report(all_results: list[dict[str, Any]], output_dir: str = LOCAL_EVALUATION_DIR) -> None: |
|
|
"""Create a comprehensive comparison report with both CodeSearchNet and benchmark data.""" |
|
|
if not all_results: |
|
|
return |
|
|
|
|
|
logger.info("π Creating comprehensive comparison report...") |
|
|
|
|
|
|
|
|
evaluation_data = [] |
|
|
benchmark_data = [] |
|
|
|
|
|
for result in all_results: |
|
|
model_name = result.get("model_name", "Unknown") |
|
|
|
|
|
|
|
|
overall = result.get("codesearch_overall", {}) |
|
|
eval_row = {"model_name": model_name} |
|
|
eval_row.update(overall) |
|
|
evaluation_data.append(eval_row) |
|
|
|
|
|
|
|
|
if not result.get("benchmark_skipped", False): |
|
|
benchmark_row = {"model_name": model_name} |
|
|
size_metrics = result.get("size_metrics", {}) |
|
|
speed_benchmarks = result.get("speed_benchmarks", {}) |
|
|
|
|
|
benchmark_row.update(size_metrics) |
|
|
if "medium" in speed_benchmarks and "batch_32" in speed_benchmarks["medium"]: |
|
|
batch_32 = speed_benchmarks["medium"]["batch_32"] |
|
|
benchmark_row["best_throughput"] = batch_32.get("texts_per_second", 0) |
|
|
benchmark_data.append(benchmark_row) |
|
|
|
|
|
|
|
|
output_path = Path(output_dir) |
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
if evaluation_data and benchmark_data: |
|
|
|
|
|
combined_data = [] |
|
|
benchmark_dict = {row["model_name"]: row for row in benchmark_data} |
|
|
|
|
|
for eval_row in evaluation_data: |
|
|
model_name = eval_row["model_name"] |
|
|
combined_row = eval_row.copy() |
|
|
|
|
|
|
|
|
if model_name in benchmark_dict: |
|
|
benchmark_row = benchmark_dict[model_name] |
|
|
combined_row.update( |
|
|
{ |
|
|
"disk_size_mb": benchmark_row.get("disk_size_mb", 0), |
|
|
"parameters_millions": benchmark_row.get("parameters_millions", 0), |
|
|
"best_throughput": benchmark_row.get("best_throughput", 0), |
|
|
} |
|
|
) |
|
|
|
|
|
combined_data.append(combined_row) |
|
|
|
|
|
combined_df = pd.DataFrame(combined_data) |
|
|
combined_csv = output_path / "comprehensive_comparison.csv" |
|
|
combined_df.to_csv(combined_csv, index=False) |
|
|
logger.info(f"π Comprehensive comparison CSV saved: {combined_csv}") |
|
|
|
|
|
|
|
|
json_path = output_path / "comprehensive_evaluation.json" |
|
|
with json_path.open("w") as f: |
|
|
json.dump(all_results, f, indent=2, default=str) |
|
|
logger.info(f"π Comprehensive results JSON saved: {json_path}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_evaluation( |
|
|
models: list[str], |
|
|
max_queries: int = 100, |
|
|
languages: list[str] | None = None, |
|
|
use_beam: bool = False, |
|
|
skip_benchmark: bool = False, |
|
|
) -> list[dict[str, Any]]: |
|
|
"""Main evaluation function that handles both local and Beam execution.""" |
|
|
logger.info(f"π Starting comprehensive evaluation ({'Beam' if use_beam else 'Local'})") |
|
|
logger.info(f"π Evaluating {len(models)} models on {len(languages or languages_config.all)} languages") |
|
|
logger.info(f"β‘ Benchmarking: {'Disabled' if skip_benchmark else 'Enabled'}") |
|
|
|
|
|
|
|
|
models_to_evaluate = [] |
|
|
skipped_models = [] |
|
|
all_results = [] |
|
|
|
|
|
for model_path in models: |
|
|
model_name = Path(model_path).name |
|
|
existing_results = check_existing_results(model_name) |
|
|
|
|
|
if existing_results: |
|
|
logger.info(f"β
Model {model_name} already evaluated, skipping") |
|
|
all_results.append(existing_results) |
|
|
skipped_models.append(model_name) |
|
|
else: |
|
|
models_to_evaluate.append(model_path) |
|
|
|
|
|
if not models_to_evaluate: |
|
|
logger.info("π All models already evaluated!") |
|
|
return all_results |
|
|
|
|
|
logger.info(f"π Need to evaluate {len(models_to_evaluate)} models") |
|
|
|
|
|
if use_beam: |
|
|
|
|
|
new_results = _run_beam_evaluation(models_to_evaluate, max_queries, languages, skip_benchmark) |
|
|
else: |
|
|
|
|
|
new_results = _run_local_evaluation(models_to_evaluate, max_queries, languages, skip_benchmark) |
|
|
|
|
|
all_results.extend(new_results) |
|
|
|
|
|
|
|
|
if len(all_results) > 1: |
|
|
create_comparison_report(all_results) |
|
|
|
|
|
|
|
|
newly_evaluated = len(new_results) |
|
|
logger.info(f"\n{'=' * 60}") |
|
|
logger.info("π EVALUATION SUMMARY") |
|
|
logger.info(f"{'=' * 60}") |
|
|
logger.info(f"π Total models: {len(models)}") |
|
|
logger.info(f"β
Newly evaluated: {newly_evaluated}") |
|
|
logger.info(f"βοΈ Skipped (already done): {len(skipped_models)}") |
|
|
logger.info(f"π― Total results: {len(all_results)}") |
|
|
logger.info(f"β‘ Benchmarking: {'Disabled' if skip_benchmark else 'Enabled'}") |
|
|
|
|
|
return all_results |
|
|
|
|
|
|
|
|
def _run_local_evaluation( |
|
|
models: list[str], |
|
|
max_queries: int = 100, |
|
|
languages: list[str] | None = None, |
|
|
skip_benchmark: bool = False, |
|
|
) -> list[dict[str, Any]]: |
|
|
"""Run comprehensive evaluation locally.""" |
|
|
logger.info("π₯οΈ Running local comprehensive evaluation") |
|
|
|
|
|
results = [] |
|
|
for model_path in models: |
|
|
model_name = Path(model_path).name |
|
|
|
|
|
logger.info(f"\n{'=' * 60}") |
|
|
logger.info(f"π Evaluating model: {model_name}") |
|
|
logger.info(f"{'=' * 60}") |
|
|
|
|
|
try: |
|
|
evaluator = ComprehensiveModelEvaluator(model_path, model_name) |
|
|
result = evaluator.run_comprehensive_evaluation(max_queries, languages, skip_benchmark) |
|
|
|
|
|
|
|
|
save_evaluation_results(result) |
|
|
print_results_summary(result) |
|
|
results.append(result) |
|
|
|
|
|
except Exception: |
|
|
logger.exception(f"β Failed to evaluate {model_name}") |
|
|
continue |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
@function(**get_evaluation_function_kwargs()) |
|
|
def _beam_evaluate_single_model( |
|
|
model_path: str, |
|
|
max_queries: int = 100, |
|
|
languages: list[str] | None = None, |
|
|
skip_benchmark: bool = False, |
|
|
) -> dict[str, Any]: |
|
|
"""Beam function to comprehensively evaluate a single model.""" |
|
|
|
|
|
|
|
|
import os |
|
|
|
|
|
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" |
|
|
|
|
|
model_name = Path(model_path).name |
|
|
logger.info(f"π Beam comprehensive evaluation starting for {model_name}") |
|
|
|
|
|
|
|
|
try: |
|
|
import torch |
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
torch.cuda.reset_peak_memory_stats() |
|
|
logger.info( |
|
|
f"π§Ή Cleared CUDA cache. Available memory: {torch.cuda.get_device_properties(0).total_memory // (1024**3)} GB" |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Could not clear CUDA cache: {e}") |
|
|
|
|
|
try: |
|
|
logger.info("π§ Creating ComprehensiveModelEvaluator...") |
|
|
evaluator = ComprehensiveModelEvaluator(model_path, model_name) |
|
|
logger.info("β
ComprehensiveModelEvaluator created successfully") |
|
|
|
|
|
logger.info("π Starting comprehensive evaluation...") |
|
|
results = evaluator.run_comprehensive_evaluation(max_queries, languages, skip_benchmark) |
|
|
logger.info("β
Comprehensive evaluation completed") |
|
|
|
|
|
|
|
|
if not results or "model_name" not in results: |
|
|
logger.error(f"β Invalid evaluation results for {model_name}: {results}") |
|
|
return {"error": "Invalid evaluation results", "model_name": model_name} |
|
|
|
|
|
|
|
|
logger.info("πΎ Saving results to Beam volume...") |
|
|
volume_results_dir = Path(VOLUME_CONFIG.mount_path) / "evaluation_results" |
|
|
volume_results_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
safe_model_name = get_safe_model_name(model_name) |
|
|
result_file = volume_results_dir / f"comprehensive_eval_{safe_model_name}.json" |
|
|
|
|
|
with result_file.open("w") as f: |
|
|
json.dump(results, f, indent=2, default=str) |
|
|
|
|
|
logger.info(f"πΎ Saved Beam comprehensive evaluation results for {model_name} to {result_file}") |
|
|
logger.info(f"π― Final results summary: {len(results.get('codesearch_languages', {}))} languages evaluated") |
|
|
|
|
|
return results |
|
|
|
|
|
except (torch.OutOfMemoryError, RuntimeError, AssertionError) as e: |
|
|
|
|
|
is_oom = isinstance(e, torch.OutOfMemoryError) or "out of memory" in str(e).lower() |
|
|
is_cuda_error = is_oom or "cuda" in str(e).lower() or "INTERNAL ASSERT FAILED" in str(e) or "handles_" in str(e) |
|
|
|
|
|
if is_cuda_error: |
|
|
error_type = "CUDA OOM" if is_oom else "CUDA Error" |
|
|
logger.exception(f"β {error_type} during evaluation of {model_name}") |
|
|
|
|
|
|
|
|
with contextlib.suppress(Exception): |
|
|
reset_cuda_state() |
|
|
|
|
|
return { |
|
|
"error": f"{error_type}: {e!s}", |
|
|
"model_name": model_name, |
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"evaluation_failed": True, |
|
|
"oom": is_oom, |
|
|
"cuda_error": True, |
|
|
} |
|
|
|
|
|
raise |
|
|
except Exception as e: |
|
|
logger.exception(f"β Beam comprehensive evaluation failed for {model_name}") |
|
|
|
|
|
error_result = { |
|
|
"error": str(e), |
|
|
"model_name": model_name, |
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"evaluation_failed": True, |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
volume_results_dir = Path(VOLUME_CONFIG.mount_path) / "evaluation_results" |
|
|
volume_results_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
safe_model_name = get_safe_model_name(model_name) |
|
|
error_file = volume_results_dir / f"error_eval_{safe_model_name}.json" |
|
|
|
|
|
with error_file.open("w") as f: |
|
|
json.dump(error_result, f, indent=2, default=str) |
|
|
|
|
|
logger.info(f"πΎ Saved error info to {error_file}") |
|
|
except Exception: |
|
|
logger.exception("β Could not save error info") |
|
|
|
|
|
return error_result |
|
|
|
|
|
|
|
|
def _run_beam_evaluation( |
|
|
models: list[str], |
|
|
max_queries: int = 100, |
|
|
languages: list[str] | None = None, |
|
|
skip_benchmark: bool = False, |
|
|
) -> list[dict[str, Any]]: |
|
|
"""Run comprehensive evaluation on Beam and download results.""" |
|
|
logger.info("βοΈ Running Beam comprehensive evaluation") |
|
|
|
|
|
results = [] |
|
|
for model_path in models: |
|
|
model_name = Path(model_path).name |
|
|
|
|
|
logger.info(f"π Starting Beam comprehensive evaluation for {model_name}") |
|
|
|
|
|
try: |
|
|
|
|
|
result = _beam_evaluate_single_model.remote(model_path, max_queries, languages, skip_benchmark) |
|
|
|
|
|
if result: |
|
|
|
|
|
if result.get("evaluation_failed", False): |
|
|
logger.error(f"β Beam evaluation failed for {model_name}: {result.get('error', 'Unknown error')}") |
|
|
if result.get("oom", False): |
|
|
logger.error("π₯ Out of memory error - model may be too large for available GPU") |
|
|
continue |
|
|
|
|
|
|
|
|
success = download_specific_evaluation_file( |
|
|
VOLUME_CONFIG.name, |
|
|
model_name, |
|
|
"evaluation_results", |
|
|
LOCAL_EVALUATION_DIR, |
|
|
file_prefix="comprehensive_eval", |
|
|
) |
|
|
|
|
|
if success: |
|
|
logger.info(f"π₯ Downloaded comprehensive results for {model_name}") |
|
|
print_results_summary(result) |
|
|
results.append(result) |
|
|
else: |
|
|
logger.warning(f"β οΈ Could not download results for {model_name}") |
|
|
else: |
|
|
logger.warning(f"β οΈ No result returned for {model_name}") |
|
|
|
|
|
except Exception: |
|
|
logger.exception(f"β Beam comprehensive evaluation failed for {model_name}") |
|
|
continue |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main( |
|
|
use_beam: bool = typer.Option(default=False, help="Use Beam for evaluation"), |
|
|
skip_third_party: bool = typer.Option(default=False, help="Skip third-party models"), |
|
|
skip_benchmark: bool = typer.Option(default=False, help="Skip performance benchmarking"), |
|
|
max_queries: int = typer.Option(default=100, help="Maximum queries per language"), |
|
|
) -> None: |
|
|
"""Main comprehensive evaluation function.""" |
|
|
logger.info("π Starting comprehensive model evaluation (CodeSearchNet + Performance)") |
|
|
|
|
|
|
|
|
models = [] |
|
|
|
|
|
|
|
|
if not skip_third_party: |
|
|
logger.info("π Including third-party peer models for comparison") |
|
|
models.extend(DEFAULT_EVALUATION_MODELS) |
|
|
else: |
|
|
logger.info("βοΈ Skipping third-party models") |
|
|
|
|
|
|
|
|
logger.info("π Discovering local distillation models...") |
|
|
local_models = discover_local_models() |
|
|
|
|
|
if local_models: |
|
|
logger.info(f"β
Found {len(local_models)} local models:") |
|
|
for model_path in local_models: |
|
|
models.append(model_path) |
|
|
logger.info(f" π {Path(model_path).name}") |
|
|
else: |
|
|
logger.warning("β οΈ No local distillation models found") |
|
|
if skip_third_party: |
|
|
logger.error("β No models to evaluate!") |
|
|
return |
|
|
|
|
|
if not models: |
|
|
logger.error("β No models to evaluate!") |
|
|
return |
|
|
|
|
|
logger.info(f"π Will evaluate {len(models)} models:") |
|
|
for i, model in enumerate(models, 1): |
|
|
logger.info(f" {i}. {Path(model).name}") |
|
|
|
|
|
|
|
|
results = run_evaluation( |
|
|
models=models, |
|
|
max_queries=max_queries, |
|
|
languages=languages_config.all, |
|
|
use_beam=use_beam, |
|
|
skip_benchmark=skip_benchmark, |
|
|
) |
|
|
|
|
|
logger.info("π Comprehensive evaluation workflow completed!") |
|
|
logger.info(f"π Successfully evaluated {len(results)} models") |
|
|
logger.info(f"πΎ Results saved to: {LOCAL_EVALUATION_DIR}") |
|
|
logger.info("π Format: Single comprehensive JSON per model (CodeSearchNet + Benchmarks)") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
typer.run(main) |
|
|
|