from transformers import ( AutoConfig, AutoProcessor, ProcessorMixin, Qwen2TokenizerFast, BaseImageProcessor, Qwen2_5_VLForConditionalGeneration, ) from transformers.models.qwen2_5_vl.modeling_qwen2_5_vl import ( Qwen2_5_VLCausalLMOutputWithPast, Qwen2RMSNorm, ) from transformers.tokenization_utils_base import PreTokenizedInput, TextInput from transformers.processing_utils import Unpack from transformers.feature_extraction_sequence_utils import BatchFeature from typing import List, Optional, TypedDict # from tabpfn_extensions import TabPFNRegressor # from tabpfn_extensions.embedding import TabPFNEmbedding import numpy as np import torch from torch import nn from torch.nn import CrossEntropyLoss from pprint import pprint class TabularProcessorKwargs(TypedDict): """ Keyword arguments for tabular processing. """ pass class TabularPreprocessor(BaseImageProcessor): def __call__(self, X: list | np.ndarray | torch.Tensor) -> torch.Tensor: if not isinstance(X, list): X = [X] res = [] for X_sample in X: if isinstance(X_sample, torch.Tensor): X_sample = X_sample.cpu().numpy() res.append(X_sample) res = np.array(res) return BatchFeature(data={"tabular_values": torch.from_numpy(res).to(torch.float32)}) AutoProcessor.register("TabularPreprocessor", TabularPreprocessor) class TabularProcessor(nn.Module): def __init__(self, **kwargs: Unpack[TabularProcessorKwargs]): super().__init__(**kwargs) self.tabpfn = TabPFNRegressor( n_estimators=1, model_path="./tabpfn-v2-regressor.ckpt", device="cuda:1" ) def __call__(self, X: np.ndarray | torch.Tensor) -> torch.Tensor: # Will convert specified categorical indices to category dtype, as well # as handle `np.object` arrays or otherwise `object` dtype pandas columns. if len(X.shape) == 2: X = [X] res = [] for X_sample in X: if isinstance(X_sample, torch.Tensor): X_sample = X_sample.cpu().numpy() X_sample = X_sample[0] self.tabpfn.fit(X_sample, np.random.random(X_sample.shape[0])) embs = self.tabpfn.get_embeddings(X_sample) embs_t = torch.from_numpy(embs).to(self.tabpfn.device) embs_t = embs_t.mean(dim=0) res.append(embs_t) res = torch.stack(res) res = res.view(-1, 192) return res class TabularBlock(nn.Module): def __init__(self, input_dim: int, hidden_dim: int = 192): super().__init__() self.linear1 = nn.Linear(input_dim, hidden_dim) self.activation = nn.GELU() self.linear2 = nn.Linear(hidden_dim, input_dim) def forward(self, x: torch.Tensor) -> torch.Tensor: residual = x x = self.linear1(x) x = self.activation(x) x = self.linear2(x) return x + residual class TabularLearnableProcessor(nn.Module): def __init__(self, num_features: int = 1): super().__init__() # Each cell is processed individually as a scalar self.input_proj = nn.Linear(num_features, 192) self.nodes = nn.Sequential( nn.GELU(), TabularBlock(192, 64), nn.GELU(), TabularBlock(192, 64), nn.GELU(), TabularBlock(192, 64), nn.GELU(), TabularBlock(192, 64), nn.GELU(), TabularBlock(192, 64), nn.GELU(), TabularBlock(192, 64), nn.GELU(), TabularBlock(192, 64), ) def forward(self, X: np.ndarray | torch.Tensor) -> torch.Tensor: if isinstance(X, np.ndarray): X = torch.from_numpy(X) param_dtype = self.input_proj.weight.dtype X = X.to(param_dtype) # Flatten the table - each cell becomes a separate token # X shape: (batch_size, rows, cols) -> (batch_size * rows * cols, 1) batch_size = X.shape[0] X_flat = X.reshape(-1, 1) # Flatten to individual cells # RMS normalization per cell for stability # X_normalized = X_flat * torch.rsqrt(X_flat.pow(2) + 1e-5) projected = self.input_proj(X_flat) # res = self.nodes(projected) return projected class Qwen_2_5_TabularProcessor(ProcessorMixin): r""" Constructs a Qwen2.5-VL processor which wraps a Qwen2.5-VL image processor and a Qwen2 tokenizer into a single processor. [`Qwen2_5_VLProcessor`] offers all the functionalities of [`Qwen2VLImageProcessor`] and [`Qwen2TokenizerFast`]. See the [`~Qwen2_5_VLProcessor.__call__`] and [`~Qwen2_5_VLProcessor.decode`] for more information. Args: image_processor ([`Qwen2VLImageProcessor`], *optional*): The image processor is a required input. tokenizer ([`Qwen2TokenizerFast`], *optional*): The tokenizer is a required input. chat_template (`str`, *optional*): A Jinja template which will be used to convert lists of messages in a chat into a tokenizable string. """ attributes = ["tokenizer"] valid_kwargs = ["chat_template"] tokenizer_class = ("Qwen2Tokenizer", "Qwen2TokenizerFast") def __init__( self, tabular_processor: TabularPreprocessor | None = None, tokenizer=None, chat_template=None, **kwargs, ): self.tabular_token = ( "<|tabular_pad|>" if not hasattr(tokenizer, "tabular_token") else tokenizer.tabular_token ) self.tabular_processor = tabular_processor super().__init__(tokenizer, chat_template=chat_template) def __call__( self, tabular_values: np.ndarray | torch.Tensor | None = None, text: TextInput | PreTokenizedInput | list[TextInput] | list[PreTokenizedInput] | None = None, **kwargs: Unpack[TabularProcessorKwargs], ) -> BatchFeature: """ Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` and `kwargs` arguments to Qwen2TokenizerFast's [`~Qwen2TokenizerFast.__call__`] if `text` is not `None` to encode the text. To prepare the vision inputs, this method forwards the `vision_infos` and `kwrags` arguments to Qwen2VLImageProcessor's [`~Qwen2VLImageProcessor.__call__`] if `vision_infos` is not `None`. Args: images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `List[PIL.Image.Image]`, `List[np.ndarray]`, `List[torch.Tensor]`): The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch tensor. Both channels-first and channels-last formats are supported. text (`str`, `List[str]`, `List[List[str]]`): The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set `is_split_into_words=True` (to lift the ambiguity with a batch of sequences). videos (`np.ndarray`, `torch.Tensor`, `List[np.ndarray]`, `List[torch.Tensor]`): The image or batch of videos to be prepared. Each video can be a 4D NumPy array or PyTorch tensor, or a nested list of 3D frames. Both channels-first and channels-last formats are supported. return_tensors (`str` or [`~utils.TensorType`], *optional*): If set, will return tensors of a particular framework. Acceptable values are: - `'tf'`: Return TensorFlow `tf.constant` objects. - `'pt'`: Return PyTorch `torch.Tensor` objects. - `'np'`: Return NumPy `np.ndarray` objects. - `'jax'`: Return JAX `jnp.ndarray` objects. Returns: [`BatchFeature`]: A [`BatchFeature`] with the following fields: - **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not `None`). - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. - **pixel_values_videos** -- Pixel values of videos to be fed to a model. Returned when `videos` is not `None`. - **image_grid_thw** -- List of image 3D grid in LLM. Returned when `images` is not `None`. - **video_grid_thw** -- List of video 3D grid in LLM. Returned when `videos` is not `None`. - **second_per_grid_ts** -- List of video seconds per time grid. Returned when `videos` is not `None`. """ # print("Tabular values: ", tabular_values) if tabular_values is not None: tabular_inputs = self.tabular_processor(tabular_values) else: print("Warning! No tabular values provided!") tabular_inputs = {} if not isinstance(text, list): text = [text] if tabular_values is not None: index = 0 for i in range(len(text)): while self.tabular_token in text[i]: # Each cell becomes a token: num_tokens = rows * cols table_shape = tabular_inputs["tabular_values"][index].shape rows, cols = table_shape[0], table_shape[1] # Build pattern: for each row, add col tokens + row separator row_pattern = "<|placeholder|>" * cols + "<|tabular_row|>" replacement = row_pattern * rows text[i] = text[i].replace( self.tabular_token, replacement, 1, ) index += 1 text[i] = text[i].replace("<|placeholder|>", self.tabular_token) text_inputs = self.tokenizer(text, **kwargs) return BatchFeature(data={**text_inputs, **tabular_inputs}) def batch_decode(self, *args, **kwargs): """ This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please refer to the docstring of this method for more information. """ return self.tokenizer.batch_decode(*args, **kwargs) def decode(self, *args, **kwargs): """ This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to the docstring of this method for more information. """ return self.tokenizer.decode(*args, **kwargs) def post_process_image_text_to_text( self, generated_outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False, **kwargs, ): """ Post-process the output of the model to decode the text. Args: generated_outputs (`torch.Tensor` or `np.ndarray`): The output of the model `generate` function. The output is expected to be a tensor of shape `(batch_size, sequence_length)` or `(sequence_length,)`. skip_special_tokens (`bool`, *optional*, defaults to `True`): Whether or not to remove special tokens in the output. Argument passed to the tokenizer's `batch_decode` method. Clean_up_tokenization_spaces (`bool`, *optional*, defaults to `False`): Whether or not to clean up the tokenization spaces. Argument passed to the tokenizer's `batch_decode` method. **kwargs: Additional arguments to be passed to the tokenizer's `batch_decode method`. Returns: `List[str]`: The decoded text. """ return self.tokenizer.batch_decode( generated_outputs, skip_special_tokens=skip_special_tokens, clean_up_tokenization_spaces=clean_up_tokenization_spaces, **kwargs, ) @property def model_input_names(self): tokenizer_input_names = self.tokenizer.model_input_names tabular_processor_input_names = self.tabular_processor.model_input_names if hasattr(self.tabular_processor, 'model_input_names') else [] names_from_processor = list( dict.fromkeys(tokenizer_input_names + tabular_processor_input_names) ) return names_from_processor + ["tabular_values"] class Qwen2_5_TabularModel(Qwen2_5_VLForConditionalGeneration): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.tabular_processor = TabularLearnableProcessor(num_features=1) self.tabular_projection = nn.Sequential( nn.Linear(192, self.config.hidden_size), nn.ReLU(), TabularBlock(self.config.hidden_size, self.config.hidden_size), nn.ReLU(), TabularBlock(self.config.hidden_size, self.config.hidden_size), nn.ReLU(), TabularBlock(self.config.hidden_size, self.config.hidden_size), ) def forward( self, input_ids: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[List[torch.FloatTensor]] = None, inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, pixel_values: Optional[torch.Tensor] = None, pixel_values_videos: Optional[torch.FloatTensor] = None, tabular_values: Optional[torch.Tensor] = None, image_grid_thw: Optional[torch.LongTensor] = None, video_grid_thw: Optional[torch.LongTensor] = None, rope_deltas: Optional[torch.LongTensor] = None, cache_position: Optional[torch.LongTensor] = None, second_per_grid_ts: Optional[torch.Tensor] = None, ): r""" labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. Returns: Example: ```python >>> from PIL import Image >>> import requests >>> from transformers import AutoProcessor, Qwen2_5_VLForConditionalGeneration >>> model = Qwen2_5_VLForConditionalGeneration.from_pretrained("Qwen/Qwen2.5-VL-7B-Instruct") >>> processor = AutoProcessor.from_pretrained("Qwen/Qwen2.5-VL-7B-Instruct") >>> messages = [ { "role": "user", "content": [ {"type": "image"}, {"type": "text", "text": "What is shown in this image?"}, ], }, ] >>> url = "https://www.ilankelman.org/stopsigns/australia.jpg" >>> image = Image.open(requests.get(url, stream=True).raw) >>> text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) >>> inputs = processor(text=[text], images=[image], vision_infos=[vision_infos]) >>> # Generate >>> generate_ids = model.generate(inputs.input_ids, max_length=30) >>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] "The image shows a street scene with a red stop sign in the foreground. In the background, there is a large red gate with Chinese characters ..." ```""" output_attentions = ( output_attentions if output_attentions is not None else self.config.output_attentions ) output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = ( return_dict if return_dict is not None else self.config.use_return_dict ) if inputs_embeds is None: inputs_embeds = self.language_model.embed_tokens(input_ids) if pixel_values is not None: pixel_values = pixel_values.type(self.visual.dtype) image_embeds = self.visual(pixel_values, grid_thw=image_grid_thw) n_image_tokens = (input_ids == self.config.image_token_id).sum().item() n_image_features = image_embeds.shape[0] if n_image_tokens != n_image_features: raise ValueError( f"Image features and image tokens do not match: tokens: {n_image_tokens}, features {n_image_features}" ) mask = input_ids == self.config.image_token_id mask_unsqueezed = mask.unsqueeze(-1) mask_expanded = mask_unsqueezed.expand_as(inputs_embeds) image_mask = mask_expanded.to(inputs_embeds.device) image_embeds = image_embeds.to( inputs_embeds.device, inputs_embeds.dtype ) inputs_embeds = inputs_embeds.masked_scatter(image_mask, image_embeds) if pixel_values_videos is not None: pixel_values_videos = pixel_values_videos.type(self.visual.dtype) video_embeds = self.visual(pixel_values_videos, grid_thw=video_grid_thw) n_video_tokens = (input_ids == self.config.video_token_id).sum().item() n_video_features = video_embeds.shape[0] if n_video_tokens != n_video_features: raise ValueError( f"Video features and video tokens do not match: tokens: {n_video_tokens}, features {n_video_features}" ) mask = input_ids == self.config.video_token_id mask_unsqueezed = mask.unsqueeze(-1) mask_expanded = mask_unsqueezed.expand_as(inputs_embeds) video_mask = mask_expanded.to(inputs_embeds.device) video_embeds = video_embeds.to( inputs_embeds.device, inputs_embeds.dtype ) inputs_embeds = inputs_embeds.masked_scatter(video_mask, video_embeds) if tabular_values is not None: proc_feats = self.tabular_processor(tabular_values.to(self.device, torch.float32)) proc_feats = proc_feats.to(inputs_embeds.dtype).to(self.device) tabular_embeds = self.tabular_projection(proc_feats) tabular_token_id = getattr(self.config, "tabular_token_id", None) if tabular_token_id is None: raise ValueError("Tabular token id (config.tabular_token_id) is not set.") mask = (input_ids == int(tabular_token_id)) tabular_no_mask = mask.sum().item() if tabular_no_mask != tabular_embeds.shape[0]: raise ValueError( f"Tabular features and tabular tokens do not match: tokens: {tabular_no_mask}, features {tabular_embeds.shape[0]}" ) mask_unsqueezed = mask.unsqueeze(-1) mask_expanded = mask_unsqueezed.expand_as(inputs_embeds) tabular_mask = mask_expanded.to(inputs_embeds.device) tabular_embeds = tabular_embeds.to( inputs_embeds.device, inputs_embeds.dtype ) inputs_embeds = inputs_embeds.masked_scatter( tabular_mask, tabular_embeds ) if attention_mask is not None: attention_mask = attention_mask.to(inputs_embeds.device) # if we get 4D attention mask we cannot calculate rope deltas anymore. TODO @raushan fixme if position_ids is None and ( attention_mask is None or attention_mask.ndim == 2 ): # calculate RoPE index once per generation in the pre-fill stage only if ( (cache_position is not None and cache_position[0] == 0) or self.rope_deltas is None or (past_key_values is None or past_key_values.get_seq_length() == 0) ): position_ids, rope_deltas = self.model.get_rope_index( input_ids, image_grid_thw, video_grid_thw, second_per_grid_ts, attention_mask, ) self.rope_deltas = rope_deltas # then use the prev pre-calculated rope-deltas to get the correct position ids else: batch_size, seq_length, _ = inputs_embeds.shape delta = ( (cache_position[0] + self.rope_deltas).to(inputs_embeds.device) if cache_position is not None else 0 ) position_ids = torch.arange(seq_length, device=inputs_embeds.device) position_ids = position_ids.view(1, -1).expand(batch_size, -1) if cache_position is not None: # otherwise `deltas` is an int `0` delta = delta.repeat_interleave(batch_size // delta.shape[0], dim=0) position_ids = position_ids.add(delta) position_ids = position_ids.unsqueeze(0).expand(3, -1, -1) outputs = self.model( input_ids=None, position_ids=position_ids, attention_mask=attention_mask, past_key_values=past_key_values, inputs_embeds=inputs_embeds, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, cache_position=cache_position, ) hidden_states = outputs[0] logits = self.lm_head(hidden_states) loss = None if labels is not None: # Upcast to float if we need to compute the loss to avoid potential precision issues logits = logits.float() # Shift so that tokens < n predict n shift_logits = logits[..., :-1, :].contiguous() shift_labels = labels[..., 1:].contiguous() # Flatten the tokens loss_fct = CrossEntropyLoss() shift_logits = shift_logits.view(-1, self.config.vocab_size) shift_labels = shift_labels.view(-1) # Enable model parallelism shift_labels = shift_labels.to(shift_logits.device) loss = loss_fct(shift_logits, shift_labels) if not return_dict: output = (logits,) + outputs[1:] return (loss,) + output if loss is not None else output return Qwen2_5_VLCausalLMOutputWithPast( loss=loss, logits=logits, past_key_values=outputs.past_key_values, hidden_states=outputs.hidden_states, attentions=outputs.attentions, rope_deltas=self.rope_deltas, ) def prepare_inputs_for_generation( self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, cache_position=None, position_ids=None, use_cache=True, pixel_values=None, pixel_values_videos=None, image_grid_thw=None, video_grid_thw=None, second_per_grid_ts=None, **kwargs, ): # Overwritten -- in specific circumstances we don't want to forward image inputs to the model model_inputs = super().prepare_inputs_for_generation( input_ids, past_key_values=past_key_values, attention_mask=attention_mask, inputs_embeds=inputs_embeds, cache_position=cache_position, position_ids=position_ids, pixel_values=pixel_values, pixel_values_videos=pixel_values_videos, image_grid_thw=image_grid_thw, video_grid_thw=video_grid_thw, second_per_grid_ts=second_per_grid_ts, use_cache=use_cache, **kwargs, ) # Qwen2-5-VL position_ids are prepareed with rope_deltas in forward model_inputs["position_ids"] = None if cache_position[0] != 0: model_inputs["pixel_values"] = None model_inputs["pixel_values_videos"] = None model_inputs["tabular_values"] = None return model_inputs if __name__ == "__main__": template = """"{% set image_count = namespace(value=0) %}{% set video_count = namespace(value=0) %}{% set tabular_count = namespace(value=0) %}{% for message in messages %}{% if loop.first and message['role'] != 'system' %}<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n{% endif %}<|im_start|>{{ message['role'] }}\n{% if message['content'] is string %}{{ message['content'] }}<|im_end|>\n{% else %}{% for content in message['content'] %}{% if content['type'] == 'image' or 'image' in content or 'image_url' in content %}{% set image_count.value = image_count.value + 1 %}{% if add_vision_id %}Picture {{ image_count.value }}: {% endif %}<|vision_start|><|image_pad|><|vision_end|>{% elif content['type'] == 'video' or 'video' in content %}{% set video_count.value = video_count.value + 1 %}{% if add_vision_id %}Video {{ video_count.value }}: {% endif %}<|vision_start|><|video_pad|><|vision_end|>{% elif content['type'] == 'tabular' or 'tabular' in content %}{% set tabular_count.value = tabular_count.value + 1 %}{% if add_vision_id %}Table {{ tabular_count.value }}: {% endif %}<|vision_start|><|tabular_pad|><|vision_end|>{% elif 'text' in content %}{{ content['text'] }}{% endif %}{% endfor %}<|im_end|>\n{% endif %}{% endfor %}{% if add_generation_prompt %}<|im_start|>assistant\n{% endif %}""" MODE = "reconstruction_variable" model_name_trained = f"./models/Tabular-LM-v0.1-{MODE}" # model_name_trained = "Qwen/Qwen2.5-VL-3B-Instruct" # model_name_trained = "./models/checkpoints/checkpoint-1000" tabular_processor = TabularPreprocessor() qwen_tabular_processor = Qwen_2_5_TabularProcessor( tabular_processor=tabular_processor, tokenizer=Qwen2TokenizerFast.from_pretrained(model_name_trained), ) qwen_tabular_processor.tabular_token = "<|tabular_pad|>" qwen_tabular_processor.tokenizer.add_tokens([qwen_tabular_processor.tabular_token, "<|tabular_row|>"]) qwen_tabular_processor.tokenizer.chat_template = template tabular_data = np.random.randn(4,6).round(2) messages = [ { "role": "user", "content": [ {"type": "text", "text": "This is a table."}, {"index": 0, "type": "tabular"}, {"type": "text", "text": "Give me its content in csv format."}, # {"type": "text", "text": "Give me a statistical summary."}, # {"type": "text", "text": "Give me the correlation matrix in csv format"}, # {"type": "text", "text": "Give me the content of the table"}, ], } ] preprocessed = qwen_tabular_processor.tokenizer.apply_chat_template( messages, tokenize=False ) processed = qwen_tabular_processor( [tabular_data], text=preprocessed, return_tensors="pt" ) model = Qwen2_5_TabularModel.from_pretrained(model_name_trained).to("cuda:1") model.config.tabular_token_id = ( qwen_tabular_processor.tokenizer.convert_tokens_to_ids("<|tabular_pad|>") ) model.config.tabular_row_token_id = ( qwen_tabular_processor.tokenizer.convert_tokens_to_ids("<|tabular_row|>") ) processed = {key: value.to("cuda:1") for key, value in processed.items()} res = model.generate(**processed, max_new_tokens=512, do_sample=False) generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(processed["input_ids"], res, strict=True)] output_text = qwen_tabular_processor.batch_decode(generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True) print("="*80) print("Original table:") print(tabular_data) print("\nModel output:") print(output_text[0]) print("="*80) if MODE in ["reconstruction", "reconstruction_variable"]: # Try to evaluate reconstruction quality from utils import text_to_array generated_array = text_to_array(output_text[0]) # Round original to match expected precision tabular_data_rounded = tabular_data.round(1) print("\nReconstruction evaluation:") print(f"Original shape: {tabular_data_rounded.shape}") print(f"Generated shape: {generated_array.shape}") if generated_array.shape == tabular_data_rounded.shape: mse = np.mean((generated_array - tabular_data_rounded) ** 2) mae = np.mean(np.abs(generated_array - tabular_data_rounded)) print(f"MSE: {mse:.4f}") print(f"MAE: {mae:.4f}") else: print(f"Shape mismatch - cannot compute metrics") if MODE == "summary": summary_parts = [] # Podstawowe statystyki summary_parts.append(f"Mean: {tabular_data.mean():.2f}") summary_parts.append(f"Median: {np.median(tabular_data):.2f}") summary_parts.append(f"Std: {tabular_data.std():.2f}") summary_parts.append(f"Min: {tabular_data.min():.2f}") summary_parts.append(f"Max: {tabular_data.max():.2f}") # Średnie po wierszach row_means = tabular_data.mean(axis=1) row_means_str = ", ".join([f"{m:.2f}" for m in row_means]) summary_parts.append(f"Row means: [{row_means_str}]") # Średnie po kolumnach col_means = tabular_data.mean(axis=0) col_means_str = ", ".join([f"{m:.2f}" for m in col_means]) summary_parts.append(f"Column means: [{col_means_str}]") # Macierz korelacji (jeśli mamy więcej niż 1 kolumnę) if tabular_data.shape[1] > 1: try: corrcoef = np.corrcoef(tabular_data.T) corr_str = "Correlation matrix:\n" for i in range(corrcoef.shape[0]): corr_row = ", ".join([f"{corrcoef[i, j]:.2f}" for j in range(corrcoef.shape[1])]) corr_str += f" [{corr_row}]\n" summary_parts.append(corr_str.strip()) except: pass summary_text = "\n".join(summary_parts) print("True summary:") print(summary_text)