|
|
| import os, json, torch |
| from torch import nn |
| from transformers import AutoModel, AutoTokenizer |
|
|
| def mean_pool(last_hidden_state, attention_mask): |
| mask = attention_mask.unsqueeze(-1).type_as(last_hidden_state) |
| summed = (last_hidden_state * mask).sum(dim=1) |
| counts = mask.sum(dim=1).clamp(min=1e-9) |
| return summed / counts |
|
|
| class SummaryEvaluatorModule(nn.Module): |
| def __init__(self, base_model_name_or_path, head_config_path, regressor_path): |
| super().__init__() |
| self.model = AutoModel.from_pretrained(base_model_name_or_path) |
| with open(head_config_path, "r", encoding="utf-8") as f: |
| head_cfg = json.load(f) |
| hidden = head_cfg["in_features"] |
| self.regressor = nn.Sequential( |
| nn.Linear(hidden, 256), |
| nn.ReLU(), |
| nn.Linear(256, 3) |
| ) |
| self.regressor.load_state_dict(torch.load(regressor_path, map_location="cpu")) |
| self.regressor.eval() |
|
|
| @torch.no_grad() |
| def forward(self, input_ids, attention_mask): |
| out = self.model(input_ids=input_ids, attention_mask=attention_mask) |
| pooled = mean_pool(out.last_hidden_state, attention_mask) |
| return self.regressor(pooled) |
|
|
| def from_pretrained_custom(repo_dir_or_id, device=None): |
| |
| base = repo_dir_or_id |
| tok = AutoTokenizer.from_pretrained(base, use_fast=True) |
| mdl = SummaryEvaluatorModule( |
| base_model_name_or_path=base, |
| head_config_path=os.path.join(base, "head_config.json"), |
| regressor_path=os.path.join(base, "regressor.pt"), |
| ) |
| if device is None: |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| mdl.to(device).eval() |
| return mdl, tok, device |
|
|