amps / utils /pubmed_utils.py
jibsn's picture
Update utils/pubmed_utils.py
2543657 verified
import os
import asyncio
import aiohttp
import requests
import pandas as pd
from minio import Minio
from loguru import logger
from bs4 import BeautifulSoup
from entities.task import PubMedTask
from utils.api_utils import (
retry_operation,
get_chat_func,
compare_chat_chocies
)
from utils.r2_utils import (
get_client,
get_file_from_minio,
get_dataframe_from_minio,
upload_text_to_minio,
upload_dataframe_to_minio,
upload_task_json_to_minio,
)
from utils.common_utils import escape_csv_field
from utils.paper_utils import (
process_papers,
generate_subheadings,
assign_subheadings_to_summaries,
create_paragraphs_by_subheading,
enhance_language_readability,
translate_to_chinese_before_references
)
BUCKET_NAME = "ai-scientist"
# =================================
# Function Groups: Pipeline for PubMed
#
# 1. pipeline
# 2. single model chat
# =================================
async def pubmed_pipeline(
task: PubMedTask,
client: Minio = None,
max_retries: int = 5,
delay: float = 0.5
):
"""
Pubmed pipeline
Args:
task: PubMedTask object, containig basic information for PubMedTask
client: Minio, minio client
max_retries: int, max retries for each step
delay: float, delay between each retry
Returns:
None
"""
if client is None:
client = get_client()
customer_name = task.customer_name
uuid = task.uuid
model_names = task.model_names
task.status_string["overall"] = "processing"
await asyncio.gather(
*(process_pubmed_single_chat(
task, model_name, client, max_retries, delay
) for model_name in model_names)
)
# if compare between models
# at least 3 models should be selected
logger.info("Check Compare...")
if task.do_compare and len(task.model_names) >= 3:
if task.status.get("compare", 0) == 0:
contents = await asyncio.gather(
*(get_file_from_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/{model_name}/review_paper.txt",
client=client
) for model_name in model_names)
)
contents = [c.decode("utf-8") for c in contents]
task.status_string["overall"] = "Start Compare"
rank_scores = await compare_chat_chocies(
contents=contents,
model_names=model_names
)
best_content = contents[min(rank_scores, key=rank_scores.get)]
await upload_text_to_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/compared_reveiw_paper.txt",
file_content=best_content
)
task.status_string["overall"] = "Finished"
task.status["compare"] = 1
await upload_task_json_to_minio(task, client)
else:
task.status_string["overall"] = "Finished"
await upload_task_json_to_minio(task, client)
else:
logger.info("No Compare.")
task.status_string["overall"] = "Finished"
await upload_task_json_to_minio(task, client)
async def process_pubmed_single_chat(
task: PubMedTask,
model_name: str,
client: Minio = None,
max_retries: int = 5,
delay: float = 0.5
):
"""
Process PubMed Task
Args:
task: PubMedTask object, containig basic information for PubMedTask
model_name: str, model name, refer to the model used at this step
client: Minio, minio client
max_retries: int, max retries for each step
delay: float, delay between each retry
Returns:
None
"""
# get minio client
if client is None:
client = get_client()
# add status for <model_name>
if model_name not in task.status.keys():
task.status[model_name] = 0
# set task status string
task.status_string["overall"] = "processing"
process_steps = {
0: process_pubmed_generate_pubmed_string,
1: process_pubmed_fetch_data,
2: process_pubmed_process_papers,
3: process_pubmed_generate_subheadings,
4: process_pubmed_assign_subheadings_to_summaries,
5: process_pubmed_create_paragraphs_by_subheading,
6: process_pubmed_enhance_language_readability,
7: process_pubmed_translate
}
state_description = {
0: "Finished pubmed string generation.",
1: "Finished fetching data.",
2: "Finished paper processing.",
3: "Finished subheading generation.",
4: "Finished subheading assignment.",
5: "Finished paragraph generation.",
6: "Finished review language readability enhancement.",
7: "Finished review translation."
}
# Execute Phase
current_state = task.status[model_name]
for state in range(current_state, len(process_steps.keys())):
await process_steps[state](
task=task,
model_name=model_name,
save_name=model_name,
prev_name=model_name,
client=client,
max_retries=max_retries, delay=delay
)
task.status_string[model_name] = state_description[state]
task.status[model_name] = state + 1
await upload_task_json_to_minio(task, client)
task.status_string[model_name] = "Finished."
await upload_task_json_to_minio(task, client)
# =================================
# Function Groups: process_pubmed_*
# 1. _generate_pubmed_string
# 2. _fetch_data
# 3. _process_papers
# 4. _generate_subheadings
# 5. _assign_subheadings_to_summaries
# 6. _create_paragraphs_by_subheading
# 7. _enhance_language_readability
# 8. _translate
# =================================
async def process_pubmed_generate_pubmed_string(
task: PubMedTask,
model_name: str,
save_name: str,
prev_name: str = None,
client: Minio = None,
max_retries: int = 5,
delay: float = 0.5
):
"""
Generate pubmed search string step
Args:
task: PubMedTask object, containig basic information for PubMedTask
prev_model_name: str, previous model name, refer to previous step result
model_name: str, next model name, refer to the model used at this step
save_name: str, save name for minio path
client: Minio, minio client
max_retries: int, max retries for each step
delay: float, delay between each retry
Returns:
path to save results
"""
if client is None:
client = get_client()
if prev_name is not None:
logger.warning("For first step, prev_model_name is not used.")
query = task.query
customer_name = task.customer_name
uuid = task.uuid
chat_func = get_chat_func(model_names=[model_name])[0]
pubmed_search_string, exceptions = await retry_operation(
generate_pubmed_search_string, task,
query=query,
max_retries=max_retries, delay=delay,
chat_func=chat_func
)
if pubmed_search_string is None: # no valid result after max retries
# store exception strings in status
task.status_string[model_name] = exceptions
await upload_task_json_to_minio(task, client)
raise RuntimeError("Pubmed Search String Generation Failed.") # exit
logger.info(f"search string: {pubmed_search_string}")
await upload_text_to_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/{save_name}/pubmed_search_string.txt",
file_content=pubmed_search_string
)
async def process_pubmed_fetch_data(
task: PubMedTask,
model_name: str,
save_name: str,
prev_name: str = None,
client: Minio = None,
max_retries: int = 5,
delay: float = 0.5
):
"""
Process PubMed Fetch Data
Args:
task: PubMedTask object, containig basic information for PubMedTask
prev_model_name: str, previous model name, refer to previous step result
model_name: str, next model name, refer to the model used at this step
save_name: str, save name for minio path
client: Minio, minio client
Returns:
path to save results
"""
if client is None:
client = get_client()
customer_name = task.customer_name
uuid = task.uuid
start_year = task.start_year
end_year = task.end_year
size = task.size
pubmed_search_string = await get_file_from_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/{prev_name}/pubmed_search_string.txt",
client=client
)
pubmed_search_string = pubmed_search_string.decode("utf-8")
results, exceptions = await retry_operation(
process_pubmed_data, task,
query=pubmed_search_string,
model_name=save_name,
start_year=start_year, end_year=end_year,
size=size,
uuid=uuid, customer_name=customer_name,
max_retries=max_retries, delay=delay
)
if results is None: # no valid result after max retries
# store exception strings in status
task.status_string[model_name] = exceptions
await upload_task_json_to_minio(task, client)
raise ConnectionError("Pubmed Data Fetch Failed.") # exit
async def process_pubmed_process_papers(
task: PubMedTask,
model_name: str,
save_name: str,
prev_name: str = None,
client: Minio = None,
max_retries: int = 5,
delay: float = 0.5
):
"""
Process PubMed Process Papers
Args:
task: PubMedTask object, containig basic information for PubMedTask
prev_model_name: str, previous model name, refer to previous step result
model_name: str, next model name, refer to the model used at this step
save_name: str, save name for minio path
client: Minio, minio client
Returns:
path to save results
"""
if client is None:
client = get_client()
query = task.query
direction = task.direction
customer_name = task.customer_name
uuid = task.uuid
chat_func = get_chat_func(model_names=[model_name])[0]
non_review_pubmed_df = await get_dataframe_from_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/{prev_name}/pubmed_results_non_reviews.csv",
client=client
)
results, exceptions = await retry_operation(
process_papers, task,
dataframe=non_review_pubmed_df,
topic=query, direction=direction,
uuid=uuid, customer_name=customer_name, model_name=save_name,
max_retries=max_retries, delay=delay,
chat_func=chat_func
)
if results is None: # no valid result after max retries
# store exception strings in status
task.status_string[model_name] = exceptions
await upload_task_json_to_minio(task, client)
raise RuntimeError("Pubmed Paper Processing Failed.") # exit
async def process_pubmed_generate_subheadings(
task: PubMedTask,
model_name: str,
save_name: str,
prev_name: str = None,
client: Minio = None,
max_retries: int = 5,
delay: float = 0.5
):
"""
Process PubMed Generate Subheadings
Args:
task: PubMedTask object, containig basic information for PubMedTask
prev_model_name: str, previous model name, refer to previous step result
model_name: str, next model name, refer to the model used at this step
save_name: str, save name for minio path
Returns:
path to save results
"""
if client is None:
client = get_client()
query = task.query
customer_name = task.customer_name
uuid = task.uuid
chat_func = get_chat_func([model_name])[0]
relevant_papers_df = await get_dataframe_from_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/{prev_name}/relevant_papers.csv",
client=client
)
results, exceptions = await retry_operation(
generate_subheadings, task,
relevant_papers_df=relevant_papers_df,
main_topic=query,
uuid=uuid, customer_name=customer_name, model_name=save_name,
chat_func=chat_func,
max_retries=max_retries, delay=delay
)
if results is None: # no valid result after max retries
# store exception strings in status
task.status_string[model_name] = exceptions
await upload_task_json_to_minio(task, client)
raise RuntimeError("Pubmed Generate Subheadings Failed.") # exit
async def process_pubmed_assign_subheadings_to_summaries(
task: PubMedTask,
model_name: str,
save_name: str,
prev_name: str = None,
client: Minio = None,
max_retries: int = 5,
delay: float = 0.5
):
"""
Process PubMed Assign Subheadings to Summaries
Args:
task: PubMedTask object, containig basic information for PubMedTask
prev_model_name: str, previous model name, refer to previous step result
model_name: str, next model name, refer to the model used at this step
save_name: str, save name for minio path
Returns:
path to save results
"""
if client is None:
client = get_client()
customer_name = task.customer_name
uuid = task.uuid
chat_func = get_chat_func([model_name])[0]
subheadings = await get_file_from_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/{prev_name}/generated_subheadings.txt",
client=client
)
subheadings = subheadings.decode("utf-8").split("\n")
relevant_papers_df = await get_dataframe_from_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/{prev_name}/relevant_papers.csv",
client=client
)
results, exceptions = await retry_operation(
assign_subheadings_to_summaries, task,
subheadings=subheadings,
relevant_papers_df=relevant_papers_df,
uuid=uuid, customer_name=customer_name, model_name=save_name,
chat_func=chat_func,
max_retries=max_retries, delay=delay
)
if results is None: # no valid result after max retries
# store exception strings in status
task.status_string[model_name] = exceptions
await upload_task_json_to_minio(task, client)
raise RuntimeError("Pubmed Assign Subheadings Failed.") # exit
async def process_pubmed_create_paragraphs_by_subheading(
task: PubMedTask,
model_name: str,
save_name: str,
prev_name: str = None,
client: Minio = None,
max_retries: int = 5,
delay: float = 0.5
):
"""
Process PubMed Create Paragraphs by Subheading
Args:
task: PubMedTask object, containig basic information for PubMedTask
prev_model_name: str, previous model name, refer to previous step result
model_name: str, next model name, refer to the model used at this step
save_name: str, save name for minio path
client: Minio, minio client
max_retries: int, max retries for the operation
delay: float, delay between retries
Returns:
path to save results
"""
if client is None:
client = get_client()
query = task.query
customer_name = task.customer_name
uuid = task.uuid
chat_func = get_chat_func([model_name])[0]
subheadings = await get_file_from_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/{prev_name}/generated_subheadings.txt",
client=client
)
subheadings = subheadings.decode("utf-8").split("\n")
relevant_papers_df = await get_dataframe_from_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/{prev_name}/assigned_subheadings.csv",
client=client
)
results, exceptions = await retry_operation(
create_paragraphs_by_subheading, task,
subheadings=subheadings, main_topic=query,
relevant_papers_df=relevant_papers_df,
uuid=uuid, customer_name=customer_name, model_name=save_name,
chat_func=chat_func,
max_retries=max_retries, delay=delay
)
if results is None: # no valid result after max retries
# store exception strings in status
task.status_string[model_name] = exceptions
await upload_task_json_to_minio(task, client)
raise RuntimeError("Pubmed Create Paragraphs Failed.") # exit
async def process_pubmed_enhance_language_readability(
task: PubMedTask,
model_name: str,
save_name: str,
prev_name: str = None,
client: Minio = None,
max_retries: int = 5,
delay: float = 0.5
):
"""
Process PubMed Enhance Language Readability
Args:
task: PubMedTask object, containig basic information for PubMedTask
prev_model_name: str, previous model name, refer to previous step result
model_name: str, next model name, refer to the model used at this step
save_name: str, save name for minio path
client: Minio, minio client
max_retries: int, max retries for the operation
delay: float, delay between retries
Returns:
path to save results
"""
if client is None:
client = get_client()
customer_name = task.customer_name
uuid = task.uuid
chat_func = get_chat_func([model_name])[0]
review_content = await get_file_from_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/{prev_name}/review_non_refined.txt",
client=client
)
review_content = review_content.decode("utf-8")
results, exceptions = await retry_operation(
enhance_language_readability, task,
content=review_content,
uuid=uuid, customer_name=customer_name, model_name=save_name,
chat_func=chat_func,
max_retries=max_retries, delay=delay
)
if results is None: # no valid result after max retries
# store exception strings in status
task.status_string[model_name] = exceptions
await upload_task_json_to_minio(task, client)
raise RuntimeError("Pubmed Enhance Language Readability Failed.") # exit
async def process_pubmed_translate(
task: PubMedTask,
model_name: str,
save_name: str,
prev_name: str = None,
client: Minio = None,
max_retries: int = 5,
delay: float = 0.5
):
"""
Process PubMed Translate
Args:
task: PubMedTask object, containig basic information for PubMedTask
prev_model_name: str, previous model name, refer to previous step result
model_name: str, next model name, refer to the model used at this step
save_name: str, save name for minio path
client: Minio, minio client
max_retries: int, max retries for the operation
delay: float, delay between retries
Returns:
path to save results
"""
if client is None:
client = get_client()
customer_name = task.customer_name
uuid = task.uuid
chat_func = get_chat_func([model_name])[0]
review_content = await get_file_from_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/{prev_name}/review_paper.txt",
client=client
)
review_content = review_content.decode("utf-8")
results, exceptions = await retry_operation(
translate_to_chinese_before_references, task,
text=review_content,
uuid=uuid, customer_name=customer_name, model_name=save_name,
chat_func=chat_func,
max_retries=max_retries, delay=delay
)
if results is None: # no valid result after max retries
# store exception strings in status
task.status_string[model_name] = exceptions
await upload_task_json_to_minio(task, client)
raise RuntimeError("Pubmed Translate Failed.") # exit
# =================================
# Function Groups: PubMed Task
#
# functions specific for pubmed task
# =================================
async def generate_pubmed_search_string(query: str, chat_func) -> str:
# Construct the improved prompt using triple single quotes
prompt = f'''
### Objective
Your task is to generate a precise PubMed search string based on the input query: "{query}". You should:
1. **Extract Critical Keywords**: Identify the main entities and concepts that have independent and specific meanings, avoiding overly general terms commonly found in many articles (e.g., "analysis", "study"). Focus on terms central to the topic.
2. **Understand Keyword Relationships**: Analyze the logical relationship between keywords. If two or more keywords are conceptually similar or interchangeable, connect them using the OR operator. If they represent distinct concepts that must co-exist, connect them using the AND operator.
3. **Expand Synonyms Thoughtfully**: For each critical keyword, generate at least 6 relevant English synonyms or related terms used in academic research. Ensure they align with the context of the query, including synonyms that may look different but are relevant based on the keyword's definition and hierarchy.
4. **Include MeSH Terms**: Find the corresponding MeSH (Medical Subject Headings) terms for each critical keyword if available.
5. **Construct the PubMed Search String**: Combine the critical keywords, their synonyms, and MeSH terms using Boolean operators. Ensure correct grouping using parentheses to reflect the logical relationships:
- If a group of terms is interchangeable (e.g., synonyms), use OR within parentheses.
- Use AND between distinct keyword groups.
### Instructions
- **Language**: All words must be in English.
- **Avoid Stop Words**: Do not include stop words (e.g., 'a', 'an', 'the').
- **Synonym Requirement**: For each critical keyword, generate **at least 6 synonyms** or related terms.
- **Logical Operator Selection**: Adjust the Boolean logic based on the relationship between terms to accurately represent (A OR B) AND C patterns.
- **Term Length**: Each term should be concise, with phrases containing at most two words.
- **Formatting**:
- Use Boolean operators (AND, OR) to connect terms and use parentheses where necessary.
- Format MeSH terms as: "Term"[MeSH Terms]
- Format other terms as: "Term"[All Fields]
### Example
**Input**: Role of AI in antimicrobial resistance and drug discovery
**Process**:
1. **Extract Critical Keywords**:
- AI
- Antimicrobial resistance
- Drug discovery
2. **Analyze Keyword Relationships**:
- AI OR machine learning (similar concepts)
- Antimicrobial resistance AND drug discovery (distinct concepts)
3. **Expand Synonyms Thoughtfully**:
- **AI**: machine learning, artificial intelligence, deep learning, neural networks, computational intelligence, data-driven algorithms
- **Antimicrobial resistance**: antibiotic resistance, drug resistance, microbial resistance, bacterial resistance, pathogen resistance, multidrug resistance
- **Drug discovery**: drug design, pharmaceutical research, drug development, lead discovery, molecular screening, target identification
4. **Include MeSH Terms**:
- **AI**: "Artificial Intelligence"[MeSH Terms]
- **Antimicrobial resistance**: "Drug Resistance, Microbial"[MeSH Terms]
- **Drug discovery**: "Drug Discovery"[MeSH Terms]
5. **Construct the PubMed Search String**:
'(("Artificial Intelligence"[MeSH Terms] OR "machine learning"[All Fields] OR "deep learning"[All Fields] OR "neural networks"[All Fields] OR "computational intelligence"[All Fields] OR "data-driven algorithms"[All Fields]) AND ("Drug Resistance, Microbial"[MeSH Terms] OR "antibiotic resistance"[All Fields] OR "microbial resistance"[All Fields] OR "bacterial resistance"[All Fields] OR "pathogen resistance"[All Fields] OR "multidrug resistance"[All Fields])) AND ("Drug Discovery"[MeSH Terms] OR "drug design"[All Fields] OR "pharmaceutical research"[All Fields] OR "drug development"[All Fields] OR "lead discovery"[All Fields] OR "molecular screening"[All Fields])'
### Now, generate the PubMed search string for the following query:
**Query**: {query}
Please provide only the final PubMed search string in the specified format.
'''
# Call the language model to get the PubMed search string
result = await chat_func(prompt)
# Extract the PubMed search string from the model's response
pubmed_search_string = result.choices[0].message.content.strip()
return pubmed_search_string
async def process_pubmed_data(
query,
model_name,
start_year, end_year, size,
uuid, customer_name
):
"""
Process PubMed Data
Args:
query: str, query for PubMed search
model_name: str, model name
start_year: int, start year for PubMed search
end_year: int, end year for PubMed search
size: int, number of results per page
uuid: str, uuid for the task
customer_name: str, customer name for the task
client: Minio, minio client
Returns:
path to save results
"""
# get prefix
prefix = f"{customer_name}/{uuid}/{model_name}/"
output_folder = prefix
# set file paths
combined_txt_filename = os.path.join(
output_folder, f'pubmed_page_combined.txt')
results_csv_filename = os.path.join(output_folder, f'pubmed_results.csv')
results_with_links_csv_filename = os.path.join(
output_folder, f'pubmed_results_with_full_text_links.csv')
impact_factors_csv_filename = os.path.join(
output_folder, f'pubmed_results_with_impact_factors.csv')
non_review_csv_filename = os.path.join(
output_folder, f'pubmed_results_non_reviews.csv')
# step 1: save pubmed pages
await save_combined_pubmed_page(query, start_year, end_year, size, output_filename=combined_txt_filename)
# step 2: process pubmed files
await process_pubmed_file(combined_txt_filename, results_csv_filename)
# step 3:添加全文链接
# pubmed_df = pd.read_csv(results_csv_filename)
pubmed_df = await get_dataframe_from_minio(
bucket_name=BUCKET_NAME,
object_name=results_csv_filename
)
pubmed_df["Full_Text_Links"] = pubmed_df["PMID"].apply(get_full_text_links)
await upload_dataframe_to_minio(
bucket_name=BUCKET_NAME,
object_name=results_with_links_csv_filename,
df=pubmed_df
)
# step 4: merge impact factor
impact_factors_df = await get_dataframe_from_minio(
bucket_name=BUCKET_NAME,
object_name='2023-JCR.xlsx'
)
# Standardize the case of the JT column in both dataframes to lowercase
pubmed_df['JT'] = pubmed_df['JT'].str.lower()
impact_factors_df['JT'] = impact_factors_df['JT'].str.lower()
# Perform the merge based on the JT column
merged_df = pd.merge(pubmed_df, impact_factors_df, on='JT', how='left')
# Save the merged dataframe to a new CSV file
await upload_dataframe_to_minio(
bucket_name=BUCKET_NAME,
object_name=impact_factors_csv_filename,
df=merged_df
)
logger.info(f"Merged data saved to {impact_factors_csv_filename}")
# step 5: filter non review papers
pubmed_df = await get_dataframe_from_minio(
bucket_name=BUCKET_NAME,
object_name=impact_factors_csv_filename
)
non_review_pubmed_df = pubmed_df[pubmed_df["Review"] == "No"]
await upload_dataframe_to_minio(
bucket_name=BUCKET_NAME,
object_name=non_review_csv_filename,
df=non_review_pubmed_df
)
logger.info(f"非评论类文章已保存到 {non_review_csv_filename}")
return pubmed_df, non_review_pubmed_df
async def save_combined_pubmed_page(query, start_year, end_year, size=200, output_filename='pubmed_page_combined.txt'):
content1 = await save_pubmed_page(query, start_year, end_year, size)
content2 = await save_pubmed_page_date(query, start_year, end_year, size)
combined_content = content1 + "\n" + content2
# 保存合并的网页内容到指定的txt文件
# async with aiofiles.open(output_filename, 'w', encoding='utf-8') as file:
# await file.write(combined_content)
await upload_text_to_minio(
bucket_name=BUCKET_NAME,
object_name=output_filename,
file_content=combined_content
)
logger.info(f"Page content saved to {output_filename}")
async def save_pubmed_page(query, start_year, end_year, size=200):
base_url = "https://pubmed.ncbi.nlm.nih.gov/"
params = {
'term': query,
'filter': f'years.{start_year}-{end_year}',
'format': 'pubmed',
'size': size
}
# 构建检索网址
search_url = f"{base_url}?term={params['term']}&filter={params['filter']}&format={params['format']}&size={params['size']}"
logger.info(f"检索网址: {search_url}")
async with aiohttp.ClientSession() as session:
async with session.get(base_url, params=params) as response:
if response.status != 200:
logger.error("Failed to retrieve data from save_pubmed_page")
raise ConnectionError(
"Failed to retrieve data from save_pubmed_page")
return await response.text()
async def save_pubmed_page_date(query, start_year, end_year, size=200):
base_url = "https://pubmed.ncbi.nlm.nih.gov/"
params = {
'term': query,
'filter': f'years.{start_year}-{end_year}',
'format': 'pubmed',
'size': size,
'sort': 'date'
}
# 构建检索网址
search_url = f"{base_url}?term={params['term']}&filter={params['filter']}&format={params['format']}&size={params['size']}&sort={params['sort']}"
logger.info(f"检索网址: {search_url}")
async with aiohttp.ClientSession() as session:
async with session.get(base_url, params=params) as response:
if response.status != 200:
logger.error(
"Failed to retrieve data from save_pubmed_page_date")
raise ConnectionError(
"Failed to retrieve data from save_pubmed_page_date")
return await response.text()
async def process_pubmed_file(input_file, output_file):
# Read the file and replace specific text
# async with aiofiles.open(input_file, 'r', encoding='utf-8') as file:
# content = await file.read()
content = await get_file_from_minio(
bucket_name=BUCKET_NAME,
object_name=input_file
)
content = content.decode("utf-8")
content = content.replace(
'<pre class="search-results-chunk">PMID-', '<pre class="search-results-chunk">\nPMID-')
# Split the content into lines
lines = content.split('\n')
records = []
current_record = {}
collecting_abstract = False
collecting_title = False
collecting_pt = False
abstract_lines = []
title_lines = []
pt_lines = []
first_author_recorded = False # Flag to capture the first occurrence of FAU
for line in lines:
if line.startswith("PMID- "):
if current_record:
# Finalize the current record before starting a new one
current_record['AB'] = ' '.join(
abstract_lines).replace('\n', ' ')
current_record['TI'] = ' '.join(title_lines).replace('\n', ' ')
current_record['PT'] = ' '.join(pt_lines).replace('\n', ' ')
# Default Review to 'No' if not set to 'Yes' during PT or AB processing
if 'Review' not in current_record:
current_record['Review'] = 'No'
# Check for mismatches between FAU-frist and the first entry in FAU list
if 'FAU-frist' in current_record and 'FAU' in current_record and current_record['FAU']:
if current_record['FAU-frist'] != current_record['FAU'][0]:
current_record['FAU'].insert(
0, current_record['FAU-frist'])
if 'JT' not in current_record:
# Ensure JT is present even if not found
current_record['JT'] = ''
if 'DCOM' not in current_record:
# Ensure DCOM is present even if not found
current_record['DCOM'] = ''
# Add current record to list of records
records.append(current_record)
# Start a new record
current_record = {'PMID': line.split("PMID- ")[1].strip()}
collecting_abstract = False
collecting_title = False
collecting_pt = False
abstract_lines = []
title_lines = []
pt_lines = []
first_author_recorded = False # Reset the flag for a new record
elif line.startswith("FAU - "):
# Append each FAU to a list and capture the first occurrence
author_name = line.split("FAU - ")[1].strip()
if 'FAU' not in current_record:
current_record['FAU'] = []
current_record['FAU'].append(author_name)
# Record the first occurrence in FAU-frist
if not first_author_recorded:
current_record['FAU-frist'] = author_name
first_author_recorded = True
elif line.startswith("JT - "):
current_record['JT'] = line.split("JT - ")[1].strip()
elif line.startswith("DCOM- "):
current_record['DCOM'] = line.split("DCOM- ")[1].strip()
elif line.startswith("TI - "):
collecting_title = True
title_lines.append(line.split("TI - ")[1].strip())
elif collecting_title:
if any(line.startswith(prefix) for prefix in ["LID - ", "AB - ", "FAU - ", "PG - "]):
collecting_title = False
else:
title_lines.append(line.strip())
elif line.startswith("LID - "):
lid = line.split("LID - ")[1].strip()
if '[doi]' in lid:
lid = lid.split(' [doi]')[0]
# 保留较长的LID
if 'LID' in current_record:
current_record['LID'] = lid if len(lid) > len(
current_record['LID']) else current_record['LID']
else:
current_record['LID'] = lid
elif line.startswith("AB - "):
collecting_abstract = True
abstract_text = line.split("AB - ")[1].strip()
abstract_lines.append(abstract_text)
# Check if 'review' is in AB line (case insensitive)
if 'review' in abstract_text.lower():
current_record['Review'] = 'Yes'
elif collecting_abstract:
if any(line.startswith(prefix) for prefix in ["LID - ", "FAU - ", "PG - "]):
collecting_abstract = False
else:
abstract_text = line.strip()
abstract_lines.append(abstract_text)
# Check if 'review' is in AB line (case insensitive)
if 'review' in abstract_text.lower():
current_record['Review'] = 'Yes'
elif line.startswith("PT - "):
pt_line = line.split("PT - ")[1].strip()
pt_lines.append(pt_line)
# Check if 'review' is in PT line (case insensitive)
if 'review' in pt_line.lower():
current_record['Review'] = 'Yes'
elif collecting_pt:
if any(line.startswith(prefix) for prefix in ["LID - ", "AB - ", "FAU - ", "PG - "]):
collecting_pt = False
else:
pt_text = line.strip()
pt_lines.append(pt_text)
# Check if 'review' is in PT line (case insensitive)
if 'review' in pt_text.lower():
current_record['Review'] = 'Yes'
# Final record handling after loop ends
if current_record:
current_record['AB'] = ' '.join(abstract_lines).replace('\n', ' ')
current_record['TI'] = ' '.join(title_lines).replace('\n', ' ')
current_record['PT'] = ' '.join(pt_lines).replace('\n', ' ')
if 'Review' not in current_record:
current_record['Review'] = 'No'
# Check for mismatches between FAU-frist and the first entry in FAU list
if 'FAU-frist' in current_record and 'FAU' in current_record and current_record['FAU']:
if current_record['FAU-frist'] != current_record['FAU'][0]:
current_record['FAU'].insert(0, current_record['FAU-frist'])
if 'JT' not in current_record:
current_record['JT'] = ''
if 'DCOM' not in current_record:
current_record['DCOM'] = ''
records.append(current_record)
# Remove duplicate records by PMID
unique_records = []
seen_pmids = set()
for record in records:
if record['PMID'] not in seen_pmids:
seen_pmids.add(record['PMID'])
unique_records.append(record)
# Write unique records to output CSV
# async with aiofiles.open(output_file, 'w', encoding='utf-8', newline='') as csvfile:
text = ""
fieldnames = ['JT', 'DCOM', 'PMID', 'TI', 'LID',
'AB', 'FAU', 'FAU-frist', 'PT', 'Review']
header = ','.join(fieldnames) + '\n'
text += header
# Write each record
for record in unique_records:
# Join the FAU list as a single string
record['FAU'] = '; '.join(record.get('FAU', [])) # Safely get 'FAU'
# Prepare the row as a CSV string
row = ','.join([escape_csv_field(str(record.get(field, '')))
for field in fieldnames]) + '\n'
text += row
await upload_text_to_minio(
bucket_name=BUCKET_NAME,
object_name=output_file,
file_content=text
)
def get_full_text_links(pmid):
url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/"
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
# 从页面中提取所有链接
links = [link['href'] for link in soup.find_all('a', href=True)]
# 如果存在第27个链接,则返回它,否则返回None
return links[26] if len(links) >= 27 else None