File size: 5,239 Bytes
3306239
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from typing import Union, Optional
import os
import numpy as np
import torch
import transformers
from transformers import AutoModelForCausalLM, AutoTokenizer

from .utils import assert_tokenizer_consistency
from .metrics import entropy
from peft import PeftModel

torch.set_grad_enabled(False)

huggingface_config = {
    # Only required for private models from Huggingface (e.g. LLaMA models)
    "TOKEN": os.environ.get("HF_TOKEN", None)
}




class DivScore(object):
    def __init__(self,

                 generalLM_name_or_path: str = "",

                 enhancedLM_name_or_path: str = "",

                 use_bfloat16: bool = True,

                 max_token_observed: int = 2048 * 5,

                 device: str = "cuda:7",

                 ) -> None:

        self.DEVICE_1 = self.DEVICE_2 = device
        self.observer_model = AutoModelForCausalLM.from_pretrained(generalLM_name_or_path,
                                                                   device_map={"": self.DEVICE_1},
                                                                   trust_remote_code=True,
                                                                   torch_dtype=torch.bfloat16 if use_bfloat16
                                                                   else torch.float32,
                                                                   token=huggingface_config["TOKEN"]
                                                                   )
        self.performer_model = AutoModelForCausalLM.from_pretrained(enhancedLM_name_or_path,
                                                                    device_map={"": self.DEVICE_2},
                                                                    trust_remote_code=True,
                                                                    torch_dtype=torch.bfloat16 if use_bfloat16
                                                                    else torch.float32,
                                                                    token=huggingface_config["TOKEN"]
                                                                    )
        self.observer_model.eval()
        self.performer_model.eval()

        self.tokenizer = AutoTokenizer.from_pretrained(generalLM_name_or_path)
        if not self.tokenizer.pad_token:
            self.tokenizer.pad_token = self.tokenizer.eos_token
        self.max_token_observed = max_token_observed

    def _load_model_with_lora(

            self,

            base_path: str,

            lora_path: Optional[str],

            device: str,

            use_bfloat16: bool

    ) -> AutoModelForCausalLM:
        torch_dtype = torch.bfloat16 if use_bfloat16 else torch.float32

        if lora_path:
            base_model = AutoModelForCausalLM.from_pretrained(
                base_path,
                device_map={"": device},
                trust_remote_code=True,
                torch_dtype=torch_dtype,
                token=huggingface_config["TOKEN"]
            )
            merged_model = PeftModel.from_pretrained(
                base_model,
                lora_path
            ).merge_and_unload()
            return merged_model.to(device)  # 确保模型在目标设备上
        else:
            return AutoModelForCausalLM.from_pretrained(
                base_path,
                device_map={"": device},
                trust_remote_code=True,
                torch_dtype=torch_dtype,
                token=huggingface_config["TOKEN"]
            )


    def _tokenize(self, batch: list[str]) -> transformers.BatchEncoding:
        batch_size = len(batch)
        encodings = self.tokenizer(
            batch,
            return_tensors="pt",
            padding="longest" if batch_size > 1 else False,
            truncation=True,
            max_length=self.max_token_observed,
            return_token_type_ids=False).to(self.observer_model.device)
        return encodings

    @torch.inference_mode()
    def _get_logits(self, encodings: transformers.BatchEncoding) -> torch.Tensor:
        observer_logits = self.observer_model(**encodings.to(self.DEVICE_1)).logits
        performer_logits = self.performer_model(**encodings.to(self.DEVICE_2)).logits

        if self.DEVICE_1 != "cpu":
            torch.cuda.synchronize()
        return observer_logits, performer_logits

    def compute_score(self, input_text: str):
        batch = [input_text] if isinstance(input_text, str) else input_text
        encodings = self._tokenize(batch)
        observer_logits, performer_logits = self._get_logits(encodings)

        entropy_score = entropy(performer_logits.to(self.DEVICE_1), performer_logits.to(self.DEVICE_1),
                        encodings.to(self.DEVICE_1), self.tokenizer.pad_token_id)
        ce_score = entropy(observer_logits.to(self.DEVICE_1), performer_logits.to(self.DEVICE_1),
                        encodings.to(self.DEVICE_1), self.tokenizer.pad_token_id)
        binoculars_scores = entropy_score / ce_score
        binoculars_scores = binoculars_scores.tolist()

        return binoculars_scores[0], entropy_score.item(), ce_score.item()