| |
| """Codette LoRA Adapter Test Suite |
| |
| Tests the newton and davinci adapters: |
| 1. Weight inspection (no base model needed) |
| 2. Full inference comparison (loads base model) |
| |
| Hardware: Intel Arc 140V (8GB XPU) + 16GB RAM |
| Strategy: CPU float16 inference with LoRA merge |
| """ |
|
|
| import os, sys, json, time |
|
|
| |
| os.environ["PATH"] = r"J:\Lib\site-packages\Library\bin" + os.pathsep + os.environ.get("PATH", "") |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
| import torch |
| import safetensors.torch as st |
| from pathlib import Path |
|
|
| ADAPTER_DIR = Path("J:/codette-training-lab/adapters/hf_download") |
| NEWTON_DIR = ADAPTER_DIR / "newton" |
| DAVINCI_DIR = ADAPTER_DIR / "davinci" |
| BASE_MODEL = "meta-llama/Llama-3.1-8B-Instruct" |
|
|
| |
| |
| |
| def phase1_weight_inspection(): |
| print("=" * 60) |
| print("PHASE 1: Adapter Weight Inspection") |
| print("=" * 60) |
|
|
| for name, adapter_dir in [("newton", NEWTON_DIR), ("davinci", DAVINCI_DIR)]: |
| print(f"\n--- {name.upper()} Adapter ---") |
|
|
| |
| with open(adapter_dir / "adapter_config.json") as f: |
| config = json.load(f) |
| print(f" Base model: {config['base_model_name_or_path']}") |
| print(f" LoRA rank: {config['r']}, alpha: {config['lora_alpha']}") |
| print(f" Targets: {config['target_modules']}") |
| print(f" PEFT version: {config['peft_version']}") |
|
|
| |
| weights = st.load_file(str(adapter_dir / "adapter_model.safetensors")) |
| print(f" Weight tensors: {len(weights)}") |
|
|
| total_params = 0 |
| layer_stats = {} |
| for key, tensor in sorted(weights.items()): |
| params = tensor.numel() |
| total_params += params |
| mean = tensor.float().mean().item() |
| std = tensor.float().std().item() |
| abs_mean = tensor.float().abs().mean().item() |
| nonzero = (tensor != 0).float().mean().item() * 100 |
|
|
| |
| if "lora_A" in key: |
| ltype = "lora_A" |
| elif "lora_B" in key: |
| ltype = "lora_B" |
| else: |
| ltype = "other" |
|
|
| if ltype not in layer_stats: |
| layer_stats[ltype] = {"count": 0, "means": [], "stds": [], "abs_means": []} |
| layer_stats[ltype]["count"] += 1 |
| layer_stats[ltype]["means"].append(mean) |
| layer_stats[ltype]["stds"].append(std) |
| layer_stats[ltype]["abs_means"].append(abs_mean) |
|
|
| print(f" Total LoRA params: {total_params:,}") |
| print(f" File size: {(adapter_dir / 'adapter_model.safetensors').stat().st_size / 1024**2:.1f} MB") |
|
|
| for ltype, stats in layer_stats.items(): |
| avg_mean = sum(stats["means"]) / len(stats["means"]) |
| avg_std = sum(stats["stds"]) / len(stats["stds"]) |
| avg_abs = sum(stats["abs_means"]) / len(stats["abs_means"]) |
| print(f" {ltype} ({stats['count']} tensors):") |
| print(f" avg mean={avg_mean:.6f}, avg std={avg_std:.6f}, avg |w|={avg_abs:.6f}") |
|
|
| |
| print(f"\n--- Weight Divergence (newton vs davinci) ---") |
| newton_w = st.load_file(str(NEWTON_DIR / "adapter_model.safetensors")) |
| davinci_w = st.load_file(str(DAVINCI_DIR / "adapter_model.safetensors")) |
|
|
| divergences = [] |
| for key in sorted(newton_w.keys()): |
| if key in davinci_w: |
| diff = (newton_w[key].float() - davinci_w[key].float()).abs().mean().item() |
| divergences.append((key.split(".")[-2] + "." + key.split(".")[-1], diff)) |
|
|
| divergences.sort(key=lambda x: x[1], reverse=True) |
| print(f" Total shared keys: {len(divergences)}") |
| print(f" Top 5 most divergent layers:") |
| for name, div in divergences[:5]: |
| print(f" {name}: {div:.6f}") |
| avg_div = sum(d for _, d in divergences) / len(divergences) |
| print(f" Average divergence: {avg_div:.6f}") |
|
|
| if avg_div > 0.001: |
| print(f" PASS: Adapters learned distinct representations (div={avg_div:.6f} >> 0)") |
| else: |
| print(f" WARN: Adapters may be too similar (div={avg_div:.6f})") |
|
|
| return True |
|
|
|
|
| |
| |
| |
| def phase2_inference_test(): |
| print(f"\n{'=' * 60}") |
| print("PHASE 2: Full Inference Test") |
| print("=" * 60) |
|
|
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| from peft import PeftModel |
| import gc |
|
|
| |
| print("Loading tokenizer...") |
| tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL) |
| if tokenizer.pad_token is None: |
| tokenizer.pad_token = tokenizer.eos_token |
|
|
| |
| print("Loading base model (CPU + disk offload, float16)...") |
| os.makedirs("J:/tmp/offload", exist_ok=True) |
| start = time.time() |
| model = AutoModelForCausalLM.from_pretrained( |
| BASE_MODEL, |
| dtype=torch.float16, |
| device_map={ |
| "": "cpu", |
| }, |
| low_cpu_mem_usage=True, |
| ) |
| print(f" Base model loaded in {time.time()-start:.0f}s") |
|
|
| |
| test_prompt = "Explain why objects fall to the ground." |
| messages = [ |
| {"role": "system", "content": "You are a helpful assistant. Answer concisely in 2-3 sentences."}, |
| {"role": "user", "content": test_prompt}, |
| ] |
| input_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
| inputs = tokenizer(input_text, return_tensors="pt") |
|
|
| gen_kwargs = dict( |
| max_new_tokens=150, |
| temperature=0.7, |
| top_p=0.9, |
| do_sample=True, |
| pad_token_id=tokenizer.eos_token_id, |
| ) |
|
|
| |
| print(f"\n--- BASE MODEL (no adapter) ---") |
| print(f"Prompt: {test_prompt}") |
| start = time.time() |
| with torch.no_grad(): |
| output = model.generate(**inputs, **gen_kwargs) |
| base_response = tokenizer.decode(output[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True) |
| print(f"Response ({time.time()-start:.1f}s): {base_response}") |
|
|
| |
| print(f"\n--- NEWTON ADAPTER ---") |
| print("Loading newton adapter...") |
| start = time.time() |
| newton_model = PeftModel.from_pretrained(model, str(NEWTON_DIR)) |
| newton_model.eval() |
| print(f" Adapter loaded in {time.time()-start:.1f}s") |
|
|
| start = time.time() |
| with torch.no_grad(): |
| output = newton_model.generate(**inputs, **gen_kwargs) |
| newton_response = tokenizer.decode(output[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True) |
| print(f"Response ({time.time()-start:.1f}s): {newton_response}") |
|
|
| |
| del newton_model |
| import gc; gc.collect() |
|
|
| |
| print(f"\n--- DAVINCI ADAPTER ---") |
| print("Loading davinci adapter...") |
| start = time.time() |
| davinci_model = PeftModel.from_pretrained(model, str(DAVINCI_DIR)) |
| davinci_model.eval() |
| print(f" Adapter loaded in {time.time()-start:.1f}s") |
|
|
| start = time.time() |
| with torch.no_grad(): |
| output = davinci_model.generate(**inputs, **gen_kwargs) |
| davinci_response = tokenizer.decode(output[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True) |
| print(f"Response ({time.time()-start:.1f}s): {davinci_response}") |
|
|
| del davinci_model |
| gc.collect() |
|
|
| |
| test_prompt2 = "What is the relationship between consciousness and the physical world?" |
| messages2 = [ |
| {"role": "system", "content": "You are a helpful assistant. Answer concisely in 2-3 sentences."}, |
| {"role": "user", "content": test_prompt2}, |
| ] |
| input_text2 = tokenizer.apply_chat_template(messages2, tokenize=False, add_generation_prompt=True) |
| inputs2 = tokenizer(input_text2, return_tensors="pt") |
|
|
| print(f"\n{'=' * 60}") |
| print(f"TEST 2: {test_prompt2}") |
| print(f"{'=' * 60}") |
|
|
| |
| print(f"\n--- NEWTON on consciousness ---") |
| newton_model = PeftModel.from_pretrained(model, str(NEWTON_DIR)) |
| newton_model.eval() |
| start = time.time() |
| with torch.no_grad(): |
| output = newton_model.generate(**inputs2, **gen_kwargs) |
| response = tokenizer.decode(output[0][inputs2["input_ids"].shape[1]:], skip_special_tokens=True) |
| print(f"Response ({time.time()-start:.1f}s): {response}") |
| del newton_model; gc.collect() |
|
|
| |
| print(f"\n--- DAVINCI on consciousness ---") |
| davinci_model = PeftModel.from_pretrained(model, str(DAVINCI_DIR)) |
| davinci_model.eval() |
| start = time.time() |
| with torch.no_grad(): |
| output = davinci_model.generate(**inputs2, **gen_kwargs) |
| response = tokenizer.decode(output[0][inputs2["input_ids"].shape[1]:], skip_special_tokens=True) |
| print(f"Response ({time.time()-start:.1f}s): {response}") |
| del davinci_model; gc.collect() |
|
|
| |
| del model |
| gc.collect() |
|
|
| print(f"\n{'=' * 60}") |
| print("INFERENCE TESTS COMPLETE") |
| print(f"{'=' * 60}") |
| return True |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| print("Codette LoRA Adapter Test Suite") |
| print(f"PyTorch: {torch.__version__}") |
| print(f"XPU: {torch.xpu.is_available()}") |
| print(f"Adapters: {ADAPTER_DIR}") |
| print() |
|
|
| |
| phase1_weight_inspection() |
|
|
| |
| print("\n" + "=" * 60) |
| if "--inference" in sys.argv or "--full" in sys.argv: |
| phase2_inference_test() |
| else: |
| print("Skipping inference test (run with --inference to enable)") |
| print(" Note: Will download ~16GB base model and needs ~16GB RAM") |
|
|