from transformers import ( AutoModel, AutoTokenizer ) import torch from huggingface_hub import hf_hub_download import os import importlib.util import sys import shutil from safetensors.torch import load_model import json import re import copy class HumitTaggerModel(torch.nn.Module): # We do not need to do anything to register our class as this class will only be used # for easily getting humit-tagger worki def register_for_auto_class(auto_class): pass return # Define our own from-pretrained to load the weights and other files needed for the tagger to work def from_pretrained(repo_name, **kwargs): # Download this model's config: this_model_config_path = hf_hub_download(repo_id=repo_name, filename=kwargs["config"].humit_tagger_configuration) # load this model's config with open(this_model_config_path,"r") as js: kwargs["this_model_config"]=json.load(js) # Download this model's config: lemma_rules_path = hf_hub_download(repo_id=repo_name, filename=kwargs["config"].lemma_rules_py_file) # load lemma rules class sys.path.append(os.path.dirname(lemma_rules_path)) spec = importlib.util.spec_from_file_location("lemma_rules", lemma_rules_path) lemma_rules = importlib.util.module_from_spec(spec) sys.modules["lemma_rules"] = lemma_rules spec.loader.exec_module(lemma_rules) # Download base_model files into cache base_config_file = hf_hub_download(repo_id=kwargs["this_model_config"]["base_model"], filename=kwargs["this_model_config"]["base_model_config_file"]) base_model_file = hf_hub_download(repo_id=kwargs["this_model_config"]["base_model"], filename=kwargs["this_model_config"]["base_model_model_file"]) base_model_config_json_file = hf_hub_download(repo_id=kwargs["this_model_config"]["base_model"], filename=kwargs["this_model_config"]["base_model_config_json_file"]) # Copy base model's configuration python file into our working directory config_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)) , os.path.basename(base_config_file)) shutil.copyfile(base_config_file, config_file_path) # HACK: Modify base model main file since __init.py__ has already been read and the new file must not contain relative imports base_model_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)) , os.path.basename(base_model_file)) with open(base_model_file, 'r') as file: file_content = file.read().replace("from .", "from ") with open(base_model_file_path, 'w') as file: file.write(file_content) # Register the new files: # First register the base model config file sys.path.append(os.path.dirname(config_file_path)) spec = importlib.util.spec_from_file_location("base_config", config_file_path) base_config = importlib.util.module_from_spec(spec) sys.modules["base_config"] = base_config spec.loader.exec_module(base_config) # Then register the base model file sys.path.append(os.path.dirname(base_model_file_path)) spec = importlib.util.spec_from_file_location("base_model", base_model_file_path) base_model = importlib.util.module_from_spec(spec) sys.modules["base_model"] = base_model spec.loader.exec_module(base_model) # Download model weights model_weights_path = hf_hub_download(repo_id=repo_name, filename=kwargs["this_model_config"]["model_weights"]) # load base model config with open(base_model_config_json_file,"r") as js: kwargs["base_model_json_cfg"] = json.load(js) kwargs["model_weights_path"] = model_weights_path kwargs["repo_name"] = repo_name return HumitTaggerModel(**kwargs) def __init__(self, **kwargs ): super(HumitTaggerModel, self).__init__() json_cfg = kwargs["base_model_json_cfg"] self.config=kwargs["this_model_config"] self.LemmaHandling = sys.modules["lemma_rules"].LemmaHandling self.LemmaHandling.load_lemma_rules_from_obj(self.config["lemma_rules"]) cfg=sys.modules["base_config"].NorbertConfig(**json_cfg) self.bert=sys.modules["base_model"].NorbertModel(cfg, pooling_type="CLS") self.dropout = torch.nn.Dropout(self.bert.config.hidden_dropout_prob) self.classifier1 = torch.nn.Linear(self.bert.config.hidden_size, self.config["num_labels1"]) self.classifier2 = torch.nn.Linear(self.bert.config.hidden_size, self.config["num_labels2"]) self.classifier3 = torch.nn.Linear(self.bert.config.hidden_size, self.config["num_labels3"]) self.seq_classifier = torch.nn.Linear(self.bert.config.hidden_size, self.config["num_labels_seq"]) self.ignore_index = self.config["ignore_index"] load_model(self, kwargs["model_weights_path"]) self.tokenizer=AutoTokenizer.from_pretrained(kwargs["repo_name"]) if "batch_size" in kwargs: self.batch_size=kwargs["batch_size"] else: self.batch_size=8 if "device" in kwargs: self.device = torch.device(kwargs["device"]) else: self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.MAX_LENGTH_WITHOUT_CLS = self.bert.config.max_position_embeddings -1 self.tags=self.config["tags"] self.tags_str=[[" ".join(i) for i in self.config["tags"][0]], [" ".join(i) for i in self.config["tags"][1]]] self.to(self.device) self.REPLACE_DICT = self.config["replace_dict"] self.REPLACE_PATTERN = '|'.join(sorted(re.escape(k) for k in self.REPLACE_DICT)) self.MAX_LENGTH = self.bert.config.max_position_embeddings def forward(self, input_ids=None, attention_mask=None ): outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, return_dict=True ) sequence_output = self.dropout(outputs.last_hidden_state) logits1 = self.classifier1(sequence_output) logits2 = self.classifier2(sequence_output) logits3 = self.classifier3(sequence_output) seq_logits = self.seq_classifier(sequence_output) total_loss = 0 return { "logits1": logits1, "logits2": logits2, "logits3": logits3, "seq_logits": seq_logits, } def _preprocess_text(self,text): new_text = re.sub(self.REPLACE_PATTERN, lambda m: self.REPLACE_DICT.get(m.group(0).upper()), text) while new_text != text: text = new_text new_text = re.sub(self.REPLACE_PATTERN, lambda m: self.REPLACE_DICT.get(m.group(0).upper()), text) return new_text def _batchify(self, lst): # Create batches batched_sentences=[] my_batch=[] for sentence in lst: sentence.append(self.tokenizer.sep_token_id) my_batch.append(sentence) if len(my_batch)==self.batch_size: max_len=len(max(my_batch, key=len)) if max_len > self.MAX_LENGTH: max_len = self.MAX_LENGTH my_attentions=torch.LongTensor([[1] * len(i[0:max_len]) + [0]*(max_len-len(i[0:max_len])) for i in my_batch]).to("cpu") my_batch=[i[0:max_len] + [0]*(max_len-len(i[0:max_len])) for i in my_batch] to_append={ "input_ids": torch.LongTensor(my_batch).to("cpu"), "attention_mask": my_attentions, } batched_sentences.append(to_append) my_batch=[] if len(my_batch)>0: max_len=len(max(my_batch, key=len)) if max_len > self.MAX_LENGTH: max_len = self.MAX_LENGTH my_attentions=torch.LongTensor([[1] * len(i[0:max_len]) + [0]*(max_len-len(i[0:max_len])) for i in my_batch]).to("cpu") my_batch=[i[0:max_len] + [0]*(max_len-len(i[0:max_len])) for i in my_batch] to_append={ "input_ids": torch.LongTensor(my_batch).to("cpu"), "attention_mask": my_attentions, } batched_sentences.append(to_append) torch.cuda.empty_cache() return batched_sentences def _split_sentences(self, inp): # Here we get the whole text tokenized. encodings = self.tokenizer(inp,add_special_tokens=False, return_tensors="pt").to(self.device) # Save a copy of the tokenization original_encodings=copy.deepcopy(encodings) original_encodings=original_encodings.to("cpu") torch.cuda.empty_cache() # Pad to the complete size (model max_size -1 (-1 to add CLS)) old_size=encodings["input_ids"][0].size()[0] # Pad size pad_size=self.MAX_LENGTH_WITHOUT_CLS - old_size % self.MAX_LENGTH_WITHOUT_CLS # Number of rows row_count=int(old_size/self.MAX_LENGTH_WITHOUT_CLS) + 1 # Do padding with pad_id to the pad_size that we have calculated. encodings["input_ids"] = torch.nn.functional.pad(input=encodings["input_ids"], pad=(0, pad_size), mode="constant", value=self.tokenizer.pad_token_id) # Set the last token as SENTENCE END (SEP) encodings["input_ids"][0][old_size]=self.tokenizer.sep_token_id # Chunk into max_length items encodings["input_ids"]=torch.reshape(encodings["input_ids"],(row_count,self.MAX_LENGTH_WITHOUT_CLS)) # Add CLS to each item encodings["input_ids"]=torch.cat(( torch.full((row_count,1), self.tokenizer.cls_token_id, device=self.device) ,encodings["input_ids"]),dim=1) # Create attention mask encodings["attention_mask"]=torch.ones_like(encodings["input_ids"], device=self.device) # Create batches input_ids_batched=torch.split(encodings["input_ids"], self.batch_size) attention_mask_batched=torch.split(encodings["attention_mask"], self.batch_size) # Set the last chunk's attention mask according to its size attention_mask_batched[-1][-1][pad_size +1:] = 0 encodings=encodings.to("cpu") # Now pass all chunks through the model and get the labels # While passing, we count the number of bokmal and nynorsk markers labels_output=[] # First get them back to CPU to open space on GPU input_ids_batched=[i.to("cpu") for i in input_ids_batched] attention_mask_batched=[i.to("cpu") for i in attention_mask_batched] torch.cuda.empty_cache() for input_ids, attention_masks in zip(input_ids_batched, attention_mask_batched): current_batch={"input_ids":input_ids.to(self.device).long(), "attention_mask":attention_masks.to(self.device).long()} outputs = self(**current_batch) del current_batch torch.cuda.empty_cache() label_data=outputs["logits1"].argmax(-1) labels_output.extend(label_data) # Serialize back labels_output=torch.stack(labels_output ,dim=0) labels_output=labels_output[:, range(1,self.MAX_LENGTH)] labels_output=torch.reshape(labels_output,(1,row_count * self.MAX_LENGTH_WITHOUT_CLS)) torch.cuda.empty_cache() # Now the data is split into sentences # So, now create sentence data as list so that this could be used # in torch operations and can be input to the models sentence_list=[] this_sentence=[self.tokenizer.cls_token_id] for token, label in zip(original_encodings["input_ids"][0].tolist(), labels_output[0].tolist()): if label==0: this_sentence.append(token) else: this_sentence.append(token) sentence_list.append(this_sentence) this_sentence=[self.tokenizer.cls_token_id] if len(this_sentence)>1: sentence_list.append(this_sentence) del original_encodings del labels_output del attention_mask_batched del input_ids_batched del encodings del old_size del inp del outputs torch.cuda.empty_cache() return sentence_list def _matcher(self, o): return o.group(0)[0] + "\n\n" + o.group(0)[2] def split_sentences(self, inp, **tag_config): inp = [i.replace("\n"," ") for i in re.sub(r"[^.!\?](\n)([^a-z,æ,ø,å,\\ ])", self._matcher, inp).split("\n\n")] sentences = [] for i in inp: sentences.extend(self._split_sentences(i.strip())) return sentences def tag_sentence_list(self, lst, **tag_config): # If the sentences are not tokenized, tokenize while batching: tokenized_batches = [] if type(lst[0])==str: tokenized_batches = [] for i in range(0, len(lst), self.batch_size): batch_texts = lst[i:i + self.batch_size] encoded_batch = self.tokenizer(batch_texts, padding=True, truncation=True, max_length=self.MAX_LENGTH, return_tensors="pt", return_token_type_ids=False) encoded_batch["input_ids"].to("cpu") encoded_batch["attention_mask"].to("cpu") tokenized_batches.append(encoded_batch) # sentences are already tokenized, then batchify them: else: tokenized_batches = self._batchify(lst) # If language will be identified per sentence if tag_config["lang_per_sentence"]: id_to_lang = self.config["id_to_lang"] # If the output will be to a python list if tag_config["write_output_to"]==None: all_tagged_sentences = [] for batch in tokenized_batches: all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) batch_tags = torch.argmax(all_out["logits2"], dim=-1) batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) batch_langs = torch.argmax(all_out["seq_logits"], dim=-1) batch["input_ids"].to("cpu") batch["attention_mask"].to("cpu") for input_ids, tags, lemmas, lang in zip(batch["input_ids"].tolist(), batch_tags.tolist(), batch_lemmas.tolist(), batch_langs[:, 0].tolist()): this_sentence=[] for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: break if lemma == 0: # If there is no lemma here, that means we haven't reached the end of the word if len(this_sentence)>0: this_sentence[-1]["w"] += self.tokenizer.decode(inps) else: this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) else: this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) all_tagged_sentences.append({"lang":id_to_lang[lang], "sent": [ {"w":i["w"], "t":self.tags[lang][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence]}) return all_tagged_sentences # If the output is in TSV format to a pipe (stdout or a file handle) elif tag_config["output_tsv"]: for batch in tokenized_batches: all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) batch_tags = torch.argmax(all_out["logits2"], dim=-1) batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) batch_langs = torch.argmax(all_out["seq_logits"], dim=-1) batch["input_ids"].to("cpu") batch["attention_mask"].to("cpu") for input_ids, tags, lemmas, lang in zip(batch["input_ids"].tolist(), batch_tags.tolist(), batch_lemmas.tolist(), batch_langs[:, 0].tolist()): this_sentence=[] for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: break if lemma == 0: # If there is no lemma here, that means we haven't reached the end of the word if len(this_sentence)>0: this_sentence[-1]["w"] += self.tokenizer.decode(inps) else: this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) else: this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) this_sentence=[ {"w":i["w"], "t":self.tags_str[lang][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence] tag_config["write_output_to"].write(id_to_lang[lang]) for lin in this_sentence: tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(lin["w"]) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(lin["l"]) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(lin["t"]) tag_config["write_output_to"].write("\n") tag_config["write_output_to"].write("\n") # If output format will be json to a pipe (stdout or a file handle) else: for batch in tokenized_batches: all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) batch_tags = torch.argmax(all_out["logits2"], dim=-1) batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) batch_langs = torch.argmax(all_out["seq_logits"], dim=-1) batch["input_ids"].to("cpu") batch["attention_mask"].to("cpu") for input_ids, tags, lemmas, lang in zip(batch["input_ids"].tolist(), batch_tags.tolist(), batch_lemmas.tolist(), batch_langs[:, 0].tolist()): this_sentence=[] for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: break if lemma == 0: # If there is no lemma here, that means we haven't reached the end of the word if len(this_sentence)>0: this_sentence[-1]["w"] += self.tokenizer.decode(inps) else: this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) else: this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) json.dump({"lang":id_to_lang[lang], "sent":[ {"w":i["w"], "t":self.tags[lang][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence]}, tag_config["write_output_to"]) tag_config["write_output_to"].write("\n") # If the language is set as parameter elif tag_config["lang"] != -1: LANG = tag_config["lang"] LANG_STR = self.config["id_to_lang"][LANG] # If the output will be to a python list if tag_config["write_output_to"]==None: all_tagged_sentences = [] for batch in tokenized_batches: all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) batch_tags = torch.argmax(all_out["logits2"], dim=-1) batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) batch["input_ids"].to("cpu") batch["attention_mask"].to("cpu") for input_ids, tags, lemmas in zip(batch["input_ids"].tolist(), batch_tags.tolist(), batch_lemmas.tolist()): this_sentence=[] for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: break if lemma == 0: # If there is no lemma here, that means we haven't reached the end of the word if len(this_sentence)>0: this_sentence[-1]["w"] += self.tokenizer.decode(inps) else: this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) else: this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) all_tagged_sentences.append({"lang":LANG_STR, "sent": [ {"w":i["w"], "t":self.tags[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence]}) return all_tagged_sentences # If the output is in TSV format to a pipe (stdout or a file handle) elif tag_config["output_tsv"]: for batch in tokenized_batches: all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) batch_tags = torch.argmax(all_out["logits2"], dim=-1) batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) batch["input_ids"].to("cpu") batch["attention_mask"].to("cpu") for input_ids, tags, lemmas in zip(batch["input_ids"].tolist(), batch_tags.tolist(), batch_lemmas.tolist()): this_sentence=[] for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: break if lemma == 0: # If there is no lemma here, that means we haven't reached the end of the word if len(this_sentence)>0: this_sentence[-1]["w"] += self.tokenizer.decode(inps) else: this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) else: this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) this_sentence=[ {"w":i["w"], "t":self.tags_str[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence] tag_config["write_output_to"].write(LANG_STR) for lin in this_sentence: tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(lin["w"]) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(lin["l"]) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(lin["t"]) tag_config["write_output_to"].write("\n") tag_config["write_output_to"].write("\n") # If output format will be json to a pipe (stdout or a file handle) else: for batch in tokenized_batches: all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) batch_tags = torch.argmax(all_out["logits2"], dim=-1) batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) batch["input_ids"].to("cpu") batch["attention_mask"].to("cpu") for input_ids, tags, lemmas in zip(batch["input_ids"].tolist(), batch_tags.tolist(), batch_lemmas.tolist()): this_sentence=[] for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: break if lemma == 0: # If there is no lemma here, that means we haven't reached the end of the word if len(this_sentence)>0: this_sentence[-1]["w"] += self.tokenizer.decode(inps) else: this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) else: this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) json.dump({"lang":LANG_STR, "sent": [ {"w":i["w"], "t":self.tags[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence]}, tag_config["write_output_to"]) tag_config["write_output_to"].write("\n") # If language will be identified according to the majority of all sentences: else: all_tags=[] all_lemmas=[] all_langs=[] all_input_ids=[] # Go over all batches and each sentence in each batch for batch in tokenized_batches: all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) batch_tags = torch.argmax(all_out["logits2"], dim=-1) batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) batch_langs = torch.argmax(all_out["seq_logits"], dim=-1) all_input_ids.extend(batch["input_ids"].tolist()) batch["input_ids"].to("cpu") batch["attention_mask"].to("cpu") all_langs.extend(batch_langs[:, 0].tolist()) all_tags.extend(batch_tags.tolist()) all_lemmas.extend(batch_lemmas.tolist()) # Identify the language tag_config["lang"] = 1 if sum(all_langs)/len(all_langs)>=0.5 else 0 LANG = tag_config["lang"] LANG_STR = self.config["id_to_lang"][LANG] # If the output will be returned as python list: if tag_config["write_output_to"]==None: all_tagged_sentences = [] for input_ids, tags, lemmas in zip(all_input_ids, all_tags, all_lemmas): this_sentence=[] for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: break if lemma == 0: # If there is no lemma here, that means we haven't reached the end of the word if len(this_sentence)>0: this_sentence[-1]["w"] += self.tokenizer.decode(inps) else: this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) else: this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) all_tagged_sentences.append({"lang":LANG_STR, "sent": [ {"w":i["w"], "t":self.tags[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence] }) return all_tagged_sentences # If the output is in TSV format elif tag_config["output_tsv"]: for input_ids, tags, lemmas in zip(all_input_ids, all_tags, all_lemmas): this_sentence=[] for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: break if lemma == 0: # If there is no lemma here, that means we haven't reached the end of the word if len(this_sentence)>0: this_sentence[-1]["w"] += self.tokenizer.decode(inps) else: this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) else: this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) this_sentence=[ {"w":i["w"], "t":self.tags_str[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence] tag_config["write_output_to"].write(LANG_STR) for lin in this_sentence: tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(lin["w"]) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(lin["l"]) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(lin["t"]) tag_config["write_output_to"].write("\n") tag_config["write_output_to"].write("\n") # If output format will be json else: for input_ids, tags, lemmas in zip(all_input_ids, all_tags, all_lemmas): this_sentence=[] for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: break if lemma == 0: # If there is no lemma here, that means we haven't reached the end of the word if len(this_sentence)>0: this_sentence[-1]["w"] += self.tokenizer.decode(inps) else: this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) else: this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) json.dump({"lang":LANG_STR, "sent":[ {"w":i["w"], "t":self.tags[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence]}, tag_config["write_output_to"]) tag_config["write_output_to"].write("\n") def _check_if_text_file_and_return_content(self, filepath): try: with open(filepath, 'r') as f: return f.read() except Exception as e: return False @torch.no_grad() def tag(self, inp=None, **tag_config): self.eval() if "one_sentence_per_line" not in tag_config: tag_config["one_sentence_per_line"]=False if "lang" not in tag_config: tag_config["lang"]=-1 else: if tag_config["lang"] in self.config["lang_to_id"]: tag_config["lang"] = self.config["lang_to_id"][tag_config["lang"]] else: tag_config["lang"]=-1 if "output_tsv" not in tag_config: tag_config["output_tsv"] = False if "lang_per_sentence" not in tag_config: tag_config["lang_per_sentence"] = False elif tag_config["lang_per_sentence"]: tag_config["lang_per_sentence"] = True if tag_config["lang"]!=-1 and tag_config["lang_per_sentence"]: raise ValueError("lang_per_sentence and lang parameters cannot be set at the same time. ") if "input_directory" in tag_config: if not "output_directory" in tag_config: raise ValueError("output_directory must be defined if input_directory is defined. ") if "write_output_to" in tag_config and tag_config["write_output_to"]!=None: raise ValueError("If an input and output directory is given, then write_output_to cannot be used as the output will be written to as files in output_directory.") write_to = sys.stderr if not sys.stderr.closed else sys.stdout if not sys.stdout.closed else open("tag.log","w") # Process directory for dir_path, _, files in os.walk(tag_config["input_directory"]): for f in files: input_path = os.path.join(dir_path, f) out_path = os.path.join(tag_config["output_directory"], os.path.relpath(dir_path, tag_config["input_directory"]), f+".tagged") file_content=self._check_if_text_file_and_return_content(input_path) if type(file_content)==str: file_content=self._preprocess_text(file_content) print (f"Tagging {input_path} to {out_path}.") os.makedirs(os.path.dirname(out_path), exist_ok=True) if tag_config["one_sentence_per_line"]: inp = [i for i in file_content.split("\n") if i!=""] inp = [i for i in inp if i!=""] with open(out_path, "w") as opened_file: tag_config["write_output_to"] = opened_file self.tag_sentence_list(inp, **tag_config) else: inp = self.split_sentences(file_content, **tag_config) with open(out_path, "w") as opened_file: tag_config["write_output_to"] = opened_file self.tag_sentence_list(inp, **tag_config) else: print (f"Could not properly open and read {input_path}.") write_to.close() return else: if "write_output_to" not in tag_config or "write_output_to" in tag_config and tag_config["write_output_to"]== None: tag_config["write_output_to"] = sys.stdout elif type(tag_config["write_output_to"]) == str and tag_config["write_output_to"]=="list": tag_config["write_output_to"] = None elif type(tag_config["write_output_to"]) == str: tag_config["write_output_to"] = open(tag_config["write_output_to"], "w") if inp==None: pass elif type(inp) == str: # Tag one sentence per line in a string if tag_config["one_sentence_per_line"]: inp = [i for i in inp.split("\n") if i!=""] inp = [self._preprocess_text(i) for i in inp if i!=""] return self.tag_sentence_list(inp, **tag_config) # identify sentences inp = self.split_sentences(inp, **tag_config) return self.tag_sentence_list(inp, **tag_config) # Tag one sentence per list item elif type(inp) == list: inp=[i.strip() for i in inp] inp=[self._preprocess_text(i) for i in inp if i!=""] return self.tag_sentence_list(inp, **tag_config) def identify_language_sentence_list(self, lst, **tag_config): # If the sentences are not tokenized, tokenize while batching: tokenized_batches = [] if type(lst[0])==str: tokenized_batches = [] for i in range(0, len(lst), self.batch_size): batch_texts = lst[i:i + self.batch_size] encoded_batch = self.tokenizer(batch_texts, padding=True, truncation=True, max_length=self.MAX_LENGTH, return_tensors="pt", return_token_type_ids=False) encoded_batch["input_ids"].to("cpu") encoded_batch["attention_mask"].to("cpu") tokenized_batches.append(encoded_batch) # sentences are already tokenized, then batchify them: else: tokenized_batches = self._batchify(lst) all_tagged_sentences = [] # Go over all batches and each sentence in each batch for batch in tokenized_batches: all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) batch_langs = torch.argmax(all_out["seq_logits"], dim=-1) batch["input_ids"].to("cpu") batch["attention_mask"].to("cpu") all_tagged_sentences.extend(batch_langs[:, 0].tolist()) # If language will be identified per item if tag_config["lang_per_item"]: return [self.config["id_to_lang"][i] for i in all_tagged_sentences] # If language will be identified according to the majority of all sentences: else: LANG = 1 if sum(all_tagged_sentences)/len(all_tagged_sentences)>=0.5 else 0 LANG_STR = self.config["id_to_lang"][LANG] return [LANG_STR] * len(lst) @torch.no_grad() def identify_language(self, inp=None, **tag_config): self.eval() if "one_sentence_per_line" not in tag_config: tag_config["one_sentence_per_line"]=False if "lang" in tag_config: del tag_config["lang"] if "output_tsv" not in tag_config: tag_config["output_tsv"] = False if "lang_per_sentence" not in tag_config: tag_config["lang_per_sentence"] = False elif tag_config["lang_per_sentence"]: tag_config["lang_per_sentence"] = True if "input_directory" in tag_config and "output_directory" in tag_config and "write_output_to" in tag_config and tag_config["write_output_to"]!=None: raise ValueError("If an input and output directory is given, then write_output_to cannot be used as the output will be written to as files in output_directory.") if "write_output_to" not in tag_config or "write_output_to" in tag_config and tag_config["write_output_to"]== None: tag_config["write_output_to"] = sys.stdout elif type(tag_config["write_output_to"]) == str and tag_config["write_output_to"]=="list": if tag_config["output_tsv"]: raise ValueError("write_output_to cannot be set to list if output_tsv is set.") if "output_directory" in tag_config and tag_config["output_directory"]: raise ValueError("write_output_to cannot be set to list if output_directory is set.") tag_config["write_output_to"] = None elif type(tag_config["write_output_to"]) == str: tag_config["write_output_to"] = open(tag_config["write_output_to"], "w") if "output_directory" in tag_config: tag_config["write_output_to"] = None if "split_sentences" not in tag_config: tag_config["split_sentences"] = False if "lang_per_item" not in tag_config: tag_config["lang_per_item"] = False if "fast_mode" in tag_config: if "input_directory" not in tag_config: raise ValueError("input_directory must be defined if fast_mode is set.") if tag_config["split_sentences"]: raise ValueError("fast_mode does not split sentences, so split_sentences cannot be set in this mode.") if tag_config["lang_per_item"]: raise ValueError("fast_mode does not identify languages of each line or sentence in a file, so lang_per_item cannot be set in this mode.") if tag_config["lang_per_sentence"]: raise ValueError("fast_mode does not identify languages of sentence in a file, so lang_per_sentence cannot be set in this mode.") general_output=[] file_names=[] contents=[] # Process directory for dir_path, _, files in os.walk(tag_config["input_directory"]): for f in files: input_path = os.path.join(dir_path, f) if len(file_names) == self.batch_size: batch = self.tokenizer(contents, padding=True, truncation=True, max_length=self.MAX_LENGTH, return_tensors="pt", return_token_type_ids=False) langs = torch.argmax( self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device))["seq_logits"], dim=-1)[:, 0].tolist() del batch torch.cuda.empty_cache() if tag_config["write_output_to"]==None: general_output.extend([{"f":i[0], "l":self.config["id_to_lang"][i[1]]} for i in zip(file_names, langs)]) elif tag_config["output_tsv"]: for fil,lan in zip(file_names, langs): tag_config["write_output_to"].write(fil) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(self.config["id_to_lang"][lan]) tag_config["write_output_to"].write("\n") else: for fil,lan in zip(file_names, langs): json.dump({"f":fil, "l":self.config["id_to_lang"][lan]}) file_names=[] contents=[] else: content=None try: with open(input_path,"r") as ff: content=ff.read(3000).replace("\n"," ").replace("\r","") except: pass if content!=None: file_names.append(input_path) contents.append(content) if len(file_names)>0: batch = self.tokenizer(contents, padding=True, truncation=True, max_length=self.MAX_LENGTH, return_tensors="pt", return_token_type_ids=False) langs = torch.argmax( self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device))["seq_logits"], dim=-1)[:, 0].tolist() del batch torch.cuda.empty_cache() if tag_config["write_output_to"]==None: general_output.extend([{"f":i[0], "l":self.config["id_to_lang"][i[1]]} for i in zip(file_names, langs)]) elif tag_config["output_tsv"]: for fil,lan in zip(file_names, langs): tag_config["write_output_to"].write(fil) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(self.config["id_to_lang"][lan]) tag_config["write_output_to"].write("\n") else: for fil,lan in zip(file_names, langs): json.dump({"f":fil, "l":self.config["id_to_lang"][lan]}) return general_output if len(general_output)>0 else None if "input_directory" in tag_config: general_output=[] # Process directory for dir_path, _, files in os.walk(tag_config["input_directory"]): for f in files: input_path = os.path.join(dir_path, f) file_content=self._check_if_text_file_and_return_content(input_path) if type(file_content)==str: file_content=self._preprocess_text(file_content) new_inp=None if tag_config["one_sentence_per_line"]: inp = [i for i in file_content.split("\n") if i!=""] inp = [i for i in inp if i!=""] out = self.identify_language_sentence_list(inp, **tag_config) else: inp = self.split_sentences(file_content, **tag_config) out = self.identify_language_sentence_list(inp, **tag_config) new_inp=[self.tokenizer.decode(i[1:]).split("[SEP]")[0].strip() for i in inp] if new_inp!=None: inp=new_inp # If no output pipe is available than write to if tag_config["write_output_to"]==None: if "output_directory" in tag_config: out_path = os.path.join(tag_config["output_directory"], os.path.relpath(dir_path, tag_config["input_directory"]), f+".lang") os.makedirs(os.path.dirname(out_path), exist_ok=True) with open(out_path, "w") as opened_file: if tag_config["lang_per_sentence"]: if tag_config["output_tsv"]: for sen,lan in zip(inp, out): opened_file.write(sen) opened_file.write("\t") opened_file.write(lan) opened_file.write("\n") else: json.dump([{"s":sen, "l":lan} for sen,lan in zip(inp, out) ] , opened_file) else: if tag_config["output_tsv"]: opened_file.write(out[0]) else: json.dump({"l":out[0]} , opened_file) else: if tag_config["lang_per_sentence"]: general_output.extend([{"s":sen, "l":lan} for sen,lan in zip(inp, out) ]) else: general_output.append({"f":input_path, "l":out[0]}) # If there is an opened pipe already else: if tag_config["lang_per_sentence"]: if tag_config["output_tsv"]: for sen,lan in zip(inp, out): tag_config["write_output_to"].write(sen) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(lan) tag_config["write_output_to"].write("\n") tag_config["write_output_to"].write("\n") else: json.dump([{"s":sen, "l":lan} for sen,lan in zip(inp, out) ] , tag_config["write_output_to"]) tag_config["write_output_to"].write("\n") else: if tag_config["output_tsv"]: tag_config["write_output_to"].write(input_path) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(out[0]) tag_config["write_output_to"].write("\n") else: json.dump({"f":input_path, "l":out[0]} , tag_config["write_output_to"]) tag_config["write_output_to"].write("\n") else: if tag_config["output_tsv"]: tag_config["write_output_to"].write(input_path) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write("err") tag_config["write_output_to"].write("\n") else: json.dump({"f":input_path, "l":"err"} , tag_config["write_output_to"]) tag_config["write_output_to"].write("\n") if tag_config["write_output_to"] and tag_config["write_output_to"]!=sys.stdout and tag_config["write_output_to"]!=sys.stderr: tag_config["write_output_to"].close() return general_output if len(general_output)>0 else None if inp==None: pass elif type(inp) == str: new_inp=None # if split sentences is set if tag_config["split_sentences"]: inp = self._preprocess_text(inp) inp = self.split_sentences(inp, **tag_config) new_inp=[self.tokenizer.decode(i[1:]).strip() for i in inp] if tag_config["lang_per_sentence"]: tag_config["lang_per_item"] = True # if tag one sentence per line in a string elif tag_config["one_sentence_per_line"]: inp = [i for i in inp.split("\n") if i!=""] inp = [self._preprocess_text(i) for i in inp if i!=""] if tag_config["lang_per_sentence"]: tag_config["lang_per_item"] = True # Otherwise identify the language of the input string as a whole else: inp = [self._preprocess_text(inp)] # Identify language out = self.identify_language_sentence_list(inp, **tag_config) if new_inp!=None: inp=new_inp # If return as list if tag_config["write_output_to"]==None: return [{"s":i[0], "l": i[1]} for i in zip(inp, out)] if tag_config["output_tsv"]: for sen,lan in zip(inp, out): tag_config["write_output_to"].write(sen) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(out) tag_config["write_output_to"].write("\n") else: json.dump([{"s":sen, "l":lan} for sen,lan in zip(inp, out) ] , tag_config["write_output_to"]) return # Tag one sentence per list item elif type(inp) == list: inp=[i.strip() for i in inp] inp=[self._preprocess_text(i) for i in inp if i!=""] out = self.identify_language_sentence_list(inp, **tag_config) # If return as list if tag_config["write_output_to"]==None: return [{"s":i[0], "l": i[1]} for i in zip(inp, out)] if tag_config["output_tsv"]: for sen,lan in zip(inp, out): tag_config["write_output_to"].write(sen) tag_config["write_output_to"].write("\t") tag_config["write_output_to"].write(lan) tag_config["write_output_to"].write("\n") else: json.dump([{"s":sen, "l":lan} for sen,lan in zip(inp, out) ] , tag_config["write_output_to"]) return