Spaces:
Sleeping
Sleeping
File size: 5,239 Bytes
3306239 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | from typing import Union, Optional
import os
import numpy as np
import torch
import transformers
from transformers import AutoModelForCausalLM, AutoTokenizer
from .utils import assert_tokenizer_consistency
from .metrics import entropy
from peft import PeftModel
torch.set_grad_enabled(False)
huggingface_config = {
# Only required for private models from Huggingface (e.g. LLaMA models)
"TOKEN": os.environ.get("HF_TOKEN", None)
}
class DivScore(object):
def __init__(self,
generalLM_name_or_path: str = "",
enhancedLM_name_or_path: str = "",
use_bfloat16: bool = True,
max_token_observed: int = 2048 * 5,
device: str = "cuda:7",
) -> None:
self.DEVICE_1 = self.DEVICE_2 = device
self.observer_model = AutoModelForCausalLM.from_pretrained(generalLM_name_or_path,
device_map={"": self.DEVICE_1},
trust_remote_code=True,
torch_dtype=torch.bfloat16 if use_bfloat16
else torch.float32,
token=huggingface_config["TOKEN"]
)
self.performer_model = AutoModelForCausalLM.from_pretrained(enhancedLM_name_or_path,
device_map={"": self.DEVICE_2},
trust_remote_code=True,
torch_dtype=torch.bfloat16 if use_bfloat16
else torch.float32,
token=huggingface_config["TOKEN"]
)
self.observer_model.eval()
self.performer_model.eval()
self.tokenizer = AutoTokenizer.from_pretrained(generalLM_name_or_path)
if not self.tokenizer.pad_token:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.max_token_observed = max_token_observed
def _load_model_with_lora(
self,
base_path: str,
lora_path: Optional[str],
device: str,
use_bfloat16: bool
) -> AutoModelForCausalLM:
torch_dtype = torch.bfloat16 if use_bfloat16 else torch.float32
if lora_path:
base_model = AutoModelForCausalLM.from_pretrained(
base_path,
device_map={"": device},
trust_remote_code=True,
torch_dtype=torch_dtype,
token=huggingface_config["TOKEN"]
)
merged_model = PeftModel.from_pretrained(
base_model,
lora_path
).merge_and_unload()
return merged_model.to(device) # 确保模型在目标设备上
else:
return AutoModelForCausalLM.from_pretrained(
base_path,
device_map={"": device},
trust_remote_code=True,
torch_dtype=torch_dtype,
token=huggingface_config["TOKEN"]
)
def _tokenize(self, batch: list[str]) -> transformers.BatchEncoding:
batch_size = len(batch)
encodings = self.tokenizer(
batch,
return_tensors="pt",
padding="longest" if batch_size > 1 else False,
truncation=True,
max_length=self.max_token_observed,
return_token_type_ids=False).to(self.observer_model.device)
return encodings
@torch.inference_mode()
def _get_logits(self, encodings: transformers.BatchEncoding) -> torch.Tensor:
observer_logits = self.observer_model(**encodings.to(self.DEVICE_1)).logits
performer_logits = self.performer_model(**encodings.to(self.DEVICE_2)).logits
if self.DEVICE_1 != "cpu":
torch.cuda.synchronize()
return observer_logits, performer_logits
def compute_score(self, input_text: str):
batch = [input_text] if isinstance(input_text, str) else input_text
encodings = self._tokenize(batch)
observer_logits, performer_logits = self._get_logits(encodings)
entropy_score = entropy(performer_logits.to(self.DEVICE_1), performer_logits.to(self.DEVICE_1),
encodings.to(self.DEVICE_1), self.tokenizer.pad_token_id)
ce_score = entropy(observer_logits.to(self.DEVICE_1), performer_logits.to(self.DEVICE_1),
encodings.to(self.DEVICE_1), self.tokenizer.pad_token_id)
binoculars_scores = entropy_score / ce_score
binoculars_scores = binoculars_scores.tolist()
return binoculars_scores[0], entropy_score.item(), ce_score.item()
|