|
|
|
|
|
from typing import Dict, List, Any |
|
|
from transformers import AutoTokenizer, AutoConfig, AutoModelForSeq2SeqLM |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import re |
|
|
|
|
|
|
|
|
class RMSNorm(nn.Module): |
|
|
def __init__(self, dim: int, eps: float = 1e-6): |
|
|
super().__init__() |
|
|
self.dim = dim |
|
|
self.eps = eps |
|
|
self.scale = nn.Parameter(torch.ones(dim)) |
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
|
rms = torch.sqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) |
|
|
return x / rms * self.scale |
|
|
|
|
|
def replace_layernorm_with_rmsnorm(module: nn.Module): |
|
|
for name, child in list(module.named_children()): |
|
|
if isinstance(child, nn.LayerNorm): |
|
|
dim = child.normalized_shape[0] if isinstance(child.normalized_shape, (tuple, list)) else child.normalized_shape |
|
|
rms = RMSNorm(dim=dim, eps=1e-6) |
|
|
setattr(module, name, rms) |
|
|
else: |
|
|
replace_layernorm_with_rmsnorm(child) |
|
|
|
|
|
|
|
|
def fix_arabic_output(text): |
|
|
if not text: return text |
|
|
|
|
|
|
|
|
prefix_pattern = r'(^|\s)(ุงู|ูู|ูุงู|ุจุงู)\s+(?=\S)' |
|
|
text = re.sub(prefix_pattern, r'\1\2', text) |
|
|
text = re.sub(prefix_pattern, r'\1\2', text) |
|
|
|
|
|
|
|
|
punctuation_marks = r'[ุุ!.,]' |
|
|
text = re.sub(r'\s+(' + punctuation_marks + ')', r'\1', text) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
class EndpointHandler: |
|
|
def __init__(self, path=""): |
|
|
|
|
|
config = AutoConfig.from_pretrained(path) |
|
|
self.model = AutoModelForSeq2SeqLM.from_config(config) |
|
|
replace_layernorm_with_rmsnorm(self.model) |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
self.model = AutoModelForSeq2SeqLM.from_pretrained(path) |
|
|
replace_layernorm_with_rmsnorm(self.model) |
|
|
except: |
|
|
|
|
|
from safetensors.torch import load_file |
|
|
import os |
|
|
w_path = os.path.join(path, "model.safetensors") |
|
|
if os.path.exists(w_path): |
|
|
state_dict = load_file(w_path) |
|
|
else: |
|
|
state_dict = torch.load(os.path.join(path, "pytorch_model.bin"), map_location="cpu") |
|
|
|
|
|
|
|
|
self.model.load_state_dict(state_dict, strict=False) |
|
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(path) |
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
self.model.to(self.device).eval() |
|
|
|
|
|
def __call__(self, data: Any) -> List[Dict[str, Any]]: |
|
|
inputs = data.pop("inputs", data) |
|
|
if isinstance(inputs, str): inputs = [inputs] |
|
|
|
|
|
|
|
|
tokenized_inputs = self.tokenizer(inputs, return_tensors="pt", padding=True).to(self.device) |
|
|
|
|
|
|
|
|
if "token_type_ids" in tokenized_inputs: |
|
|
del tokenized_inputs["token_type_ids"] |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
generated_ids = self.model.generate( |
|
|
**tokenized_inputs, |
|
|
max_new_tokens=128, |
|
|
num_beams=5, |
|
|
early_stopping=True |
|
|
) |
|
|
|
|
|
|
|
|
decoded_outputs = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True) |
|
|
final_outputs = [fix_arabic_output(text) for text in decoded_outputs] |
|
|
|
|
|
return [{"generated_text": text} for text in final_outputs] |