Spaces:
Sleeping
Sleeping
| import argparse | |
| import os | |
| import torch | |
| import numpy as np | |
| import random | |
| import os.path as osp | |
| from scipy import stats | |
| from tqdm import tqdm | |
| ROOT = os.path.abspath(os.path.dirname(__file__)) | |
| def set_default_seed(seed=1000): | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| torch.cuda.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) # multi-GPU | |
| torch.backends.cudnn.deterministic = True | |
| torch.backends.cudnn.benchmark = False | |
| print(f"<--------------------------- seed:{seed} --------------------------->") | |
| def get_args(): | |
| parser = argparse.ArgumentParser(description="Build basic RemovalNet.") | |
| parser.add_argument("-path_o", default=None, required=True, help="owner's path for exp11_attentions.pth") | |
| parser.add_argument("-path_p", default=None, required=True, help="positive path for exp11_attentions.pth") | |
| parser.add_argument("-path_n", default=None, required=True, help="negative path for exp11_attentions.pth") | |
| parser.add_argument("-model_name", default=None, help="model_name") | |
| parser.add_argument("-seed", default=2233, help="seed") | |
| parser.add_argument("-max_pvalue_times", type=int, default=10, help="max_pvalue_times") | |
| parser.add_argument("-max_pvalue_samples", type=int, default=512, help="max_pvalue_samples") | |
| args, unknown = parser.parse_known_args() | |
| args.ROOT = ROOT | |
| if "checkpoints" not in args.path_o: | |
| args.path_o = osp.join(ROOT, "checkpoints", args.path_o, "exp11_attentions.pth") | |
| if "checkpoints" not in args.path_p: | |
| args.path_p = osp.join(ROOT, "checkpoints", args.path_p, "exp11_attentions.pth") | |
| if "checkpoints" not in args.path_n: | |
| args.path_n = osp.join(ROOT, "checkpoints", args.path_n, "exp11_attentions.pth") | |
| if args.model_name is not None: | |
| if args.model_name == "opt-1.3b": | |
| args.model_name = "facebook/opt-1.3b" | |
| return args | |
| def get_predict_token(result): | |
| clean_labels = result["clean_labels"] | |
| target_labels = result["target_labels"] | |
| attentions = result["wmk_attentions"] | |
| total_idx = torch.arange(len(attentions[0])).tolist() | |
| select_idx = list(set(torch.cat([clean_labels.view(-1), target_labels.view(-1)]).tolist())) | |
| no_select_ids = list(set(total_idx).difference(set(select_idx))) | |
| probs = torch.softmax(attentions, dim=1) | |
| probs[:, no_select_ids] = 0. | |
| tokens = probs.argmax(dim=1).numpy() | |
| return tokens | |
| def main(): | |
| args = get_args() | |
| set_default_seed(args.seed) | |
| result_o = torch.load(args.path_o, map_location="cpu") | |
| result_p = torch.load(args.path_p, map_location="cpu") | |
| result_n = torch.load(args.path_n, map_location="cpu") | |
| print(f"-> load from: {args.path_n}") | |
| tokens_w = get_predict_token(result_o) # watermarked | |
| tokens_p = get_predict_token(result_p) # positive | |
| tokens_n = get_predict_token(result_n) # negative | |
| words_w, words_p, words_n = [], [], [] | |
| if args.model_name is not None: | |
| if "llama" in args.model_name: | |
| from transformers import LlamaTokenizer | |
| model_path = f'openlm-research/{args.model_name}' | |
| tokenizer = LlamaTokenizer.from_pretrained(model_path) | |
| else: | |
| from transformers import AutoTokenizer | |
| tokenizer = AutoTokenizer.from_pretrained(args.model_name) | |
| words_w = tokenizer.convert_ids_to_tokens(tokens_w[:10000]) | |
| words_p = tokenizer.convert_ids_to_tokens(tokens_p[:10000]) | |
| words_n = tokenizer.convert_ids_to_tokens(tokens_n[:10000]) | |
| print("-> [watermarked] tokens", tokens_w[:20], words_w[:20], len(words_w)) | |
| print("-> [positive] tokens", tokens_p[:20], words_p[:20], len(words_p)) | |
| print("-> [negative] tokens", tokens_n[:20], words_n[:20], len(words_n)) | |
| pvalue = np.zeros([2, args.max_pvalue_times]) | |
| statistic = np.zeros([2, args.max_pvalue_times]) | |
| per_size = args.max_pvalue_samples | |
| phar = tqdm(range(args.max_pvalue_times)) | |
| for step in phar: | |
| rand_idx = np.random.choice(np.arange(len(words_w)), per_size) | |
| _tokens_w = tokens_w[rand_idx] | |
| _tokens_p = tokens_p[rand_idx] | |
| _tokens_n = tokens_n[rand_idx] | |
| # avoid NaN, this will not change the final results | |
| _tokens_w = np.array(_tokens_w, dtype=np.float32) | |
| tokens_w[-1] += 0.00001 | |
| res_p = stats.ttest_ind(_tokens_w, np.array(_tokens_p, dtype=np.float32), equal_var=True, nan_policy="omit") | |
| res_n = stats.ttest_ind(_tokens_w, np.array(_tokens_n, dtype=np.float32), equal_var=True, nan_policy="omit") | |
| pvalue[0, step] = res_n.pvalue | |
| pvalue[1, step] = res_p.pvalue | |
| statistic[0, step] = res_n.statistic | |
| statistic[1, step] = res_p.statistic | |
| phar.set_description(f"[{step}/{args.max_pvalue_times}] negative:{res_n.pvalue} positive:{res_p.pvalue}") | |
| print(f"-> pvalue:{pvalue}") | |
| print(f"-> [negative]-[{args.max_pvalue_samples}] pvalue:{pvalue.mean(axis=1)[0]} state:{statistic.mean(axis=1)[0]}") | |
| print(f"-> [positive]-[{args.max_pvalue_samples}] pvalue:{pvalue.mean(axis=1)[1]} state:{statistic.mean(axis=1)[1]}") | |
| print(args.path_o) | |
| if __name__ == "__main__": | |
| main() | |