wwydmanski's picture
Upload folder using huggingface_hub
03d6533 verified
from trl.models.utils import unwrap_model_for_generation
# %%
import re
import openai
import torch
from transformers import (
GenerationConfig,
TrainerCallback,
Qwen2TokenizerFast,
)
import wandb
import tqdm
from accelerate.utils import gather_object
import pandas as pd
import io
import numpy as np
# Chat template for tabular models
TABULAR_CHAT_TEMPLATE = """{% set image_count = namespace(value=0) %}{% set video_count = namespace(value=0) %}{% set tabular_count = namespace(value=0) %}{% for message in messages %}{% if loop.first and message['role'] != 'system' %}<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n{% endif %}<|im_start|>{{ message['role'] }}\n{% if message['content'] is string %}{{ message['content'] }}<|im_end|>\n{% else %}{% for content in message['content'] %}{% if content['type'] == 'image' or 'image' in content or 'image_url' in content %}{% set image_count.value = image_count.value + 1 %}{% if add_vision_id %}Picture {{ image_count.value }}: {% endif %}<|vision_start|><|image_pad|><|vision_end|>{% elif content['type'] == 'video' or 'video' in content %}{% set video_count.value = video_count.value + 1 %}{% if add_vision_id %}Video {{ video_count.value }}: {% endif %}<|vision_start|><|video_pad|><|vision_end|>{% elif content['type'] == 'tabular' or 'tabular' in content %}{% set tabular_count.value = tabular_count.value + 1 %}{% if add_vision_id %}Table {{ tabular_count.value }}: {% endif %}<|vision_start|><|tabular_pad|><|vision_end|>{% elif 'text' in content %}{{ content['text'] }}{% endif %}{% endfor %}<|im_end|>\n{% endif %}{% endfor %}{% if add_generation_prompt %}<|im_start|>assistant\n{% endif %}"""
def load_model_and_processor(
model_path: str,
device: str = "cuda:0",
torch_dtype=torch.bfloat16,
) -> tuple:
"""
Load a Qwen2_5_TabularModel and its processor.
Args:
model_path: Path to the model checkpoint or HuggingFace model name
device: Device to load the model on (e.g., "cuda:0", "cuda:1", "cpu")
torch_dtype: Torch dtype for the model (default: torch.bfloat16)
Returns:
tuple: (model, processor) ready to use
"""
from TabularModel import (
TabularPreprocessor,
Qwen_2_5_TabularProcessor,
Qwen2_5_TabularModel,
)
# Create tabular preprocessor
tabular_processor = TabularPreprocessor()
# Create Qwen tabular processor
qwen_tabular_processor = Qwen_2_5_TabularProcessor(
tabular_processor=tabular_processor,
tokenizer=Qwen2TokenizerFast.from_pretrained(model_path),
)
# Add special tokens
qwen_tabular_processor.tabular_token = "<|tabular_pad|>"
qwen_tabular_processor.tokenizer.add_tokens([
qwen_tabular_processor.tabular_token,
"<|tabular_row|>",
"<|tabular_cell|>"
])
qwen_tabular_processor.tokenizer.chat_template = TABULAR_CHAT_TEMPLATE
# Load model
model = Qwen2_5_TabularModel.from_pretrained(
model_path,
torch_dtype=torch_dtype,
).to(device)
# Set token IDs in config
model.config.tabular_token_id = (
qwen_tabular_processor.tokenizer.convert_tokens_to_ids("<|tabular_pad|>")
)
model.config.tabular_row_token_id = (
qwen_tabular_processor.tokenizer.convert_tokens_to_ids("<|tabular_row|>")
)
model.config.tabular_cell_token_id = (
qwen_tabular_processor.tokenizer.convert_tokens_to_ids("<|tabular_cell|>")
)
return model, qwen_tabular_processor
def get_role_by_idx(convo: list[dict[str, str]], role: str, idx: int) -> str:
found = 0
for message in convo:
if message["role"] == role:
if found == idx:
return message["content"]
found += 1
raise ValueError(f"Role {role} not found {idx} times")
class LLMSampleCB(TrainerCallback):
def __init__(
self,
trainer,
test_dataset,
num_samples=10,
max_new_tokens=256,
log_model="checkpoint",
):
"A CallBack to log samples a wandb.Table during training"
super().__init__()
self._log_model = log_model
self.trainer = trainer
# Get unique tasks from the dataset
tasks = set([i["task"] for i in test_dataset])
# Get num_samples from each task
task_samples = []
for task in tasks:
task_dataset = [i for i in test_dataset if i["task"] == task][:num_samples]
task_samples.extend(task_dataset)
# Combine samples from all tasks
self.sample_dataset = task_samples
self.model, self.tokenizer = trainer.model_wrapped, trainer.tokenizer
self.tokenizer.padding_side = "left"
self.gen_config = GenerationConfig.from_pretrained(
trainer.model.name_or_path, temperature=0.001, max_new_tokens=max_new_tokens
)
self.idx = 0
def generate(self, conversations: list[list[dict[str, str]]]) -> list[str]:
accelerator = self.trainer.accelerator
# Create original prompts before distribution to use as keys
original_prompts = self.tokenizer.apply_chat_template(conversations, tokenize=False)
original_prompt_to_idx = {self._normalize_string(prompt): idx for idx, prompt in enumerate(original_prompts)}
completions = [None] * len(conversations) # Pre-allocate result array
with accelerator.split_between_processes(conversations) as conversation_subset:
model = self.trainer.model_wrapped
with unwrap_model_for_generation(model, accelerator) as unwrapped_model:
prompts = self.tokenizer.apply_chat_template(conversation_subset, tokenize=False)
tokenized_prompts = self.tokenizer(prompts, return_tensors="pt", padding=True).to(model.device)
with torch.inference_mode():
print("Generating...")
generations = unwrapped_model.generate(**tokenized_prompts, generation_config=self.gen_config).cpu()
print("Generated!")
results = []
for prompt_str, prompt_tokens, generation in zip(prompts, tokenized_prompts.input_ids, generations):
# Remove prompt from generation
generation = generation[len(prompt_tokens) :]
completion = self.tokenizer.decode(generation, skip_special_tokens=True)
results.append((prompt_str, completion))
# Gather results from all processes
all_results = gather_object(results)
# Place completions in their original positions
for prompt_str, completion in all_results:
norm_prompt = self._normalize_string(prompt_str)
if norm_prompt in original_prompt_to_idx:
idx = original_prompt_to_idx[norm_prompt]
completions[idx] = completion
return completions
def samples_filtering_table(self, examples):
"Create a wandb.Table to store the generations"
records_table = wandb.Table(columns=["full_prompt", "question", "generation", "real_answer", "points"])
max_num = [0]
summary = [0]
batch_size = 32
all_data = []
for i in tqdm.trange(0, len(examples), batch_size):
batch = examples[i : i + batch_size]
batch_data = []
# Prepare batch inputs
batch_inputs = []
for row in batch:
row = row["messages"]
user = get_role_by_idx(row, "user", 0)
real_answer = get_role_by_idx(row, "assistant", 0)
# Extract the question from the user prompt
question = user.split("Zapytanie brzmi:")[1].strip() if "Zapytanie brzmi:" in user else user
prompt = user
batch_inputs.append(row[:-1])
batch_data.append((prompt, question, real_answer))
# Generate all responses in a single pass
generations = self.generate(batch_inputs)
# Process results
if self.trainer.accelerator.is_main_process:
for idx, (prompt, question, real_answer) in enumerate(batch_data):
generation = generations[idx]
# Get points for this example
try:
_, points = self.compare_filtering_answer(question, generation, real_answer)
max_num[0] += 1
summary[0] += points
except Exception:
points = 0
records_table.add_data(prompt, question, generation, real_answer, points)
batch_data[idx] = (prompt, question, generation, real_answer)
all_data.extend(batch_data)
return records_table, summary[0] / max_num[0] if max_num[0] > 0 else 0
def compare_filtering_answer(self, question, answer, expected):
client = openai.Client()
system = "Jesteś sztuczną inteligencją do oceniania odpowiedzi na zadania filtrowania dokumentów prawniczych."
user = f"Zapytanie: '{question}'.\nPoprawna odpowiedź: '{expected}'\nOdpowiedź modelu: '{answer}'."
user += "\nOceń, czy odpowiedź modelu poprawnie identyfikuje powiązanie i zawiera odpowiednią argumentację, podobnie jak w poprawnej odpowiedzi." # noqa: E501
user += "\nOdpowiedz w formacie 'Argumentacja: (...)\nOcena: 0 lub 1', gdzie 0 to niepoprawna odpowiedź, a 1 to poprawna odpowiedź." # noqa: E501
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
temperature=0.0,
max_tokens=512,
)
resp = response.choices[0].message.content.rstrip(".").strip()
print(resp)
try:
return resp, int(resp.split(":")[-1].split()[0].strip())
except Exception:
print("Error: ", resp)
# Look for either 0 or 1 in the response
score = 1 if "ocena: 1" in resp.lower() else 0
return resp, score
def on_evaluate(self, *args, **kwargs):
"Log the wandb.Table after calling trainer.evaluate"
filtering_dataset = [i for i in self.sample_dataset if i["task"] == "filtering"]
records_table, recall = self.samples_filtering_table(filtering_dataset)
if self.trainer.accelerator.is_main_process:
try:
wandb.log({"filtering_predictions_" + str(self.idx): records_table})
wandb.log({"filtering_recall": recall})
except Exception:
pass
self.idx += 1
def compare_answer(self, question, answer, expected):
client = openai.Client()
system = "Jesteś sztuczną inteligencją do oceniania odpowiedzi na egzaminie. Oceniasz odpowiedzi jako poprawne (1 punkt) lub niepoprawne (0 punktów)." # noqa: E501
user = f"Pytanie: '{question}'.\n Poprawna odpowiedź: '{expected}'\n Odpowiedź użytkownika: '{answer}'."
user += "\nCzy odpowiedź użytkownika jest poprawna? Przyznaj 1 punkt za poprawną odpowiedź lub 0 punktów za niepoprawną. Jeżeli poprawna odpowiedź sugeruje że nie da się odpowiedzieć na pytanie, to odpowiedź użytkownika powinna być taka sama. Nie dawaj punktów za chęci. Oceniaj odpowiedź tylko pod kątem poprawności." # noqa: E501
user += "\nPodkreślam: jeżeli poprawna odpowiedź sugeruje że nie da się udzielić odpowiedzi na podstawie źródeł, to odpowiedź użytkownika powinna być taka sama." # noqa: E501
user += (
"Odpowiedz w formacie 'Argumentacja: (...)\nOcena: 0 lub 1', gdzie 0 to brak punktów, a 1 to pełna ocena."
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
temperature=0.0,
max_tokens=512,
)
resp = response.choices[0].message.content.rstrip(".").strip()
try:
return resp, int(resp.split(":")[-1].split()[0].strip())
except Exception:
print("Error: ", resp)
# Look for either 0 or 1 in the response
score = 1 if "1" in re.findall(r"\d+", resp) else 0
return resp, score
def _normalize_string(self, s):
"""Normalize string to avoid whitespace/newline comparison issues"""
if s is None:
return ""
# Remove all whitespace and convert to lowercase for more robust matching
return re.sub(r'\s+', '', s).lower()
def text_to_array(text):
if '```' not in text:
csv_text = text.strip()
elif '```csv' not in text:
csv_text = text.strip().split("```")[1].strip()
else:
csv_text = text.strip().split("```csv")[1].split("```")[0]
# Parse CSV into a DataFrame
df = pd.read_csv(io.StringIO(csv_text), header=None)
# Convert DataFrame to numpy array for comparison
generated_corr_matrix = df.values
return generated_corr_matrix
def generate_answer(
model,
processor,
table: np.ndarray | torch.Tensor | list,
question: str,
max_new_tokens: int = 512,
do_sample: bool = False,
temperature: float | None = None,
) -> str:
"""
Generate an answer based on a table and a question.
Args:
model: The Qwen2_5_TabularModel instance
processor: The Qwen_2_5_TabularProcessor instance
table: The input table as numpy array (including dtype=object for mixed types),
torch tensor, or list of lists
question: The question to answer about the table
max_new_tokens: Maximum number of tokens to generate
do_sample: Whether to use sampling
temperature: Sampling temperature (if do_sample=True)
Returns:
Generated answer as a string
"""
# Prepare messages in the expected format
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "Consider this table:"},
{"index": 0, "type": "tabular"},
{"type": "text", "text": question},
],
}
]
# Apply chat template
preprocessed = processor.tokenizer.apply_chat_template(
messages, tokenize=False
)
# Process inputs
processed = processor(
[table], text=preprocessed, return_tensors="pt"
)
# Move to model device
device = next(model.parameters()).device
processed = {
key: value.to(device) if isinstance(value, torch.Tensor) else value
for key, value in processed.items()
}
# Remove tabular_metadata as it's not a model parameter
processed.pop('tabular_metadata', None)
# Generate
gen_kwargs = {
"max_new_tokens": max_new_tokens,
"do_sample": do_sample,
}
if temperature is not None:
gen_kwargs["temperature"] = temperature
with torch.inference_mode():
res = model.generate(**processed, **gen_kwargs)
# Decode only the generated part (remove input)
generated_ids = [
output_ids[len(input_ids):]
for input_ids, output_ids in zip(processed["input_ids"], res, strict=True)
]
output_text = processor.batch_decode(
generated_ids,
skip_special_tokens=True,
clean_up_tokenization_spaces=True
)
return output_text[0]