|
|
from transformers import ( |
|
|
AutoModel, |
|
|
AutoTokenizer |
|
|
) |
|
|
import torch |
|
|
from huggingface_hub import hf_hub_download |
|
|
import os |
|
|
import importlib.util |
|
|
import sys |
|
|
import shutil |
|
|
from safetensors.torch import load_model |
|
|
import json |
|
|
import re |
|
|
import copy |
|
|
|
|
|
class HumitTaggerModel(torch.nn.Module): |
|
|
|
|
|
|
|
|
|
|
|
def register_for_auto_class(auto_class): |
|
|
pass |
|
|
return |
|
|
|
|
|
|
|
|
def from_pretrained(repo_name, **kwargs): |
|
|
|
|
|
|
|
|
this_model_config_path = hf_hub_download(repo_id=repo_name, filename=kwargs["config"].humit_tagger_configuration) |
|
|
|
|
|
|
|
|
with open(this_model_config_path,"r") as js: |
|
|
kwargs["this_model_config"]=json.load(js) |
|
|
|
|
|
|
|
|
|
|
|
lemma_rules_path = hf_hub_download(repo_id=repo_name, filename=kwargs["config"].lemma_rules_py_file) |
|
|
|
|
|
|
|
|
sys.path.append(os.path.dirname(lemma_rules_path)) |
|
|
spec = importlib.util.spec_from_file_location("lemma_rules", lemma_rules_path) |
|
|
lemma_rules = importlib.util.module_from_spec(spec) |
|
|
sys.modules["lemma_rules"] = lemma_rules |
|
|
spec.loader.exec_module(lemma_rules) |
|
|
|
|
|
|
|
|
base_config_file = hf_hub_download(repo_id=kwargs["this_model_config"]["base_model"], filename=kwargs["this_model_config"]["base_model_config_file"]) |
|
|
base_model_file = hf_hub_download(repo_id=kwargs["this_model_config"]["base_model"], filename=kwargs["this_model_config"]["base_model_model_file"]) |
|
|
base_model_config_json_file = hf_hub_download(repo_id=kwargs["this_model_config"]["base_model"], filename=kwargs["this_model_config"]["base_model_config_json_file"]) |
|
|
|
|
|
|
|
|
config_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)) , os.path.basename(base_config_file)) |
|
|
shutil.copyfile(base_config_file, config_file_path) |
|
|
|
|
|
|
|
|
base_model_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)) , os.path.basename(base_model_file)) |
|
|
with open(base_model_file, 'r') as file: |
|
|
file_content = file.read().replace("from .", "from ") |
|
|
with open(base_model_file_path, 'w') as file: |
|
|
file.write(file_content) |
|
|
|
|
|
|
|
|
|
|
|
sys.path.append(os.path.dirname(config_file_path)) |
|
|
spec = importlib.util.spec_from_file_location("base_config", config_file_path) |
|
|
base_config = importlib.util.module_from_spec(spec) |
|
|
sys.modules["base_config"] = base_config |
|
|
spec.loader.exec_module(base_config) |
|
|
|
|
|
sys.path.append(os.path.dirname(base_model_file_path)) |
|
|
spec = importlib.util.spec_from_file_location("base_model", base_model_file_path) |
|
|
base_model = importlib.util.module_from_spec(spec) |
|
|
sys.modules["base_model"] = base_model |
|
|
spec.loader.exec_module(base_model) |
|
|
|
|
|
|
|
|
model_weights_path = hf_hub_download(repo_id=repo_name, filename=kwargs["this_model_config"]["model_weights"]) |
|
|
|
|
|
|
|
|
with open(base_model_config_json_file,"r") as js: |
|
|
kwargs["base_model_json_cfg"] = json.load(js) |
|
|
|
|
|
kwargs["model_weights_path"] = model_weights_path |
|
|
kwargs["repo_name"] = repo_name |
|
|
return HumitTaggerModel(**kwargs) |
|
|
|
|
|
def __init__(self, **kwargs ): |
|
|
super(HumitTaggerModel, self).__init__() |
|
|
json_cfg = kwargs["base_model_json_cfg"] |
|
|
self.config=kwargs["this_model_config"] |
|
|
self.LemmaHandling = sys.modules["lemma_rules"].LemmaHandling |
|
|
self.LemmaHandling.load_lemma_rules_from_obj(self.config["lemma_rules"]) |
|
|
cfg=sys.modules["base_config"].NorbertConfig(**json_cfg) |
|
|
self.bert=sys.modules["base_model"].NorbertModel(cfg, pooling_type="CLS") |
|
|
self.dropout = torch.nn.Dropout(self.bert.config.hidden_dropout_prob) |
|
|
self.classifier1 = torch.nn.Linear(self.bert.config.hidden_size, self.config["num_labels1"]) |
|
|
self.classifier2 = torch.nn.Linear(self.bert.config.hidden_size, self.config["num_labels2"]) |
|
|
self.classifier3 = torch.nn.Linear(self.bert.config.hidden_size, self.config["num_labels3"]) |
|
|
self.seq_classifier = torch.nn.Linear(self.bert.config.hidden_size, self.config["num_labels_seq"]) |
|
|
self.ignore_index = self.config["ignore_index"] |
|
|
load_model(self, kwargs["model_weights_path"]) |
|
|
self.tokenizer=AutoTokenizer.from_pretrained(kwargs["repo_name"]) |
|
|
if "batch_size" in kwargs: |
|
|
self.batch_size=kwargs["batch_size"] |
|
|
else: |
|
|
self.batch_size=8 |
|
|
|
|
|
if "device" in kwargs: |
|
|
self.device = torch.device(kwargs["device"]) |
|
|
else: |
|
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
self.MAX_LENGTH_WITHOUT_CLS = self.bert.config.max_position_embeddings -1 |
|
|
self.tags=self.config["tags"] |
|
|
self.tags_str=[[" ".join(i) for i in self.config["tags"][0]], [" ".join(i) for i in self.config["tags"][1]]] |
|
|
self.to(self.device) |
|
|
self.REPLACE_DICT = self.config["replace_dict"] |
|
|
self.REPLACE_PATTERN = '|'.join(sorted(re.escape(k) for k in self.REPLACE_DICT)) |
|
|
self.MAX_LENGTH = self.bert.config.max_position_embeddings |
|
|
|
|
|
def forward(self, input_ids=None, attention_mask=None ): |
|
|
outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, return_dict=True ) |
|
|
sequence_output = self.dropout(outputs.last_hidden_state) |
|
|
logits1 = self.classifier1(sequence_output) |
|
|
logits2 = self.classifier2(sequence_output) |
|
|
logits3 = self.classifier3(sequence_output) |
|
|
seq_logits = self.seq_classifier(sequence_output) |
|
|
total_loss = 0 |
|
|
return { |
|
|
"logits1": logits1, |
|
|
"logits2": logits2, |
|
|
"logits3": logits3, |
|
|
"seq_logits": seq_logits, |
|
|
} |
|
|
|
|
|
def _preprocess_text(self,text): |
|
|
new_text = re.sub(self.REPLACE_PATTERN, lambda m: self.REPLACE_DICT.get(m.group(0).upper()), text) |
|
|
while new_text != text: |
|
|
text = new_text |
|
|
new_text = re.sub(self.REPLACE_PATTERN, lambda m: self.REPLACE_DICT.get(m.group(0).upper()), text) |
|
|
return new_text |
|
|
|
|
|
def _batchify(self, lst): |
|
|
|
|
|
|
|
|
batched_sentences=[] |
|
|
my_batch=[] |
|
|
for sentence in lst: |
|
|
sentence.append(self.tokenizer.sep_token_id) |
|
|
my_batch.append(sentence) |
|
|
if len(my_batch)==self.batch_size: |
|
|
max_len=len(max(my_batch, key=len)) |
|
|
if max_len > self.MAX_LENGTH: |
|
|
max_len = self.MAX_LENGTH |
|
|
my_attentions=torch.LongTensor([[1] * len(i[0:max_len]) + [0]*(max_len-len(i[0:max_len])) for i in my_batch]).to("cpu") |
|
|
my_batch=[i[0:max_len] + [0]*(max_len-len(i[0:max_len])) for i in my_batch] |
|
|
to_append={ |
|
|
"input_ids": torch.LongTensor(my_batch).to("cpu"), |
|
|
"attention_mask": my_attentions, |
|
|
} |
|
|
batched_sentences.append(to_append) |
|
|
my_batch=[] |
|
|
if len(my_batch)>0: |
|
|
max_len=len(max(my_batch, key=len)) |
|
|
if max_len > self.MAX_LENGTH: |
|
|
max_len = self.MAX_LENGTH |
|
|
my_attentions=torch.LongTensor([[1] * len(i[0:max_len]) + [0]*(max_len-len(i[0:max_len])) for i in my_batch]).to("cpu") |
|
|
my_batch=[i[0:max_len] + [0]*(max_len-len(i[0:max_len])) for i in my_batch] |
|
|
to_append={ |
|
|
"input_ids": torch.LongTensor(my_batch).to("cpu"), |
|
|
"attention_mask": my_attentions, |
|
|
} |
|
|
batched_sentences.append(to_append) |
|
|
|
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
return batched_sentences |
|
|
|
|
|
def _split_sentences(self, inp): |
|
|
|
|
|
|
|
|
encodings = self.tokenizer(inp,add_special_tokens=False, return_tensors="pt").to(self.device) |
|
|
|
|
|
|
|
|
original_encodings=copy.deepcopy(encodings) |
|
|
original_encodings=original_encodings.to("cpu") |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
old_size=encodings["input_ids"][0].size()[0] |
|
|
|
|
|
|
|
|
pad_size=self.MAX_LENGTH_WITHOUT_CLS - old_size % self.MAX_LENGTH_WITHOUT_CLS |
|
|
|
|
|
|
|
|
row_count=int(old_size/self.MAX_LENGTH_WITHOUT_CLS) + 1 |
|
|
|
|
|
|
|
|
encodings["input_ids"] = torch.nn.functional.pad(input=encodings["input_ids"], pad=(0, pad_size), mode="constant", value=self.tokenizer.pad_token_id) |
|
|
|
|
|
|
|
|
encodings["input_ids"][0][old_size]=self.tokenizer.sep_token_id |
|
|
|
|
|
|
|
|
encodings["input_ids"]=torch.reshape(encodings["input_ids"],(row_count,self.MAX_LENGTH_WITHOUT_CLS)) |
|
|
|
|
|
|
|
|
encodings["input_ids"]=torch.cat(( torch.full((row_count,1), self.tokenizer.cls_token_id, device=self.device) ,encodings["input_ids"]),dim=1) |
|
|
|
|
|
|
|
|
encodings["attention_mask"]=torch.ones_like(encodings["input_ids"], device=self.device) |
|
|
|
|
|
|
|
|
input_ids_batched=torch.split(encodings["input_ids"], self.batch_size) |
|
|
attention_mask_batched=torch.split(encodings["attention_mask"], self.batch_size) |
|
|
|
|
|
|
|
|
attention_mask_batched[-1][-1][pad_size +1:] = 0 |
|
|
|
|
|
encodings=encodings.to("cpu") |
|
|
|
|
|
|
|
|
|
|
|
labels_output=[] |
|
|
|
|
|
|
|
|
input_ids_batched=[i.to("cpu") for i in input_ids_batched] |
|
|
attention_mask_batched=[i.to("cpu") for i in attention_mask_batched] |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
for input_ids, attention_masks in zip(input_ids_batched, attention_mask_batched): |
|
|
current_batch={"input_ids":input_ids.to(self.device).long(), "attention_mask":attention_masks.to(self.device).long()} |
|
|
outputs = self(**current_batch) |
|
|
del current_batch |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
label_data=outputs["logits1"].argmax(-1) |
|
|
labels_output.extend(label_data) |
|
|
|
|
|
|
|
|
labels_output=torch.stack(labels_output ,dim=0) |
|
|
labels_output=labels_output[:, range(1,self.MAX_LENGTH)] |
|
|
labels_output=torch.reshape(labels_output,(1,row_count * self.MAX_LENGTH_WITHOUT_CLS)) |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sentence_list=[] |
|
|
this_sentence=[self.tokenizer.cls_token_id] |
|
|
for token, label in zip(original_encodings["input_ids"][0].tolist(), labels_output[0].tolist()): |
|
|
if label==0: |
|
|
this_sentence.append(token) |
|
|
else: |
|
|
this_sentence.append(token) |
|
|
sentence_list.append(this_sentence) |
|
|
this_sentence=[self.tokenizer.cls_token_id] |
|
|
|
|
|
if len(this_sentence)>1: |
|
|
sentence_list.append(this_sentence) |
|
|
del original_encodings |
|
|
del labels_output |
|
|
del attention_mask_batched |
|
|
del input_ids_batched |
|
|
del encodings |
|
|
del old_size |
|
|
del inp |
|
|
del outputs |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
return sentence_list |
|
|
|
|
|
def _matcher(self, o): |
|
|
return o.group(0)[0] + "\n\n" + o.group(0)[2] |
|
|
|
|
|
def split_sentences(self, inp, **tag_config): |
|
|
inp = [i.replace("\n"," ") for i in re.sub(r"[^.!\?](\n)([^a-z,æ,ø,å,\\ ])", self._matcher, inp).split("\n\n")] |
|
|
sentences = [] |
|
|
for i in inp: |
|
|
sentences.extend(self._split_sentences(i.strip())) |
|
|
return sentences |
|
|
|
|
|
def tag_sentence_list(self, lst, **tag_config): |
|
|
|
|
|
|
|
|
tokenized_batches = [] |
|
|
if type(lst[0])==str: |
|
|
tokenized_batches = [] |
|
|
for i in range(0, len(lst), self.batch_size): |
|
|
batch_texts = lst[i:i + self.batch_size] |
|
|
encoded_batch = self.tokenizer(batch_texts, padding=True, truncation=True, max_length=self.MAX_LENGTH, return_tensors="pt", return_token_type_ids=False) |
|
|
encoded_batch["input_ids"].to("cpu") |
|
|
encoded_batch["attention_mask"].to("cpu") |
|
|
tokenized_batches.append(encoded_batch) |
|
|
|
|
|
|
|
|
else: |
|
|
tokenized_batches = self._batchify(lst) |
|
|
|
|
|
|
|
|
if tag_config["lang_per_sentence"]: |
|
|
id_to_lang = self.config["id_to_lang"] |
|
|
|
|
|
if tag_config["write_output_to"]==None: |
|
|
all_tagged_sentences = [] |
|
|
for batch in tokenized_batches: |
|
|
all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) |
|
|
batch_tags = torch.argmax(all_out["logits2"], dim=-1) |
|
|
batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) |
|
|
batch_langs = torch.argmax(all_out["seq_logits"], dim=-1) |
|
|
batch["input_ids"].to("cpu") |
|
|
batch["attention_mask"].to("cpu") |
|
|
|
|
|
for input_ids, tags, lemmas, lang in zip(batch["input_ids"].tolist(), batch_tags.tolist(), |
|
|
batch_lemmas.tolist(), batch_langs[:, 0].tolist()): |
|
|
this_sentence=[] |
|
|
for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): |
|
|
if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: |
|
|
break |
|
|
if lemma == 0: |
|
|
if len(this_sentence)>0: |
|
|
this_sentence[-1]["w"] += self.tokenizer.decode(inps) |
|
|
else: |
|
|
this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) |
|
|
else: |
|
|
this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) |
|
|
all_tagged_sentences.append({"lang":id_to_lang[lang], "sent": [ {"w":i["w"], "t":self.tags[lang][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence]}) |
|
|
|
|
|
return all_tagged_sentences |
|
|
|
|
|
|
|
|
elif tag_config["output_tsv"]: |
|
|
for batch in tokenized_batches: |
|
|
all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) |
|
|
batch_tags = torch.argmax(all_out["logits2"], dim=-1) |
|
|
batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) |
|
|
batch_langs = torch.argmax(all_out["seq_logits"], dim=-1) |
|
|
batch["input_ids"].to("cpu") |
|
|
batch["attention_mask"].to("cpu") |
|
|
|
|
|
for input_ids, tags, lemmas, lang in zip(batch["input_ids"].tolist(), batch_tags.tolist(), |
|
|
batch_lemmas.tolist(), batch_langs[:, 0].tolist()): |
|
|
this_sentence=[] |
|
|
for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): |
|
|
if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: |
|
|
break |
|
|
if lemma == 0: |
|
|
if len(this_sentence)>0: |
|
|
this_sentence[-1]["w"] += self.tokenizer.decode(inps) |
|
|
else: |
|
|
this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) |
|
|
else: |
|
|
this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) |
|
|
this_sentence=[ {"w":i["w"], "t":self.tags_str[lang][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence] |
|
|
tag_config["write_output_to"].write(id_to_lang[lang]) |
|
|
for lin in this_sentence: |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(lin["w"]) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(lin["l"]) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(lin["t"]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
tag_config["write_output_to"].write("\n") |
|
|
|
|
|
|
|
|
else: |
|
|
for batch in tokenized_batches: |
|
|
all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) |
|
|
batch_tags = torch.argmax(all_out["logits2"], dim=-1) |
|
|
batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) |
|
|
batch_langs = torch.argmax(all_out["seq_logits"], dim=-1) |
|
|
batch["input_ids"].to("cpu") |
|
|
batch["attention_mask"].to("cpu") |
|
|
|
|
|
for input_ids, tags, lemmas, lang in zip(batch["input_ids"].tolist(), batch_tags.tolist(), |
|
|
batch_lemmas.tolist(), batch_langs[:, 0].tolist()): |
|
|
this_sentence=[] |
|
|
for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): |
|
|
if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: |
|
|
break |
|
|
if lemma == 0: |
|
|
if len(this_sentence)>0: |
|
|
this_sentence[-1]["w"] += self.tokenizer.decode(inps) |
|
|
else: |
|
|
this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) |
|
|
else: |
|
|
this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) |
|
|
|
|
|
json.dump({"lang":id_to_lang[lang], "sent":[ {"w":i["w"], "t":self.tags[lang][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence]}, tag_config["write_output_to"]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
|
|
|
|
|
|
elif tag_config["lang"] != -1: |
|
|
LANG = tag_config["lang"] |
|
|
LANG_STR = self.config["id_to_lang"][LANG] |
|
|
|
|
|
if tag_config["write_output_to"]==None: |
|
|
all_tagged_sentences = [] |
|
|
for batch in tokenized_batches: |
|
|
all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) |
|
|
batch_tags = torch.argmax(all_out["logits2"], dim=-1) |
|
|
batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) |
|
|
batch["input_ids"].to("cpu") |
|
|
batch["attention_mask"].to("cpu") |
|
|
|
|
|
for input_ids, tags, lemmas in zip(batch["input_ids"].tolist(), batch_tags.tolist(), |
|
|
batch_lemmas.tolist()): |
|
|
this_sentence=[] |
|
|
for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): |
|
|
if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: |
|
|
break |
|
|
if lemma == 0: |
|
|
if len(this_sentence)>0: |
|
|
this_sentence[-1]["w"] += self.tokenizer.decode(inps) |
|
|
else: |
|
|
this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) |
|
|
else: |
|
|
this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) |
|
|
all_tagged_sentences.append({"lang":LANG_STR, "sent": [ {"w":i["w"], "t":self.tags[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence]}) |
|
|
|
|
|
return all_tagged_sentences |
|
|
|
|
|
|
|
|
elif tag_config["output_tsv"]: |
|
|
for batch in tokenized_batches: |
|
|
all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) |
|
|
batch_tags = torch.argmax(all_out["logits2"], dim=-1) |
|
|
batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) |
|
|
batch["input_ids"].to("cpu") |
|
|
batch["attention_mask"].to("cpu") |
|
|
|
|
|
for input_ids, tags, lemmas in zip(batch["input_ids"].tolist(), batch_tags.tolist(), |
|
|
batch_lemmas.tolist()): |
|
|
this_sentence=[] |
|
|
for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): |
|
|
if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: |
|
|
break |
|
|
if lemma == 0: |
|
|
if len(this_sentence)>0: |
|
|
this_sentence[-1]["w"] += self.tokenizer.decode(inps) |
|
|
else: |
|
|
this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) |
|
|
else: |
|
|
this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) |
|
|
this_sentence=[ {"w":i["w"], "t":self.tags_str[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence] |
|
|
tag_config["write_output_to"].write(LANG_STR) |
|
|
for lin in this_sentence: |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(lin["w"]) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(lin["l"]) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(lin["t"]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
tag_config["write_output_to"].write("\n") |
|
|
|
|
|
|
|
|
else: |
|
|
for batch in tokenized_batches: |
|
|
all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) |
|
|
batch_tags = torch.argmax(all_out["logits2"], dim=-1) |
|
|
batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) |
|
|
batch["input_ids"].to("cpu") |
|
|
batch["attention_mask"].to("cpu") |
|
|
|
|
|
for input_ids, tags, lemmas in zip(batch["input_ids"].tolist(), batch_tags.tolist(), |
|
|
batch_lemmas.tolist()): |
|
|
this_sentence=[] |
|
|
for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): |
|
|
if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: |
|
|
break |
|
|
if lemma == 0: |
|
|
if len(this_sentence)>0: |
|
|
this_sentence[-1]["w"] += self.tokenizer.decode(inps) |
|
|
else: |
|
|
this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) |
|
|
else: |
|
|
this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) |
|
|
|
|
|
json.dump({"lang":LANG_STR, "sent": [ {"w":i["w"], "t":self.tags[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence]}, tag_config["write_output_to"]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
|
|
|
|
|
|
else: |
|
|
all_tags=[] |
|
|
all_lemmas=[] |
|
|
all_langs=[] |
|
|
all_input_ids=[] |
|
|
|
|
|
for batch in tokenized_batches: |
|
|
all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) |
|
|
batch_tags = torch.argmax(all_out["logits2"], dim=-1) |
|
|
batch_lemmas = torch.argmax(all_out["logits3"], dim=-1) |
|
|
batch_langs = torch.argmax(all_out["seq_logits"], dim=-1) |
|
|
all_input_ids.extend(batch["input_ids"].tolist()) |
|
|
batch["input_ids"].to("cpu") |
|
|
batch["attention_mask"].to("cpu") |
|
|
all_langs.extend(batch_langs[:, 0].tolist()) |
|
|
all_tags.extend(batch_tags.tolist()) |
|
|
all_lemmas.extend(batch_lemmas.tolist()) |
|
|
|
|
|
|
|
|
tag_config["lang"] = 1 if sum(all_langs)/len(all_langs)>=0.5 else 0 |
|
|
LANG = tag_config["lang"] |
|
|
LANG_STR = self.config["id_to_lang"][LANG] |
|
|
|
|
|
|
|
|
if tag_config["write_output_to"]==None: |
|
|
all_tagged_sentences = [] |
|
|
for input_ids, tags, lemmas in zip(all_input_ids, all_tags, all_lemmas): |
|
|
this_sentence=[] |
|
|
for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): |
|
|
if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: |
|
|
break |
|
|
if lemma == 0: |
|
|
if len(this_sentence)>0: |
|
|
this_sentence[-1]["w"] += self.tokenizer.decode(inps) |
|
|
else: |
|
|
this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) |
|
|
else: |
|
|
this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) |
|
|
all_tagged_sentences.append({"lang":LANG_STR, "sent": [ {"w":i["w"], "t":self.tags[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence] }) |
|
|
return all_tagged_sentences |
|
|
|
|
|
|
|
|
elif tag_config["output_tsv"]: |
|
|
for input_ids, tags, lemmas in zip(all_input_ids, all_tags, all_lemmas): |
|
|
this_sentence=[] |
|
|
for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): |
|
|
if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: |
|
|
break |
|
|
if lemma == 0: |
|
|
if len(this_sentence)>0: |
|
|
this_sentence[-1]["w"] += self.tokenizer.decode(inps) |
|
|
else: |
|
|
this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) |
|
|
else: |
|
|
this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) |
|
|
this_sentence=[ {"w":i["w"], "t":self.tags_str[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence] |
|
|
tag_config["write_output_to"].write(LANG_STR) |
|
|
for lin in this_sentence: |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(lin["w"]) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(lin["l"]) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(lin["t"]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
tag_config["write_output_to"].write("\n") |
|
|
|
|
|
|
|
|
else: |
|
|
for input_ids, tags, lemmas in zip(all_input_ids, all_tags, all_lemmas): |
|
|
this_sentence=[] |
|
|
for inps, tag, lemma in zip(input_ids[1:], tags[1:], lemmas[1:]): |
|
|
if inps == self.tokenizer.sep_token_id or inps == self.tokenizer.pad_token_id: |
|
|
break |
|
|
if lemma == 0: |
|
|
if len(this_sentence)>0: |
|
|
this_sentence[-1]["w"] += self.tokenizer.decode(inps) |
|
|
else: |
|
|
this_sentence.append({"w": self.tokenizer.decode(inps), "t": tag, "l":lemma}) |
|
|
else: |
|
|
this_sentence.append({"w":self.tokenizer.decode(inps).strip(), "t":tag, "l":lemma}) |
|
|
|
|
|
json.dump({"lang":LANG_STR, "sent":[ {"w":i["w"], "t":self.tags[LANG][i["t"]], "l":self.LemmaHandling.get_lemma_given_word_and_lemma_list_index(i["w"],i["l"])} for i in this_sentence]}, tag_config["write_output_to"]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
|
|
|
def _check_if_text_file_and_return_content(self, filepath): |
|
|
try: |
|
|
with open(filepath, 'r') as f: |
|
|
return f.read() |
|
|
except Exception as e: |
|
|
return False |
|
|
|
|
|
@torch.no_grad() |
|
|
def tag(self, inp=None, **tag_config): |
|
|
self.eval() |
|
|
if "one_sentence_per_line" not in tag_config: |
|
|
tag_config["one_sentence_per_line"]=False |
|
|
|
|
|
if "lang" not in tag_config: |
|
|
tag_config["lang"]=-1 |
|
|
else: |
|
|
if tag_config["lang"] in self.config["lang_to_id"]: |
|
|
tag_config["lang"] = self.config["lang_to_id"][tag_config["lang"]] |
|
|
else: |
|
|
tag_config["lang"]=-1 |
|
|
if "output_tsv" not in tag_config: |
|
|
tag_config["output_tsv"] = False |
|
|
|
|
|
if "lang_per_sentence" not in tag_config: |
|
|
tag_config["lang_per_sentence"] = False |
|
|
|
|
|
elif tag_config["lang_per_sentence"]: |
|
|
tag_config["lang_per_sentence"] = True |
|
|
|
|
|
if tag_config["lang"]!=-1 and tag_config["lang_per_sentence"]: |
|
|
raise ValueError("lang_per_sentence and lang parameters cannot be set at the same time. ") |
|
|
|
|
|
if "input_directory" in tag_config: |
|
|
if not "output_directory" in tag_config: |
|
|
raise ValueError("output_directory must be defined if input_directory is defined. ") |
|
|
if "write_output_to" in tag_config and tag_config["write_output_to"]!=None: |
|
|
raise ValueError("If an input and output directory is given, then write_output_to cannot be used as the output will be written to as files in output_directory.") |
|
|
|
|
|
write_to = sys.stderr if not sys.stderr.closed else sys.stdout if not sys.stdout.closed else open("tag.log","w") |
|
|
|
|
|
|
|
|
for dir_path, _, files in os.walk(tag_config["input_directory"]): |
|
|
for f in files: |
|
|
input_path = os.path.join(dir_path, f) |
|
|
out_path = os.path.join(tag_config["output_directory"], os.path.relpath(dir_path, tag_config["input_directory"]), f+".tagged") |
|
|
|
|
|
file_content=self._check_if_text_file_and_return_content(input_path) |
|
|
|
|
|
if type(file_content)==str: |
|
|
file_content=self._preprocess_text(file_content) |
|
|
print (f"Tagging {input_path} to {out_path}.") |
|
|
os.makedirs(os.path.dirname(out_path), exist_ok=True) |
|
|
if tag_config["one_sentence_per_line"]: |
|
|
inp = [i for i in file_content.split("\n") if i!=""] |
|
|
inp = [i for i in inp if i!=""] |
|
|
with open(out_path, "w") as opened_file: |
|
|
tag_config["write_output_to"] = opened_file |
|
|
self.tag_sentence_list(inp, **tag_config) |
|
|
else: |
|
|
inp = self.split_sentences(file_content, **tag_config) |
|
|
with open(out_path, "w") as opened_file: |
|
|
tag_config["write_output_to"] = opened_file |
|
|
self.tag_sentence_list(inp, **tag_config) |
|
|
else: |
|
|
print (f"Could not properly open and read {input_path}.") |
|
|
|
|
|
write_to.close() |
|
|
return |
|
|
|
|
|
else: |
|
|
if "write_output_to" not in tag_config or "write_output_to" in tag_config and tag_config["write_output_to"]== None: |
|
|
tag_config["write_output_to"] = sys.stdout |
|
|
elif type(tag_config["write_output_to"]) == str and tag_config["write_output_to"]=="list": |
|
|
tag_config["write_output_to"] = None |
|
|
elif type(tag_config["write_output_to"]) == str: |
|
|
tag_config["write_output_to"] = open(tag_config["write_output_to"], "w") |
|
|
|
|
|
if inp==None: |
|
|
pass |
|
|
elif type(inp) == str: |
|
|
|
|
|
|
|
|
if tag_config["one_sentence_per_line"]: |
|
|
inp = [i for i in inp.split("\n") if i!=""] |
|
|
inp = [self._preprocess_text(i) for i in inp if i!=""] |
|
|
return self.tag_sentence_list(inp, **tag_config) |
|
|
|
|
|
|
|
|
inp = self.split_sentences(inp, **tag_config) |
|
|
return self.tag_sentence_list(inp, **tag_config) |
|
|
|
|
|
|
|
|
elif type(inp) == list: |
|
|
inp=[i.strip() for i in inp] |
|
|
inp=[self._preprocess_text(i) for i in inp if i!=""] |
|
|
return self.tag_sentence_list(inp, **tag_config) |
|
|
|
|
|
def identify_language_sentence_list(self, lst, **tag_config): |
|
|
|
|
|
|
|
|
tokenized_batches = [] |
|
|
if type(lst[0])==str: |
|
|
tokenized_batches = [] |
|
|
for i in range(0, len(lst), self.batch_size): |
|
|
batch_texts = lst[i:i + self.batch_size] |
|
|
encoded_batch = self.tokenizer(batch_texts, padding=True, truncation=True, max_length=self.MAX_LENGTH, return_tensors="pt", return_token_type_ids=False) |
|
|
encoded_batch["input_ids"].to("cpu") |
|
|
encoded_batch["attention_mask"].to("cpu") |
|
|
tokenized_batches.append(encoded_batch) |
|
|
|
|
|
|
|
|
else: |
|
|
tokenized_batches = self._batchify(lst) |
|
|
|
|
|
|
|
|
all_tagged_sentences = [] |
|
|
|
|
|
|
|
|
for batch in tokenized_batches: |
|
|
all_out = self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device)) |
|
|
batch_langs = torch.argmax(all_out["seq_logits"], dim=-1) |
|
|
batch["input_ids"].to("cpu") |
|
|
batch["attention_mask"].to("cpu") |
|
|
all_tagged_sentences.extend(batch_langs[:, 0].tolist()) |
|
|
|
|
|
|
|
|
if tag_config["lang_per_item"]: |
|
|
return [self.config["id_to_lang"][i] for i in all_tagged_sentences] |
|
|
|
|
|
|
|
|
else: |
|
|
LANG = 1 if sum(all_tagged_sentences)/len(all_tagged_sentences)>=0.5 else 0 |
|
|
LANG_STR = self.config["id_to_lang"][LANG] |
|
|
return [LANG_STR] * len(lst) |
|
|
|
|
|
@torch.no_grad() |
|
|
def identify_language(self, inp=None, **tag_config): |
|
|
self.eval() |
|
|
if "one_sentence_per_line" not in tag_config: |
|
|
tag_config["one_sentence_per_line"]=False |
|
|
if "lang" in tag_config: |
|
|
del tag_config["lang"] |
|
|
|
|
|
if "output_tsv" not in tag_config: |
|
|
tag_config["output_tsv"] = False |
|
|
|
|
|
if "lang_per_sentence" not in tag_config: |
|
|
tag_config["lang_per_sentence"] = False |
|
|
|
|
|
elif tag_config["lang_per_sentence"]: |
|
|
tag_config["lang_per_sentence"] = True |
|
|
|
|
|
if "input_directory" in tag_config and "output_directory" in tag_config and "write_output_to" in tag_config and tag_config["write_output_to"]!=None: |
|
|
raise ValueError("If an input and output directory is given, then write_output_to cannot be used as the output will be written to as files in output_directory.") |
|
|
|
|
|
if "write_output_to" not in tag_config or "write_output_to" in tag_config and tag_config["write_output_to"]== None: |
|
|
tag_config["write_output_to"] = sys.stdout |
|
|
|
|
|
elif type(tag_config["write_output_to"]) == str and tag_config["write_output_to"]=="list": |
|
|
if tag_config["output_tsv"]: |
|
|
raise ValueError("write_output_to cannot be set to list if output_tsv is set.") |
|
|
if "output_directory" in tag_config and tag_config["output_directory"]: |
|
|
raise ValueError("write_output_to cannot be set to list if output_directory is set.") |
|
|
tag_config["write_output_to"] = None |
|
|
|
|
|
elif type(tag_config["write_output_to"]) == str: |
|
|
tag_config["write_output_to"] = open(tag_config["write_output_to"], "w") |
|
|
|
|
|
if "output_directory" in tag_config: |
|
|
tag_config["write_output_to"] = None |
|
|
|
|
|
if "split_sentences" not in tag_config: |
|
|
tag_config["split_sentences"] = False |
|
|
|
|
|
if "lang_per_item" not in tag_config: |
|
|
tag_config["lang_per_item"] = False |
|
|
|
|
|
if "fast_mode" in tag_config: |
|
|
|
|
|
if "input_directory" not in tag_config: |
|
|
raise ValueError("input_directory must be defined if fast_mode is set.") |
|
|
|
|
|
if tag_config["split_sentences"]: |
|
|
raise ValueError("fast_mode does not split sentences, so split_sentences cannot be set in this mode.") |
|
|
|
|
|
if tag_config["lang_per_item"]: |
|
|
raise ValueError("fast_mode does not identify languages of each line or sentence in a file, so lang_per_item cannot be set in this mode.") |
|
|
|
|
|
if tag_config["lang_per_sentence"]: |
|
|
raise ValueError("fast_mode does not identify languages of sentence in a file, so lang_per_sentence cannot be set in this mode.") |
|
|
|
|
|
general_output=[] |
|
|
file_names=[] |
|
|
contents=[] |
|
|
|
|
|
for dir_path, _, files in os.walk(tag_config["input_directory"]): |
|
|
for f in files: |
|
|
input_path = os.path.join(dir_path, f) |
|
|
if len(file_names) == self.batch_size: |
|
|
batch = self.tokenizer(contents, padding=True, truncation=True, max_length=self.MAX_LENGTH, return_tensors="pt", return_token_type_ids=False) |
|
|
langs = torch.argmax( self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device))["seq_logits"], dim=-1)[:, 0].tolist() |
|
|
del batch |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
if tag_config["write_output_to"]==None: |
|
|
general_output.extend([{"f":i[0], "l":self.config["id_to_lang"][i[1]]} for i in zip(file_names, langs)]) |
|
|
elif tag_config["output_tsv"]: |
|
|
for fil,lan in zip(file_names, langs): |
|
|
tag_config["write_output_to"].write(fil) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(self.config["id_to_lang"][lan]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
else: |
|
|
for fil,lan in zip(file_names, langs): |
|
|
json.dump({"f":fil, "l":self.config["id_to_lang"][lan]}) |
|
|
file_names=[] |
|
|
contents=[] |
|
|
else: |
|
|
content=None |
|
|
try: |
|
|
with open(input_path,"r") as ff: |
|
|
content=ff.read(3000).replace("\n"," ").replace("\r","") |
|
|
except: |
|
|
pass |
|
|
if content!=None: |
|
|
file_names.append(input_path) |
|
|
contents.append(content) |
|
|
|
|
|
if len(file_names)>0: |
|
|
batch = self.tokenizer(contents, padding=True, truncation=True, max_length=self.MAX_LENGTH, return_tensors="pt", return_token_type_ids=False) |
|
|
langs = torch.argmax( self(batch["input_ids"].to(self.device), batch["attention_mask"].to(self.device))["seq_logits"], dim=-1)[:, 0].tolist() |
|
|
del batch |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
if tag_config["write_output_to"]==None: |
|
|
general_output.extend([{"f":i[0], "l":self.config["id_to_lang"][i[1]]} for i in zip(file_names, langs)]) |
|
|
elif tag_config["output_tsv"]: |
|
|
for fil,lan in zip(file_names, langs): |
|
|
tag_config["write_output_to"].write(fil) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(self.config["id_to_lang"][lan]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
else: |
|
|
for fil,lan in zip(file_names, langs): |
|
|
json.dump({"f":fil, "l":self.config["id_to_lang"][lan]}) |
|
|
|
|
|
return general_output if len(general_output)>0 else None |
|
|
|
|
|
if "input_directory" in tag_config: |
|
|
general_output=[] |
|
|
|
|
|
for dir_path, _, files in os.walk(tag_config["input_directory"]): |
|
|
for f in files: |
|
|
input_path = os.path.join(dir_path, f) |
|
|
|
|
|
file_content=self._check_if_text_file_and_return_content(input_path) |
|
|
|
|
|
if type(file_content)==str: |
|
|
file_content=self._preprocess_text(file_content) |
|
|
new_inp=None |
|
|
if tag_config["one_sentence_per_line"]: |
|
|
inp = [i for i in file_content.split("\n") if i!=""] |
|
|
inp = [i for i in inp if i!=""] |
|
|
out = self.identify_language_sentence_list(inp, **tag_config) |
|
|
else: |
|
|
inp = self.split_sentences(file_content, **tag_config) |
|
|
out = self.identify_language_sentence_list(inp, **tag_config) |
|
|
new_inp=[self.tokenizer.decode(i[1:]).split("[SEP]")[0].strip() for i in inp] |
|
|
|
|
|
if new_inp!=None: |
|
|
inp=new_inp |
|
|
|
|
|
|
|
|
if tag_config["write_output_to"]==None: |
|
|
if "output_directory" in tag_config: |
|
|
out_path = os.path.join(tag_config["output_directory"], os.path.relpath(dir_path, tag_config["input_directory"]), f+".lang") |
|
|
os.makedirs(os.path.dirname(out_path), exist_ok=True) |
|
|
with open(out_path, "w") as opened_file: |
|
|
if tag_config["lang_per_sentence"]: |
|
|
if tag_config["output_tsv"]: |
|
|
for sen,lan in zip(inp, out): |
|
|
opened_file.write(sen) |
|
|
opened_file.write("\t") |
|
|
opened_file.write(lan) |
|
|
opened_file.write("\n") |
|
|
else: |
|
|
json.dump([{"s":sen, "l":lan} for sen,lan in zip(inp, out) ] , opened_file) |
|
|
else: |
|
|
if tag_config["output_tsv"]: |
|
|
opened_file.write(out[0]) |
|
|
else: |
|
|
json.dump({"l":out[0]} , opened_file) |
|
|
else: |
|
|
if tag_config["lang_per_sentence"]: |
|
|
general_output.extend([{"s":sen, "l":lan} for sen,lan in zip(inp, out) ]) |
|
|
else: |
|
|
general_output.append({"f":input_path, "l":out[0]}) |
|
|
|
|
|
|
|
|
else: |
|
|
if tag_config["lang_per_sentence"]: |
|
|
if tag_config["output_tsv"]: |
|
|
for sen,lan in zip(inp, out): |
|
|
tag_config["write_output_to"].write(sen) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(lan) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
tag_config["write_output_to"].write("\n") |
|
|
else: |
|
|
json.dump([{"s":sen, "l":lan} for sen,lan in zip(inp, out) ] , tag_config["write_output_to"]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
else: |
|
|
if tag_config["output_tsv"]: |
|
|
tag_config["write_output_to"].write(input_path) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(out[0]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
else: |
|
|
json.dump({"f":input_path, "l":out[0]} , tag_config["write_output_to"]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
|
|
|
else: |
|
|
if tag_config["output_tsv"]: |
|
|
tag_config["write_output_to"].write(input_path) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write("err") |
|
|
tag_config["write_output_to"].write("\n") |
|
|
else: |
|
|
json.dump({"f":input_path, "l":"err"} , tag_config["write_output_to"]) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
|
|
|
if tag_config["write_output_to"] and tag_config["write_output_to"]!=sys.stdout and tag_config["write_output_to"]!=sys.stderr: |
|
|
tag_config["write_output_to"].close() |
|
|
|
|
|
return general_output if len(general_output)>0 else None |
|
|
|
|
|
if inp==None: |
|
|
pass |
|
|
elif type(inp) == str: |
|
|
new_inp=None |
|
|
|
|
|
if tag_config["split_sentences"]: |
|
|
inp = self._preprocess_text(inp) |
|
|
inp = self.split_sentences(inp, **tag_config) |
|
|
new_inp=[self.tokenizer.decode(i[1:]).strip() for i in inp] |
|
|
if tag_config["lang_per_sentence"]: |
|
|
tag_config["lang_per_item"] = True |
|
|
|
|
|
|
|
|
elif tag_config["one_sentence_per_line"]: |
|
|
inp = [i for i in inp.split("\n") if i!=""] |
|
|
inp = [self._preprocess_text(i) for i in inp if i!=""] |
|
|
if tag_config["lang_per_sentence"]: |
|
|
tag_config["lang_per_item"] = True |
|
|
|
|
|
|
|
|
else: |
|
|
inp = [self._preprocess_text(inp)] |
|
|
|
|
|
|
|
|
out = self.identify_language_sentence_list(inp, **tag_config) |
|
|
|
|
|
if new_inp!=None: |
|
|
inp=new_inp |
|
|
|
|
|
|
|
|
if tag_config["write_output_to"]==None: |
|
|
return [{"s":i[0], "l": i[1]} for i in zip(inp, out)] |
|
|
|
|
|
if tag_config["output_tsv"]: |
|
|
for sen,lan in zip(inp, out): |
|
|
tag_config["write_output_to"].write(sen) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(out) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
else: |
|
|
json.dump([{"s":sen, "l":lan} for sen,lan in zip(inp, out) ] , tag_config["write_output_to"]) |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
elif type(inp) == list: |
|
|
inp=[i.strip() for i in inp] |
|
|
inp=[self._preprocess_text(i) for i in inp if i!=""] |
|
|
out = self.identify_language_sentence_list(inp, **tag_config) |
|
|
|
|
|
|
|
|
if tag_config["write_output_to"]==None: |
|
|
return [{"s":i[0], "l": i[1]} for i in zip(inp, out)] |
|
|
|
|
|
if tag_config["output_tsv"]: |
|
|
for sen,lan in zip(inp, out): |
|
|
tag_config["write_output_to"].write(sen) |
|
|
tag_config["write_output_to"].write("\t") |
|
|
tag_config["write_output_to"].write(lan) |
|
|
tag_config["write_output_to"].write("\n") |
|
|
else: |
|
|
json.dump([{"s":sen, "l":lan} for sen,lan in zip(inp, out) ] , tag_config["write_output_to"]) |
|
|
|
|
|
return |
|
|
|
|
|
|