| import os |
| import json |
| import time |
| import pickle |
| from tqdm import tqdm |
| import numpy as np |
| import torch |
| from torch.nn import CrossEntropyLoss |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
|
|
| |
| MODEL_ZOO = { |
| 'llama2-7b': 'meta-llama/Llama-2-7b-chat-hf', |
| 'llama2-13b': 'meta-llama/Llama-2-13b-chat-hf', |
| 'gemma-2b': 'google/gemma-1.1-2b-it', |
| 'gemma-7b': 'google/gemma-1.1-7b-it', |
| |
| 'gpt2-xl': 'gpt2-xl', |
| 'qwen-7b': 'Qwen/Qwen2.5-7B', |
| 'llama3-8b': 'meta-llama/Meta-Llama-3-8B', |
| 'mistralai-7b': 'mistralai/Mistral-7B-Instruct-v0.2', |
| } |
|
|
| |
| COMPLETION_PROMPT_ONLY = "Complete the following text: " |
| COMPLETION_PROMPT = "Given the summary:\n{prompt}\n Complete the following text: " |
|
|
| def generate(model, tokenizer, input_ids, trigger_length, target_length): |
| """ |
| Generate additional tokens using the model's generation API. |
| |
| Parameters: |
| model: the language model for generation. |
| tokenizer: associated tokenizer. |
| input_ids: input token IDs (either 1D or 2D). |
| trigger_length: the length of the prompt (number of tokens to skip in the output). |
| target_length: the number of new tokens to generate. |
| |
| Returns: |
| Generated tokens (as a 2D tensor) after removing the trigger tokens. |
| """ |
| config = model.generation_config |
| config.max_new_tokens = target_length |
| |
| if input_ids.dim() == 1: |
| input_ids = input_ids.to(model.device).unsqueeze(0) |
| else: |
| input_ids = input_ids.to(model.device) |
| |
| attn_masks = torch.ones(input_ids.shape, device=input_ids.device) |
| |
| out = model.generate( |
| input_ids, |
| attention_mask=attn_masks, |
| generation_config=config, |
| pad_token_id=tokenizer.pad_token_id |
| )[0] |
| |
| return out[trigger_length:] |
|
|
|
|
| def compute_fce_loss(logits, targets, text_slice): |
| """ |
| Compute the FCE loss by shifting indices by 1. |
| Returns a NumPy array of loss values. |
| """ |
| loss = CrossEntropyLoss(reduction='none')( |
| logits[0, text_slice.start-1:text_slice.stop-1, :], |
| targets |
| ) |
| return loss.detach().cpu().numpy() |
|
|
| def compute_bce_loss(logits, targets, text_slice): |
| """ |
| Compute the BCE loss without shifting indices. |
| Returns a NumPy array of loss values. |
| """ |
| loss = CrossEntropyLoss(reduction='none')( |
| logits[0, text_slice, :], |
| targets |
| ) |
| return loss.detach().cpu().numpy() |
|
|
| def detect_single_sample(args, model, tokenizer, summary_model, summary_tokenizer, sample, device='cuda'): |
| """ |
| Process a sample by generating a summary-based prompt, tokenizing (with clipping), |
| obtaining model outputs, and computing loss-based features (FCE and BCE). |
| Returns a list of loss features computed over 10 segments. |
| """ |
| |
| if 'gpt-' in args.summary_model: |
| from openai import OpenAI |
| openai_key = os.environ.get('OPENAI_API_KEY') |
| if not openai_key: |
| raise ValueError("OPENAI_API_KEY not found in environment.") |
| client = OpenAI(api_key=openai_key) |
| from tenacity import ( |
| retry, |
| stop_after_attempt, |
| wait_random_exponential, |
| ) |
| @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6)) |
| def openai_backoff(client, **kwargs): |
| return client.chat.completions.create(**kwargs) |
| summary_input = f"generate a very short and concise summary for the following text, just the summary: {sample}" |
| response = openai_backoff(client, model=args.summary_model, |
| messages=[{"role": "user", "content": summary_input}]) |
| summary_text = response.choices[0].message.content.strip() |
| |
| |
| prompt_text = COMPLETION_PROMPT.format(prompt=summary_text) |
| elif args.summary_model in MODEL_ZOO: |
| summary_input = f"Write a title for this text: {sample}\nJust output the title:" |
| summary_ids = summary_tokenizer(summary_input, return_tensors='pt', |
| max_length=args.sample_clip, truncation=True).input_ids.to(device) |
| summary_ids = summary_ids[:, 1:] |
| gen_ids = generate(summary_model, summary_tokenizer, summary_ids, summary_ids.shape[1], 64) |
| summary_text = summary_tokenizer.decode(gen_ids, skip_special_tokens=True).strip().split('\n')[0] |
| prompt_text = COMPLETION_PROMPT.format(prompt=summary_text) |
| else: |
| prompt_text = COMPLETION_PROMPT_ONLY |
|
|
| |
| prompt_ids = tokenizer(prompt_text, return_tensors='pt').input_ids.to(device) |
| text_ids = tokenizer(sample, return_tensors='pt', max_length=args.sample_clip, truncation=True).input_ids.to(device) |
| combined_ids = torch.cat([prompt_ids, text_ids], dim=1) |
| text_slice = slice(prompt_ids.shape[1], combined_ids.shape[1]) |
| outputs = model(input_ids=combined_ids) |
| logits = outputs.logits |
| targets = combined_ids[0][text_slice] |
| |
| |
| fce_loss = compute_fce_loss(logits, targets, text_slice) |
| bce_loss = compute_bce_loss(logits, targets, text_slice) |
| features = [] |
| for p in range(1, 10): |
| split = len(fce_loss) * p // 10 |
| features.extend([ |
| np.mean(fce_loss[split:]), np.max(fce_loss[split:]), |
| np.min(fce_loss[split:]), np.std(fce_loss[split:]), |
| np.mean(bce_loss[split:]), np.max(bce_loss[split:]), |
| np.min(bce_loss[split:]), np.std(bce_loss[split:]) |
| ]) |
| return features |
|
|
| def data_generation(args, out_dir, task, generative_model, base_dir): |
| """ |
| Generate loss-based features for both human and GPT samples and save them to disk. |
| |
| Parameters: |
| out_dir: Output directory. |
| task: Task name (e.g., Arxiv, Code, Essay). |
| generative_model: Key for the GPT samples. |
| |
| Returns: |
| The output directory. |
| """ |
| |
| if args.summary_model in MODEL_ZOO: |
| summary_model = AutoModelForCausalLM.from_pretrained( |
| MODEL_ZOO[args.summary_model], |
| torch_dtype=torch.float16, |
| device_map='auto', |
| cache_dir=args.cache_dir, |
| ).eval() |
| summary_tokenizer = AutoTokenizer.from_pretrained( |
| MODEL_ZOO[args.summary_model], padding_side='left', cache_dir=args.cache_dir, |
| ) |
| summary_tokenizer.pad_token = summary_tokenizer.eos_token |
| else: |
| summary_model, summary_tokenizer = None, None |
|
|
| |
| if args.detect_model in MODEL_ZOO: |
| model = AutoModelForCausalLM.from_pretrained( |
| MODEL_ZOO[args.detect_model], |
| torch_dtype=torch.float16, |
| device_map='auto', |
| cache_dir=args.cache_dir, |
| ).eval() |
| tokenizer = AutoTokenizer.from_pretrained( |
| MODEL_ZOO[args.detect_model], padding_side='left', cache_dir=args.cache_dir, |
| ) |
| tokenizer.pad_token = tokenizer.eos_token |
| else: |
| raise ValueError("Unknown detection model") |
|
|
| if getattr(args, "use_hf_dataset", False): |
| |
| from datasets import load_dataset |
| ds = load_dataset("HanxiGuo/BiScope_Data", split="train") |
| paraphrased_flag = False |
|
|
| |
| human_data = ds.filter(lambda x: x["task"] == task and x["source"].lower() == "human") |
| human_data = [s["text"] for s in human_data] |
|
|
| |
| |
| gpt_data = ds.filter(lambda x: x["task"] == task and x["paraphrased"] == paraphrased_flag and x["source"].lower() == generative_model.lower()) |
| gpt_data = [s["text"] for s in gpt_data] |
| else: |
| |
| |
| with open(f'{base_dir}/{task}_{generative_model}.raw_data.json', 'r') as f: |
| dataset = json.load(f) |
|
|
| human_data = dataset['original'] |
| gpt_data = dataset['sampled'] |
| |
| |
| human_feat_path = os.path.join(out_dir, f"{task}_human_features.pkl") |
| |
| |
| human_features = [detect_single_sample(args, model, tokenizer, summary_model, summary_tokenizer, s, device='cuda') for s in tqdm(human_data)] |
| with open(human_feat_path, 'wb') as f: |
| pickle.dump(human_features, f) |
| |
| |
| gpt_feat_path = os.path.join(out_dir, f"{task}_GPT_features.pkl") |
| gpt_features = [detect_single_sample(args, model, tokenizer, summary_model, summary_tokenizer, s, device='cuda') for s in tqdm(gpt_data)] |
| with open(gpt_feat_path, 'wb') as f: |
| pickle.dump(gpt_features, f) |
| |
| return out_dir |
|
|