|
|
|
|
|
from trl.models.utils import unwrap_model_for_generation |
|
|
|
|
|
import re |
|
|
|
|
|
import openai |
|
|
import torch |
|
|
from transformers import ( |
|
|
GenerationConfig, |
|
|
TrainerCallback, |
|
|
Qwen2TokenizerFast, |
|
|
) |
|
|
|
|
|
import wandb |
|
|
|
|
|
import tqdm |
|
|
from accelerate.utils import gather_object |
|
|
import pandas as pd |
|
|
import io |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
TABULAR_CHAT_TEMPLATE = """{% set image_count = namespace(value=0) %}{% set video_count = namespace(value=0) %}{% set tabular_count = namespace(value=0) %}{% for message in messages %}{% if loop.first and message['role'] != 'system' %}<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n{% endif %}<|im_start|>{{ message['role'] }}\n{% if message['content'] is string %}{{ message['content'] }}<|im_end|>\n{% else %}{% for content in message['content'] %}{% if content['type'] == 'image' or 'image' in content or 'image_url' in content %}{% set image_count.value = image_count.value + 1 %}{% if add_vision_id %}Picture {{ image_count.value }}: {% endif %}<|vision_start|><|image_pad|><|vision_end|>{% elif content['type'] == 'video' or 'video' in content %}{% set video_count.value = video_count.value + 1 %}{% if add_vision_id %}Video {{ video_count.value }}: {% endif %}<|vision_start|><|video_pad|><|vision_end|>{% elif content['type'] == 'tabular' or 'tabular' in content %}{% set tabular_count.value = tabular_count.value + 1 %}{% if add_vision_id %}Table {{ tabular_count.value }}: {% endif %}<|vision_start|><|tabular_pad|><|vision_end|>{% elif 'text' in content %}{{ content['text'] }}{% endif %}{% endfor %}<|im_end|>\n{% endif %}{% endfor %}{% if add_generation_prompt %}<|im_start|>assistant\n{% endif %}""" |
|
|
|
|
|
def load_model_and_processor( |
|
|
model_path: str, |
|
|
device: str = "cuda:0", |
|
|
torch_dtype=torch.bfloat16, |
|
|
) -> tuple: |
|
|
""" |
|
|
Load a Qwen2_5_TabularModel and its processor. |
|
|
|
|
|
Args: |
|
|
model_path: Path to the model checkpoint or HuggingFace model name |
|
|
device: Device to load the model on (e.g., "cuda:0", "cuda:1", "cpu") |
|
|
torch_dtype: Torch dtype for the model (default: torch.bfloat16) |
|
|
|
|
|
Returns: |
|
|
tuple: (model, processor) ready to use |
|
|
""" |
|
|
from TabularModel import ( |
|
|
TabularPreprocessor, |
|
|
Qwen_2_5_TabularProcessor, |
|
|
Qwen2_5_TabularModel, |
|
|
) |
|
|
|
|
|
|
|
|
tabular_processor = TabularPreprocessor() |
|
|
|
|
|
|
|
|
qwen_tabular_processor = Qwen_2_5_TabularProcessor( |
|
|
tabular_processor=tabular_processor, |
|
|
tokenizer=Qwen2TokenizerFast.from_pretrained(model_path), |
|
|
) |
|
|
|
|
|
|
|
|
qwen_tabular_processor.tabular_token = "<|tabular_pad|>" |
|
|
qwen_tabular_processor.tokenizer.add_tokens([ |
|
|
qwen_tabular_processor.tabular_token, |
|
|
"<|tabular_row|>", |
|
|
"<|tabular_cell|>" |
|
|
]) |
|
|
qwen_tabular_processor.tokenizer.chat_template = TABULAR_CHAT_TEMPLATE |
|
|
|
|
|
|
|
|
model = Qwen2_5_TabularModel.from_pretrained( |
|
|
model_path, |
|
|
torch_dtype=torch_dtype, |
|
|
).to(device) |
|
|
|
|
|
|
|
|
model.config.tabular_token_id = ( |
|
|
qwen_tabular_processor.tokenizer.convert_tokens_to_ids("<|tabular_pad|>") |
|
|
) |
|
|
model.config.tabular_row_token_id = ( |
|
|
qwen_tabular_processor.tokenizer.convert_tokens_to_ids("<|tabular_row|>") |
|
|
) |
|
|
model.config.tabular_cell_token_id = ( |
|
|
qwen_tabular_processor.tokenizer.convert_tokens_to_ids("<|tabular_cell|>") |
|
|
) |
|
|
|
|
|
return model, qwen_tabular_processor |
|
|
|
|
|
def get_role_by_idx(convo: list[dict[str, str]], role: str, idx: int) -> str: |
|
|
found = 0 |
|
|
for message in convo: |
|
|
if message["role"] == role: |
|
|
if found == idx: |
|
|
return message["content"] |
|
|
found += 1 |
|
|
raise ValueError(f"Role {role} not found {idx} times") |
|
|
|
|
|
|
|
|
class LLMSampleCB(TrainerCallback): |
|
|
def __init__( |
|
|
self, |
|
|
trainer, |
|
|
test_dataset, |
|
|
num_samples=10, |
|
|
max_new_tokens=256, |
|
|
log_model="checkpoint", |
|
|
): |
|
|
"A CallBack to log samples a wandb.Table during training" |
|
|
super().__init__() |
|
|
self._log_model = log_model |
|
|
self.trainer = trainer |
|
|
|
|
|
|
|
|
tasks = set([i["task"] for i in test_dataset]) |
|
|
|
|
|
|
|
|
task_samples = [] |
|
|
for task in tasks: |
|
|
task_dataset = [i for i in test_dataset if i["task"] == task][:num_samples] |
|
|
task_samples.extend(task_dataset) |
|
|
|
|
|
|
|
|
self.sample_dataset = task_samples |
|
|
|
|
|
self.model, self.tokenizer = trainer.model_wrapped, trainer.tokenizer |
|
|
|
|
|
self.tokenizer.padding_side = "left" |
|
|
|
|
|
self.gen_config = GenerationConfig.from_pretrained( |
|
|
trainer.model.name_or_path, temperature=0.001, max_new_tokens=max_new_tokens |
|
|
) |
|
|
self.idx = 0 |
|
|
|
|
|
def generate(self, conversations: list[list[dict[str, str]]]) -> list[str]: |
|
|
accelerator = self.trainer.accelerator |
|
|
|
|
|
|
|
|
original_prompts = self.tokenizer.apply_chat_template(conversations, tokenize=False) |
|
|
original_prompt_to_idx = {self._normalize_string(prompt): idx for idx, prompt in enumerate(original_prompts)} |
|
|
|
|
|
completions = [None] * len(conversations) |
|
|
|
|
|
with accelerator.split_between_processes(conversations) as conversation_subset: |
|
|
model = self.trainer.model_wrapped |
|
|
with unwrap_model_for_generation(model, accelerator) as unwrapped_model: |
|
|
prompts = self.tokenizer.apply_chat_template(conversation_subset, tokenize=False) |
|
|
|
|
|
tokenized_prompts = self.tokenizer(prompts, return_tensors="pt", padding=True).to(model.device) |
|
|
with torch.inference_mode(): |
|
|
print("Generating...") |
|
|
generations = unwrapped_model.generate(**tokenized_prompts, generation_config=self.gen_config).cpu() |
|
|
print("Generated!") |
|
|
|
|
|
results = [] |
|
|
for prompt_str, prompt_tokens, generation in zip(prompts, tokenized_prompts.input_ids, generations): |
|
|
|
|
|
generation = generation[len(prompt_tokens) :] |
|
|
completion = self.tokenizer.decode(generation, skip_special_tokens=True) |
|
|
results.append((prompt_str, completion)) |
|
|
|
|
|
|
|
|
all_results = gather_object(results) |
|
|
|
|
|
|
|
|
for prompt_str, completion in all_results: |
|
|
norm_prompt = self._normalize_string(prompt_str) |
|
|
if norm_prompt in original_prompt_to_idx: |
|
|
idx = original_prompt_to_idx[norm_prompt] |
|
|
completions[idx] = completion |
|
|
|
|
|
return completions |
|
|
|
|
|
def samples_filtering_table(self, examples): |
|
|
"Create a wandb.Table to store the generations" |
|
|
records_table = wandb.Table(columns=["full_prompt", "question", "generation", "real_answer", "points"]) |
|
|
max_num = [0] |
|
|
summary = [0] |
|
|
|
|
|
batch_size = 32 |
|
|
all_data = [] |
|
|
|
|
|
for i in tqdm.trange(0, len(examples), batch_size): |
|
|
batch = examples[i : i + batch_size] |
|
|
batch_data = [] |
|
|
|
|
|
|
|
|
batch_inputs = [] |
|
|
for row in batch: |
|
|
row = row["messages"] |
|
|
user = get_role_by_idx(row, "user", 0) |
|
|
real_answer = get_role_by_idx(row, "assistant", 0) |
|
|
|
|
|
|
|
|
question = user.split("Zapytanie brzmi:")[1].strip() if "Zapytanie brzmi:" in user else user |
|
|
prompt = user |
|
|
|
|
|
batch_inputs.append(row[:-1]) |
|
|
batch_data.append((prompt, question, real_answer)) |
|
|
|
|
|
|
|
|
generations = self.generate(batch_inputs) |
|
|
|
|
|
|
|
|
if self.trainer.accelerator.is_main_process: |
|
|
for idx, (prompt, question, real_answer) in enumerate(batch_data): |
|
|
generation = generations[idx] |
|
|
|
|
|
|
|
|
try: |
|
|
_, points = self.compare_filtering_answer(question, generation, real_answer) |
|
|
max_num[0] += 1 |
|
|
summary[0] += points |
|
|
except Exception: |
|
|
points = 0 |
|
|
|
|
|
records_table.add_data(prompt, question, generation, real_answer, points) |
|
|
batch_data[idx] = (prompt, question, generation, real_answer) |
|
|
|
|
|
all_data.extend(batch_data) |
|
|
|
|
|
return records_table, summary[0] / max_num[0] if max_num[0] > 0 else 0 |
|
|
|
|
|
def compare_filtering_answer(self, question, answer, expected): |
|
|
client = openai.Client() |
|
|
system = "Jesteś sztuczną inteligencją do oceniania odpowiedzi na zadania filtrowania dokumentów prawniczych." |
|
|
user = f"Zapytanie: '{question}'.\nPoprawna odpowiedź: '{expected}'\nOdpowiedź modelu: '{answer}'." |
|
|
user += "\nOceń, czy odpowiedź modelu poprawnie identyfikuje powiązanie i zawiera odpowiednią argumentację, podobnie jak w poprawnej odpowiedzi." |
|
|
user += "\nOdpowiedz w formacie 'Argumentacja: (...)\nOcena: 0 lub 1', gdzie 0 to niepoprawna odpowiedź, a 1 to poprawna odpowiedź." |
|
|
|
|
|
response = client.chat.completions.create( |
|
|
model="gpt-4o", |
|
|
messages=[ |
|
|
{"role": "system", "content": system}, |
|
|
{"role": "user", "content": user}, |
|
|
], |
|
|
temperature=0.0, |
|
|
max_tokens=512, |
|
|
) |
|
|
resp = response.choices[0].message.content.rstrip(".").strip() |
|
|
print(resp) |
|
|
try: |
|
|
return resp, int(resp.split(":")[-1].split()[0].strip()) |
|
|
except Exception: |
|
|
print("Error: ", resp) |
|
|
|
|
|
score = 1 if "ocena: 1" in resp.lower() else 0 |
|
|
return resp, score |
|
|
|
|
|
def on_evaluate(self, *args, **kwargs): |
|
|
"Log the wandb.Table after calling trainer.evaluate" |
|
|
filtering_dataset = [i for i in self.sample_dataset if i["task"] == "filtering"] |
|
|
records_table, recall = self.samples_filtering_table(filtering_dataset) |
|
|
|
|
|
if self.trainer.accelerator.is_main_process: |
|
|
try: |
|
|
wandb.log({"filtering_predictions_" + str(self.idx): records_table}) |
|
|
wandb.log({"filtering_recall": recall}) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
self.idx += 1 |
|
|
|
|
|
def compare_answer(self, question, answer, expected): |
|
|
client = openai.Client() |
|
|
system = "Jesteś sztuczną inteligencją do oceniania odpowiedzi na egzaminie. Oceniasz odpowiedzi jako poprawne (1 punkt) lub niepoprawne (0 punktów)." |
|
|
user = f"Pytanie: '{question}'.\n Poprawna odpowiedź: '{expected}'\n Odpowiedź użytkownika: '{answer}'." |
|
|
user += "\nCzy odpowiedź użytkownika jest poprawna? Przyznaj 1 punkt za poprawną odpowiedź lub 0 punktów za niepoprawną. Jeżeli poprawna odpowiedź sugeruje że nie da się odpowiedzieć na pytanie, to odpowiedź użytkownika powinna być taka sama. Nie dawaj punktów za chęci. Oceniaj odpowiedź tylko pod kątem poprawności." |
|
|
user += "\nPodkreślam: jeżeli poprawna odpowiedź sugeruje że nie da się udzielić odpowiedzi na podstawie źródeł, to odpowiedź użytkownika powinna być taka sama." |
|
|
user += ( |
|
|
"Odpowiedz w formacie 'Argumentacja: (...)\nOcena: 0 lub 1', gdzie 0 to brak punktów, a 1 to pełna ocena." |
|
|
) |
|
|
response = client.chat.completions.create( |
|
|
model="gpt-4o", |
|
|
messages=[ |
|
|
{"role": "system", "content": system}, |
|
|
{"role": "user", "content": user}, |
|
|
], |
|
|
temperature=0.0, |
|
|
max_tokens=512, |
|
|
) |
|
|
resp = response.choices[0].message.content.rstrip(".").strip() |
|
|
try: |
|
|
return resp, int(resp.split(":")[-1].split()[0].strip()) |
|
|
except Exception: |
|
|
print("Error: ", resp) |
|
|
|
|
|
score = 1 if "1" in re.findall(r"\d+", resp) else 0 |
|
|
return resp, score |
|
|
|
|
|
def _normalize_string(self, s): |
|
|
"""Normalize string to avoid whitespace/newline comparison issues""" |
|
|
if s is None: |
|
|
return "" |
|
|
|
|
|
return re.sub(r'\s+', '', s).lower() |
|
|
|
|
|
def text_to_array(text): |
|
|
if '```' not in text: |
|
|
csv_text = text.strip() |
|
|
elif '```csv' not in text: |
|
|
csv_text = text.strip().split("```")[1].strip() |
|
|
else: |
|
|
csv_text = text.strip().split("```csv")[1].split("```")[0] |
|
|
|
|
|
df = pd.read_csv(io.StringIO(csv_text), header=None) |
|
|
|
|
|
|
|
|
generated_corr_matrix = df.values |
|
|
return generated_corr_matrix |
|
|
|
|
|
def generate_answer( |
|
|
model, |
|
|
processor, |
|
|
table: np.ndarray | torch.Tensor | list, |
|
|
question: str, |
|
|
max_new_tokens: int = 512, |
|
|
do_sample: bool = False, |
|
|
temperature: float | None = None, |
|
|
) -> str: |
|
|
""" |
|
|
Generate an answer based on a table and a question. |
|
|
|
|
|
Args: |
|
|
model: The Qwen2_5_TabularModel instance |
|
|
processor: The Qwen_2_5_TabularProcessor instance |
|
|
table: The input table as numpy array (including dtype=object for mixed types), |
|
|
torch tensor, or list of lists |
|
|
question: The question to answer about the table |
|
|
max_new_tokens: Maximum number of tokens to generate |
|
|
do_sample: Whether to use sampling |
|
|
temperature: Sampling temperature (if do_sample=True) |
|
|
|
|
|
Returns: |
|
|
Generated answer as a string |
|
|
""" |
|
|
|
|
|
messages = [ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{"type": "text", "text": "Consider this table:"}, |
|
|
{"index": 0, "type": "tabular"}, |
|
|
{"type": "text", "text": question}, |
|
|
], |
|
|
} |
|
|
] |
|
|
|
|
|
|
|
|
preprocessed = processor.tokenizer.apply_chat_template( |
|
|
messages, tokenize=False |
|
|
) |
|
|
|
|
|
|
|
|
processed = processor( |
|
|
[table], text=preprocessed, return_tensors="pt" |
|
|
) |
|
|
|
|
|
|
|
|
device = next(model.parameters()).device |
|
|
processed = { |
|
|
key: value.to(device) if isinstance(value, torch.Tensor) else value |
|
|
for key, value in processed.items() |
|
|
} |
|
|
|
|
|
|
|
|
processed.pop('tabular_metadata', None) |
|
|
|
|
|
|
|
|
gen_kwargs = { |
|
|
"max_new_tokens": max_new_tokens, |
|
|
"do_sample": do_sample, |
|
|
} |
|
|
if temperature is not None: |
|
|
gen_kwargs["temperature"] = temperature |
|
|
|
|
|
with torch.inference_mode(): |
|
|
res = model.generate(**processed, **gen_kwargs) |
|
|
|
|
|
|
|
|
generated_ids = [ |
|
|
output_ids[len(input_ids):] |
|
|
for input_ids, output_ids in zip(processed["input_ids"], res, strict=True) |
|
|
] |
|
|
output_text = processor.batch_decode( |
|
|
generated_ids, |
|
|
skip_special_tokens=True, |
|
|
clean_up_tokenization_spaces=True |
|
|
) |
|
|
|
|
|
return output_text[0] |