| """ |
| Evaluate the Pocket Automator real-world benchmark locally. |
| |
| Run: |
| python scripts/generate_pocket_benchmark.py |
| python -m src.evaluate_pocket_benchmark |
| python -m src.evaluate_pocket_benchmark --model-path ./trained_model/adapter |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| from pathlib import Path |
|
|
| import torch |
| from peft import PeftModel |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
|
|
| from src.classifier_prompt import build_intent_messages |
| from src.paths import TRAINED_MODEL_DIR |
| from src.pocket_benchmark import ( |
| BENCHMARK_PROMPTS_PATH, |
| load_benchmark_prompts, |
| record_result, |
| save_benchmark_outputs, |
| ) |
| from src.skill_utils import extract_intent |
|
|
| BASE_MODEL = "Qwen/Qwen2.5-3B-Instruct" |
| MAX_SEQ_LENGTH = 2048 |
| MAX_NEW_TOKENS = 128 |
| DEFAULT_MODEL_PATH = TRAINED_MODEL_DIR / "adapter" |
| FALLBACK_MODEL_PATH = TRAINED_MODEL_DIR / "merged" |
|
|
|
|
| def pick_device() -> str: |
| if torch.cuda.is_available(): |
| return "cuda" |
| if torch.backends.mps.is_available(): |
| return "mps" |
| return "cpu" |
|
|
|
|
| def is_adapter_path(model_path: Path) -> bool: |
| return (model_path / "adapter_config.json").exists() |
|
|
|
|
| def is_complete_merged_model(model_path: Path) -> bool: |
| config_path = model_path / "config.json" |
| if not config_path.exists() or config_path.stat().st_size == 0: |
| return False |
|
|
| index_path = model_path / "model.safetensors.index.json" |
| if index_path.exists(): |
| index = json.loads(index_path.read_text(encoding="utf-8")) |
| shard_names = set(index.get("weight_map", {}).values()) |
| return all( |
| (model_path / shard).exists() and (model_path / shard).stat().st_size > 0 |
| for shard in shard_names |
| ) |
|
|
| single_shard = model_path / "model.safetensors" |
| return single_shard.exists() and single_shard.stat().st_size > 0 |
|
|
|
|
| def resolve_model_path(path: str) -> Path: |
| model_path = Path(path) |
| if model_path.exists(): |
| if is_adapter_path(model_path): |
| return model_path |
| if is_complete_merged_model(model_path): |
| return model_path |
| print(f"Warning: {model_path} looks incomplete; trying adapter fallback.") |
|
|
| adapter_path = DEFAULT_MODEL_PATH |
| if adapter_path.exists() and is_adapter_path(adapter_path): |
| print(f"Using LoRA adapter at {adapter_path}") |
| return adapter_path |
|
|
| merged_path = FALLBACK_MODEL_PATH |
| if merged_path.exists() and is_complete_merged_model(merged_path): |
| print(f"Using merged model at {merged_path}") |
| return merged_path |
|
|
| raise FileNotFoundError( |
| "No usable trained model found. Expected a complete merged model or " |
| f"LoRA adapter at {DEFAULT_MODEL_PATH}." |
| ) |
|
|
|
|
| def load_model(model_path: Path, device: str): |
| dtype = torch.float16 if device in {"cuda", "mps"} else torch.float32 |
|
|
| if is_adapter_path(model_path): |
| print(f"Loading base model: {BASE_MODEL}") |
| tokenizer = AutoTokenizer.from_pretrained(model_path) |
| base_model = AutoModelForCausalLM.from_pretrained( |
| BASE_MODEL, |
| dtype=dtype, |
| low_cpu_mem_usage=True, |
| ) |
| model = PeftModel.from_pretrained(base_model, str(model_path)) |
| else: |
| print("Loading merged model weights") |
| tokenizer = AutoTokenizer.from_pretrained(model_path) |
| model = AutoModelForCausalLM.from_pretrained( |
| model_path, |
| dtype=dtype, |
| low_cpu_mem_usage=True, |
| ) |
|
|
| model.to(device) |
| model.eval() |
| return model, tokenizer |
|
|
|
|
| def generate_intent(model, tokenizer, prompt: str, device: str) -> str: |
| messages = build_intent_messages(prompt) |
| inputs = tokenizer.apply_chat_template( |
| messages, |
| tokenize=True, |
| add_generation_prompt=True, |
| return_tensors="pt", |
| ).to(device) |
|
|
| with torch.inference_mode(): |
| outputs = model.generate( |
| input_ids=inputs, |
| max_new_tokens=MAX_NEW_TOKENS, |
| use_cache=True, |
| do_sample=False, |
| ) |
|
|
| generated = outputs[0][inputs.shape[1] :] |
| return tokenizer.decode(generated, skip_special_tokens=True).strip() |
|
|
|
|
| def evaluate(model, tokenizer, cases: list[dict], device: str) -> list[dict]: |
| results: list[dict] = [] |
|
|
| for index, case in enumerate(cases, start=1): |
| prompt = case["prompt"] |
| raw_output = generate_intent(model, tokenizer, prompt, device) |
| predicted = extract_intent(raw_output) |
| result = record_result(case, raw_output, predicted) |
| results.append(result) |
|
|
| print(f"--- [{index}/{len(cases)}] {case.get('id', 'n/a')} ---") |
| print(f"Prompt: {prompt}") |
| print(f"Expected: {json.dumps(case['expected'], separators=(',', ':'))}") |
| print( |
| f"Predicted: {json.dumps(predicted, separators=(',', ':')) if predicted else raw_output}" |
| ) |
| print(f"Skill: {'PASS' if result['skill_correct'] else 'FAIL'}") |
| print(f"Params: {'PASS' if result['parameter_correct'] else 'FAIL'}") |
| print(f"Exact JSON:{'PASS' if result['exact_json_match'] else 'FAIL'}") |
| print() |
|
|
| return results |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Evaluate Pocket Automator benchmark.") |
| parser.add_argument( |
| "--model-path", |
| default=str(DEFAULT_MODEL_PATH), |
| help=f"Path to LoRA adapter or merged model (default: {DEFAULT_MODEL_PATH})", |
| ) |
| parser.add_argument( |
| "--benchmark-path", |
| default=str(BENCHMARK_PROMPTS_PATH), |
| help=f"Path to benchmark prompts JSON (default: {BENCHMARK_PROMPTS_PATH})", |
| ) |
| args = parser.parse_args() |
|
|
| benchmark_path = Path(args.benchmark_path) |
| if not benchmark_path.exists(): |
| raise FileNotFoundError( |
| f"Benchmark prompts not found at {benchmark_path}. " |
| "Run `python scripts/generate_pocket_benchmark.py` first." |
| ) |
|
|
| cases = load_benchmark_prompts(benchmark_path) |
| device = pick_device() |
| model_path = resolve_model_path(args.model_path) |
| print(f"Device: {device}") |
| print(f"Loading model from {model_path.resolve()}") |
| model, tokenizer = load_model(model_path, device) |
| print(f"Running Pocket Automator benchmark on {len(cases)} prompts...\n") |
|
|
| results = evaluate(model, tokenizer, cases, device) |
| metrics, report = save_benchmark_outputs(results) |
|
|
| print("--- Pocket Automator Benchmark Summary ---") |
| print(report) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|