|
|
import os |
|
|
import re |
|
|
import shutil |
|
|
import torch |
|
|
import numpy as np |
|
|
from datasets import Dataset, DatasetDict |
|
|
from transformers import GPT2Tokenizer, GPT2LMHeadModel |
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
|
|
def resolve_path(*parts): |
|
|
return os.path.abspath(os.path.join(BASE_DIR, *parts)) |
|
|
|
|
|
|
|
|
|
|
|
def contains_special_characters(text): |
|
|
|
|
|
return bool(re.search(r'[^\x00-\x7F]', text)) |
|
|
|
|
|
def check_texts_for_special_characters(texts): |
|
|
results = [] |
|
|
for i, text in enumerate(texts): |
|
|
if contains_special_characters(text): |
|
|
results.append(f"Text {i}: Contains special characters") |
|
|
return results |
|
|
|
|
|
def clean_text(text): |
|
|
|
|
|
text = re.sub(r'[^\x00-\x7F]+', '', text) |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
text = text.strip() |
|
|
if text.endswith("."): |
|
|
text = text[:-1] |
|
|
return text |
|
|
|
|
|
def clean_texts(texts): |
|
|
return [clean_text(text) for text in texts] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def tokenizing_data_percentile3(tokenizer, data_dict): |
|
|
|
|
|
dataset = Dataset.from_dict(data_dict) |
|
|
|
|
|
sim_percentiles = np.percentile(np.array(dataset["similarity"]), [0, 33, 66, 100]) |
|
|
aes_percentiles = np.percentile(np.array(dataset["aesthetics_score"]), [0, 33, 66, 100]) |
|
|
iqa_percentiles = np.percentile(np.array(dataset["IQAs"]), [0, 33, 66, 100]) |
|
|
|
|
|
|
|
|
|
|
|
def categorize_percentiles(score, percentiles): |
|
|
if score <= percentiles[1]: |
|
|
return "low" |
|
|
elif score <= percentiles[2]: |
|
|
return "medium" |
|
|
else: |
|
|
return "high" |
|
|
|
|
|
prompt = ( |
|
|
|
|
|
|
|
|
|
|
|
f"<|startoftext|>Similarity: {{sim}}, Aesthetic: {{aes}}, DeQA Quality: {{iqa}}, Query: " |
|
|
) |
|
|
|
|
|
def apply_prompt_template(sample, sim_percentiles, aes_percentiles, iqa_percentiles): |
|
|
|
|
|
sim = categorize_percentiles(sample["similarity"], sim_percentiles) |
|
|
aes = categorize_percentiles(sample["aesthetics_score"], aes_percentiles) |
|
|
iqa = categorize_percentiles(sample["IQAs"], iqa_percentiles) |
|
|
return { |
|
|
|
|
|
|
|
|
|
|
|
"prompt": prompt.format(sim=sim, aes=aes, iqa=iqa), |
|
|
"query": sample["text"], |
|
|
} |
|
|
|
|
|
dataset = dataset.map(apply_prompt_template, |
|
|
fn_kwargs={"sim_percentiles": sim_percentiles, |
|
|
"aes_percentiles": aes_percentiles, |
|
|
"iqa_percentiles": iqa_percentiles |
|
|
}) |
|
|
|
|
|
|
|
|
def tokenize_add_label(sample): |
|
|
|
|
|
prompt = tokenizer.encode(sample["prompt"], add_special_tokens=False) |
|
|
query = tokenizer.encode(sample["query"], add_special_tokens=False) |
|
|
|
|
|
text = prompt + query |
|
|
|
|
|
tokenized_inputs = tokenizer.pad({"input_ids": text}, padding="max_length", max_length=65, return_tensors="pt") |
|
|
|
|
|
if tokenized_inputs["input_ids"].shape[0] > 65: |
|
|
tokenized_inputs["input_ids"] = tokenized_inputs["input_ids"][:65] |
|
|
if tokenizer.eos_token_id is not None: |
|
|
tokenized_inputs["input_ids"][-1] = tokenizer.eos_token_id |
|
|
if "attention_mask" in tokenized_inputs: |
|
|
tokenized_inputs["attention_mask"] = tokenized_inputs["attention_mask"][:65] |
|
|
|
|
|
|
|
|
tokenized_inputs["labels"] = tokenized_inputs["input_ids"].clone() |
|
|
num_tokens_in_prompt = len(prompt) |
|
|
tokenized_inputs["labels"][:num_tokens_in_prompt] = -100 |
|
|
tokenized_inputs["labels"][tokenized_inputs["labels"] == tokenizer.pad_token_id] = -100 |
|
|
tokenized_inputs["labels"][tokenized_inputs["labels"] == tokenizer.cls_token_id] = -100 |
|
|
|
|
|
|
|
|
tokenized_inputs["similarity"] = sample["similarity"] |
|
|
tokenized_inputs["aesthetics_score"] = sample["aesthetics_score"] |
|
|
tokenized_inputs["IQAs"] = sample["IQAs"] |
|
|
|
|
|
return tokenized_inputs |
|
|
|
|
|
tokenized_datasets = dataset.map(tokenize_add_label, remove_columns=["text"]) |
|
|
|
|
|
|
|
|
tokenized_datasets.set_format("torch", columns=["input_ids", "attention_mask", "labels", "aesthetics_score", "similarity", "IQAs", "prompt", "query"]) |
|
|
|
|
|
|
|
|
return tokenized_datasets |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def tokenize_split_save(text_dir, tokenized_data_path, tokenizer): |
|
|
|
|
|
|
|
|
|
|
|
data_path = os.path.join(text_dir, "data.pt") |
|
|
loaded_data = torch.load(data_path, weights_only=False) |
|
|
|
|
|
|
|
|
texts = loaded_data["texts"] |
|
|
faiss_sim = loaded_data['faiss_sim'] |
|
|
aesthetics_score = torch.tensor(loaded_data["aesthetics"]) |
|
|
IQAs = torch.tensor(loaded_data["IQAs"]) |
|
|
|
|
|
print(f"data loaded successfully from {data_path}!") |
|
|
|
|
|
cleaned_texts = [clean_text(text) for text in texts] |
|
|
|
|
|
print("Adding eos token at the end for each text...") |
|
|
|
|
|
texts_with_eos = [f"{text}<|endoftext|>" for text in cleaned_texts] |
|
|
for ii in range(0, 10): |
|
|
print(texts_with_eos[ii]) |
|
|
|
|
|
|
|
|
lengths = [len(text) for text in texts_with_eos] |
|
|
max_index = lengths.index(max(lengths)) |
|
|
longest_text = texts_with_eos[max_index] |
|
|
longest_text_token = tokenizer.encode(longest_text, return_tensors="pt") |
|
|
|
|
|
print("Longest text:", longest_text) |
|
|
print("Longest text token:", longest_text_token, longest_text_token.shape) |
|
|
|
|
|
data_dict = {'text': texts_with_eos, |
|
|
'similarity': faiss_sim, |
|
|
'aesthetics_score': aesthetics_score, |
|
|
'IQAs': IQAs |
|
|
} |
|
|
|
|
|
tokenized_datasets = tokenizing_data_percentile3(tokenizer, data_dict) |
|
|
|
|
|
tokenized_datasets = tokenized_datasets.train_test_split(test_size=0.2, shuffle=True, seed=42) |
|
|
tokenized_datasets = DatasetDict({ |
|
|
'train': tokenized_datasets['train'], |
|
|
'test': tokenized_datasets['test'] |
|
|
}) |
|
|
|
|
|
|
|
|
if os.path.exists(tokenized_data_path): |
|
|
shutil.rmtree(tokenized_data_path) |
|
|
tokenized_datasets.save_to_disk(tokenized_data_path) |
|
|
print(f"Tokenized data saved to {tokenized_data_path}!") |
|
|
|
|
|
return tokenized_datasets |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
text_dir = resolve_path('../', 'processed_data', 'coco') |
|
|
model_name = "gpt2" |
|
|
data_save_path = os.path.join(text_dir, model_name) |
|
|
|
|
|
tokenizer = GPT2Tokenizer.from_pretrained(model_name) |
|
|
model = GPT2LMHeadModel.from_pretrained(model_name) |
|
|
|
|
|
tokenizer.add_special_tokens({'cls_token': '<|startoftext|>', 'eos_token': '<|endoftext|>', 'pad_token': '<pad>'}) |
|
|
|
|
|
model.config.cls_token_id = tokenizer.cls_token_id |
|
|
model.config.eos_token_id = tokenizer.eos_token_id |
|
|
model.config.pad_token_id = tokenizer.pad_token_id |
|
|
model.resize_token_embeddings(len(tokenizer)) |
|
|
|
|
|
tokenize_split_save(text_dir, data_save_path, tokenizer) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|