| |
| import json, torch, torch.nn as nn, torch.nn.functional as F |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| from safetensors.torch import load_file |
| from huggingface_hub import hf_hub_download |
|
|
| class EarlyExitClassifier(nn.Module): |
| def __init__(self, hidden_size, vocab_size, dtype=torch.float16, device=None): |
| super().__init__() |
| self.linear = nn.Linear(hidden_size, vocab_size, dtype=dtype) |
| if device is not None: |
| self.to(device) |
|
|
| def forward(self, hidden_states): |
| return self.linear(hidden_states) |
|
|
| class EarlyExitModelWrapper(nn.Module): |
| def __init__(self, model, confidence_threshold=0.9, num_layers_to_check=None, device=None, dtype=torch.float16): |
| super().__init__() |
| self.model = model |
| self.config = model.config |
| self.confidence_threshold = confidence_threshold |
| nl = num_layers_to_check or len(model.model.layers) |
| self.classifiers = nn.ModuleList([ |
| EarlyExitClassifier(self.config.hidden_size, self.config.vocab_size, dtype=dtype, device=device) |
| for _ in range(nl) |
| ]) |
| self._device = device or next(model.parameters()).device |
| self._dtype = dtype |
|
|
| def forward(self, input_ids, attention_mask=None, **kwargs): |
| outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True, **kwargs) |
| hidden_states = outputs.hidden_states |
| for layer_idx, classifier in enumerate(self.classifiers, start=1): |
| h_state = hidden_states[layer_idx] |
| logits = classifier(h_state[:, -1, :].to(self._dtype)) |
| probs = F.softmax(logits, dim=-1) |
| max_prob = probs.max(dim=-1)[0] |
| if bool((max_prob >= self.confidence_threshold).item()): |
| return {"logits": logits, "exit_layer": layer_idx, "hidden_states": h_state} |
|
|
| final_hidden = self.model.model.norm(hidden_states[-1].to(self._dtype)) |
| final_logits = self.model.lm_head(final_hidden[:, -1, :]) |
| return {"logits": final_logits, "exit_layer": len(hidden_states)-1, "hidden_states": hidden_states[-1]} |
|
|
| @torch.no_grad() |
| def generate_with_early_exit(prompt, model, tokenizer, max_new_tokens=64, temperature=0.7, top_p=0.9, device=None): |
| device = device or next(model.parameters()).device |
| inputs = tokenizer(prompt, return_tensors="pt").to(device) |
| input_ids = inputs["input_ids"] |
| attention_mask = inputs.get("attention_mask", None) |
| generated_ids = input_ids.clone() |
|
|
| for _ in range(max_new_tokens): |
| outputs = model(input_ids=generated_ids, attention_mask=attention_mask) |
| logits = outputs["logits"] / temperature |
|
|
| if top_p < 1.0: |
| sorted_logits, sorted_indices = torch.sort(logits, dim=-1, descending=True) |
| cumprobs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1) |
| to_remove = cumprobs > top_p |
| to_remove[:, 1:] = to_remove[:, :-1].clone() |
| to_remove[:, 0] = 0 |
| indices_to_remove = to_remove.scatter(1, sorted_indices, to_remove) |
| logits[indices_to_remove] = float("-inf") |
|
|
| probs = torch.softmax(logits, dim=-1) |
| next_token_id = torch.multinomial(probs, num_samples=1) |
| generated_ids = torch.cat([generated_ids, next_token_id], dim=-1) |
| if attention_mask is not None: |
| attention_mask = torch.cat([attention_mask, torch.ones_like(next_token_id)], dim=-1) |
| if next_token_id.item() == tokenizer.eos_token_id: |
| break |
| return generated_ids |
|
|
| def load_early_exit_from_hub(repo_id: str, device: str = None): |
| """ |
| Loads: |
| - early_exit_config.json |
| - early_exit_heads.safetensors |
| and returns (wrapped_model, tokenizer). |
| """ |
| cfg_path = hf_hub_download(repo_id=repo_id, filename="early_exit_config.json") |
| with open(cfg_path, "r") as f: |
| cfg = json.load(f) |
|
|
| base_id = cfg["base_model"] |
| dtype = torch.float16 if cfg.get("dtype", "float16") == "float16" else torch.float32 |
| device = device or ("mps" if torch.backends.mps.is_available() else ("cuda" if torch.cuda.is_available() else "cpu")) |
|
|
| tokenizer = AutoTokenizer.from_pretrained(base_id) |
| base_model = AutoModelForCausalLM.from_pretrained( |
| base_id, |
| torch_dtype=dtype, |
| device_map={"": device} if device != "cpu" else None, |
| ) |
|
|
| wrapped = EarlyExitModelWrapper( |
| base_model, |
| confidence_threshold=float(cfg["confidence_threshold"]), |
| num_layers_to_check=int(cfg["num_layers_to_check"]), |
| device=device, |
| dtype=dtype, |
| ) |
|
|
| heads_path = hf_hub_download(repo_id=repo_id, filename="early_exit_heads.safetensors") |
| state = load_file(heads_path) |
| wrapped.classifiers.load_state_dict(state, strict=True) |
|
|
| return wrapped, tokenizer |
|
|