|
|
import os |
|
|
import asyncio |
|
|
import aiohttp |
|
|
import requests |
|
|
import pandas as pd |
|
|
|
|
|
from minio import Minio |
|
|
from loguru import logger |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
from entities.task import PubMedTask |
|
|
from utils.api_utils import ( |
|
|
retry_operation, |
|
|
get_chat_func, |
|
|
compare_chat_chocies |
|
|
) |
|
|
from utils.r2_utils import ( |
|
|
get_client, |
|
|
get_file_from_minio, |
|
|
get_dataframe_from_minio, |
|
|
upload_text_to_minio, |
|
|
upload_dataframe_to_minio, |
|
|
upload_task_json_to_minio, |
|
|
) |
|
|
from utils.common_utils import escape_csv_field |
|
|
from utils.paper_utils import ( |
|
|
process_papers, |
|
|
generate_subheadings, |
|
|
assign_subheadings_to_summaries, |
|
|
create_paragraphs_by_subheading, |
|
|
enhance_language_readability, |
|
|
translate_to_chinese_before_references |
|
|
) |
|
|
|
|
|
|
|
|
BUCKET_NAME = "ai-scientist" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def pubmed_pipeline( |
|
|
task: PubMedTask, |
|
|
client: Minio = None, |
|
|
max_retries: int = 5, |
|
|
delay: float = 0.5 |
|
|
): |
|
|
""" |
|
|
Pubmed pipeline |
|
|
|
|
|
Args: |
|
|
task: PubMedTask object, containig basic information for PubMedTask |
|
|
client: Minio, minio client |
|
|
max_retries: int, max retries for each step |
|
|
delay: float, delay between each retry |
|
|
|
|
|
Returns: |
|
|
None |
|
|
|
|
|
""" |
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
customer_name = task.customer_name |
|
|
uuid = task.uuid |
|
|
model_names = task.model_names |
|
|
|
|
|
task.status_string["overall"] = "processing" |
|
|
|
|
|
await asyncio.gather( |
|
|
*(process_pubmed_single_chat( |
|
|
task, model_name, client, max_retries, delay |
|
|
) for model_name in model_names) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
logger.info("Check Compare...") |
|
|
if task.do_compare and len(task.model_names) >= 3: |
|
|
if task.status.get("compare", 0) == 0: |
|
|
contents = await asyncio.gather( |
|
|
*(get_file_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/{model_name}/review_paper.txt", |
|
|
client=client |
|
|
) for model_name in model_names) |
|
|
) |
|
|
contents = [c.decode("utf-8") for c in contents] |
|
|
task.status_string["overall"] = "Start Compare" |
|
|
|
|
|
rank_scores = await compare_chat_chocies( |
|
|
contents=contents, |
|
|
model_names=model_names |
|
|
) |
|
|
best_content = contents[min(rank_scores, key=rank_scores.get)] |
|
|
await upload_text_to_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/compared_reveiw_paper.txt", |
|
|
file_content=best_content |
|
|
) |
|
|
task.status_string["overall"] = "Finished" |
|
|
task.status["compare"] = 1 |
|
|
await upload_task_json_to_minio(task, client) |
|
|
else: |
|
|
task.status_string["overall"] = "Finished" |
|
|
await upload_task_json_to_minio(task, client) |
|
|
else: |
|
|
logger.info("No Compare.") |
|
|
task.status_string["overall"] = "Finished" |
|
|
await upload_task_json_to_minio(task, client) |
|
|
|
|
|
|
|
|
async def process_pubmed_single_chat( |
|
|
task: PubMedTask, |
|
|
model_name: str, |
|
|
client: Minio = None, |
|
|
max_retries: int = 5, |
|
|
delay: float = 0.5 |
|
|
): |
|
|
""" |
|
|
Process PubMed Task |
|
|
|
|
|
Args: |
|
|
task: PubMedTask object, containig basic information for PubMedTask |
|
|
model_name: str, model name, refer to the model used at this step |
|
|
client: Minio, minio client |
|
|
max_retries: int, max retries for each step |
|
|
delay: float, delay between each retry |
|
|
|
|
|
Returns: |
|
|
None |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
if model_name not in task.status.keys(): |
|
|
task.status[model_name] = 0 |
|
|
|
|
|
|
|
|
task.status_string["overall"] = "processing" |
|
|
|
|
|
process_steps = { |
|
|
0: process_pubmed_generate_pubmed_string, |
|
|
1: process_pubmed_fetch_data, |
|
|
2: process_pubmed_process_papers, |
|
|
3: process_pubmed_generate_subheadings, |
|
|
4: process_pubmed_assign_subheadings_to_summaries, |
|
|
5: process_pubmed_create_paragraphs_by_subheading, |
|
|
6: process_pubmed_enhance_language_readability, |
|
|
7: process_pubmed_translate |
|
|
} |
|
|
|
|
|
state_description = { |
|
|
0: "Finished pubmed string generation.", |
|
|
1: "Finished fetching data.", |
|
|
2: "Finished paper processing.", |
|
|
3: "Finished subheading generation.", |
|
|
4: "Finished subheading assignment.", |
|
|
5: "Finished paragraph generation.", |
|
|
6: "Finished review language readability enhancement.", |
|
|
7: "Finished review translation." |
|
|
} |
|
|
|
|
|
|
|
|
current_state = task.status[model_name] |
|
|
for state in range(current_state, len(process_steps.keys())): |
|
|
await process_steps[state]( |
|
|
task=task, |
|
|
model_name=model_name, |
|
|
save_name=model_name, |
|
|
prev_name=model_name, |
|
|
client=client, |
|
|
max_retries=max_retries, delay=delay |
|
|
) |
|
|
task.status_string[model_name] = state_description[state] |
|
|
task.status[model_name] = state + 1 |
|
|
await upload_task_json_to_minio(task, client) |
|
|
|
|
|
task.status_string[model_name] = "Finished." |
|
|
await upload_task_json_to_minio(task, client) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def process_pubmed_generate_pubmed_string( |
|
|
task: PubMedTask, |
|
|
model_name: str, |
|
|
save_name: str, |
|
|
prev_name: str = None, |
|
|
client: Minio = None, |
|
|
max_retries: int = 5, |
|
|
delay: float = 0.5 |
|
|
): |
|
|
""" |
|
|
Generate pubmed search string step |
|
|
|
|
|
Args: |
|
|
task: PubMedTask object, containig basic information for PubMedTask |
|
|
prev_model_name: str, previous model name, refer to previous step result |
|
|
model_name: str, next model name, refer to the model used at this step |
|
|
save_name: str, save name for minio path |
|
|
client: Minio, minio client |
|
|
max_retries: int, max retries for each step |
|
|
delay: float, delay between each retry |
|
|
|
|
|
Returns: |
|
|
path to save results |
|
|
|
|
|
""" |
|
|
|
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
if prev_name is not None: |
|
|
logger.warning("For first step, prev_model_name is not used.") |
|
|
|
|
|
query = task.query |
|
|
customer_name = task.customer_name |
|
|
uuid = task.uuid |
|
|
|
|
|
chat_func = get_chat_func(model_names=[model_name])[0] |
|
|
|
|
|
pubmed_search_string, exceptions = await retry_operation( |
|
|
generate_pubmed_search_string, task, |
|
|
query=query, |
|
|
max_retries=max_retries, delay=delay, |
|
|
chat_func=chat_func |
|
|
) |
|
|
if pubmed_search_string is None: |
|
|
|
|
|
task.status_string[model_name] = exceptions |
|
|
await upload_task_json_to_minio(task, client) |
|
|
raise RuntimeError("Pubmed Search String Generation Failed.") |
|
|
logger.info(f"search string: {pubmed_search_string}") |
|
|
await upload_text_to_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/{save_name}/pubmed_search_string.txt", |
|
|
file_content=pubmed_search_string |
|
|
) |
|
|
|
|
|
|
|
|
async def process_pubmed_fetch_data( |
|
|
task: PubMedTask, |
|
|
model_name: str, |
|
|
save_name: str, |
|
|
prev_name: str = None, |
|
|
client: Minio = None, |
|
|
max_retries: int = 5, |
|
|
delay: float = 0.5 |
|
|
): |
|
|
""" |
|
|
Process PubMed Fetch Data |
|
|
|
|
|
Args: |
|
|
task: PubMedTask object, containig basic information for PubMedTask |
|
|
prev_model_name: str, previous model name, refer to previous step result |
|
|
model_name: str, next model name, refer to the model used at this step |
|
|
save_name: str, save name for minio path |
|
|
client: Minio, minio client |
|
|
|
|
|
Returns: |
|
|
path to save results |
|
|
|
|
|
""" |
|
|
|
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
customer_name = task.customer_name |
|
|
uuid = task.uuid |
|
|
start_year = task.start_year |
|
|
end_year = task.end_year |
|
|
size = task.size |
|
|
|
|
|
pubmed_search_string = await get_file_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/{prev_name}/pubmed_search_string.txt", |
|
|
client=client |
|
|
) |
|
|
pubmed_search_string = pubmed_search_string.decode("utf-8") |
|
|
results, exceptions = await retry_operation( |
|
|
process_pubmed_data, task, |
|
|
query=pubmed_search_string, |
|
|
model_name=save_name, |
|
|
start_year=start_year, end_year=end_year, |
|
|
size=size, |
|
|
uuid=uuid, customer_name=customer_name, |
|
|
max_retries=max_retries, delay=delay |
|
|
) |
|
|
if results is None: |
|
|
|
|
|
task.status_string[model_name] = exceptions |
|
|
await upload_task_json_to_minio(task, client) |
|
|
raise ConnectionError("Pubmed Data Fetch Failed.") |
|
|
|
|
|
|
|
|
async def process_pubmed_process_papers( |
|
|
task: PubMedTask, |
|
|
model_name: str, |
|
|
save_name: str, |
|
|
prev_name: str = None, |
|
|
client: Minio = None, |
|
|
max_retries: int = 5, |
|
|
delay: float = 0.5 |
|
|
): |
|
|
""" |
|
|
Process PubMed Process Papers |
|
|
|
|
|
Args: |
|
|
task: PubMedTask object, containig basic information for PubMedTask |
|
|
prev_model_name: str, previous model name, refer to previous step result |
|
|
model_name: str, next model name, refer to the model used at this step |
|
|
save_name: str, save name for minio path |
|
|
client: Minio, minio client |
|
|
|
|
|
Returns: |
|
|
path to save results |
|
|
|
|
|
""" |
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
query = task.query |
|
|
direction = task.direction |
|
|
customer_name = task.customer_name |
|
|
uuid = task.uuid |
|
|
|
|
|
chat_func = get_chat_func(model_names=[model_name])[0] |
|
|
|
|
|
non_review_pubmed_df = await get_dataframe_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/{prev_name}/pubmed_results_non_reviews.csv", |
|
|
client=client |
|
|
) |
|
|
results, exceptions = await retry_operation( |
|
|
process_papers, task, |
|
|
dataframe=non_review_pubmed_df, |
|
|
topic=query, direction=direction, |
|
|
uuid=uuid, customer_name=customer_name, model_name=save_name, |
|
|
max_retries=max_retries, delay=delay, |
|
|
chat_func=chat_func |
|
|
) |
|
|
if results is None: |
|
|
|
|
|
task.status_string[model_name] = exceptions |
|
|
await upload_task_json_to_minio(task, client) |
|
|
raise RuntimeError("Pubmed Paper Processing Failed.") |
|
|
|
|
|
|
|
|
async def process_pubmed_generate_subheadings( |
|
|
task: PubMedTask, |
|
|
model_name: str, |
|
|
save_name: str, |
|
|
prev_name: str = None, |
|
|
client: Minio = None, |
|
|
max_retries: int = 5, |
|
|
delay: float = 0.5 |
|
|
): |
|
|
""" |
|
|
Process PubMed Generate Subheadings |
|
|
Args: |
|
|
task: PubMedTask object, containig basic information for PubMedTask |
|
|
prev_model_name: str, previous model name, refer to previous step result |
|
|
model_name: str, next model name, refer to the model used at this step |
|
|
save_name: str, save name for minio path |
|
|
|
|
|
Returns: |
|
|
path to save results |
|
|
""" |
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
query = task.query |
|
|
customer_name = task.customer_name |
|
|
uuid = task.uuid |
|
|
|
|
|
chat_func = get_chat_func([model_name])[0] |
|
|
|
|
|
relevant_papers_df = await get_dataframe_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/{prev_name}/relevant_papers.csv", |
|
|
client=client |
|
|
) |
|
|
|
|
|
results, exceptions = await retry_operation( |
|
|
generate_subheadings, task, |
|
|
relevant_papers_df=relevant_papers_df, |
|
|
main_topic=query, |
|
|
uuid=uuid, customer_name=customer_name, model_name=save_name, |
|
|
chat_func=chat_func, |
|
|
max_retries=max_retries, delay=delay |
|
|
) |
|
|
if results is None: |
|
|
|
|
|
task.status_string[model_name] = exceptions |
|
|
await upload_task_json_to_minio(task, client) |
|
|
raise RuntimeError("Pubmed Generate Subheadings Failed.") |
|
|
|
|
|
|
|
|
async def process_pubmed_assign_subheadings_to_summaries( |
|
|
task: PubMedTask, |
|
|
model_name: str, |
|
|
save_name: str, |
|
|
prev_name: str = None, |
|
|
client: Minio = None, |
|
|
max_retries: int = 5, |
|
|
delay: float = 0.5 |
|
|
): |
|
|
""" |
|
|
Process PubMed Assign Subheadings to Summaries |
|
|
Args: |
|
|
task: PubMedTask object, containig basic information for PubMedTask |
|
|
prev_model_name: str, previous model name, refer to previous step result |
|
|
model_name: str, next model name, refer to the model used at this step |
|
|
save_name: str, save name for minio path |
|
|
|
|
|
Returns: |
|
|
path to save results |
|
|
""" |
|
|
|
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
customer_name = task.customer_name |
|
|
uuid = task.uuid |
|
|
|
|
|
chat_func = get_chat_func([model_name])[0] |
|
|
|
|
|
subheadings = await get_file_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/{prev_name}/generated_subheadings.txt", |
|
|
client=client |
|
|
) |
|
|
subheadings = subheadings.decode("utf-8").split("\n") |
|
|
|
|
|
relevant_papers_df = await get_dataframe_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/{prev_name}/relevant_papers.csv", |
|
|
client=client |
|
|
) |
|
|
|
|
|
results, exceptions = await retry_operation( |
|
|
assign_subheadings_to_summaries, task, |
|
|
subheadings=subheadings, |
|
|
relevant_papers_df=relevant_papers_df, |
|
|
uuid=uuid, customer_name=customer_name, model_name=save_name, |
|
|
chat_func=chat_func, |
|
|
max_retries=max_retries, delay=delay |
|
|
) |
|
|
if results is None: |
|
|
|
|
|
task.status_string[model_name] = exceptions |
|
|
await upload_task_json_to_minio(task, client) |
|
|
raise RuntimeError("Pubmed Assign Subheadings Failed.") |
|
|
|
|
|
|
|
|
async def process_pubmed_create_paragraphs_by_subheading( |
|
|
task: PubMedTask, |
|
|
model_name: str, |
|
|
save_name: str, |
|
|
prev_name: str = None, |
|
|
client: Minio = None, |
|
|
max_retries: int = 5, |
|
|
delay: float = 0.5 |
|
|
): |
|
|
""" |
|
|
Process PubMed Create Paragraphs by Subheading |
|
|
Args: |
|
|
task: PubMedTask object, containig basic information for PubMedTask |
|
|
prev_model_name: str, previous model name, refer to previous step result |
|
|
model_name: str, next model name, refer to the model used at this step |
|
|
save_name: str, save name for minio path |
|
|
client: Minio, minio client |
|
|
max_retries: int, max retries for the operation |
|
|
delay: float, delay between retries |
|
|
|
|
|
Returns: |
|
|
path to save results |
|
|
""" |
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
query = task.query |
|
|
customer_name = task.customer_name |
|
|
uuid = task.uuid |
|
|
|
|
|
chat_func = get_chat_func([model_name])[0] |
|
|
|
|
|
subheadings = await get_file_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/{prev_name}/generated_subheadings.txt", |
|
|
client=client |
|
|
) |
|
|
subheadings = subheadings.decode("utf-8").split("\n") |
|
|
|
|
|
relevant_papers_df = await get_dataframe_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/{prev_name}/assigned_subheadings.csv", |
|
|
client=client |
|
|
) |
|
|
|
|
|
results, exceptions = await retry_operation( |
|
|
create_paragraphs_by_subheading, task, |
|
|
subheadings=subheadings, main_topic=query, |
|
|
relevant_papers_df=relevant_papers_df, |
|
|
uuid=uuid, customer_name=customer_name, model_name=save_name, |
|
|
chat_func=chat_func, |
|
|
max_retries=max_retries, delay=delay |
|
|
) |
|
|
if results is None: |
|
|
|
|
|
task.status_string[model_name] = exceptions |
|
|
await upload_task_json_to_minio(task, client) |
|
|
raise RuntimeError("Pubmed Create Paragraphs Failed.") |
|
|
|
|
|
|
|
|
async def process_pubmed_enhance_language_readability( |
|
|
task: PubMedTask, |
|
|
model_name: str, |
|
|
save_name: str, |
|
|
prev_name: str = None, |
|
|
client: Minio = None, |
|
|
max_retries: int = 5, |
|
|
delay: float = 0.5 |
|
|
): |
|
|
""" |
|
|
Process PubMed Enhance Language Readability |
|
|
Args: |
|
|
task: PubMedTask object, containig basic information for PubMedTask |
|
|
prev_model_name: str, previous model name, refer to previous step result |
|
|
model_name: str, next model name, refer to the model used at this step |
|
|
save_name: str, save name for minio path |
|
|
client: Minio, minio client |
|
|
max_retries: int, max retries for the operation |
|
|
delay: float, delay between retries |
|
|
|
|
|
Returns: |
|
|
path to save results |
|
|
""" |
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
customer_name = task.customer_name |
|
|
uuid = task.uuid |
|
|
|
|
|
chat_func = get_chat_func([model_name])[0] |
|
|
|
|
|
review_content = await get_file_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/{prev_name}/review_non_refined.txt", |
|
|
client=client |
|
|
) |
|
|
review_content = review_content.decode("utf-8") |
|
|
|
|
|
results, exceptions = await retry_operation( |
|
|
enhance_language_readability, task, |
|
|
content=review_content, |
|
|
uuid=uuid, customer_name=customer_name, model_name=save_name, |
|
|
chat_func=chat_func, |
|
|
max_retries=max_retries, delay=delay |
|
|
) |
|
|
if results is None: |
|
|
|
|
|
task.status_string[model_name] = exceptions |
|
|
await upload_task_json_to_minio(task, client) |
|
|
raise RuntimeError("Pubmed Enhance Language Readability Failed.") |
|
|
|
|
|
|
|
|
async def process_pubmed_translate( |
|
|
task: PubMedTask, |
|
|
model_name: str, |
|
|
save_name: str, |
|
|
prev_name: str = None, |
|
|
client: Minio = None, |
|
|
max_retries: int = 5, |
|
|
delay: float = 0.5 |
|
|
): |
|
|
""" |
|
|
Process PubMed Translate |
|
|
Args: |
|
|
task: PubMedTask object, containig basic information for PubMedTask |
|
|
prev_model_name: str, previous model name, refer to previous step result |
|
|
model_name: str, next model name, refer to the model used at this step |
|
|
save_name: str, save name for minio path |
|
|
client: Minio, minio client |
|
|
max_retries: int, max retries for the operation |
|
|
delay: float, delay between retries |
|
|
|
|
|
Returns: |
|
|
path to save results |
|
|
""" |
|
|
|
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
customer_name = task.customer_name |
|
|
uuid = task.uuid |
|
|
|
|
|
chat_func = get_chat_func([model_name])[0] |
|
|
|
|
|
review_content = await get_file_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/{prev_name}/review_paper.txt", |
|
|
client=client |
|
|
) |
|
|
review_content = review_content.decode("utf-8") |
|
|
|
|
|
results, exceptions = await retry_operation( |
|
|
translate_to_chinese_before_references, task, |
|
|
text=review_content, |
|
|
uuid=uuid, customer_name=customer_name, model_name=save_name, |
|
|
chat_func=chat_func, |
|
|
max_retries=max_retries, delay=delay |
|
|
) |
|
|
if results is None: |
|
|
|
|
|
task.status_string[model_name] = exceptions |
|
|
await upload_task_json_to_minio(task, client) |
|
|
raise RuntimeError("Pubmed Translate Failed.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def generate_pubmed_search_string(query: str, chat_func) -> str: |
|
|
|
|
|
prompt = f''' |
|
|
### Objective |
|
|
Your task is to generate a precise PubMed search string based on the input query: "{query}". You should: |
|
|
|
|
|
1. **Extract Critical Keywords**: Identify the main entities and concepts that have independent and specific meanings, avoiding overly general terms commonly found in many articles (e.g., "analysis", "study"). Focus on terms central to the topic. |
|
|
|
|
|
2. **Understand Keyword Relationships**: Analyze the logical relationship between keywords. If two or more keywords are conceptually similar or interchangeable, connect them using the OR operator. If they represent distinct concepts that must co-exist, connect them using the AND operator. |
|
|
|
|
|
3. **Expand Synonyms Thoughtfully**: For each critical keyword, generate at least 6 relevant English synonyms or related terms used in academic research. Ensure they align with the context of the query, including synonyms that may look different but are relevant based on the keyword's definition and hierarchy. |
|
|
|
|
|
4. **Include MeSH Terms**: Find the corresponding MeSH (Medical Subject Headings) terms for each critical keyword if available. |
|
|
|
|
|
5. **Construct the PubMed Search String**: Combine the critical keywords, their synonyms, and MeSH terms using Boolean operators. Ensure correct grouping using parentheses to reflect the logical relationships: |
|
|
- If a group of terms is interchangeable (e.g., synonyms), use OR within parentheses. |
|
|
- Use AND between distinct keyword groups. |
|
|
|
|
|
### Instructions |
|
|
- **Language**: All words must be in English. |
|
|
- **Avoid Stop Words**: Do not include stop words (e.g., 'a', 'an', 'the'). |
|
|
- **Synonym Requirement**: For each critical keyword, generate **at least 6 synonyms** or related terms. |
|
|
- **Logical Operator Selection**: Adjust the Boolean logic based on the relationship between terms to accurately represent (A OR B) AND C patterns. |
|
|
- **Term Length**: Each term should be concise, with phrases containing at most two words. |
|
|
- **Formatting**: |
|
|
- Use Boolean operators (AND, OR) to connect terms and use parentheses where necessary. |
|
|
- Format MeSH terms as: "Term"[MeSH Terms] |
|
|
- Format other terms as: "Term"[All Fields] |
|
|
|
|
|
### Example |
|
|
**Input**: Role of AI in antimicrobial resistance and drug discovery |
|
|
|
|
|
**Process**: |
|
|
1. **Extract Critical Keywords**: |
|
|
- AI |
|
|
- Antimicrobial resistance |
|
|
- Drug discovery |
|
|
|
|
|
2. **Analyze Keyword Relationships**: |
|
|
- AI OR machine learning (similar concepts) |
|
|
- Antimicrobial resistance AND drug discovery (distinct concepts) |
|
|
|
|
|
3. **Expand Synonyms Thoughtfully**: |
|
|
- **AI**: machine learning, artificial intelligence, deep learning, neural networks, computational intelligence, data-driven algorithms |
|
|
- **Antimicrobial resistance**: antibiotic resistance, drug resistance, microbial resistance, bacterial resistance, pathogen resistance, multidrug resistance |
|
|
- **Drug discovery**: drug design, pharmaceutical research, drug development, lead discovery, molecular screening, target identification |
|
|
|
|
|
4. **Include MeSH Terms**: |
|
|
- **AI**: "Artificial Intelligence"[MeSH Terms] |
|
|
- **Antimicrobial resistance**: "Drug Resistance, Microbial"[MeSH Terms] |
|
|
- **Drug discovery**: "Drug Discovery"[MeSH Terms] |
|
|
|
|
|
5. **Construct the PubMed Search String**: |
|
|
|
|
|
'(("Artificial Intelligence"[MeSH Terms] OR "machine learning"[All Fields] OR "deep learning"[All Fields] OR "neural networks"[All Fields] OR "computational intelligence"[All Fields] OR "data-driven algorithms"[All Fields]) AND ("Drug Resistance, Microbial"[MeSH Terms] OR "antibiotic resistance"[All Fields] OR "microbial resistance"[All Fields] OR "bacterial resistance"[All Fields] OR "pathogen resistance"[All Fields] OR "multidrug resistance"[All Fields])) AND ("Drug Discovery"[MeSH Terms] OR "drug design"[All Fields] OR "pharmaceutical research"[All Fields] OR "drug development"[All Fields] OR "lead discovery"[All Fields] OR "molecular screening"[All Fields])' |
|
|
|
|
|
### Now, generate the PubMed search string for the following query: |
|
|
|
|
|
**Query**: {query} |
|
|
|
|
|
Please provide only the final PubMed search string in the specified format. |
|
|
''' |
|
|
|
|
|
|
|
|
result = await chat_func(prompt) |
|
|
|
|
|
|
|
|
pubmed_search_string = result.choices[0].message.content.strip() |
|
|
|
|
|
return pubmed_search_string |
|
|
|
|
|
|
|
|
async def process_pubmed_data( |
|
|
query, |
|
|
model_name, |
|
|
start_year, end_year, size, |
|
|
uuid, customer_name |
|
|
): |
|
|
""" |
|
|
Process PubMed Data |
|
|
|
|
|
Args: |
|
|
query: str, query for PubMed search |
|
|
model_name: str, model name |
|
|
start_year: int, start year for PubMed search |
|
|
end_year: int, end year for PubMed search |
|
|
size: int, number of results per page |
|
|
uuid: str, uuid for the task |
|
|
customer_name: str, customer name for the task |
|
|
client: Minio, minio client |
|
|
|
|
|
Returns: |
|
|
path to save results |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
prefix = f"{customer_name}/{uuid}/{model_name}/" |
|
|
output_folder = prefix |
|
|
|
|
|
|
|
|
combined_txt_filename = os.path.join( |
|
|
output_folder, f'pubmed_page_combined.txt') |
|
|
results_csv_filename = os.path.join(output_folder, f'pubmed_results.csv') |
|
|
results_with_links_csv_filename = os.path.join( |
|
|
output_folder, f'pubmed_results_with_full_text_links.csv') |
|
|
impact_factors_csv_filename = os.path.join( |
|
|
output_folder, f'pubmed_results_with_impact_factors.csv') |
|
|
non_review_csv_filename = os.path.join( |
|
|
output_folder, f'pubmed_results_non_reviews.csv') |
|
|
|
|
|
|
|
|
await save_combined_pubmed_page(query, start_year, end_year, size, output_filename=combined_txt_filename) |
|
|
|
|
|
|
|
|
await process_pubmed_file(combined_txt_filename, results_csv_filename) |
|
|
|
|
|
|
|
|
|
|
|
pubmed_df = await get_dataframe_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=results_csv_filename |
|
|
) |
|
|
pubmed_df["Full_Text_Links"] = pubmed_df["PMID"].apply(get_full_text_links) |
|
|
await upload_dataframe_to_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=results_with_links_csv_filename, |
|
|
df=pubmed_df |
|
|
) |
|
|
|
|
|
|
|
|
impact_factors_df = await get_dataframe_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name='2023-JCR.xlsx' |
|
|
) |
|
|
|
|
|
|
|
|
pubmed_df['JT'] = pubmed_df['JT'].str.lower() |
|
|
impact_factors_df['JT'] = impact_factors_df['JT'].str.lower() |
|
|
|
|
|
|
|
|
merged_df = pd.merge(pubmed_df, impact_factors_df, on='JT', how='left') |
|
|
|
|
|
|
|
|
await upload_dataframe_to_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=impact_factors_csv_filename, |
|
|
df=merged_df |
|
|
) |
|
|
logger.info(f"Merged data saved to {impact_factors_csv_filename}") |
|
|
|
|
|
|
|
|
pubmed_df = await get_dataframe_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=impact_factors_csv_filename |
|
|
) |
|
|
non_review_pubmed_df = pubmed_df[pubmed_df["Review"] == "No"] |
|
|
await upload_dataframe_to_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=non_review_csv_filename, |
|
|
df=non_review_pubmed_df |
|
|
) |
|
|
|
|
|
logger.info(f"非评论类文章已保存到 {non_review_csv_filename}") |
|
|
|
|
|
return pubmed_df, non_review_pubmed_df |
|
|
|
|
|
|
|
|
async def save_combined_pubmed_page(query, start_year, end_year, size=200, output_filename='pubmed_page_combined.txt'): |
|
|
content1 = await save_pubmed_page(query, start_year, end_year, size) |
|
|
content2 = await save_pubmed_page_date(query, start_year, end_year, size) |
|
|
|
|
|
combined_content = content1 + "\n" + content2 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
await upload_text_to_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=output_filename, |
|
|
file_content=combined_content |
|
|
) |
|
|
|
|
|
logger.info(f"Page content saved to {output_filename}") |
|
|
|
|
|
|
|
|
async def save_pubmed_page(query, start_year, end_year, size=200): |
|
|
base_url = "https://pubmed.ncbi.nlm.nih.gov/" |
|
|
params = { |
|
|
'term': query, |
|
|
'filter': f'years.{start_year}-{end_year}', |
|
|
'format': 'pubmed', |
|
|
'size': size |
|
|
} |
|
|
|
|
|
|
|
|
search_url = f"{base_url}?term={params['term']}&filter={params['filter']}&format={params['format']}&size={params['size']}" |
|
|
logger.info(f"检索网址: {search_url}") |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(base_url, params=params) as response: |
|
|
if response.status != 200: |
|
|
logger.error("Failed to retrieve data from save_pubmed_page") |
|
|
raise ConnectionError( |
|
|
"Failed to retrieve data from save_pubmed_page") |
|
|
return await response.text() |
|
|
|
|
|
|
|
|
async def save_pubmed_page_date(query, start_year, end_year, size=200): |
|
|
base_url = "https://pubmed.ncbi.nlm.nih.gov/" |
|
|
params = { |
|
|
'term': query, |
|
|
'filter': f'years.{start_year}-{end_year}', |
|
|
'format': 'pubmed', |
|
|
'size': size, |
|
|
'sort': 'date' |
|
|
} |
|
|
|
|
|
|
|
|
search_url = f"{base_url}?term={params['term']}&filter={params['filter']}&format={params['format']}&size={params['size']}&sort={params['sort']}" |
|
|
logger.info(f"检索网址: {search_url}") |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(base_url, params=params) as response: |
|
|
if response.status != 200: |
|
|
logger.error( |
|
|
"Failed to retrieve data from save_pubmed_page_date") |
|
|
raise ConnectionError( |
|
|
"Failed to retrieve data from save_pubmed_page_date") |
|
|
return await response.text() |
|
|
|
|
|
|
|
|
async def process_pubmed_file(input_file, output_file): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
content = await get_file_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=input_file |
|
|
) |
|
|
content = content.decode("utf-8") |
|
|
content = content.replace( |
|
|
'<pre class="search-results-chunk">PMID-', '<pre class="search-results-chunk">\nPMID-') |
|
|
|
|
|
|
|
|
lines = content.split('\n') |
|
|
records = [] |
|
|
current_record = {} |
|
|
collecting_abstract = False |
|
|
collecting_title = False |
|
|
collecting_pt = False |
|
|
abstract_lines = [] |
|
|
title_lines = [] |
|
|
pt_lines = [] |
|
|
first_author_recorded = False |
|
|
|
|
|
for line in lines: |
|
|
if line.startswith("PMID- "): |
|
|
if current_record: |
|
|
|
|
|
current_record['AB'] = ' '.join( |
|
|
abstract_lines).replace('\n', ' ') |
|
|
current_record['TI'] = ' '.join(title_lines).replace('\n', ' ') |
|
|
current_record['PT'] = ' '.join(pt_lines).replace('\n', ' ') |
|
|
|
|
|
|
|
|
if 'Review' not in current_record: |
|
|
current_record['Review'] = 'No' |
|
|
|
|
|
|
|
|
if 'FAU-frist' in current_record and 'FAU' in current_record and current_record['FAU']: |
|
|
if current_record['FAU-frist'] != current_record['FAU'][0]: |
|
|
current_record['FAU'].insert( |
|
|
0, current_record['FAU-frist']) |
|
|
|
|
|
if 'JT' not in current_record: |
|
|
|
|
|
current_record['JT'] = '' |
|
|
if 'DCOM' not in current_record: |
|
|
|
|
|
current_record['DCOM'] = '' |
|
|
|
|
|
|
|
|
records.append(current_record) |
|
|
|
|
|
|
|
|
current_record = {'PMID': line.split("PMID- ")[1].strip()} |
|
|
collecting_abstract = False |
|
|
collecting_title = False |
|
|
collecting_pt = False |
|
|
abstract_lines = [] |
|
|
title_lines = [] |
|
|
pt_lines = [] |
|
|
first_author_recorded = False |
|
|
|
|
|
elif line.startswith("FAU - "): |
|
|
|
|
|
author_name = line.split("FAU - ")[1].strip() |
|
|
if 'FAU' not in current_record: |
|
|
current_record['FAU'] = [] |
|
|
current_record['FAU'].append(author_name) |
|
|
|
|
|
|
|
|
if not first_author_recorded: |
|
|
current_record['FAU-frist'] = author_name |
|
|
first_author_recorded = True |
|
|
|
|
|
elif line.startswith("JT - "): |
|
|
current_record['JT'] = line.split("JT - ")[1].strip() |
|
|
|
|
|
elif line.startswith("DCOM- "): |
|
|
current_record['DCOM'] = line.split("DCOM- ")[1].strip() |
|
|
|
|
|
elif line.startswith("TI - "): |
|
|
collecting_title = True |
|
|
title_lines.append(line.split("TI - ")[1].strip()) |
|
|
|
|
|
elif collecting_title: |
|
|
if any(line.startswith(prefix) for prefix in ["LID - ", "AB - ", "FAU - ", "PG - "]): |
|
|
collecting_title = False |
|
|
else: |
|
|
title_lines.append(line.strip()) |
|
|
|
|
|
elif line.startswith("LID - "): |
|
|
lid = line.split("LID - ")[1].strip() |
|
|
if '[doi]' in lid: |
|
|
lid = lid.split(' [doi]')[0] |
|
|
|
|
|
if 'LID' in current_record: |
|
|
current_record['LID'] = lid if len(lid) > len( |
|
|
current_record['LID']) else current_record['LID'] |
|
|
else: |
|
|
current_record['LID'] = lid |
|
|
|
|
|
elif line.startswith("AB - "): |
|
|
collecting_abstract = True |
|
|
abstract_text = line.split("AB - ")[1].strip() |
|
|
abstract_lines.append(abstract_text) |
|
|
|
|
|
if 'review' in abstract_text.lower(): |
|
|
current_record['Review'] = 'Yes' |
|
|
|
|
|
elif collecting_abstract: |
|
|
if any(line.startswith(prefix) for prefix in ["LID - ", "FAU - ", "PG - "]): |
|
|
collecting_abstract = False |
|
|
else: |
|
|
abstract_text = line.strip() |
|
|
abstract_lines.append(abstract_text) |
|
|
|
|
|
if 'review' in abstract_text.lower(): |
|
|
current_record['Review'] = 'Yes' |
|
|
|
|
|
elif line.startswith("PT - "): |
|
|
pt_line = line.split("PT - ")[1].strip() |
|
|
pt_lines.append(pt_line) |
|
|
|
|
|
if 'review' in pt_line.lower(): |
|
|
current_record['Review'] = 'Yes' |
|
|
|
|
|
elif collecting_pt: |
|
|
if any(line.startswith(prefix) for prefix in ["LID - ", "AB - ", "FAU - ", "PG - "]): |
|
|
collecting_pt = False |
|
|
else: |
|
|
pt_text = line.strip() |
|
|
pt_lines.append(pt_text) |
|
|
|
|
|
if 'review' in pt_text.lower(): |
|
|
current_record['Review'] = 'Yes' |
|
|
|
|
|
|
|
|
if current_record: |
|
|
current_record['AB'] = ' '.join(abstract_lines).replace('\n', ' ') |
|
|
current_record['TI'] = ' '.join(title_lines).replace('\n', ' ') |
|
|
current_record['PT'] = ' '.join(pt_lines).replace('\n', ' ') |
|
|
if 'Review' not in current_record: |
|
|
current_record['Review'] = 'No' |
|
|
|
|
|
|
|
|
if 'FAU-frist' in current_record and 'FAU' in current_record and current_record['FAU']: |
|
|
if current_record['FAU-frist'] != current_record['FAU'][0]: |
|
|
current_record['FAU'].insert(0, current_record['FAU-frist']) |
|
|
|
|
|
if 'JT' not in current_record: |
|
|
current_record['JT'] = '' |
|
|
if 'DCOM' not in current_record: |
|
|
current_record['DCOM'] = '' |
|
|
|
|
|
records.append(current_record) |
|
|
|
|
|
|
|
|
unique_records = [] |
|
|
seen_pmids = set() |
|
|
for record in records: |
|
|
if record['PMID'] not in seen_pmids: |
|
|
seen_pmids.add(record['PMID']) |
|
|
unique_records.append(record) |
|
|
|
|
|
|
|
|
|
|
|
text = "" |
|
|
fieldnames = ['JT', 'DCOM', 'PMID', 'TI', 'LID', |
|
|
'AB', 'FAU', 'FAU-frist', 'PT', 'Review'] |
|
|
header = ','.join(fieldnames) + '\n' |
|
|
text += header |
|
|
|
|
|
|
|
|
for record in unique_records: |
|
|
|
|
|
record['FAU'] = '; '.join(record.get('FAU', [])) |
|
|
|
|
|
|
|
|
row = ','.join([escape_csv_field(str(record.get(field, ''))) |
|
|
for field in fieldnames]) + '\n' |
|
|
text += row |
|
|
|
|
|
await upload_text_to_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=output_file, |
|
|
file_content=text |
|
|
) |
|
|
|
|
|
|
|
|
def get_full_text_links(pmid): |
|
|
url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" |
|
|
response = requests.get(url) |
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
|
|
|
links = [link['href'] for link in soup.find_all('a', href=True)] |
|
|
|
|
|
|
|
|
return links[26] if len(links) >= 27 else None |
|
|
|