| |
| """ |
| Zero-shot benchmark evaluation on Hugging Face infrastructure |
| """ |
| import torch |
| import json |
| import yaml |
| from transformers import ( |
| AutoTokenizer, |
| AutoModelForCausalLM, |
| pipeline, |
| BitsAndBytesConfig |
| ) |
| from datasets import load_dataset |
| import numpy as np |
| from typing import Dict, List, Tuple |
| import logging |
| import re |
| from pathlib import Path |
| import os |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class HFZeroShotBenchmark: |
| def __init__(self): |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| logger.info(f"Using device: {self.device}") |
| |
| |
| self.models = { |
| "biomistral_7b": { |
| "name": "BioMistral/BioMistral-7B", |
| "type": "causal_lm" |
| }, |
| "qwen3_7b": { |
| "name": "Qwen/Qwen2.5-7B-Instruct", |
| "type": "causal_lm" |
| }, |
| "meditron_7b": { |
| "name": "epfl-llm/meditron-7b", |
| "type": "causal_lm" |
| }, |
| "internist_7b": { |
| "name": "internistai/internist-7b", |
| "type": "causal_lm" |
| } |
| } |
| |
| |
| self.quantization_config = BitsAndBytesConfig( |
| load_in_4bit=True, |
| bnb_4bit_compute_dtype=torch.float16, |
| bnb_4bit_use_double_quant=True, |
| bnb_4bit_quant_type="nf4" |
| ) |
| |
| def load_model(self, model_name: str, model_config: Dict) -> Tuple: |
| """Load model and tokenizer""" |
| logger.info(f"Loading model: {model_name}") |
| |
| try: |
| |
| tokenizer = AutoTokenizer.from_pretrained( |
| model_config['name'], |
| trust_remote_code=True |
| ) |
| |
| |
| if tokenizer.pad_token is None: |
| tokenizer.pad_token = tokenizer.eos_token |
| |
| |
| model = AutoModelForCausalLM.from_pretrained( |
| model_config['name'], |
| quantization_config=self.quantization_config if self.device == "cuda" else None, |
| device_map="auto" if self.device == "cuda" else None, |
| torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, |
| trust_remote_code=True |
| ) |
| |
| logger.info(f"Successfully loaded {model_name}") |
| return model, tokenizer |
| |
| except Exception as e: |
| logger.error(f"Failed to load {model_name}: {e}") |
| return None, None |
| |
| def create_prompt(self, question: str, options: List[str], model_name: str) -> str: |
| """Create prompt for different model types""" |
| options_text = "\n".join([f"{chr(65+i)}. {opt}" for i, opt in enumerate(options)]) |
| |
| if "qwen" in model_name.lower(): |
| return f"""<|im_start|>user |
| {question} |
| |
| {options_text} |
| |
| Please select the correct answer (A, B, C, D, or E).<|im_end|> |
| <|im_start|>assistant |
| The correct answer is""" |
| |
| elif "mistral" in model_name.lower() or "biomistral" in model_name.lower(): |
| return f"""<s>[INST] {question} |
| |
| {options_text} |
| |
| Please select the correct answer (A, B, C, D, or E). [/INST] The correct answer is""" |
| |
| else: |
| |
| return f"""Question: {question} |
| |
| {options_text} |
| |
| Answer:""" |
| |
| def extract_answer(self, text: str) -> str: |
| """Extract answer from model output""" |
| |
| patterns = [ |
| r'[Tt]he correct answer is ([A-E])', |
| r'[Aa]nswer: ([A-E])', |
| r'([A-E])\.', |
| r'^([A-E])\s*$' |
| ] |
| |
| for pattern in patterns: |
| match = re.search(pattern, text) |
| if match: |
| return match.group(1) |
| |
| |
| match = re.search(r'([A-E])', text) |
| if match: |
| return match.group(1) |
| |
| return "A" |
| |
| def evaluate_model(self, model_name: str, model_config: Dict, test_dataset) -> Dict: |
| """Evaluate a single model on the test dataset""" |
| logger.info(f"Evaluating {model_name}") |
| |
| model, tokenizer = self.load_model(model_name, model_config) |
| if model is None or tokenizer is None: |
| return {"error": f"Failed to load {model_name}"} |
| |
| |
| generator = pipeline( |
| "text-generation", |
| model=model, |
| tokenizer=tokenizer, |
| max_new_tokens=50, |
| temperature=0.1, |
| do_sample=False, |
| pad_token_id=tokenizer.eos_token_id |
| ) |
| |
| results = [] |
| correct = 0 |
| total = len(test_dataset) |
| |
| logger.info(f"Running evaluation on {total} examples") |
| |
| for i, example in enumerate(test_dataset): |
| try: |
| |
| prompt = self.create_prompt( |
| example['question'], |
| example['options'], |
| model_name |
| ) |
| |
| |
| response = generator(prompt, return_full_text=False) |
| generated_text = response[0]['generated_text'] |
| |
| |
| predicted_answer = self.extract_answer(generated_text) |
| true_answer = example['answer'] |
| |
| is_correct = predicted_answer == true_answer |
| if is_correct: |
| correct += 1 |
| |
| results.append({ |
| 'question_id': i, |
| 'question': example['question'], |
| 'options': example['options'], |
| 'true_answer': true_answer, |
| 'predicted_answer': predicted_answer, |
| 'generated_text': generated_text, |
| 'is_correct': is_correct |
| }) |
| |
| except Exception as e: |
| logger.error(f"Error processing example {i}: {e}") |
| results.append({ |
| 'question_id': i, |
| 'error': str(e), |
| 'is_correct': False |
| }) |
| |
| |
| accuracy = correct / total if total > 0 else 0 |
| |
| |
| option_accuracies = {} |
| for option in ['A', 'B', 'C', 'D', 'E']: |
| option_correct = sum(1 for r in results if r.get('true_answer') == option and r.get('is_correct', False)) |
| option_total = sum(1 for r in results if r.get('true_answer') == option) |
| option_accuracies[option] = option_correct / option_total if option_total > 0 else 0 |
| |
| metrics = { |
| 'model_name': model_name, |
| 'total_examples': total, |
| 'correct_predictions': correct, |
| 'accuracy': accuracy, |
| 'option_accuracies': option_accuracies |
| } |
| |
| logger.info(f"{model_name} - Accuracy: {accuracy:.4f}") |
| |
| |
| del model, tokenizer, generator |
| torch.cuda.empty_cache() if torch.cuda.is_available() else None |
| |
| return metrics |
| |
| def run_benchmark(self, test_dataset) -> Dict: |
| """Run benchmark on all models""" |
| results = {} |
| |
| for model_name, model_config in self.models.items(): |
| logger.info(f"Starting evaluation for {model_name}") |
| results[model_name] = self.evaluate_model(model_name, model_config, test_dataset) |
| |
| return results |
| |
| def save_results(self, results: Dict, output_path: str = "/tmp/zero_shot_results.json"): |
| """Save evaluation results""" |
| |
| serializable_results = {} |
| for model_name, result in results.items(): |
| if 'error' in result: |
| serializable_results[model_name] = result |
| else: |
| serializable_results[model_name] = { |
| 'model_name': result['model_name'], |
| 'total_examples': result['total_examples'], |
| 'correct_predictions': result['correct_predictions'], |
| 'accuracy': result['accuracy'], |
| 'option_accuracies': result['option_accuracies'] |
| } |
| |
| with open(output_path, 'w') as f: |
| json.dump(serializable_results, f, indent=2) |
| |
| logger.info(f"Results saved to {output_path}") |
| return output_path |
|
|
|
|
| def main(): |
| """Main function for HF job""" |
| logger.info("Starting zero-shot benchmark on Hugging Face infrastructure") |
| |
| |
| logger.info("Loading MedQA dataset...") |
| try: |
| |
| dataset_names = ["bigbio/med_qa", "medqa", "medqa_usmle"] |
| dataset = None |
| |
| for name in dataset_names: |
| try: |
| dataset = load_dataset(name) |
| logger.info(f"Loaded dataset: {name}") |
| break |
| except: |
| continue |
| |
| if dataset is None: |
| logger.error("Could not load MedQA dataset") |
| return |
| |
| |
| def process_example(example): |
| |
| if 'question' in example: |
| question = example['question'] |
| elif 'text' in example: |
| question = example['text'] |
| else: |
| question = example['input'] |
| |
| |
| if 'options' in example: |
| options = example['options'] |
| elif 'choices' in example: |
| options = example['choices'] |
| else: |
| |
| options = [] |
| for i in range(5): |
| key = f'option_{i}' if f'option_{i}' in example else f'choice_{i}' |
| if key in example: |
| options.append(example[key]) |
| |
| |
| if 'answer' in example: |
| answer = example['answer'] |
| elif 'label' in example: |
| answer = example['label'] |
| else: |
| answer = example['output'] |
| |
| return { |
| 'question': question, |
| 'options': options, |
| 'answer': answer |
| } |
| |
| |
| test_dataset = dataset['test'].map(process_example) |
| logger.info(f"Processed {len(test_dataset)} test examples") |
| |
| except Exception as e: |
| logger.error(f"Error loading dataset: {e}") |
| return |
| |
| |
| benchmark = HFZeroShotBenchmark() |
| |
| |
| logger.info("Starting zero-shot benchmark evaluation...") |
| results = benchmark.run_benchmark(test_dataset) |
| |
| |
| output_path = benchmark.save_results(results) |
| |
| |
| print("\n" + "="*50) |
| print("ZERO-SHOT BENCHMARK RESULTS") |
| print("="*50) |
| for model_name, result in results.items(): |
| if 'error' not in result: |
| print(f"{model_name}: {result['accuracy']:.4f} accuracy") |
| |
| |
| try: |
| from huggingface_hub import HfApi |
| api = HfApi() |
| |
| |
| repo_name = "medical-benchmark-results" |
| try: |
| api.create_repo(repo_name, exist_ok=True) |
| except: |
| pass |
| |
| |
| api.upload_file( |
| path_or_fileobj=output_path, |
| path_in_repo="zero_shot_benchmark.json", |
| repo_id=repo_name, |
| repo_type="dataset" |
| ) |
| logger.info(f"Results uploaded to {repo_name}/zero_shot_benchmark.json") |
| |
| except Exception as e: |
| logger.warning(f"Could not upload results to HF Hub: {e}") |
| |
| logger.info("Zero-shot benchmark completed!") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|