Spaces:
Runtime error
Runtime error
| from typing import Union, Optional | |
| import os | |
| import numpy as np | |
| import torch | |
| import transformers | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from .utils import assert_tokenizer_consistency | |
| from .metrics import entropy | |
| from peft import PeftModel | |
| torch.set_grad_enabled(False) | |
| huggingface_config = { | |
| # Only required for private models from Huggingface (e.g. LLaMA models) | |
| "TOKEN": os.environ.get("HF_TOKEN", None) | |
| } | |
| class DivScore(object): | |
| def __init__(self, | |
| generalLM_name_or_path: str = "", | |
| enhancedLM_name_or_path: str = "", | |
| use_bfloat16: bool = True, | |
| max_token_observed: int = 2048 * 5, | |
| device: str = "cuda:7", | |
| ) -> None: | |
| self.DEVICE_1 = self.DEVICE_2 = device | |
| self.observer_model = AutoModelForCausalLM.from_pretrained(generalLM_name_or_path, | |
| device_map={"": self.DEVICE_1}, | |
| trust_remote_code=True, | |
| torch_dtype=torch.bfloat16 if use_bfloat16 | |
| else torch.float32, | |
| token=huggingface_config["TOKEN"] | |
| ) | |
| self.performer_model = AutoModelForCausalLM.from_pretrained(enhancedLM_name_or_path, | |
| device_map={"": self.DEVICE_2}, | |
| trust_remote_code=True, | |
| torch_dtype=torch.bfloat16 if use_bfloat16 | |
| else torch.float32, | |
| token=huggingface_config["TOKEN"] | |
| ) | |
| self.observer_model.eval() | |
| self.performer_model.eval() | |
| self.tokenizer = AutoTokenizer.from_pretrained(generalLM_name_or_path) | |
| if not self.tokenizer.pad_token: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| self.max_token_observed = max_token_observed | |
| def _load_model_with_lora( | |
| self, | |
| base_path: str, | |
| lora_path: Optional[str], | |
| device: str, | |
| use_bfloat16: bool | |
| ) -> AutoModelForCausalLM: | |
| torch_dtype = torch.bfloat16 if use_bfloat16 else torch.float32 | |
| if lora_path: | |
| base_model = AutoModelForCausalLM.from_pretrained( | |
| base_path, | |
| device_map={"": device}, | |
| trust_remote_code=True, | |
| torch_dtype=torch_dtype, | |
| token=huggingface_config["TOKEN"] | |
| ) | |
| merged_model = PeftModel.from_pretrained( | |
| base_model, | |
| lora_path | |
| ).merge_and_unload() | |
| return merged_model.to(device) # 确保模型在目标设备上 | |
| else: | |
| return AutoModelForCausalLM.from_pretrained( | |
| base_path, | |
| device_map={"": device}, | |
| trust_remote_code=True, | |
| torch_dtype=torch_dtype, | |
| token=huggingface_config["TOKEN"] | |
| ) | |
| def _tokenize(self, batch: list[str]) -> transformers.BatchEncoding: | |
| batch_size = len(batch) | |
| encodings = self.tokenizer( | |
| batch, | |
| return_tensors="pt", | |
| padding="longest" if batch_size > 1 else False, | |
| truncation=True, | |
| max_length=self.max_token_observed, | |
| return_token_type_ids=False).to(self.observer_model.device) | |
| return encodings | |
| def _get_logits(self, encodings: transformers.BatchEncoding) -> torch.Tensor: | |
| observer_logits = self.observer_model(**encodings.to(self.DEVICE_1)).logits | |
| performer_logits = self.performer_model(**encodings.to(self.DEVICE_2)).logits | |
| if self.DEVICE_1 != "cpu": | |
| torch.cuda.synchronize() | |
| return observer_logits, performer_logits | |
| def compute_score(self, input_text: str): | |
| batch = [input_text] if isinstance(input_text, str) else input_text | |
| encodings = self._tokenize(batch) | |
| observer_logits, performer_logits = self._get_logits(encodings) | |
| entropy_score = entropy(performer_logits.to(self.DEVICE_1), performer_logits.to(self.DEVICE_1), | |
| encodings.to(self.DEVICE_1), self.tokenizer.pad_token_id) | |
| ce_score = entropy(observer_logits.to(self.DEVICE_1), performer_logits.to(self.DEVICE_1), | |
| encodings.to(self.DEVICE_1), self.tokenizer.pad_token_id) | |
| binoculars_scores = entropy_score / ce_score | |
| binoculars_scores = binoculars_scores.tolist() | |
| return binoculars_scores[0], entropy_score.item(), ce_score.item() | |