Spaces:
Sleeping
Sleeping
| # -*- coding:utf-8 -*- | |
| from io import BytesIO | |
| import re | |
| from zipfile import ZipFile | |
| import os | |
| from pathlib import Path | |
| import streamlit as st | |
| from cassis import load_typesystem, load_cas_from_xmi | |
| def st_pb(method): | |
| """streamlit decorator to display | |
| progress bar | |
| """ | |
| def progress_bar(ref): | |
| container = st.empty() | |
| bar = st.progress(0) | |
| pg_gen = method(ref) | |
| try: | |
| while True: | |
| progress = next(pg_gen) | |
| bar.progress(progress[0]) | |
| if progress[2]: | |
| container.write("✅ Processing... " + progress[1]) | |
| else: | |
| container.write("❌️ Errror with..." + progress[1]) | |
| except StopIteration as result: | |
| return result.value | |
| return progress_bar | |
| class Project: | |
| def __init__(self, zip_project, type, remote): | |
| # zip container that contains XMI and typesystem | |
| self.zip_project = zip_project | |
| self.remote = remote | |
| # 'iaa' or 'global' | |
| self.type = type | |
| # store source filename | |
| self.documents = [] | |
| # store XMI representation | |
| self.xmi_documents = [] | |
| # store typesystem file | |
| self.typesystem = None # cassis.load_typesystem(BytesIO(annotation_zip.read('TypeSystem.xml'))) | |
| # set annotators | |
| self.annotators = [] | |
| # set annotations | |
| """ | |
| { | |
| "Filename.xmi": { | |
| mentions: [], | |
| labels: [] | |
| }, ... | |
| } | |
| """ | |
| self.annotations = {} | |
| if isinstance(self.zip_project, ZipFile) and self.remote and self.type == "global": | |
| for fp in self.zip_project.namelist(): | |
| if self.typesystem is None: | |
| self.typesystem = load_typesystem(BytesIO(self.zip_project.open('TypeSystem.xml').read())) | |
| if fp.endswith('.xmi'): | |
| self.documents.append(fp) | |
| self.xmi_documents.append(str(self.zip_project.open(fp).read().decode("utf-8"))) | |
| else: | |
| with ZipFile(self.zip_project) as project_zip: | |
| if self.type == "global": | |
| regex = re.compile('.*curation/.*/(?!\._).*zip$') | |
| elif self.type == "iaa": | |
| regex = re.compile('.*xm[il]$') | |
| annotation_fps = (fp for fp in project_zip.namelist() if regex.match(fp)) | |
| for fp in annotation_fps: | |
| if self.type == "global": | |
| with ZipFile(BytesIO(project_zip.read(fp))) as annotation_zip: | |
| if self.typesystem is None: | |
| self.typesystem = load_typesystem(BytesIO(annotation_zip.read('TypeSystem.xml'))) | |
| for f in annotation_zip.namelist(): | |
| if f.endswith('.xmi'): | |
| # store source filename | |
| self.documents.append(Path(fp).parent.name) | |
| # annotators = [] | |
| # store XMI representation | |
| self.xmi_documents.append(str(annotation_zip.read(f).decode("utf-8"))) | |
| elif self.type == "iaa": | |
| if self.typesystem is None and fp.endswith('.xml'): | |
| self.typesystem = load_typesystem(BytesIO(project_zip.read('TypeSystem.xml'))) | |
| else: | |
| if fp.endswith('.xmi'): | |
| # store source filename | |
| self.documents.append(fp) | |
| # set annotators | |
| self.annotators.append(os.path.splitext(fp)[0]) | |
| # store XMI representation | |
| self.xmi_documents.append(str(project_zip.read(fp).decode("utf-8"))) | |
| self.extract_ne() | |
| def extract_ne(self): | |
| count = 0 | |
| for xmi, src in zip(self.xmi_documents, self.documents): | |
| doc_flag = True | |
| try: | |
| cas = load_cas_from_xmi(xmi, typesystem=self.typesystem) | |
| self.annotations[src] = { | |
| "mentions": [], | |
| "labels": [] | |
| } | |
| for ne in cas.select('de.tudarmstadt.ukp.dkpro.core.api.ner.type.NamedEntity'): | |
| self.annotations[src]["mentions"].append(ne.get_covered_text()) | |
| self.annotations[src]["labels"].append(ne.value) | |
| except: | |
| doc_flag = False | |
| count += 1 | |
| yield (count / len(self.documents)) * 1.0, src, doc_flag | |