Spaces:
Sleeping
Sleeping
| # import gradio as gr | |
| # import time # 模拟处理耗时 | |
| # import os | |
| # import spacy | |
| # from spacy import displacy | |
| # import pandas as pd | |
| # nlp = spacy.load("en_core_web_md") | |
| # def process_api(input_text): | |
| # # 这里编写实际的后端处理逻辑 | |
| # return { | |
| # "status": "success", | |
| # # "result": f"Processed: {input_text.upper()}", | |
| # "result": f"Processed: {nlp(input_text).to_json()}", | |
| # "timestamp": time.time() | |
| # } | |
| # # 设置API格式为JSON | |
| # gr.Interface( | |
| # fn=process_api, | |
| # inputs="text", | |
| # outputs="json", | |
| # title="Backend API", | |
| # allow_flagging="never" | |
| # ).launch() | |
| import gradio as gr | |
| import time | |
| import spacy | |
| from spacy.tokens import Span, Doc, Token | |
| from spacy.language import Language | |
| import llm_ent_extract | |
| import regex_spatial | |
| import re | |
| colors = {'GPE': "#43c6fc", "LOC": "#fd9720", "RSE":"#a6e22d"} | |
| options = {"ents": ['GPE', 'LOC', "RSE"], "colors": colors} | |
| HTML_WRAPPER = """<div style="overflow-x: auto; border: none solid #a6e22d; border-radius: 0.25rem; padding: 1rem">{}</div>""" | |
| BASE_URL = "" | |
| model = "" | |
| types = "" | |
| nlp = spacy.load("en_core_web_md") | |
| gpe_selected = 'GPE' | |
| loc_selected = 'loc' | |
| rse_selected = 'rse' | |
| rse_id = "rse_id" | |
| def set_selected_entities(doc): | |
| global gpe_selected, loc_selected, rse_selected, model | |
| ents = [ent for ent in doc.ents if ent.label_ == gpe_selected or ent.label_ == loc_selected or ent.label_ == rse_selected] | |
| doc.ents = ents | |
| return doc | |
| def update_entities(doc, entity_texts, replace=True): | |
| """ | |
| 根据给定的文本内容标注实体,并直接修改 doc.ents。 | |
| :param doc: spaCy 解析后的 Doc 对象 | |
| :param entity_texts: 字典,键是要标注的实体文本,值是对应的实体类别 | |
| :param replace: 布尔值,True 则替换现有实体,False 则保留现有实体并添加新的 | |
| """ | |
| new_ents = list(doc.ents) if not replace else [] # 如果 replace=False,保留已有实体 | |
| for ent_text, ent_label in entity_texts.items(): | |
| start = doc.text.find(ent_text) # 在全文中查找文本位置 | |
| if start != -1: | |
| start_token = len(doc.text[:start].split()) # 计算起始 token 索引 | |
| end_token = start_token + len(ent_text.split()) # 计算结束 token 索引 | |
| if start_token < len(doc) and end_token <= len(doc): # 确保索引不越界 | |
| new_ent = Span(doc, start_token, end_token, label=ent_label) | |
| new_ents.append(new_ent) | |
| doc.set_ents(new_ents) # 更新 doc.ents | |
| def find_ent_by_regex(doc, sentence, ent, regex): | |
| global id | |
| if id == "": | |
| id = ent.text | |
| for match in re.finditer(regex, doc.text): | |
| start, end = match.span() | |
| if(start>= sentence.start_char and start<= sentence.end_char): | |
| span = doc.char_span(start, end) | |
| if span is not None: | |
| id = span.text +"_"+ id | |
| if(start > ent.end_char): | |
| ent.end_char = end | |
| else: | |
| ent.start_char = start | |
| return ent | |
| return ent | |
| def set_extension(): | |
| Span.set_extension(rse_id, default="", force=True) | |
| Doc.set_extension(rse_id, default="", force=True) | |
| Token.set_extension(rse_id, default="", force=True) | |
| def get_level1(doc, sentence, ent): | |
| return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level1_regex()) | |
| def get_level2(doc, sentence, ent): | |
| return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level2_regex()) | |
| def get_level3(doc, sentence, ent): | |
| return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level3_regex()) | |
| def get_relative_entity(doc, sentence, ent): | |
| global id | |
| id = "" | |
| rel_entity = get_level1(doc, sentence, ent) | |
| # print(1111 ,rel_entity) | |
| rel_entity = get_level2(doc, sentence, rel_entity) | |
| # print(2222 ,rel_entity) | |
| rel_entity = get_level3(doc, sentence, rel_entity) | |
| # print(3333 ,rel_entity) | |
| if("_" in id): | |
| rel_entity = doc.char_span(rel_entity.start_char, rel_entity.end_char, "RSE") | |
| rel_entity._.rse_id = id | |
| # print(id, 'idid') | |
| # print(rel_entity._.rse_id, '._._') | |
| return rel_entity | |
| rel_entity = doc.char_span(ent.start_char, ent.end_char, ent.label_) | |
| rel_entity._.rse_id = id | |
| # print(4444 ,rel_entity) | |
| return rel_entity | |
| def get_spatial_ent(doc): | |
| set_extension() | |
| new_ents = [] | |
| # ents = [ent for ent in doc.ents if ent.label_ == "GPE" or ent.label_ == "LOC"] # 筛选出ase | |
| # LLM 输出 | |
| # GPE = '[###Pyrmont###, ###Glebe###]' # LLM 输出的实体 | |
| GPE = llm_ent_extract.extract_GPE(doc.text) # LLM 输出的实体 | |
| print(doc.text, 'llmin') | |
| print(GPE, 'llout') | |
| GPE = llm_ent_extract.extract(GPE, 'GPE') | |
| print(GPE, 'llmout2') | |
| update_entities(doc, GPE, True) | |
| ents = doc.ents | |
| print(ents, 'eee') | |
| end = None | |
| for ent in ents: | |
| if ent.end != len(doc): | |
| next_token = doc[ent.end] | |
| if end is not None: | |
| start = end | |
| else: | |
| start = ent.sent.start | |
| if next_token.text.lower() in regex_spatial.get_keywords(): | |
| end = next_token.i | |
| else: | |
| end = ent.end | |
| else: | |
| start = ent.sent.start | |
| end = ent.end | |
| # print(doc, '//',start, '//', end, 999888) | |
| # print(doc[start],'//', doc[end]) | |
| # print(ents, 999) | |
| rsi_ent = get_relative_entity(doc,Span(doc, start, end), ent) | |
| # print(doc.ents[0]._.rse_id, '._._2') | |
| # print(rsi_ent.text, rsi_ent.label_, rsi_ent._.rse_id) | |
| new_ents.append(rsi_ent) | |
| doc.ents = new_ents | |
| return doc | |
| nlp.add_pipe("spatial_pipeline", after="ner") | |
| def extract_spatial_entities(text): | |
| doc = nlp(text) | |
| # 分句处理 | |
| sent_ents = [] | |
| sent_texts = [] | |
| sent_rse_id = [] | |
| offset = 0 # 记录当前 token 偏移量 | |
| sent_start_positions = [0] # 记录句子信息 | |
| doc_copy = doc.copy() # 用于展示方程组合 | |
| for sent in doc.sents: | |
| sent_doc = nlp(sent.text) # 逐句处理 | |
| sent_doc = set_selected_entities(sent_doc) # 这里处理实体 | |
| sent_texts.append(sent_doc.text) | |
| for ent in sent_doc.ents: | |
| sent_rse_id.append(ent._.rse_id) | |
| # **调整每个实体的索引,使其匹配完整文本** | |
| for ent in sent_doc.ents: | |
| new_ent = Span(doc, ent.start + offset, ent.end + offset, label=ent.label_) | |
| sent_ents.append(new_ent) | |
| offset += len(sent) # 更新偏移量 | |
| sent_start_positions.append(sent_start_positions[-1] + len(sent)) # 记录句子起点 | |
| # **创建新 Doc** | |
| final_doc = Doc(nlp.vocab, words=[token.text for token in doc], spaces=[token.whitespace_ for token in doc]) | |
| for i in sent_start_positions: # 手动标记句子起始点 | |
| if i < len(final_doc): | |
| final_doc[i].is_sent_start = True | |
| # **设置实体** | |
| final_doc.set_ents(sent_ents) | |
| for i in range(len(sent_rse_id)): | |
| final_doc.ents[i]._.rse_id = sent_rse_id[i] | |
| doc = final_doc | |
| ents_ext = [] | |
| for ent in doc.ents: | |
| ents_ext.append({ | |
| "start": ent.start_char, | |
| "end": ent.end_char, | |
| "label": ent.label_, | |
| "rse_id": ent._.rse_id # ✅ 加入扩展字段 | |
| }) | |
| return { | |
| "text": doc.text, | |
| "ents": [{"start": ent.start_char, "end": ent.end_char, "label": ent.label_} for ent in doc.ents], | |
| "tokens": [{"id": i, "start": token.idx, "end": token.idx + len(token)} for i, token in enumerate(doc)], | |
| "ents_ext": ents_ext # ✅ 添加扩展字段 | |
| } | |
| def process_api(input_text): | |
| # 这里编写实际的后端处理逻辑 | |
| # return { | |
| # "status": "success", | |
| # # "result": f"Processed: {input_text.upper()}", | |
| # # "result": f"Processed: {nlp(input_text).to_json()}", | |
| # "result": f"Processed: {extract_spatial_entities(input_text)}", | |
| # "timestamp": time.time() | |
| # } | |
| return extract_spatial_entities(input_text) | |
| # 设置API格式为JSON | |
| gr.Interface( | |
| fn=process_api, | |
| inputs="text", | |
| outputs="json", | |
| title="Backend API", | |
| allow_flagging="never" | |
| ).launch() |