| | |
| | |
| | |
| |
|
| | import os |
| | import re |
| | import torch |
| | from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig |
| | from peft import PeftModel |
| | from human_eval.data import write_jsonl, read_problems |
| | from human_eval.evaluation import evaluate_functional_correctness |
| | import tempfile |
| | import json |
| | from tqdm import tqdm |
| |
|
| | print("="*60) |
| | print("EVALUATION v2: Base vs Fine-tuned on HumanEval") |
| | print("Using correct Instruct format for fine-tuned model") |
| | print("="*60) |
| |
|
| | |
| | BASE_MODEL = "mistralai/Devstral-Small-2505" |
| | FINETUNED_MODEL = "stmasson/alizee-coder-devstral-1-small" |
| | NUM_SAMPLES = 1 |
| | TEMPERATURE = 0.1 |
| | MAX_NEW_TOKENS = 1024 |
| |
|
| | |
| | bnb_config = BitsAndBytesConfig( |
| | load_in_4bit=True, |
| | bnb_4bit_quant_type="nf4", |
| | bnb_4bit_compute_dtype=torch.bfloat16, |
| | bnb_4bit_use_double_quant=True, |
| | ) |
| |
|
| | def load_model(model_name, adapter_name=None): |
| | """Load model with optional LoRA adapter""" |
| | print(f"\nLoading model: {model_name}") |
| | if adapter_name: |
| | print(f"With adapter: {adapter_name}") |
| |
|
| | tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) |
| | if tokenizer.pad_token is None: |
| | tokenizer.pad_token = tokenizer.eos_token |
| |
|
| | model = AutoModelForCausalLM.from_pretrained( |
| | model_name, |
| | quantization_config=bnb_config, |
| | device_map="auto", |
| | trust_remote_code=True, |
| | torch_dtype=torch.bfloat16, |
| | ) |
| |
|
| | if adapter_name: |
| | model = PeftModel.from_pretrained(model, adapter_name) |
| | model = model.merge_and_unload() |
| |
|
| | model.eval() |
| | return model, tokenizer |
| |
|
| | def extract_python_code(text): |
| | """Extract Python code from model output""" |
| | |
| | pattern = r'```python\s*(.*?)\s*```' |
| | matches = re.findall(pattern, text, re.DOTALL) |
| | if matches: |
| | return matches[-1].strip() |
| |
|
| | |
| | pattern = r'```\s*(.*?)\s*```' |
| | matches = re.findall(pattern, text, re.DOTALL) |
| | if matches: |
| | return matches[-1].strip() |
| |
|
| | |
| | markers = ["**Solution:**", "Solution:", "```"] |
| | for marker in markers: |
| | if marker in text: |
| | code_part = text.split(marker)[-1] |
| | |
| | code_part = code_part.replace("```", "").strip() |
| | if code_part: |
| | return code_part |
| |
|
| | |
| | return text.strip() |
| |
|
| | def generate_completion_base(model, tokenizer, prompt): |
| | """Generate code completion for BASE model (direct completion)""" |
| | inputs = tokenizer(prompt, return_tensors="pt").to(model.device) |
| |
|
| | with torch.no_grad(): |
| | outputs = model.generate( |
| | **inputs, |
| | max_new_tokens=512, |
| | temperature=TEMPERATURE, |
| | do_sample=True if TEMPERATURE > 0 else False, |
| | pad_token_id=tokenizer.pad_token_id, |
| | eos_token_id=tokenizer.eos_token_id, |
| | ) |
| |
|
| | completion = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True) |
| |
|
| | |
| | stop_tokens = ["\ndef ", "\nclass ", "\nif __name__", "\n\n\n"] |
| | for stop in stop_tokens: |
| | if stop in completion: |
| | completion = completion[:completion.index(stop)] |
| |
|
| | return completion |
| |
|
| | def generate_completion_finetuned(model, tokenizer, prompt, problem_text): |
| | """Generate code completion for FINE-TUNED model (Instruct format)""" |
| | |
| | instruct_prompt = f"<s>[INST] Solve this programming problem with detailed reasoning:\n\n{problem_text}\n\nComplete the following function:\n{prompt}\n[/INST]" |
| |
|
| | inputs = tokenizer(instruct_prompt, return_tensors="pt").to(model.device) |
| |
|
| | with torch.no_grad(): |
| | outputs = model.generate( |
| | **inputs, |
| | max_new_tokens=MAX_NEW_TOKENS, |
| | temperature=TEMPERATURE, |
| | do_sample=True if TEMPERATURE > 0 else False, |
| | pad_token_id=tokenizer.pad_token_id, |
| | eos_token_id=tokenizer.eos_token_id, |
| | ) |
| |
|
| | full_response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True) |
| |
|
| | |
| | code = extract_python_code(full_response) |
| |
|
| | |
| | if "def " in code: |
| | |
| | lines = code.split('\n') |
| | result_lines = [] |
| | in_function = False |
| | for line in lines: |
| | if line.strip().startswith("def "): |
| | in_function = True |
| | continue |
| | if in_function: |
| | result_lines.append(line) |
| | if result_lines: |
| | return '\n'.join(result_lines) |
| |
|
| | return code |
| |
|
| | def evaluate_model(model, tokenizer, problems, model_name, is_finetuned=False): |
| | """Evaluate model on HumanEval""" |
| | print(f"\nEvaluating {model_name}...") |
| | samples = [] |
| |
|
| | for task_id, problem in tqdm(problems.items(), desc=f"Generating ({model_name})"): |
| | prompt = problem["prompt"] |
| |
|
| | for _ in range(NUM_SAMPLES): |
| | if is_finetuned: |
| | |
| | completion = generate_completion_finetuned(model, tokenizer, prompt, problem.get("prompt", "")) |
| | else: |
| | |
| | completion = generate_completion_base(model, tokenizer, prompt) |
| |
|
| | samples.append({ |
| | "task_id": task_id, |
| | "completion": completion |
| | }) |
| |
|
| | |
| | with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f: |
| | sample_file = f.name |
| | write_jsonl(sample_file, samples) |
| |
|
| | results = evaluate_functional_correctness(sample_file, k=[1]) |
| | os.unlink(sample_file) |
| |
|
| | return results |
| |
|
| | def main(): |
| | |
| | print("\nLoading HumanEval problems...") |
| | problems = read_problems() |
| | print(f"Total problems: {len(problems)}") |
| |
|
| | results = {} |
| |
|
| | |
| | print("\n" + "="*60) |
| | print("EVALUATING BASE MODEL (direct completion)") |
| | print("="*60) |
| | base_model, base_tokenizer = load_model(BASE_MODEL) |
| | results["base"] = evaluate_model(base_model, base_tokenizer, problems, "Devstral-Small (Base)", is_finetuned=False) |
| | print(f"\nBase Model Results: {results['base']}") |
| |
|
| | |
| | del base_model |
| | torch.cuda.empty_cache() |
| |
|
| | |
| | print("\n" + "="*60) |
| | print("EVALUATING FINE-TUNED MODEL (Instruct format)") |
| | print("="*60) |
| | ft_model, ft_tokenizer = load_model(BASE_MODEL, FINETUNED_MODEL) |
| | results["finetuned"] = evaluate_model(ft_model, ft_tokenizer, problems, "Alizee-Coder (Fine-tuned)", is_finetuned=True) |
| | print(f"\nFine-tuned Model Results: {results['finetuned']}") |
| |
|
| | |
| | print("\n" + "="*60) |
| | print("COMPARISON SUMMARY (v2 - Correct Prompt Format)") |
| | print("="*60) |
| | print(f"\n{'Model':<45} {'pass@1':>10}") |
| | print("-"*57) |
| | print(f"{'Devstral-Small-2505 (Base)':<45} {results['base']['pass@1']*100:>9.2f}%") |
| | print(f"{'Alizee-Coder-Devstral (Fine-tuned+Instruct)':<45} {results['finetuned']['pass@1']*100:>9.2f}%") |
| |
|
| | improvement = (results['finetuned']['pass@1'] - results['base']['pass@1']) * 100 |
| | sign = "+" if improvement >= 0 else "" |
| | print(f"\n{'Improvement:':<45} {sign}{improvement:>9.2f}%") |
| |
|
| | |
| | with open("eval_results_v2.json", "w") as f: |
| | json.dump({ |
| | "base_pass@1": float(results['base']['pass@1']), |
| | "finetuned_pass@1": float(results['finetuned']['pass@1']), |
| | "improvement": float(improvement) |
| | }, f, indent=2) |
| | print("\nResults saved to eval_results_v2.json") |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|