from trl.models.utils import unwrap_model_for_generation # %% import re import openai import torch from transformers import ( GenerationConfig, TrainerCallback, Qwen2TokenizerFast, ) import wandb import tqdm from accelerate.utils import gather_object import pandas as pd import io import numpy as np # Chat template for tabular models TABULAR_CHAT_TEMPLATE = """{% set image_count = namespace(value=0) %}{% set video_count = namespace(value=0) %}{% set tabular_count = namespace(value=0) %}{% for message in messages %}{% if loop.first and message['role'] != 'system' %}<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n{% endif %}<|im_start|>{{ message['role'] }}\n{% if message['content'] is string %}{{ message['content'] }}<|im_end|>\n{% else %}{% for content in message['content'] %}{% if content['type'] == 'image' or 'image' in content or 'image_url' in content %}{% set image_count.value = image_count.value + 1 %}{% if add_vision_id %}Picture {{ image_count.value }}: {% endif %}<|vision_start|><|image_pad|><|vision_end|>{% elif content['type'] == 'video' or 'video' in content %}{% set video_count.value = video_count.value + 1 %}{% if add_vision_id %}Video {{ video_count.value }}: {% endif %}<|vision_start|><|video_pad|><|vision_end|>{% elif content['type'] == 'tabular' or 'tabular' in content %}{% set tabular_count.value = tabular_count.value + 1 %}{% if add_vision_id %}Table {{ tabular_count.value }}: {% endif %}<|vision_start|><|tabular_pad|><|vision_end|>{% elif 'text' in content %}{{ content['text'] }}{% endif %}{% endfor %}<|im_end|>\n{% endif %}{% endfor %}{% if add_generation_prompt %}<|im_start|>assistant\n{% endif %}""" def load_model_and_processor( model_path: str, device: str = "cuda:0", torch_dtype=torch.bfloat16, ) -> tuple: """ Load a Qwen2_5_TabularModel and its processor. Args: model_path: Path to the model checkpoint or HuggingFace model name device: Device to load the model on (e.g., "cuda:0", "cuda:1", "cpu") torch_dtype: Torch dtype for the model (default: torch.bfloat16) Returns: tuple: (model, processor) ready to use """ from TabularModel import ( TabularPreprocessor, Qwen_2_5_TabularProcessor, Qwen2_5_TabularModel, ) # Create tabular preprocessor tabular_processor = TabularPreprocessor() # Create Qwen tabular processor qwen_tabular_processor = Qwen_2_5_TabularProcessor( tabular_processor=tabular_processor, tokenizer=Qwen2TokenizerFast.from_pretrained(model_path), ) # Add special tokens qwen_tabular_processor.tabular_token = "<|tabular_pad|>" qwen_tabular_processor.tokenizer.add_tokens([ qwen_tabular_processor.tabular_token, "<|tabular_row|>", "<|tabular_cell|>" ]) qwen_tabular_processor.tokenizer.chat_template = TABULAR_CHAT_TEMPLATE # Load model model = Qwen2_5_TabularModel.from_pretrained( model_path, torch_dtype=torch_dtype, ).to(device) # Set token IDs in config model.config.tabular_token_id = ( qwen_tabular_processor.tokenizer.convert_tokens_to_ids("<|tabular_pad|>") ) model.config.tabular_row_token_id = ( qwen_tabular_processor.tokenizer.convert_tokens_to_ids("<|tabular_row|>") ) model.config.tabular_cell_token_id = ( qwen_tabular_processor.tokenizer.convert_tokens_to_ids("<|tabular_cell|>") ) return model, qwen_tabular_processor def get_role_by_idx(convo: list[dict[str, str]], role: str, idx: int) -> str: found = 0 for message in convo: if message["role"] == role: if found == idx: return message["content"] found += 1 raise ValueError(f"Role {role} not found {idx} times") class LLMSampleCB(TrainerCallback): def __init__( self, trainer, test_dataset, num_samples=10, max_new_tokens=256, log_model="checkpoint", ): "A CallBack to log samples a wandb.Table during training" super().__init__() self._log_model = log_model self.trainer = trainer # Get unique tasks from the dataset tasks = set([i["task"] for i in test_dataset]) # Get num_samples from each task task_samples = [] for task in tasks: task_dataset = [i for i in test_dataset if i["task"] == task][:num_samples] task_samples.extend(task_dataset) # Combine samples from all tasks self.sample_dataset = task_samples self.model, self.tokenizer = trainer.model_wrapped, trainer.tokenizer self.tokenizer.padding_side = "left" self.gen_config = GenerationConfig.from_pretrained( trainer.model.name_or_path, temperature=0.001, max_new_tokens=max_new_tokens ) self.idx = 0 def generate(self, conversations: list[list[dict[str, str]]]) -> list[str]: accelerator = self.trainer.accelerator # Create original prompts before distribution to use as keys original_prompts = self.tokenizer.apply_chat_template(conversations, tokenize=False) original_prompt_to_idx = {self._normalize_string(prompt): idx for idx, prompt in enumerate(original_prompts)} completions = [None] * len(conversations) # Pre-allocate result array with accelerator.split_between_processes(conversations) as conversation_subset: model = self.trainer.model_wrapped with unwrap_model_for_generation(model, accelerator) as unwrapped_model: prompts = self.tokenizer.apply_chat_template(conversation_subset, tokenize=False) tokenized_prompts = self.tokenizer(prompts, return_tensors="pt", padding=True).to(model.device) with torch.inference_mode(): print("Generating...") generations = unwrapped_model.generate(**tokenized_prompts, generation_config=self.gen_config).cpu() print("Generated!") results = [] for prompt_str, prompt_tokens, generation in zip(prompts, tokenized_prompts.input_ids, generations): # Remove prompt from generation generation = generation[len(prompt_tokens) :] completion = self.tokenizer.decode(generation, skip_special_tokens=True) results.append((prompt_str, completion)) # Gather results from all processes all_results = gather_object(results) # Place completions in their original positions for prompt_str, completion in all_results: norm_prompt = self._normalize_string(prompt_str) if norm_prompt in original_prompt_to_idx: idx = original_prompt_to_idx[norm_prompt] completions[idx] = completion return completions def samples_filtering_table(self, examples): "Create a wandb.Table to store the generations" records_table = wandb.Table(columns=["full_prompt", "question", "generation", "real_answer", "points"]) max_num = [0] summary = [0] batch_size = 32 all_data = [] for i in tqdm.trange(0, len(examples), batch_size): batch = examples[i : i + batch_size] batch_data = [] # Prepare batch inputs batch_inputs = [] for row in batch: row = row["messages"] user = get_role_by_idx(row, "user", 0) real_answer = get_role_by_idx(row, "assistant", 0) # Extract the question from the user prompt question = user.split("Zapytanie brzmi:")[1].strip() if "Zapytanie brzmi:" in user else user prompt = user batch_inputs.append(row[:-1]) batch_data.append((prompt, question, real_answer)) # Generate all responses in a single pass generations = self.generate(batch_inputs) # Process results if self.trainer.accelerator.is_main_process: for idx, (prompt, question, real_answer) in enumerate(batch_data): generation = generations[idx] # Get points for this example try: _, points = self.compare_filtering_answer(question, generation, real_answer) max_num[0] += 1 summary[0] += points except Exception: points = 0 records_table.add_data(prompt, question, generation, real_answer, points) batch_data[idx] = (prompt, question, generation, real_answer) all_data.extend(batch_data) return records_table, summary[0] / max_num[0] if max_num[0] > 0 else 0 def compare_filtering_answer(self, question, answer, expected): client = openai.Client() system = "Jesteś sztuczną inteligencją do oceniania odpowiedzi na zadania filtrowania dokumentów prawniczych." user = f"Zapytanie: '{question}'.\nPoprawna odpowiedź: '{expected}'\nOdpowiedź modelu: '{answer}'." user += "\nOceń, czy odpowiedź modelu poprawnie identyfikuje powiązanie i zawiera odpowiednią argumentację, podobnie jak w poprawnej odpowiedzi." # noqa: E501 user += "\nOdpowiedz w formacie 'Argumentacja: (...)\nOcena: 0 lub 1', gdzie 0 to niepoprawna odpowiedź, a 1 to poprawna odpowiedź." # noqa: E501 response = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": system}, {"role": "user", "content": user}, ], temperature=0.0, max_tokens=512, ) resp = response.choices[0].message.content.rstrip(".").strip() print(resp) try: return resp, int(resp.split(":")[-1].split()[0].strip()) except Exception: print("Error: ", resp) # Look for either 0 or 1 in the response score = 1 if "ocena: 1" in resp.lower() else 0 return resp, score def on_evaluate(self, *args, **kwargs): "Log the wandb.Table after calling trainer.evaluate" filtering_dataset = [i for i in self.sample_dataset if i["task"] == "filtering"] records_table, recall = self.samples_filtering_table(filtering_dataset) if self.trainer.accelerator.is_main_process: try: wandb.log({"filtering_predictions_" + str(self.idx): records_table}) wandb.log({"filtering_recall": recall}) except Exception: pass self.idx += 1 def compare_answer(self, question, answer, expected): client = openai.Client() system = "Jesteś sztuczną inteligencją do oceniania odpowiedzi na egzaminie. Oceniasz odpowiedzi jako poprawne (1 punkt) lub niepoprawne (0 punktów)." # noqa: E501 user = f"Pytanie: '{question}'.\n Poprawna odpowiedź: '{expected}'\n Odpowiedź użytkownika: '{answer}'." user += "\nCzy odpowiedź użytkownika jest poprawna? Przyznaj 1 punkt za poprawną odpowiedź lub 0 punktów za niepoprawną. Jeżeli poprawna odpowiedź sugeruje że nie da się odpowiedzieć na pytanie, to odpowiedź użytkownika powinna być taka sama. Nie dawaj punktów za chęci. Oceniaj odpowiedź tylko pod kątem poprawności." # noqa: E501 user += "\nPodkreślam: jeżeli poprawna odpowiedź sugeruje że nie da się udzielić odpowiedzi na podstawie źródeł, to odpowiedź użytkownika powinna być taka sama." # noqa: E501 user += ( "Odpowiedz w formacie 'Argumentacja: (...)\nOcena: 0 lub 1', gdzie 0 to brak punktów, a 1 to pełna ocena." ) response = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": system}, {"role": "user", "content": user}, ], temperature=0.0, max_tokens=512, ) resp = response.choices[0].message.content.rstrip(".").strip() try: return resp, int(resp.split(":")[-1].split()[0].strip()) except Exception: print("Error: ", resp) # Look for either 0 or 1 in the response score = 1 if "1" in re.findall(r"\d+", resp) else 0 return resp, score def _normalize_string(self, s): """Normalize string to avoid whitespace/newline comparison issues""" if s is None: return "" # Remove all whitespace and convert to lowercase for more robust matching return re.sub(r'\s+', '', s).lower() def text_to_array(text): if '```' not in text: csv_text = text.strip() elif '```csv' not in text: csv_text = text.strip().split("```")[1].strip() else: csv_text = text.strip().split("```csv")[1].split("```")[0] # Parse CSV into a DataFrame df = pd.read_csv(io.StringIO(csv_text), header=None) # Convert DataFrame to numpy array for comparison generated_corr_matrix = df.values return generated_corr_matrix def generate_answer( model, processor, table: np.ndarray | torch.Tensor | list, question: str, max_new_tokens: int = 512, do_sample: bool = False, temperature: float | None = None, ) -> str: """ Generate an answer based on a table and a question. Args: model: The Qwen2_5_TabularModel instance processor: The Qwen_2_5_TabularProcessor instance table: The input table as numpy array (including dtype=object for mixed types), torch tensor, or list of lists question: The question to answer about the table max_new_tokens: Maximum number of tokens to generate do_sample: Whether to use sampling temperature: Sampling temperature (if do_sample=True) Returns: Generated answer as a string """ # Prepare messages in the expected format messages = [ { "role": "user", "content": [ {"type": "text", "text": "Consider this table:"}, {"index": 0, "type": "tabular"}, {"type": "text", "text": question}, ], } ] # Apply chat template preprocessed = processor.tokenizer.apply_chat_template( messages, tokenize=False ) # Process inputs processed = processor( [table], text=preprocessed, return_tensors="pt" ) # Move to model device device = next(model.parameters()).device processed = { key: value.to(device) if isinstance(value, torch.Tensor) else value for key, value in processed.items() } # Remove tabular_metadata as it's not a model parameter processed.pop('tabular_metadata', None) # Generate gen_kwargs = { "max_new_tokens": max_new_tokens, "do_sample": do_sample, } if temperature is not None: gen_kwargs["temperature"] = temperature with torch.inference_mode(): res = model.generate(**processed, **gen_kwargs) # Decode only the generated part (remove input) generated_ids = [ output_ids[len(input_ids):] for input_ids, output_ids in zip(processed["input_ids"], res, strict=True) ] output_text = processor.batch_decode( generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True ) return output_text[0]