|
|
|
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
import gradio as gr |
|
|
import asyncio |
|
|
|
|
|
from langchain.llms import OpenAI |
|
|
from langchain.prompts import PromptTemplate |
|
|
from langchain.chains import LLMChain |
|
|
|
|
|
|
|
|
from chains import * |
|
|
from cloud_db import * |
|
|
from cloud_storage import * |
|
|
from supplier import * |
|
|
from utility import list_dict_to_dict |
|
|
|
|
|
|
|
|
@terminal_print |
|
|
def init_app_data(): |
|
|
''' |
|
|
A function to initialize the application data from the cloud backend. |
|
|
All the cloud data was saved in the app_data dictionary. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
None |
|
|
|
|
|
Returns |
|
|
------- |
|
|
None |
|
|
''' |
|
|
app_data["prompts"] = list_dict_to_dict(get_table("prompts"),key="prompt_name") |
|
|
app_data["terms"] = get_table("terms") |
|
|
app_data["articles"] = list_dict_to_dict(get_table("articles"),key="name") |
|
|
app_data["summary"] = list_dict_to_dict(get_table("summary"),key="term") |
|
|
with open(".data/instruction_agg_performance.json","r") as f: |
|
|
prompts_agg_json = json.load(f) |
|
|
app_data["prompts_agg"] = list_dict_to_dict(prompts_agg_json,key="assessment") |
|
|
|
|
|
@terminal_print |
|
|
def get_existing_article( |
|
|
article_name, |
|
|
): |
|
|
''' |
|
|
get_existing_article function receive the article name and return the article object |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
article_name : str |
|
|
name of the article |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
article object |
|
|
''' |
|
|
article = app_data["articles"][article_name] |
|
|
app_data["current_article"] = article |
|
|
|
|
|
return create_overview(article), create_detail_views(article) |
|
|
|
|
|
@terminal_print |
|
|
def generate_summary(): |
|
|
articles = app_data["user"]["summary"]["articles"] |
|
|
|
|
|
pass |
|
|
|
|
|
@terminal_print |
|
|
def process_study( |
|
|
domain, |
|
|
study_file_obj, |
|
|
study_content, |
|
|
): |
|
|
|
|
|
if study_file_obj: |
|
|
article = add_article(domain,study_file_obj) |
|
|
elif study_content: |
|
|
article = add_article(domain,study_content,file_object=False) |
|
|
else: |
|
|
return "No file or content provided","No file or content provided","No file or content provided" |
|
|
|
|
|
|
|
|
update_article_segment(article) |
|
|
|
|
|
|
|
|
process_prompts(article=article) |
|
|
|
|
|
|
|
|
post_process(article) |
|
|
|
|
|
|
|
|
app_data["current_article"] = article |
|
|
app_data["articles"][article["name"]] = article |
|
|
|
|
|
|
|
|
try: |
|
|
update_article(article) |
|
|
except Exception as e: |
|
|
print(e) |
|
|
|
|
|
|
|
|
|
|
|
detail_views = create_detail_views(article) |
|
|
overview = create_overview(article) |
|
|
|
|
|
return overview, detail_views |
|
|
|
|
|
@terminal_print |
|
|
def process_studies( |
|
|
domain, |
|
|
file_objs): |
|
|
|
|
|
for file_obj in file_objs: |
|
|
process_study(domain,file_obj,None) |
|
|
return gr.update(value=create_md_tables(app_data["articles"])) |
|
|
|
|
|
@terminal_print |
|
|
def create_md_tables(articles): |
|
|
''' |
|
|
create markdown tables for the articles. |
|
|
''' |
|
|
md_text = "" |
|
|
md_text += "| Article Name | Authors | Domain | Upload Time |\n| --- | --- | --- | --- |\n" |
|
|
|
|
|
for name, article in articles.items(): |
|
|
md_table = f"| {name} | {article['Authors']} |{article['domain']} | {article['upload_time']} | \n" |
|
|
md_text += md_table |
|
|
|
|
|
return md_text |
|
|
|
|
|
@terminal_print |
|
|
def update_article_segment(article): |
|
|
|
|
|
raw_content = article["raw"] |
|
|
index_discussion = raw_content.lower().index("discussion") if "discussion" in raw_content.lower() else len(raw_content) |
|
|
|
|
|
|
|
|
meta_content = raw_content[:index_discussion] |
|
|
abstract, next_content = get_key_content(raw_content,"objective","key") |
|
|
introduction, next_content = get_key_content(next_content,"key","methods") |
|
|
materials_and_methods, next_content = get_key_content(next_content,"methods","results") |
|
|
results, _ = get_key_content(next_content,"results","discussion") |
|
|
|
|
|
|
|
|
|
|
|
article.update({ |
|
|
"Abstract": abstract, |
|
|
"Introduction": introduction, |
|
|
"Material and Methods": materials_and_methods, |
|
|
"Results": results, |
|
|
"Meta Content": meta_content, |
|
|
"tables": "" |
|
|
}) |
|
|
|
|
|
|
|
|
article.update({ |
|
|
|
|
|
"key_content": article["Abstract"] + article["Material and Methods"] + article["Results"], |
|
|
}) |
|
|
|
|
|
article.update(identify_logic(article["key_content"])) |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
pre_loop = asyncio.new_event_loop() |
|
|
pre_loop.run_until_complete(get_segments(article,article_prompts)) |
|
|
pre_loop.close() |
|
|
|
|
|
except: |
|
|
pre_loop = asyncio.get_event_loop() |
|
|
tasks = [] |
|
|
tasks.append(get_segments(article,article_prompts)) |
|
|
asyncio.gather(*tasks,return_exceptions=True) |
|
|
|
|
|
@aterminal_print |
|
|
async def gen_segment(article,name,chain): |
|
|
|
|
|
resp = await chain.ainvoke({"term":""}) |
|
|
article[name] = resp.content |
|
|
|
|
|
@aterminal_print |
|
|
async def get_segments(article,prompts): |
|
|
llm = ChatOpenAI( |
|
|
temperature=0.0, |
|
|
model_name="gpt-3.5-turbo-16k", |
|
|
openai_api_key=openai.api_key) |
|
|
tasks = [] |
|
|
|
|
|
for name,p in prompts.items(): |
|
|
prompt = ChatPromptTemplate.from_messages([ |
|
|
("human",article["Meta Content"]), |
|
|
("system","From the text above "+p), |
|
|
]) |
|
|
chain = prompt | llm |
|
|
tasks.append(gen_segment(article,name,chain)) |
|
|
|
|
|
await asyncio.gather(*tasks) |
|
|
|
|
|
@terminal_print |
|
|
def refresh(): |
|
|
''' |
|
|
this function refresh the application data from the cloud backend |
|
|
''' |
|
|
init_app_data() |
|
|
|
|
|
article = app_data["current_article"] |
|
|
if not article: |
|
|
return "No file or content provided" |
|
|
process_prompts(article) |
|
|
|
|
|
detail_views = create_detail_views(article) |
|
|
overview = create_overview(article) |
|
|
|
|
|
update_article(article=article) |
|
|
|
|
|
return overview, detail_views |
|
|
|
|
|
@terminal_print |
|
|
def create_overview(article): |
|
|
|
|
|
assessment = "overview" |
|
|
|
|
|
md_text = f"## Overview\n\n" |
|
|
overview_components = article["extraction"][assessment] |
|
|
for component in overview_components: |
|
|
md_text += f"#### {assessment} - {component}\n\n" |
|
|
if component in article: |
|
|
md_text += article[component] + "\n\n" |
|
|
else: |
|
|
md_text += "No content found\n\n" |
|
|
|
|
|
return gr.update(value=md_text) |
|
|
|
|
|
@terminal_print |
|
|
def create_detail_views(article): |
|
|
md_text = "## Performance\n\n" |
|
|
assessments = ["clinical","radiologic","safety","other"] |
|
|
|
|
|
|
|
|
for a in assessments: |
|
|
if a in article["extraction"]: |
|
|
md_text += f"### {a.capitalize()}\n\n" |
|
|
performance_components = article["extraction"][a] |
|
|
for component in performance_components: |
|
|
md_text += f"#### {a} - {component}\n\n" |
|
|
if component in article: |
|
|
md_text += article[component] + "\n\n" |
|
|
else: |
|
|
md_text += "No content found\n\n" |
|
|
|
|
|
return gr.update(value=md_text) |
|
|
|
|
|
@terminal_print |
|
|
def get_key_content(text:str,start,end:str,case_sensitive:bool=False): |
|
|
''' |
|
|
this function extract the content between start and end |
|
|
and return the content in between. If no start or end is |
|
|
found, the function will return the empty string. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
text : str |
|
|
text of the article |
|
|
start : list |
|
|
list of start substrings |
|
|
end : list |
|
|
list of end substrings |
|
|
|
|
|
Returns |
|
|
------- |
|
|
str |
|
|
content between start and end |
|
|
''' |
|
|
|
|
|
text = text.lower() |
|
|
end = end.lower() |
|
|
|
|
|
if type(start) is str: |
|
|
start = start.lower() |
|
|
start_index = text.find(start) |
|
|
else: |
|
|
start_index = start |
|
|
|
|
|
end_index = text.find(end) |
|
|
|
|
|
|
|
|
if start_index == -1: |
|
|
start_index = 0 |
|
|
|
|
|
|
|
|
|
|
|
if end_index == -1: |
|
|
end_index = 0 |
|
|
return text[start_index:],text[start_index:] |
|
|
|
|
|
|
|
|
return text[start_index:end_index],text[end_index:] |
|
|
|
|
|
@terminal_print |
|
|
def get_articles(update_local=True): |
|
|
''' |
|
|
this function return the list of articles |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
update_local : bool, optional |
|
|
update the local memory, by default True |
|
|
|
|
|
Returns |
|
|
------- |
|
|
list |
|
|
list of articles |
|
|
''' |
|
|
articles = get_table("articles") |
|
|
if update_local: |
|
|
app_data["articles"] = list_dict_to_dict(articles) |
|
|
|
|
|
return articles |
|
|
|
|
|
@terminal_print |
|
|
def get_article(domain,name): |
|
|
''' |
|
|
this function return the article object |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
domain : str |
|
|
subject domain of the article |
|
|
name : str |
|
|
name of the article |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
article object |
|
|
''' |
|
|
article = get_item("articles",{"domain":domain,"name":name}) |
|
|
|
|
|
return article |
|
|
|
|
|
@terminal_print |
|
|
def add_article(domain,file,add_to_s3=True, add_to_local=True, file_object=True): |
|
|
''' |
|
|
this function receive the domain name and file obj |
|
|
and add the article to the cloud, s3 and local memory |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
domain : str |
|
|
subject domain of the article |
|
|
file_obj : file object |
|
|
file object of the article |
|
|
add_to_s3 : bool, optional |
|
|
add article to s3 bucket, by default True |
|
|
add_to_local : bool, optional |
|
|
add article to local memory, by default True |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
article object |
|
|
''' |
|
|
if type(file) is str: |
|
|
content = file |
|
|
filename = file |
|
|
upload_file(file,default_s3_bucket,filename) |
|
|
else: |
|
|
|
|
|
content, _ = read_pdf(file) |
|
|
if "\\" in file.name: |
|
|
filename = file.name.split("\\")[-1] |
|
|
elif "/" in file.name: |
|
|
filename = file.name.split("/")[-1] |
|
|
else: |
|
|
filename = file.name |
|
|
|
|
|
|
|
|
pdf_obj = open(file.name, 'rb') |
|
|
upload_fileobj(pdf_obj,default_s3_bucket,filename) |
|
|
pdf_obj.close() |
|
|
|
|
|
article ={ |
|
|
"domain":domain, |
|
|
"name":filename, |
|
|
"raw":content, |
|
|
"upload_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
|
} |
|
|
|
|
|
if add_to_local: |
|
|
app_data["articles"][article["name"]]=article |
|
|
|
|
|
res = post_item("articles",article) |
|
|
if "Error" in res: |
|
|
print(res["Error"]) |
|
|
return res |
|
|
|
|
|
return article |
|
|
|
|
|
@terminal_print |
|
|
def remove_article(domain,name,remove_from_s3=True, remove_from_local=True): |
|
|
''' |
|
|
this function remove the article from the cloud, s3 and local memory |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
domain : str |
|
|
subject domain of the article |
|
|
name : str |
|
|
name of the article |
|
|
remove_from_s3 : bool, optional |
|
|
remove article from s3 bucket, by default True |
|
|
remove_from_local : bool, optional |
|
|
remove article from local memory, by default True |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
article object |
|
|
''' |
|
|
delete_item("articles",{"domain":domain,"name":name}) |
|
|
if remove_from_s3: |
|
|
delete_file(domain,name) |
|
|
if remove_from_local: |
|
|
del app_data["articles"][name] |
|
|
pass |
|
|
delete_item("articles",{"domain":domain,"name":name}) |
|
|
|
|
|
return True |
|
|
|
|
|
@terminal_print |
|
|
def update_article(article,file_obj=None,update_local=True): |
|
|
''' |
|
|
this function receive the article object and update the article |
|
|
to the cloud, s3 and local memory |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
article : dict |
|
|
article object |
|
|
file_obj : file object, optional |
|
|
file object of the article, by default None |
|
|
update_local : bool, optional |
|
|
update article to local memory, by default True |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
article object |
|
|
''' |
|
|
if file_obj: |
|
|
upload_fileobj(file_obj,article["domain"],article["name"]) |
|
|
|
|
|
if update_local: |
|
|
app_data["articles"][article["name"]] = article |
|
|
|
|
|
post_item("articles",article) |
|
|
|
|
|
return article |
|
|
|
|
|
@terminal_print |
|
|
def identify_logic(text,logic_keywords=logic_keywords,case_sensitive=False): |
|
|
''' |
|
|
identify_logic function receive the text and return the logic of the article |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
text : str |
|
|
text of the article |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
the type of prompt to be used for the article (groups, preoperative, both or none) |
|
|
''' |
|
|
if not case_sensitive: |
|
|
text = text.lower() |
|
|
|
|
|
prompt_logic={ |
|
|
(True,True):"prompt_p_g", |
|
|
(True,False):"prompt_np_g", |
|
|
(False,True):"prompt_p_ng", |
|
|
(False,False):"prompt_np_ng", |
|
|
} |
|
|
|
|
|
article_observation = ( |
|
|
sum([text.count(kw) for kw in logic_keywords["groups"]])>3, |
|
|
sum([text.count(kw) for kw in logic_keywords["preoperatives"]])>=3 |
|
|
) |
|
|
|
|
|
return {"logic":prompt_logic[article_observation]} |
|
|
|
|
|
|
|
|
@terminal_print |
|
|
def select_overview_prompts(article): |
|
|
valid_prompts = set() |
|
|
for t in app_data["terms"]: |
|
|
|
|
|
if validate_term(article,t,"overview"): |
|
|
|
|
|
valid_prompts.update(t["prompts_list"]) |
|
|
sorted_prompts = sorted(valid_prompts,key=lambda x:app_data["prompts"][x]["section_sequence"]) |
|
|
article["extraction"]["overview"] = sorted_prompts |
|
|
|
|
|
return {p:app_data["prompts"][p] for p in valid_prompts} |
|
|
|
|
|
@terminal_print |
|
|
def select_performance_prompts(article,performance_assessment): |
|
|
valid_terms = [] |
|
|
search_text = article["key_content"]+article["Authors"]+article["Acceptance Month"]+article["Acceptance Year"]+"\n".join(article["tables"]) |
|
|
search_text = search_text.lower() |
|
|
|
|
|
for t in app_data["terms"]: |
|
|
|
|
|
if validate_term(article,t,performance_assessment): |
|
|
|
|
|
valid_terms.append(t) |
|
|
|
|
|
valid_prompts = {} |
|
|
for t in valid_terms: |
|
|
if any([p not in valid_prompts for p in t["prompts_list"]]): |
|
|
for p in t["prompts_list"]: |
|
|
prompt = app_data["prompts"][p] |
|
|
valid_prompts[p] = prompt |
|
|
if "term" not in valid_prompts[p]: |
|
|
valid_prompts[p]["term"] = {t["term"]:t} |
|
|
else: |
|
|
valid_prompts[p]["term"].update({t["term"]:t}) |
|
|
if performance_assessment not in article["extraction"]: |
|
|
article["extraction"][performance_assessment] = set() |
|
|
article["extraction"][performance_assessment].add(prompt["prompt_name"]) |
|
|
|
|
|
return valid_prompts |
|
|
|
|
|
@terminal_print |
|
|
def process_prompts(article): |
|
|
''' |
|
|
process_prompts function receive the article identify the prompts to be used, |
|
|
and traverse through the prompts and article to extract the content from the article |
|
|
The prompts were selected based on the terms and the article attributes |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
article : dict |
|
|
article object |
|
|
terms : list |
|
|
list of terms |
|
|
prompts : list |
|
|
list of prompts |
|
|
|
|
|
Returns |
|
|
------- |
|
|
list |
|
|
list of prompts selected for use on the article |
|
|
''' |
|
|
article["extraction"] = {} |
|
|
|
|
|
overview_prompts = select_overview_prompts(article) |
|
|
performance_assessments = ["clinical","radiologic","safety","other"] |
|
|
|
|
|
performance_prompts = {} |
|
|
for assessment in performance_assessments: |
|
|
performance_prompts[assessment] = select_performance_prompts(article,assessment) |
|
|
|
|
|
overview = asyncio.new_event_loop() |
|
|
overview.run_until_complete(execute_concurrent(article,overview_prompts)) |
|
|
overview.close() |
|
|
for assessment in performance_assessments: |
|
|
performance = asyncio.new_event_loop() |
|
|
performance.run_until_complete(execute_concurrent(article,performance_prompts[assessment])) |
|
|
performance.close() |
|
|
|
|
|
|
|
|
def validate_term(article,term,assessment_step): |
|
|
|
|
|
if term["region"] != "all" and term["region"] != article["domain"].lower(): |
|
|
return False |
|
|
|
|
|
if assessment_step == "overview" and term["assessment_step"] == "overview": |
|
|
return True |
|
|
|
|
|
|
|
|
if term["assessment_step"] == assessment_step: |
|
|
|
|
|
key_text = (article["key_content"]+article["Authors"]+article["Acceptance Month"]+article["Acceptance Year"]+"\n".join(article["tables"])) |
|
|
key_text = key_text.replace("/n"," ") |
|
|
key_text = key_text.lower() |
|
|
keywords = [kw.strip().lower() for kw in term["term"].split(",")] |
|
|
|
|
|
return all([kw in key_text for kw in keywords]) |
|
|
|
|
|
return False |
|
|
|
|
|
@terminal_print |
|
|
def keyword_search(keywords,full_text): |
|
|
keywords_result = {} |
|
|
for k in keywords: |
|
|
if type(k) is tuple or type(k) is list or type(k) is set: |
|
|
keywords_result[k]=any([keyword_search(kw,full_text) for kw in k]) |
|
|
else: |
|
|
keywords_result[k]=k in full_text |
|
|
return keywords_result |
|
|
|
|
|
@terminal_print |
|
|
def execute_prompts(article,prompt): |
|
|
|
|
|
for i in prompt["input_list"]: |
|
|
if i.strip() not in article: |
|
|
execute_prompts(article,app_data["prompts"][i.strip()]) |
|
|
|
|
|
|
|
|
run_executor(article,prompt) |
|
|
|
|
|
@terminal_print |
|
|
def run_gpt(article,prompt): |
|
|
|
|
|
instructions = [ |
|
|
prompt[article["logic"]], |
|
|
prompt["reformat_inst"] |
|
|
] |
|
|
text_in = "\n".join([article[i.strip()] for i in prompt["input_list"]]) |
|
|
inst_stream = create_inst(text_in,instructions) |
|
|
print(prompt["prompt_name"]) |
|
|
|
|
|
|
|
|
res = send_inst(inst_stream) |
|
|
|
|
|
|
|
|
article[prompt["prompt_name"]] = res |
|
|
|
|
|
|
|
|
@terminal_print |
|
|
def f_replacement_term(article,prompt): |
|
|
input_text = article[prompt["input_list"][0]] |
|
|
|
|
|
for t in app_data["summary"]: |
|
|
result = input_text.replace(t["term"],t["term_replacement"]) |
|
|
article[prompt["prompt_name"]] = result |
|
|
|
|
|
@terminal_print |
|
|
def f_summary_term(article,prompt): |
|
|
input_text = article[prompt["input_list"][0]] |
|
|
|
|
|
for t in app_data["summary"]: |
|
|
result = input_text.replace(t["term"],t["term_summary"]) |
|
|
article[prompt["prompt_name"]] = result |
|
|
|
|
|
@terminal_print |
|
|
def run_executor(article,prompt): |
|
|
''' |
|
|
run_executor function receive the text and prompts and select the executor for the text input |
|
|
''' |
|
|
match prompt["executed by"]: |
|
|
case "gpt-3.5-turbo-16k": |
|
|
run_gpt(article,prompt) |
|
|
case "f_replacement_term": |
|
|
f_replacement_term(article,prompt) |
|
|
case "f_summary_term": |
|
|
f_summary_term(article,prompt) |
|
|
|
|
|
@retry_decorator |
|
|
@terminal_print |
|
|
def post_process(article): |
|
|
post_inputs = {} |
|
|
for assessment,segements in article["extraction"].items(): |
|
|
if assessment == "overview": |
|
|
continue |
|
|
post_inputs[assessment] = "\n".join([article[s] for s in segements]) |
|
|
|
|
|
template = ChatPromptTemplate.from_messages([ |
|
|
("human","{text}"), |
|
|
("system","From the text above {instruction}"), |
|
|
]) |
|
|
|
|
|
llm = ChatOpenAI( |
|
|
temperature=0.0, |
|
|
model_name="gpt-3.5-turbo-16k", |
|
|
openai_api_key=openai.api_key) |
|
|
|
|
|
chain = template | llm |
|
|
|
|
|
for assessment,post_input in post_inputs.items(): |
|
|
instruction_agg = app_data["prompts_agg"][assessment] |
|
|
article[instruction_agg["name"]] = chain.invoke({"text":post_input,"instruction":instruction_agg["chain"][0]}).content |
|
|
article["extraction"][assessment].add(instruction_agg["name"]) |
|
|
|
|
|
|
|
|
def add_inst(instructions,prompt): |
|
|
return instructions + prompt |