Spaces:
Runtime error
Runtime error
| import json | |
| import subprocess | |
| from cltk.core.data_types import Process | |
| from dataclasses import dataclass | |
| from copy import deepcopy | |
| from boltons.cacheutils import cachedproperty | |
| from cltk.core.data_types import Doc, Word | |
| import subprocess | |
| import re | |
| import string | |
| from cltk.tokenizers.lat.lat import LatinWordTokenizer | |
| from cltk.core.data_types import Process, Pipeline | |
| from cltk.languages.utils import get_lang | |
| from cltk.alphabet.processes import LatinNormalizeProcess | |
| from cltk.nlp import NLP | |
| from cltk.text.processes import DefaultPunctuationRemovalProcess | |
| from fastapi import FastAPI | |
| from fastapi.responses import FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from typing import Optional | |
| import json | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| import morph_simplifier | |
| import json | |
| import os | |
| app = FastAPI() | |
| origins = ["*"] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class LatinWhitakersWordsMorphology(Process): | |
| """A simple ``Process`` for giving the stem and morphological features | |
| of a latin word using Whitakers Words | |
| """ | |
| language: str = None | |
| def algorithm(self): | |
| return None | |
| def parse_word(self, tup): | |
| index, (word_tup) = tup | |
| word_obj, word_lookup = word_tup | |
| word_obj.word_lookup = word_lookup | |
| word_lookup = word_lookup.strip() | |
| if word_obj.string in [',', ":", "'", '"', ".", ";"] or "UNKNOWN" in word_lookup or "" == word_lookup: | |
| word_obj.stem = word_obj.string | |
| word_obj.morph = "OTHER" | |
| word_obj.case = "" | |
| else: | |
| letter_swap = False | |
| # shit like XIIX, why was this written?? | |
| if "Bad Roman Numeral?" in word_lookup: | |
| word_obj.stem = word_obj.string | |
| word_obj.morph = "NUM20XXXCARD" | |
| word_obj.case = "" | |
| return word_obj | |
| if "WORD_EDIT" in word_lookup: | |
| letter_swap = True | |
| word_lookup = word_lookup.replace("WORD_EDIT\n", "") | |
| # form of sum/esse/ | |
| if word_lookup[0] == '.': | |
| word_obj.stem = "esse" | |
| word_obj.morph = "".join(word_lookup.split("\n")[0].split(" ")[1:]).replace("Late", "").replace("Early", "").replace("N98XXM", "ADV") | |
| word_obj.case = word_obj.string | |
| return word_obj | |
| try: | |
| # alicuius - [XXXAO] starts the line | |
| if word_lookup.split("\n")[1].strip()[0] == "[": | |
| sp = word_lookup.split("\n") | |
| word_lookup = sp[0] + "\n" + sp[2] | |
| except Exception as e: | |
| print(e) | |
| #i/j u/v d/t swap, need to drop another line | |
| if word_lookup.split(" ")[0].split(".")[0] == "Word": | |
| word_lookup = "\n".join(word_lookup.split("\n")[2:]) | |
| letter_swap = True | |
| # Cardinal number | |
| if "CARD" in word_lookup and "." not in word_lookup.split(" ")[0]: | |
| word_obj.stem = word_lookup.split(" ")[0] | |
| word_obj.morph = "".join(word_lookup.split("\n")[0].split(" ")[1:]).replace("Late", "").replace("Early", "").replace("N98XXM", "ADV") | |
| word_obj.case = "" | |
| return word_obj | |
| if word_lookup.split(" ")[0].replace(".", "").replace("ivi", "ii").replace("v", "u").replace("j", "i").strip().lower() != word_obj.string.lower().replace("j", "i").replace("v.i", "").replace("ivi", "ii").replace("-", "").replace("v", "u"): | |
| if word_lookup.split(" ")[0].replace(".", "").strip().lower() == 'special_replace': | |
| word_obj.stem = word_obj.string | |
| word_obj.morph = "V51PRESACTIVEIND3P" | |
| word_obj.case = "" | |
| return word_obj | |
| elif word_lookup.split(" ")[0].replace(".", "").strip().lower() == 'iri_special': | |
| word_obj.stem = word_obj.string | |
| word_obj.morph = "V31FUTPASSIVEINF0X" | |
| word_obj.case = "" | |
| return word_obj | |
| if index != self.l - 1 and not letter_swap: | |
| try: | |
| word_lookup = word_lookup.split("\n")[2] | |
| word_obj.word_lookup = word_lookup | |
| except: | |
| word_obj.word_lookup = word_lookup | |
| word_obj.stem = word_lookup.split(" ")[0].split(".")[0] | |
| word_obj.morph = "".join(word_lookup.split("\n")[0].split(" ")[1:]).replace("Late", "").replace("Early", "").replace("N98XXM", "ADV") | |
| word_obj.case = word_lookup.split(" ")[0].split(".")[1] if "." in word_lookup.split(" ")[0] else "" | |
| return word_obj | |
| def run(self, input_doc: Doc) -> Doc: | |
| output_doc = deepcopy(input_doc) | |
| output_doc.words = [word for word in output_doc.words if word is not None and word.string != '-'] | |
| self.l = len(output_doc.words) | |
| words =re.sub(r"SUPINE \+ iri.*\n", "\n\nIRI_SPECIAL ", re.sub(r"PPL\+sunt.*\n\nsum|Syncope s => vis *\n\n", "", "\n".join(re.split(r"\n=>|=>\n",subprocess.check_output(["./words"],input=" ".join([word.string.replace("j","i") for word in output_doc.words]), cwd='./bin/', text=True), maxsplit=1)[1].split("\n")[:-6]) | |
| .replace("MORE - hit RETURN/ENTER to continue\nUnexpected exception in PAUSE", "") \ | |
| .replace("\n*", '\n') \ | |
| .replace("PERF PASSIVE PPL + verb TO_BE => PASSIVE perfect system", "\n\nSPECIAL_REPLACE") \ | |
| .replace("FUT PASSIVE PPL + esse => PRES PASSIVE INF", "\n\nSPECIAL_REPLACE") \ | |
| .replace("\nFUT PASSIVE PPL + verb TO_BE => PASSIVE Periphrastic - should/ought/had to", "\n\nSPECIAL_REPLACE") \ | |
| .replace("\nFUT ACTIVE PPL + verb TO_BE => ACTIVE Periphrastic - about to, going to", "\n\nSPECIAL_REPLACE") \ | |
| .replace("\nFUT PASSIVE PPL + esse => PASSIVE Periphrastic - should/ought/had to", "\n\nSPECIAL_REPLACE") \ | |
| .replace("\nFUT ACT PPL+fuisse => PERF ACT INF Periphrastic - to have been about/going to", "\n\nSPECIAL_REPLACE") \ | |
| .replace("\nFUT PASSIVE PPL + fuisse => PERF PASSIVE INF Periphrastic - about to, going to", "\n\nSPECIAL_REPLACE") \ | |
| .replace("\nFUT ACTIVE PPL + esse => ACTIVE Periphrastic - about to, going to", "\n\nSPECIAL_REPLACE") \ | |
| .replace("\nFUT ACTIVE PPL + esse => PRES Periphastic/FUT ACTIVE INF - be about/going to", "\n\nSPECIAL_REPLACE") \ | |
| .replace("Syncope s => vis\n\n", "WORD_EDIT") \ | |
| .replace("Syncope s => vis \n\n", "WORD_EDIT") \ | |
| .replace("\nSyncope ii => ivi \nSyncopated perfect ivi can drop 'v' without contracting vowel", "WORD_EDIT") \ | |
| .replace("Syncope s => vis \nSyncopated perfect often drops the 'v' and contracts vowel", "WORD_EDIT") \ | |
| .replace("\nPERF PASSIVE PPL + esse => PERF PASSIVE INF", "\n\nSPECIAL_REPLACE"))) \ | |
| .replace("\nSlur sub/su~ \nAn initial 'sub' may be rendered by su~", "WORD_EDIT") \ | |
| .replace("\nSyncope r => v.r \n\n", "WORD_EDIT") \ | |
| .split("\n\n") | |
| output_tokens = list(map(self.parse_word, enumerate(zip(output_doc.words, words)))) | |
| return output_tokens | |
| class LatinTokenizationProcessWithPropers(Process): | |
| def algorithm(self): | |
| return LatinWordTokenizer() | |
| def run(self, input_doc: Doc) -> Doc: | |
| output_doc = deepcopy(input_doc) | |
| output_doc.words = [] | |
| tokenizer_obj = self.algorithm | |
| enclitics_exceptions=LatinWordTokenizer.EXCEPTIONS + ["beniamin", "mosen", "hegesian", "bitumen", "aaron", "aristomene", 'disan', 'aran', 'lothan', 'amdan', 'amdan', 'esban', 'iethran', 'charan', "restitue", "resen"] | |
| tokens = tokenizer_obj.tokenize(output_doc.raw, enclitics_exceptions=enclitics_exceptions, enclitics=['que', 'n', 'ne', 'ue', 've', 'st']) | |
| indices = tokenizer_obj.compute_indices(output_doc.raw, tokens) | |
| for index, token in enumerate(tokens): | |
| word_obj = Word( | |
| string=token, | |
| index_token=index, | |
| index_char_start=indices[index], | |
| index_char_stop=indices[index] + len(token), | |
| ) | |
| output_doc.words.append(word_obj) | |
| return output_doc | |
| pipe_morph = Pipeline(description="A custom Latin pipeline", processes=[LatinNormalizeProcess, LatinTokenizationProcessWithPropers, DefaultPunctuationRemovalProcess, LatinWhitakersWordsMorphology], language=get_lang("lat")) | |
| nlp_morph = NLP(language='lat', custom_pipeline = pipe_morph, suppress_banner=True) | |
| def process_line_morph(line): | |
| an = nlp_morph.analyze(line.translate(str.maketrans('', '', string.punctuation)).replace('(','').replace(')','').replace("“", "").replace("”", "").replace("—", ":")) | |
| output_line = "" | |
| for word in an: | |
| if not word: | |
| continue | |
| output_line += word.stem + (" " + word.morph + " " if word.morph != "" else " ") | |
| return output_line[:-1].replace("\n", "").replace(" ", " ") | |
| def process_line_morph_simplified(line): | |
| an = nlp_morph.analyze(line.translate(str.maketrans('', '', string.punctuation)).replace('(','').replace(')','').replace("“", "").replace("”", "").replace("—", ":")) | |
| output_line = "" | |
| for word in an: | |
| if not word: | |
| continue | |
| output_line += word.stem + (" " + morph_simplifier.simplify_form(word.morph) + " " if word.morph != "" else " ") | |
| return output_line[:-1].replace("\n", "").replace(" ", " ") | |
| def process_line_case(line): | |
| an = nlp_morph.analyze(line.translate(str.maketrans('', '', string.punctuation)).replace('(','').replace(')','').replace("“", "").replace("”", "").replace("—", ":")) | |
| output_line = "" | |
| for word in an: | |
| if not word: | |
| continue | |
| output_line += (word.stem) + (" CASE_" + word.case + " " if word.case != "" else " ") | |
| return output_line[:-1].replace("\n", "").replace(" ", " ") | |
| base_tokenizer = AutoTokenizer.from_pretrained("grosenthal/la_en_base") | |
| morph_tokenizer = AutoTokenizer.from_pretrained("grosenthal/la_en_morphology") | |
| morph_simplified_tokenizer = AutoTokenizer.from_pretrained("grosenthal/la_en_morph_simplified") | |
| case_tokenizer = AutoTokenizer.from_pretrained("grosenthal/la_en_case") | |
| base_model = AutoModelForSeq2SeqLM.from_pretrained("grosenthal/la_en_base") | |
| morph_model = AutoModelForSeq2SeqLM.from_pretrained("grosenthal/la_en_morphology") | |
| morph_simplified_model = AutoModelForSeq2SeqLM.from_pretrained("grosenthal/la_en_morph_simplified") | |
| case_model = AutoModelForSeq2SeqLM.from_pretrained("grosenthal/la_en_case") | |
| def tokenize(tokenizer, text): | |
| split_text = tokenizer.tokenize(text, truncation=True, max_length=128) | |
| input_ids = tokenizer(text, truncation=True, max_length=128)['input_ids'] | |
| return { | |
| "text": split_text, | |
| "ids": input_ids | |
| } | |
| tokenize_base = lambda t: tokenize(base_tokenizer, t) | |
| tokenize_morph = lambda t: tokenize(morph_tokenizer, t) | |
| tokenize_morph_simplified = lambda t: tokenize(morph_simplified_tokenizer, t) | |
| tokenize_case = lambda t: tokenize(case_tokenizer, t) | |
| def translate(model, tokenizer, text): | |
| translated = model.generate(**tokenizer(text, return_tensors="pt", padding=True, truncation=True)) | |
| translated_line = [tokenizer.decode(t, skip_special_tokens=True) for t in translated] | |
| return translated_line | |
| translate_base = lambda t: translate(base_model, base_tokenizer, t) | |
| translate_morph = lambda t: translate(morph_model, morph_tokenizer, t) | |
| translate_morph_simplified = lambda t: translate(morph_simplified_model, morph_simplified_tokenizer, t) | |
| translate_case = lambda t: translate(case_model, case_tokenizer, t) | |
| def process_handler(text): | |
| print("in handler") | |
| morph_text = process_line_morph(text) | |
| morph_simplified_text = process_line_morph_simplified(text) | |
| case_text = process_line_case(text) | |
| return { | |
| 'processed_texts':{ | |
| 'base': text, | |
| 'morph': morph_text, | |
| 'morph_simplified': morph_simplified_text, | |
| 'case': case_text | |
| }, | |
| 'tokenized':{ | |
| 'base': tokenize_base(text), | |
| 'morph': tokenize_morph(morph_text), | |
| 'morph_simplified': tokenize_morph_simplified(morph_simplified_text), | |
| 'case': tokenize_case(case_text), | |
| } | |
| } | |
| async def process(text: Optional[str] = None): | |
| if text is not None: | |
| result = process_handler(text) | |
| return json.dumps(result) | |
| else: | |
| return json.dumps({"error": "Missing required parameter 'text'"}), 400 | |
| async def translate_base_http(text: Optional[str] = None): | |
| if text is not None: | |
| result = translate_base(text) | |
| return json.dumps(result) | |
| else: | |
| return json.dumps({"error": "Missing required parameter 'text'"}), 400 | |
| async def translate_case_http(text: Optional[str] = None): | |
| if text is not None: | |
| result = translate_case(text) | |
| return json.dumps(result) | |
| else: | |
| return json.dumps({"error": "Missing required parameter 'text'"}), 400 | |
| async def translate_morph_http(text: Optional[str] = None): | |
| if text is not None: | |
| result = translate_morph(text) | |
| return json.dumps(result) | |
| else: | |
| return json.dumps({"error": "Missing required parameter 'text'"}), 400 | |
| async def translate_morph_simplified_http(text: Optional[str] = None): | |
| if text is not None: | |
| result = translate_morph_simplified(text) | |
| return json.dumps(result) | |
| else: | |
| return json.dumps({"error": "Missing required parameter 'text'"}), 400 | |
| async def translate_all(text: Optional[str] = None): | |
| if text is not None: | |
| base_result = translate_base(text) | |
| case_result = translate_case(process_line_case(text)) | |
| morph_result = translate_morph(process_line_morph(text)) | |
| morph_simplified_result = translate_morph_simplified(process_line_morph_simplified(text)) | |
| return json.dumps({ | |
| 'base': base_result, | |
| 'case': case_result, | |
| 'morph': morph_result, | |
| 'morph_simplified': morph_simplified_result | |
| }) | |
| else: | |
| return json.dumps({"error": "Missing required parameter 'text'"}), 400 | |
| app.mount("/", StaticFiles(directory="src/aineid/build", html=True), name="static") | |
| def index() -> FileResponse: | |
| return FileResponse(path="/app/static/index.html", media_type="text/html") |