| import inspect
|
| from typing import Optional, Tuple, Union
|
|
|
| import torch
|
| import torch.nn as nn
|
| from transformers import (
|
| AutoConfig,
|
| AutoModel,
|
| DebertaV2Config,
|
| PreTrainedModel,
|
| RobertaConfig,
|
| )
|
|
|
| try:
|
| from transformers import EuroBertModel
|
| except ImportError:
|
| EuroBertModel = None
|
| from transformers.modeling_outputs import TokenClassifierOutput
|
|
|
|
|
| class MultiLabelTokenClassificationModelCustom(PreTrainedModel):
|
| """
|
| Custom multi-label token classification model with configurable classifier head.
|
| This model can be loaded with trust_remote_code=True for HuggingFace Hub compatibility.
|
| """
|
|
|
|
|
|
|
|
|
|
|
| def __init__(
|
| self,
|
| config,
|
| base_model=None,
|
| freeze_backbone=False,
|
| classifier_hidden_layers=None,
|
| classifier_dropout=0.1,
|
| ):
|
| super().__init__(config)
|
| self.__class__.config_class = config.__class__
|
| self.config = config
|
| self.num_labels = config.num_labels
|
|
|
|
|
| freeze_backbone = getattr(config, "freeze_backbone", False)
|
| classifier_hidden_layers = getattr(config, "classifier_hidden_layers", None)
|
| classifier_dropout = getattr(config, "classifier_dropout", 0.1)
|
|
|
|
|
|
|
| if base_model is None:
|
|
|
|
|
| backbone_name = getattr(config, "backbone_model_name", None)
|
| if backbone_name is None:
|
|
|
| backbone_name = getattr(config, "_name_or_path", None)
|
| if backbone_name is None:
|
| raise ValueError(
|
| "config.backbone_model_name (or config.name_or_path) is required to load pretrained backbone"
|
| )
|
|
|
|
|
| backbone_config = AutoConfig.from_pretrained(
|
| backbone_name, trust_remote_code=True
|
| )
|
|
|
| backbone_config.hidden_dropout_prob = getattr(
|
| config, "hidden_dropout_prob", 0.1
|
| )
|
| if "eurobert" in backbone_name.lower() and EuroBertModel is not None:
|
| self.backbone = EuroBertModel(backbone_config)
|
| else:
|
| self.backbone = AutoModel.from_config(
|
| backbone_config,
|
| trust_remote_code=True,
|
| )
|
| else:
|
| self.backbone = base_model
|
|
|
|
|
| if (
|
| not hasattr(config, "backbone_model_name")
|
| or config.backbone_model_name is None
|
| ):
|
|
|
| if base_model is not None and hasattr(base_model.config, "_name_or_path"):
|
| config.backbone_model_name = base_model.config._name_or_path
|
| elif hasattr(config, "_name_or_path"):
|
| config.backbone_model_name = config._name_or_path
|
|
|
|
|
| self.lm_output_size = self.backbone.config.hidden_size
|
|
|
| self.config.freeze_backbone = freeze_backbone
|
| self.config.classifier_hidden_layers = classifier_hidden_layers
|
| self.config.classifier_dropout = classifier_dropout
|
|
|
| if freeze_backbone:
|
|
|
| for param in self.backbone.parameters():
|
| param.requires_grad = False
|
| self.backbone.eval()
|
| else:
|
| self.backbone.train(True)
|
|
|
| self.dropout = nn.Dropout(
|
| config.hidden_dropout_prob
|
| if hasattr(config, "hidden_dropout_prob")
|
| else 0.1
|
| )
|
| self._build_classifier_head(classifier_hidden_layers, classifier_dropout)
|
| self.post_init()
|
|
|
| @classmethod
|
| def from_config(cls, config):
|
| return cls(
|
| config=config,
|
| freeze_backbone=getattr(config, "freeze_backbone", False),
|
| classifier_hidden_layers=getattr(config, "classifier_hidden_layers", None),
|
| classifier_dropout=getattr(config, "classifier_dropout", 0.1),
|
| )
|
|
|
| def _set_backbone_model_name(self, name: str):
|
| """Explicitly set the backbone model name in config (call before saving)."""
|
| self.config.backbone_model_name = name
|
|
|
| def _build_classifier_head(self, hidden_layers, dropout_rate):
|
| """
|
| Build a flexible classifier head with configurable hidden layers and dropout.
|
|
|
| Args:
|
| hidden_layers: Tuple of integers representing the number of neurons in each hidden layer.
|
| None or empty tuple means a simple linear layer.
|
| dropout_rate: Dropout probability between layers
|
| """
|
| layers = []
|
| input_size = self.lm_output_size
|
|
|
|
|
| if not hidden_layers:
|
| self.classifier = nn.Sequential(
|
| nn.Dropout(dropout_rate), nn.Linear(input_size, self.num_labels)
|
| )
|
| return
|
|
|
|
|
| for hidden_size in hidden_layers:
|
| layers.append(nn.Linear(input_size, hidden_size))
|
| layers.append(nn.ReLU())
|
| layers.append(nn.Dropout(dropout_rate))
|
| input_size = hidden_size
|
|
|
|
|
| layers.append(nn.Linear(input_size, self.num_labels))
|
|
|
|
|
| self.classifier = nn.Sequential(*layers)
|
|
|
| def forward(
|
| self,
|
| input_ids: Optional[torch.LongTensor] = None,
|
| attention_mask: Optional[torch.FloatTensor] = None,
|
| token_type_ids: Optional[torch.LongTensor] = None,
|
| position_ids: Optional[torch.LongTensor] = None,
|
| head_mask: Optional[torch.FloatTensor] = None,
|
| inputs_embeds: Optional[torch.FloatTensor] = None,
|
| labels: Optional[torch.LongTensor] = None,
|
| output_attentions: Optional[bool] = None,
|
| output_hidden_states: Optional[bool] = None,
|
| return_dict: Optional[bool] = None,
|
| **kwargs,
|
| ) -> Union[Tuple[torch.Tensor], TokenClassifierOutput]:
|
| return_dict = (
|
| return_dict if return_dict is not None else self.config.use_return_dict
|
| )
|
|
|
| forward_kwargs = {
|
| "attention_mask": attention_mask,
|
| "token_type_ids": token_type_ids,
|
| "position_ids": position_ids,
|
| "inputs_embeds": inputs_embeds,
|
| "output_attentions": output_attentions,
|
| "output_hidden_states": output_hidden_states,
|
| "return_dict": return_dict,
|
| "head_mask": head_mask,
|
| }
|
|
|
|
|
|
|
| sig = inspect.signature(
|
| getattr(self.backbone, "forward", self.backbone.__call__)
|
| )
|
| allowed = sig.parameters.keys()
|
| forward_kwargs = {
|
| k: v for k, v in forward_kwargs.items() if k in allowed and v is not None
|
| }
|
|
|
| outputs = self.backbone(input_ids, **forward_kwargs)
|
|
|
| sequence_output = outputs[0]
|
| sequence_output = self.dropout(sequence_output)
|
| logits = self.classifier(sequence_output)
|
|
|
| loss = None
|
| if labels is not None:
|
|
|
| loss_fct = nn.BCEWithLogitsLoss(reduction="mean")
|
|
|
| mask = (labels != -100).float()
|
|
|
| labels_masked = torch.where(
|
| labels == -100, torch.zeros_like(labels), labels
|
| )
|
|
|
| loss_tensor = loss_fct(logits, labels_masked.float())
|
| loss = (loss_tensor * mask).sum() / mask.sum()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| if not return_dict:
|
| output = (logits,) + outputs[2:]
|
| return ((loss,) + output) if loss is not None else output
|
|
|
| return TokenClassifierOutput(
|
| loss=loss,
|
| logits=logits,
|
| hidden_states=outputs.hidden_states,
|
| attentions=outputs.attentions,
|
| )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def load_custom_cardioner_model(model_path: str, device: str = "auto"):
|
| """
|
| Utility function to easily load a custom CardioNER model.
|
|
|
| Args:
|
| model_path: Path to the saved model directory
|
| device: Device to load model on ("auto", "cpu", "cuda", etc.)
|
|
|
| Returns:
|
| tuple: (model, tokenizer, config)
|
| """
|
|
|
| import os
|
|
|
| import torch
|
| from transformers import AutoModelForTokenClassification, AutoTokenizer
|
|
|
| required_files = ["config.json", "modeling.py", "pytorch_model.bin"]
|
| missing_files = [
|
| f for f in required_files if not os.path.exists(os.path.join(model_path, f))
|
| ]
|
|
|
| if missing_files:
|
| raise FileNotFoundError(
|
| f"Missing required files in {model_path}: {missing_files}"
|
| )
|
|
|
| print(f"Loading custom CardioNER model from: {model_path}")
|
|
|
|
|
| tokenizer = AutoTokenizer.from_pretrained(model_path)
|
|
|
|
|
| model = AutoModelForTokenClassification.from_pretrained(
|
| model_path, trust_remote_code=True, use_safetensors=True
|
| )
|
|
|
|
|
| if device == "auto":
|
| device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
|
| model = model.to(device)
|
|
|
| print(f"Model loaded successfully on {device}")
|
| print(f"Model type: {type(model).__name__}")
|
| print(f"Number of labels: {model.num_labels}")
|
|
|
| return model, tokenizer, model.config
|
|
|
|
|
| def validate_custom_model_directory(model_path: str) -> dict:
|
| """
|
| Validate that a model directory contains all necessary files for custom model loading.
|
|
|
| Args:
|
| model_path: Path to the model directory
|
|
|
| Returns:
|
| dict: Validation results with status and details
|
| """
|
| import json
|
| import os
|
|
|
| validation_results = {
|
| "valid": True,
|
| "errors": [],
|
| "warnings": [],
|
| "files_found": [],
|
| "model_info": {},
|
| }
|
|
|
|
|
| required_files = {
|
| "config.json": "Model configuration",
|
| "modeling.py": "Custom model class definition",
|
| "pytorch_model.bin": "Model weights",
|
| }
|
|
|
|
|
| optional_files = {
|
| "tokenizer.json": "Tokenizer vocabulary",
|
| "tokenizer_config.json": "Tokenizer configuration",
|
| "training_args.json": "Training arguments",
|
| }
|
|
|
|
|
| for filename, description in required_files.items():
|
| filepath = os.path.join(model_path, filename)
|
| if os.path.exists(filepath):
|
| validation_results["files_found"].append(f"{filename} ({description})")
|
| else:
|
| validation_results["valid"] = False
|
| validation_results["errors"].append(
|
| f"Missing required file: {filename} - {description}"
|
| )
|
|
|
|
|
| for filename, description in optional_files.items():
|
| filepath = os.path.join(model_path, filename)
|
| if os.path.exists(filepath):
|
| validation_results["files_found"].append(f"{filename} ({description})")
|
| else:
|
| validation_results["warnings"].append(
|
| f"Missing optional file: {filename} - {description}"
|
| )
|
|
|
|
|
| config_path = os.path.join(model_path, "config.json")
|
| if os.path.exists(config_path):
|
| try:
|
| with open(config_path, "r") as f:
|
| config = json.load(f)
|
|
|
| validation_results["model_info"]["num_labels"] = config.get(
|
| "num_labels", "Unknown"
|
| )
|
| validation_results["model_info"]["model_type"] = config.get(
|
| "model_type", "Unknown"
|
| )
|
| validation_results["model_info"]["has_auto_map"] = "auto_map" in config
|
| validation_results["model_info"]["classifier_hidden_layers"] = config.get(
|
| "classifier_hidden_layers", None
|
| )
|
| validation_results["model_info"]["freeze_backbone"] = config.get(
|
| "freeze_backbone", None
|
| )
|
|
|
| if not config.get("auto_map"):
|
| validation_results["warnings"].append(
|
| "No auto_map found in config - may not load correctly with trust_remote_code=True"
|
| )
|
|
|
| except json.JSONDecodeError as e:
|
| validation_results["valid"] = False
|
| validation_results["errors"].append(f"Invalid config.json: {str(e)}")
|
|
|
|
|
| modeling_path = os.path.join(model_path, "modeling.py")
|
| if os.path.exists(modeling_path):
|
| try:
|
| with open(modeling_path, "r") as f:
|
| content = f.read()
|
|
|
| if "MultiLabelTokenClassificationModelCustom" not in content:
|
| validation_results["valid"] = False
|
| validation_results["errors"].append(
|
| "modeling.py does not contain MultiLabelTokenClassificationModelCustom class"
|
| )
|
|
|
| except Exception as e:
|
| validation_results["warnings"].append(
|
| f"Could not read modeling.py: {str(e)}"
|
| )
|
|
|
| return validation_results
|
|
|