""" Evaluate the Pocket Automator real-world benchmark locally. Run: python scripts/generate_pocket_benchmark.py python -m src.evaluate_pocket_benchmark python -m src.evaluate_pocket_benchmark --model-path ./trained_model/adapter """ from __future__ import annotations import argparse import json from pathlib import Path import torch from peft import PeftModel from transformers import AutoModelForCausalLM, AutoTokenizer from src.classifier_prompt import build_intent_messages from src.paths import TRAINED_MODEL_DIR from src.pocket_benchmark import ( BENCHMARK_PROMPTS_PATH, load_benchmark_prompts, record_result, save_benchmark_outputs, ) from src.skill_utils import extract_intent BASE_MODEL = "Qwen/Qwen2.5-3B-Instruct" MAX_SEQ_LENGTH = 2048 MAX_NEW_TOKENS = 128 DEFAULT_MODEL_PATH = TRAINED_MODEL_DIR / "adapter" FALLBACK_MODEL_PATH = TRAINED_MODEL_DIR / "merged" def pick_device() -> str: if torch.cuda.is_available(): return "cuda" if torch.backends.mps.is_available(): return "mps" return "cpu" def is_adapter_path(model_path: Path) -> bool: return (model_path / "adapter_config.json").exists() def is_complete_merged_model(model_path: Path) -> bool: config_path = model_path / "config.json" if not config_path.exists() or config_path.stat().st_size == 0: return False index_path = model_path / "model.safetensors.index.json" if index_path.exists(): index = json.loads(index_path.read_text(encoding="utf-8")) shard_names = set(index.get("weight_map", {}).values()) return all( (model_path / shard).exists() and (model_path / shard).stat().st_size > 0 for shard in shard_names ) single_shard = model_path / "model.safetensors" return single_shard.exists() and single_shard.stat().st_size > 0 def resolve_model_path(path: str) -> Path: model_path = Path(path) if model_path.exists(): if is_adapter_path(model_path): return model_path if is_complete_merged_model(model_path): return model_path print(f"Warning: {model_path} looks incomplete; trying adapter fallback.") adapter_path = DEFAULT_MODEL_PATH if adapter_path.exists() and is_adapter_path(adapter_path): print(f"Using LoRA adapter at {adapter_path}") return adapter_path merged_path = FALLBACK_MODEL_PATH if merged_path.exists() and is_complete_merged_model(merged_path): print(f"Using merged model at {merged_path}") return merged_path raise FileNotFoundError( "No usable trained model found. Expected a complete merged model or " f"LoRA adapter at {DEFAULT_MODEL_PATH}." ) def load_model(model_path: Path, device: str): dtype = torch.float16 if device in {"cuda", "mps"} else torch.float32 if is_adapter_path(model_path): print(f"Loading base model: {BASE_MODEL}") tokenizer = AutoTokenizer.from_pretrained(model_path) base_model = AutoModelForCausalLM.from_pretrained( BASE_MODEL, dtype=dtype, low_cpu_mem_usage=True, ) model = PeftModel.from_pretrained(base_model, str(model_path)) else: print("Loading merged model weights") tokenizer = AutoTokenizer.from_pretrained(model_path) model = AutoModelForCausalLM.from_pretrained( model_path, dtype=dtype, low_cpu_mem_usage=True, ) model.to(device) model.eval() return model, tokenizer def generate_intent(model, tokenizer, prompt: str, device: str) -> str: messages = build_intent_messages(prompt) inputs = tokenizer.apply_chat_template( messages, tokenize=True, add_generation_prompt=True, return_tensors="pt", ).to(device) with torch.inference_mode(): outputs = model.generate( input_ids=inputs, max_new_tokens=MAX_NEW_TOKENS, use_cache=True, do_sample=False, ) generated = outputs[0][inputs.shape[1] :] return tokenizer.decode(generated, skip_special_tokens=True).strip() def evaluate(model, tokenizer, cases: list[dict], device: str) -> list[dict]: results: list[dict] = [] for index, case in enumerate(cases, start=1): prompt = case["prompt"] raw_output = generate_intent(model, tokenizer, prompt, device) predicted = extract_intent(raw_output) result = record_result(case, raw_output, predicted) results.append(result) print(f"--- [{index}/{len(cases)}] {case.get('id', 'n/a')} ---") print(f"Prompt: {prompt}") print(f"Expected: {json.dumps(case['expected'], separators=(',', ':'))}") print( f"Predicted: {json.dumps(predicted, separators=(',', ':')) if predicted else raw_output}" ) print(f"Skill: {'PASS' if result['skill_correct'] else 'FAIL'}") print(f"Params: {'PASS' if result['parameter_correct'] else 'FAIL'}") print(f"Exact JSON:{'PASS' if result['exact_json_match'] else 'FAIL'}") print() return results def main() -> None: parser = argparse.ArgumentParser(description="Evaluate Pocket Automator benchmark.") parser.add_argument( "--model-path", default=str(DEFAULT_MODEL_PATH), help=f"Path to LoRA adapter or merged model (default: {DEFAULT_MODEL_PATH})", ) parser.add_argument( "--benchmark-path", default=str(BENCHMARK_PROMPTS_PATH), help=f"Path to benchmark prompts JSON (default: {BENCHMARK_PROMPTS_PATH})", ) args = parser.parse_args() benchmark_path = Path(args.benchmark_path) if not benchmark_path.exists(): raise FileNotFoundError( f"Benchmark prompts not found at {benchmark_path}. " "Run `python scripts/generate_pocket_benchmark.py` first." ) cases = load_benchmark_prompts(benchmark_path) device = pick_device() model_path = resolve_model_path(args.model_path) print(f"Device: {device}") print(f"Loading model from {model_path.resolve()}") model, tokenizer = load_model(model_path, device) print(f"Running Pocket Automator benchmark on {len(cases)} prompts...\n") results = evaluate(model, tokenizer, cases, device) metrics, report = save_benchmark_outputs(results) print("--- Pocket Automator Benchmark Summary ---") print(report) if __name__ == "__main__": main()