File size: 8,563 Bytes
e6450ab fdd6aa7 e6450ab fdd6aa7 e6450ab fdd6aa7 e6450ab fdd6aa7 e6450ab 8b5ebbb e6450ab 8b5ebbb e6450ab 8b5ebbb e6450ab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | # Copyright (c) Guangsheng Bao.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
import json
import time
from types import SimpleNamespace
from detector_base import DetectorBase
class OpenAIGPT:
def __init__(self, config):
self.config = config
if config.api_base.find('azure.com') > 0:
self.client = self.create_client_azure()
else:
self.client = self.create_client_openai()
# predefined prompts
self.prompts = {
"prompt0": "",
"prompt1": f"You serve as a valuable aide, capable of generating clear and persuasive pieces of writing given a certain context. Now, assume the role of an author and strive to finalize this article.\n",
"prompt2": f"You serve as a valuable aide, capable of generating clear and persuasive pieces of writing given a certain context. Now, assume the role of an author and strive to finalize this article.\nI operate as an entity utilizing GPT as the foundational large language model. I function in the capacity of a writer, authoring articles on a daily basis. Presented below is an example of an article I have crafted.\n",
"prompt3": f"System:\nYou serve as a valuable aide, capable of generating clear and persuasive pieces of writing given a certain context. Now, assume the role of an author and strive to finalize this article.\nAssistant:\nI operate as an entity utilizing GPT as the foundational large language model. I function in the capacity of a writer, authoring articles on a daily basis. Presented below is an example of an article I have crafted.\n",
"prompt4": f"Assistant:\nYou serve as a valuable aide, capable of generating clear and persuasive pieces of writing given a certain context. Now, assume the role of an author and strive to finalize this article.\nUser:\nI operate as an entity utilizing GPT as the foundational large language model. I function in the capacity of a writer, authoring articles on a daily basis. Presented below is an example of an article I have crafted.\n",
}
self.max_topk = 10
def create_client_azure(self):
from openai import AzureOpenAI
return AzureOpenAI(
azure_endpoint=self.config.api_base,
api_key=self.config.api_key,
api_version=self.config.api_version)
def create_client_openai(self):
from openai import OpenAI
return OpenAI(
base_url=self.config.api_base,
api_key=self.config.api_key)
def evaluate(self, prompt, text):
model_name = self.config.scoring_model_name
kwargs = {"model": model_name,
"prompt": f"<|endoftext|>{prompt}{text}",
"max_tokens": 0, "echo": True, "logprobs": self.max_topk}
# retry 1 time
ntry = 2
for idx in range(ntry):
try:
response = self.client.completions.create(**kwargs)
response = response.choices[0].logprobs
return response
break
except Exception as e:
if idx < ntry - 1:
print(f'{model_name}, {kwargs}: {e}. Retrying ...')
time.sleep(5)
continue
raise e
def eval(self, text):
prompt = self.prompts[self.config.prompt]
# get top tokens
result = self.evaluate(prompt, text)
# decide the prefix length
prefix = ""
nprefix = 1
while len(prefix) < len(prompt):
prefix += result.tokens[nprefix]
nprefix += 1
assert prefix == prompt, f"Mismatch: {prompt} .vs. {prefix}"
tokens = result.tokens[nprefix:]
logprobs = result.token_logprobs[nprefix:]
toplogprobs = result.top_logprobs[nprefix:]
toplogprobs = [dict(item) for item in toplogprobs]
assert len(tokens) == len(logprobs), f"Expected {len(tokens)} logprobs, got {len(logprobs)}"
assert len(tokens) == len(toplogprobs), f"Expected {len(tokens)} toplogprobs, got {len(toplogprobs)}"
return tokens, logprobs, toplogprobs
# probability distribution estimation
def safe_log(prob):
return np.log(np.array(prob) + 1e-8)
class GeometricDistribution:
'''
Top-K probabilities: p_1, p_2, ..., p_K
Estimated probabilities: Pr(X=k) = p_K * lambda ^ (k - K), for k > K.
'''
def __init__(self, top_k, rank_size):
self.name = "GeometricDistribution"
self.top_k = top_k
self.rank_size = rank_size
def estimate_distrib_token(self, toplogprobs):
M = self.rank_size # assuming rank list size
K = self.top_k # assuming top-K tokens
assert K <= M
toplogprobs = sorted(toplogprobs.values(), reverse=True)
assert len(toplogprobs) >= K
toplogprobs = toplogprobs[:K]
probs = np.exp(toplogprobs) # distribution over ranks
if probs.sum() > 1.0:
# print(f'Warnining: Probability {probs.sum()} excels 1.0')
probs = probs / (probs.sum() + 1e-6)
p_K = probs[-1] # the k-th top token
p_rest = 1 - probs.sum() # the rest probability mass
_lambda = p_rest / (p_K + p_rest) # approximate the decay factor
if _lambda ** (M - K + 1) > 1e-6:
# If the condition was not satisfied, use the following code to calculate the decay factor iteratively
_lambda_old = _lambda
last_diff = 1.0
while True:
_lambda0 = _lambda
minor = _lambda ** (M - K + 1) # the minor part
assert p_rest > 0, f'Error: Invalid p_rest={p_rest}'
_lambda = 1 - (_lambda - minor) * p_K / p_rest
# check convergence
diff = abs(_lambda - _lambda0)
if _lambda < 0 or diff < 1e-6 or diff >= last_diff:
_lambda = _lambda0
break
last_diff = diff
# print(f'Warnining: Invalid lambda={_lambda_old}, re-calculate lambda={_lambda}')
assert p_rest >= 0, f'Error: Invalid p_rest={p_rest}'
assert 0 <= _lambda <= 1, f'Error: Invalid lambda={_lambda} calculated by p_K={p_K} and p_rest={p_rest}.'
# estimate the probabilities of the rest tokens
probs_rest = np.exp(safe_log(p_K) + np.arange(1, M - K + 1) * safe_log(_lambda))
probs = np.concatenate([probs, probs_rest])
# check total probability
# if abs(probs.sum() - 1.0) >= 1e-2:
# print(f'Warnining: Invalid total probability: {probs.sum()}')
probs = probs / probs.sum()
return probs.tolist()
class PdeBase:
def __init__(self, distrib):
self.distrib = distrib
def estimate_distrib_sequence(self, item):
key = f'{self.distrib.name}-top{self.distrib.top_k}'
if key in item:
probs = item[key]
else:
toplogprobs = item["toplogprobs"]
probs = [self.distrib.estimate_distrib_token(v) for v in toplogprobs]
item[key] = probs
return np.array(probs)
class PdeFastDetectGPT(PdeBase):
def __call__(self, item):
logprobs = item["logprobs"]
probs = self.estimate_distrib_sequence(item)
log_likelihood = np.array(logprobs)
lprobs = np.nan_to_num(np.log(probs))
mean_ref = (probs * lprobs).sum(axis=-1)
lprobs2 = np.nan_to_num(np.square(lprobs))
var_ref = (probs * lprobs2).sum(axis=-1) - np.square(mean_ref)
discrepancy = (log_likelihood.sum(axis=-1) - mean_ref.sum(axis=-1)) / np.sqrt(var_ref.sum(axis=-1))
discrepancy = discrepancy.mean()
return discrepancy.item()
# the detector
class Glimpse(DetectorBase):
def __init__(self, config_name):
super().__init__(config_name)
self.gpt = OpenAIGPT(self.config)
self.criterion_fn = PdeFastDetectGPT(GeometricDistribution(self.config.top_k, self.config.rank_size))
def compute_crit(self, text):
tokens, logprobs, toplogprobs = self.gpt.eval(text)
result = { 'text': text, 'tokens': tokens,
'logprobs': logprobs, 'toplogprobs': toplogprobs}
crit = self.criterion_fn(result)
return crit, len(tokens)
|