#!/usr/bin/env python3 """ Offline evaluation for DFlash-LoRA-Inject: measure accepted length & speedup. Aligned with official DFlash benchmark.py methodology. Unlike DFlash-b16 which uses a small 5-layer draft model with fc/hidden_norm, LoRA-Inject uses a full Qwen3-8B with LoRA adapters that receives target hidden states via layer-by-layer injection. Usage: conda activate spec # 8 GPU parallel (default, all 10 benchmarks) torchrun --nproc_per_node 8 eval_dflash_lora_inject.py # single GPU python3 eval_dflash_lora_inject.py # specific checkpoint / benchmark torchrun --nproc_per_node 8 eval_dflash_lora_inject.py --ckpt epoch_0_step_1000 --datasets humaneval # quick test torchrun --nproc_per_node 8 eval_dflash_lora_inject.py --max-samples 20 """ import argparse import json import os import random import sys import time import warnings from itertools import chain from types import SimpleNamespace from typing import List, Optional, Tuple import numpy as np import torch import torch.nn as nn import torch.distributed as dist from peft import PeftModel from tqdm import tqdm from transformers import AutoModelForCausalLM, AutoTokenizer, DynamicCache # Import official dataset loader sys.path.insert(0, "/workspace/hanrui/dflash") from model.utils import load_and_process_dataset # ────────────────────────────────────────────────────────────────── # Config defaults # ────────────────────────────────────────────────────────────────── BASE_MODEL = "/workspace/models/Qwen3-8B" ADAPTER_ROOT = "/workspace/hanrui/syxin/Specforge/outputs/qwen3-8b-dflash-lora-inject" DEFAULT_CKPT = "epoch_3_step_1400" MASK_TOKEN_ID = 151669 # Qwen3 <|mask|> BLOCK_SIZE = 16 RESULT_DIR = "/workspace/hanrui/syxin/Specforge/benchmarks/results" # Official benchmark tasks (from run_benchmark.sh) OFFICIAL_TASKS = { "gsm8k": 128, "math500": 128, "aime24": 30, "aime25": 30, "humaneval": 164, "mbpp": 128, "livecodebench": 128, "swe-bench": 128, "mt-bench": 80, "alpaca": 128, } # ────────────────────────────────────────────────────────────────── # CUDA-synchronised timer (matches official benchmark.py) # ────────────────────────────────────────────────────────────────── def cuda_time() -> float: torch.cuda.synchronize() return time.perf_counter() def has_flash_attn() -> bool: try: import flash_attn # noqa: F401 return True except ImportError: print("[WARN] flash_attn not installed, falling back to sdpa.") return False # ────────────────────────────────────────────────────────────────── # Distributed helpers (mirrors official distributed.py) # ────────────────────────────────────────────────────────────────── def dist_init(): if "RANK" not in os.environ: warnings.warn("RANK not set. Skipping distributed init.") return dist.init_process_group(backend="nccl", init_method="env://") def dist_rank(): return int(os.environ.get("RANK", 0)) def dist_size(): return int(os.environ.get("WORLD_SIZE", 1)) def dist_local_rank(): return int(os.environ.get("LOCAL_RANK", 0)) def dist_is_main(): return dist_rank() == 0 def dist_gather(obj, dst=0): if not dist.is_initialized(): return [obj] if dist_is_main(): objs = [None for _ in range(dist_size())] dist.gather_object(obj, objs, dst=dst) return objs else: dist.gather_object(obj, dst=dst) return None def print_rank0(*args, **kwargs): if dist_is_main(): print(*args, **kwargs) # ────────────────────────────────────────────────────────────────── # Sampling (matches official model/utils.py::sample) # ────────────────────────────────────────────────────────────────── def sample(logits: torch.Tensor, temperature: float = 0.0) -> torch.Tensor: if temperature < 1e-5: return torch.argmax(logits, dim=-1) bsz, seq_len, vocab_size = logits.shape logits = logits.view(-1, vocab_size) logits = logits / temperature probs = torch.softmax(logits, dim=-1) return torch.multinomial(probs, num_samples=1).view(bsz, seq_len) # ────────────────────────────────────────────────────────────────── # Build DFlash attention mask (vectorized, no Python loops) # ────────────────────────────────────────────────────────────────── def build_dflash_mask(ctx_len: int, block_size: int, device, dtype=torch.bfloat16): """ Build DFlash attention mask for [context | block] sequence. - Context part: standard causal - Block part: each token sees all context + all tokens in same block (bidirectional) """ full_len = ctx_len + block_size neg_inf = torch.finfo(dtype).min mask = torch.full((1, 1, full_len, full_len), neg_inf, device=device, dtype=dtype) if ctx_len > 0: ctx_rows = torch.arange(ctx_len, device=device) ctx_cols = torch.arange(ctx_len, device=device) causal = ctx_cols.unsqueeze(0) <= ctx_rows.unsqueeze(1) mask[0, 0, :ctx_len, :ctx_len].masked_fill_(causal, 0) if ctx_len > 0: mask[0, 0, ctx_len:, :ctx_len] = 0 mask[0, 0, ctx_len:, ctx_len:] = 0 return mask # ────────────────────────────────────────────────────────────────── # Pure autoregressive generation (target model only, no draft) # Used for AR baseline timing -- avoids inflating AR time with draft overhead. # ────────────────────────────────────────────────────────────────── @torch.inference_mode() def ar_generate( target_model: nn.Module, input_ids: torch.LongTensor, max_new_tokens: int = 2048, mask_token_id: int = MASK_TOKEN_ID, temperature: float = 0.0, stop_token_ids: Optional[List[int]] = None, ) -> SimpleNamespace: """ Pure autoregressive generation using only the target model. Mirrors official benchmark.py with block_size=1 (no draft model involved). Returns SimpleNamespace matching official dflash_generate output format. """ device = input_ids.device num_input_tokens = input_ids.shape[1] max_length = num_input_tokens + max_new_tokens output_ids = torch.full( (1, max_length + 1), mask_token_id, dtype=torch.long, device=device, ) output_ids[:, :num_input_tokens] = input_ids position_ids = torch.arange(output_ids.shape[1], device=device).unsqueeze(0) past_key_values = DynamicCache() # Prefill prefill_start = cuda_time() output = target_model( input_ids, position_ids=position_ids[:, :num_input_tokens], past_key_values=past_key_values, use_cache=True, logits_to_keep=1, output_hidden_states=False, ) first_token = sample(output.logits, temperature) output_ids[:, num_input_tokens:num_input_tokens + 1] = first_token time_to_first_token = cuda_time() - prefill_start # Decode (autoregressive, one token at a time) decode_start = cuda_time() start = num_input_tokens while start < max_length: cur_token = output_ids[:, start:start + 1] cur_pos = position_ids[:, start:start + 1] output = target_model( cur_token, position_ids=cur_pos, past_key_values=past_key_values, use_cache=True, output_hidden_states=False, ) next_token = sample(output.logits, temperature) start += 1 output_ids[:, start:start + 1] = next_token past_key_values.crop(start) # Check stop tokens (matches official: check all generated) if stop_token_ids is not None and any( sid in output_ids[:, num_input_tokens:] for sid in stop_token_ids ): break output_ids = output_ids[:, :max_length] output_ids = output_ids[:, output_ids[0] != mask_token_id] if stop_token_ids is not None: stop_t = torch.tensor(stop_token_ids, device=output_ids.device) stop_idx = torch.isin(output_ids[0][num_input_tokens:], stop_t).nonzero(as_tuple=True)[0] if stop_idx.numel() > 0: output_ids = output_ids[:, :num_input_tokens + stop_idx[0] + 1] num_output_tokens = output_ids.shape[1] - num_input_tokens total_decode_time = cuda_time() - decode_start time_per_output_token = total_decode_time / max(num_output_tokens, 1) return SimpleNamespace( output_ids=output_ids, num_input_tokens=num_input_tokens, num_output_tokens=num_output_tokens, time_to_first_token=time_to_first_token, time_per_output_token=time_per_output_token, acceptance_lengths=[1] * max(num_output_tokens, 0), # AR: always 1 ) # ────────────────────────────────────────────────────────────────── # Core: spec_generate with layer-by-layer injection (KV-cached) # ────────────────────────────────────────────────────────────────── @torch.inference_mode() def spec_generate_inject( target_model: nn.Module, draft_model: nn.Module, input_ids: torch.LongTensor, max_new_tokens: int = 2048, block_size: int = 16, mask_token_id: int = MASK_TOKEN_ID, temperature: float = 0.0, stop_token_ids: Optional[List[int]] = None, ) -> SimpleNamespace: """ Speculative generation using DFlash-LoRA-Inject inference pattern. Returns SimpleNamespace matching official dflash_generate output format. """ device = input_ids.device num_input_tokens = input_ids.shape[1] max_length = num_input_tokens + max_new_tokens draft_layers = draft_model.model.layers draft_norm = draft_model.model.norm draft_lm_head = draft_model.lm_head rotary_emb = draft_model.model.rotary_emb num_layers = len(draft_layers) output_ids = torch.full( (1, max_length + block_size), mask_token_id, dtype=torch.long, device=device, ) output_ids[:, :num_input_tokens] = input_ids # ── Prefill: target with KV cache + hidden states ── prefill_start = cuda_time() target_kv = DynamicCache() target_output = target_model( input_ids, past_key_values=target_kv, use_cache=True, output_hidden_states=True, ) first_token = sample(target_output.logits[:, -1:, :], temperature) output_ids[:, num_input_tokens] = first_token.squeeze() ctx_hidden_per_layer = [ target_output.hidden_states[i + 1] for i in range(num_layers) ] time_to_first_token = cuda_time() - prefill_start # Decode decode_start = cuda_time() acceptance_lengths = [] start = num_input_tokens draft_prefill = True while start < max_length: end = min(start + block_size, max_length) actual_block_size = end - start block_ids = output_ids[:, start:end].clone() # ── Draft: forward with layer-by-layer injection ── draft_hidden = draft_model.model.embed_tokens(block_ids) ctx_len = ctx_hidden_per_layer[0].shape[1] dflash_mask = build_dflash_mask(ctx_len, actual_block_size, device) combined_pos = torch.arange(ctx_len + actual_block_size, device=device).unsqueeze(0) dummy_combined = torch.empty(1, ctx_len + actual_block_size, draft_hidden.shape[-1], device=device, dtype=torch.bfloat16) position_embeddings = rotary_emb(dummy_combined, combined_pos) for layer_idx in range(num_layers): target_ctx = ctx_hidden_per_layer[layer_idx] combined = torch.cat([target_ctx, draft_hidden], dim=1) layer_output = draft_layers[layer_idx]( combined, attention_mask=dflash_mask, position_ids=combined_pos, position_embeddings=position_embeddings, ) if isinstance(layer_output, tuple): layer_output = layer_output[0] draft_hidden = layer_output[:, ctx_len:, :] draft_hidden = draft_norm(draft_hidden) draft_logits = draft_lm_head(draft_hidden) draft_predictions = sample(draft_logits[:, :-1, :], temperature) block_ids[:, 1:actual_block_size] = draft_predictions[:, :actual_block_size - 1] # Exclude draft's first prefill from decode timing (matches official pattern) if draft_prefill: draft_prefill = False decode_start = cuda_time() # ── Verify: target forward on block tokens (with KV cache) ── position_ids_block = torch.arange( start, start + actual_block_size, device=device ).unsqueeze(0) target_verify = target_model( block_ids, position_ids=position_ids_block, past_key_values=target_kv, use_cache=True, output_hidden_states=True, ) target_tokens = sample(target_verify.logits, temperature) # Acceptance matches = (block_ids[:, 1:actual_block_size] == target_tokens[:, :actual_block_size - 1]) acceptance_length = int(matches.cumprod(dim=1).sum(dim=1)[0].item()) output_ids[:, start:start + acceptance_length + 1] = block_ids[:, :acceptance_length + 1] output_ids[:, start + acceptance_length + 1] = target_tokens[:, acceptance_length] accepted_end = start + acceptance_length + 1 target_kv.crop(accepted_end) for i in range(num_layers): new_hidden = target_verify.hidden_states[i + 1][:, :acceptance_length + 1, :] ctx_hidden_per_layer[i] = torch.cat([ctx_hidden_per_layer[i], new_hidden], dim=1) start += acceptance_length + 1 acceptance_lengths.append(acceptance_length + 1) # Official: check ALL generated tokens if stop_token_ids is not None and any( sid in output_ids[:, num_input_tokens:] for sid in stop_token_ids ): break output_ids = output_ids[:, :min(start, max_length)] output_ids = output_ids[:, output_ids[0] != mask_token_id] if stop_token_ids is not None: stop_t = torch.tensor(stop_token_ids, device=output_ids.device) stop_idx = torch.isin(output_ids[0][num_input_tokens:], stop_t).nonzero(as_tuple=True)[0] if stop_idx.numel() > 0: output_ids = output_ids[:, :num_input_tokens + stop_idx[0] + 1] num_output_tokens = output_ids.shape[1] - num_input_tokens total_decode_time = cuda_time() - decode_start time_per_output_token = total_decode_time / max(num_output_tokens, 1) return SimpleNamespace( output_ids=output_ids, num_input_tokens=num_input_tokens, num_output_tokens=num_output_tokens, time_to_first_token=time_to_first_token, time_per_output_token=time_per_output_token, acceptance_lengths=acceptance_lengths, ) # ────────────────────────────────────────────────────────────────── # Main # ────────────────────────────────────────────────────────────────── def parse_args(): p = argparse.ArgumentParser(description="Offline eval for DFlash-LoRA-Inject (aligned with official)") p.add_argument("--base-model", default=BASE_MODEL) p.add_argument("--adapter-root", default=ADAPTER_ROOT) p.add_argument("--ckpt", default=DEFAULT_CKPT, help="Checkpoint folder name") p.add_argument("--merged-path", default="/workspace/hanrui/syxin/Specforge/outputs/qwen3-8b-dflash-lora-inject-merged", help="Path to pre-merged model. If None, will merge on the fly.") p.add_argument("--block-size", type=int, default=BLOCK_SIZE) p.add_argument("--max-new-tokens", type=int, default=2048, help="Max new tokens per turn (official shell uses 2048)") p.add_argument("--temperature", type=float, default=0.0) p.add_argument("--datasets", nargs="+", default=list(OFFICIAL_TASKS.keys()), help="Benchmarks to run (default: all 10 official tasks)") p.add_argument("--max-samples", type=int, default=None, help="Override max samples per dataset (None = use official per-task counts)") p.add_argument("--output-dir", default=RESULT_DIR) return p.parse_args() def main(): args = parse_args() # Fix random seeds (matches official) random.seed(0) np.random.seed(0) torch.manual_seed(0) torch.cuda.manual_seed_all(0) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False # ── Init distributed ── dist_init() torch.cuda.set_device(dist_local_rank()) device = torch.device(f"cuda:{dist_local_rank()}") print_rank0(f"Running on {dist_size()} GPU(s)") # Detect flash_attn (only for target model; draft needs sdpa for custom DFlash mask) installed_flash_attn = has_flash_attn() target_attn_impl = "flash_attention_2" if installed_flash_attn else "sdpa" draft_attn_impl = "sdpa" # DFlash injection uses custom attention mask print_rank0(f"Using attn_implementation: target={target_attn_impl}, draft={draft_attn_impl}") # ── Load models ── print_rank0(f"Loading target model: {args.base_model}") target_model = AutoModelForCausalLM.from_pretrained( args.base_model, torch_dtype=torch.bfloat16, attn_implementation=target_attn_impl, device_map=device, trust_remote_code=True, ) target_model.eval() if args.merged_path and os.path.isdir(args.merged_path): print_rank0(f"Loading pre-merged draft model: {args.merged_path}") draft_model = AutoModelForCausalLM.from_pretrained( args.merged_path, torch_dtype=torch.bfloat16, attn_implementation=draft_attn_impl, device_map=device, trust_remote_code=True, ) else: adapter_path = os.path.join(args.adapter_root, args.ckpt) print_rank0(f"Loading base + LoRA adapter: {adapter_path}") draft_model = AutoModelForCausalLM.from_pretrained( args.base_model, torch_dtype=torch.bfloat16, attn_implementation=draft_attn_impl, device_map=device, trust_remote_code=True, ) draft_model = PeftModel.from_pretrained(draft_model, adapter_path) draft_model = draft_model.merge_and_unload() draft_model.eval() tokenizer = AutoTokenizer.from_pretrained(args.base_model, trust_remote_code=True) stop_token_ids = [tokenizer.eos_token_id] block_size = args.block_size # ── Run benchmarks ── all_results = {"model": f"dflash-lora-inject/{args.ckpt}", "block_size": block_size} for dataset_name in args.datasets: print_rank0(f"\n{'=' * 60}") print_rank0(f"Benchmark: {dataset_name} ({dist_size()} GPUs)") print_rank0(f"{'=' * 60}") # Load dataset using official loader dataset = load_and_process_dataset(dataset_name) # Sample selection: official uses shuffle(seed=0).select() max_samples = args.max_samples if args.max_samples is not None else OFFICIAL_TASKS.get(dataset_name) if max_samples is not None and len(dataset) > max_samples: dataset = dataset.shuffle(seed=0).select(range(max_samples)) print_rank0(f"Total {len(dataset)} samples, distributed across {dist_size()} GPUs") responses = [] indices = range(dist_rank(), len(dataset), dist_size()) iterator = tqdm(indices, desc=f"[GPU{dist_rank()}] {dataset_name}", unit="sample", disable=not dist_is_main()) for idx in iterator: instance = dataset[idx] # Multi-turn support (matches official benchmark.py) messages = [] for turn_index, user_content in enumerate(instance["turns"]): messages.append({"role": "user", "content": user_content}) input_text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, enable_thinking=False, ) input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device) response = {} # AR baseline: pure target-only autoregressive (no draft overhead) response[1] = ar_generate( target_model=target_model, input_ids=input_ids, max_new_tokens=args.max_new_tokens, mask_token_id=MASK_TOKEN_ID, temperature=args.temperature, stop_token_ids=stop_token_ids, ) # Speculative: DFlash-LoRA-Inject response[block_size] = spec_generate_inject( target_model=target_model, draft_model=draft_model, input_ids=input_ids, max_new_tokens=args.max_new_tokens, block_size=block_size, mask_token_id=MASK_TOKEN_ID, temperature=args.temperature, stop_token_ids=stop_token_ids, ) # Append assistant response for multi-turn context spec_response = response[block_size] generated_ids = spec_response.output_ids[0, spec_response.num_input_tokens:] output_text = tokenizer.decode(generated_ids, skip_special_tokens=True) messages.append({"role": "assistant", "content": output_text}) responses.append(response) if dist_is_main() and responses: recent_tau = np.mean([np.mean(r[block_size].acceptance_lengths) for r in responses[-5:]]) iterator.set_postfix(accept_len=f"{recent_tau:.2f}") # ── Gather to rank 0 (matches official) ── if dist_size() > 1: gathered = dist_gather(responses, dst=0) if not dist_is_main(): continue responses = list(chain(*gathered)) elif not dist_is_main(): continue # ── Compute metrics (exact official formulas) ── t1 = np.mean([r[1].time_per_output_token for r in responses]) tb = np.mean([r[block_size].time_per_output_token for r in responses]) speedup = t1 / tb if tb > 0 else 0 # Acceptance length: per-sample mean, then mean of means (official) tau = np.mean([np.mean(r[block_size].acceptance_lengths) for r in responses]) # Histogram acceptance_lengths = list(chain(*[r[block_size].acceptance_lengths for r in responses])) histogram = [acceptance_lengths.count(b) / len(acceptance_lengths) for b in range(block_size + 1)] print_rank0(f"\n{dataset_name} Results:") print_rank0(f" Decoding speedup: {speedup:.2f}x") print_rank0(f" Average Acceptance length: {tau:.2f}") print_rank0(f" Acceptance length histogram: {[f'{x * 100:.1f}%' for x in histogram]}") print_rank0(f" Num responses: {len(responses)}") all_results[dataset_name] = { "decoding_speedup": speedup, "avg_accept_length": tau, "acceptance_histogram": histogram, "num_responses": len(responses), "num_gpus": dist_size(), } # ── Save results ── if dist_is_main(): os.makedirs(args.output_dir, exist_ok=True) timestamp = time.strftime("%Y%m%d_%H%M%S") result_file = os.path.join( args.output_dir, f"dflash_lora_inject_offline_{args.ckpt}_{timestamp}.json", ) with open(result_file, "w") as f: json.dump(all_results, f, indent=2) print(f"\nResults saved to: {result_file}") if __name__ == "__main__": main()