|
|
import asyncio |
|
|
import time |
|
|
from typing import Dict, List |
|
|
import aiohttp |
|
|
|
|
|
from config.global_storage import get_model_config |
|
|
from dto.bio_document import PubMedDocument |
|
|
from service.pubmed_xml_parse import PubmedXmlParse |
|
|
from utils.bio_logger import bio_logger as logger |
|
|
|
|
|
PUBMED_ACCOUNT = [ |
|
|
{"email": "email1@gmail.com", "api_key": "60eb67add17f39aa588a43e30bb7fce98809"}, |
|
|
{"email": "email2@gmail.com", "api_key": "fd9bb5b827c95086b9c2d579df20beca2708"}, |
|
|
{"email": "email3@gmail.com", "api_key": "026586b79437a2b21d1e27d8c3f339230208"}, |
|
|
{"email": "email4@gmail.com", "api_key": "bca0489d8fe314bfdbb1f7bfe63fb5d76e09"}, |
|
|
] |
|
|
|
|
|
|
|
|
class PubMedAsyncApi: |
|
|
def __init__(self): |
|
|
self.pubmed_xml_parse = PubmedXmlParse() |
|
|
self.model_config = get_model_config() |
|
|
|
|
|
async def pubmed_search_function( |
|
|
self, query: str, top_k: int, search_type: str |
|
|
) -> List[PubMedDocument]: |
|
|
|
|
|
try: |
|
|
start_time = time.time() |
|
|
logger.info( |
|
|
f'Trying to search PubMed for "{query}", top_k={top_k}, search_type={search_type}' |
|
|
) |
|
|
id_list = await self.search_database( |
|
|
query, db="pubmed", retmax=top_k, search_type=search_type |
|
|
) |
|
|
articles = await self.fetch_details( |
|
|
id_list, db="pubmed", rettype="abstract" |
|
|
) |
|
|
|
|
|
end_search_pubmed_time = time.time() |
|
|
logger.info( |
|
|
f'Finished searching PubMed for "{query}", took {end_search_pubmed_time - start_time:.2f} seconds, found {len(articles)} results' |
|
|
) |
|
|
|
|
|
return [ |
|
|
PubMedDocument( |
|
|
title=result["title"], |
|
|
abstract=result["abstract"], |
|
|
authors=self.process_authors(result["authors"]), |
|
|
doi=result["doi"], |
|
|
source="pubmed", |
|
|
source_id=result["pmid"], |
|
|
pub_date=result["pub_date"], |
|
|
journal=result["journal"], |
|
|
) |
|
|
for result in articles |
|
|
] |
|
|
except Exception as e: |
|
|
logger.error(f"Error searching PubMed query: {query} error: {e}") |
|
|
raise e |
|
|
|
|
|
def process_authors(self, author_list: List[Dict]) -> str: |
|
|
|
|
|
return ", ".join( |
|
|
[f"{author['forename']} {author['lastname']}" for author in author_list] |
|
|
) |
|
|
|
|
|
|
|
|
async def search_database( |
|
|
self, query: str, db: str, retmax: int, search_type: str = "keyword" |
|
|
) -> List[Dict]: |
|
|
if search_type not in ["keyword", "advanced"]: |
|
|
raise ValueError("search_type must be one of 'keyword' or 'advanced'") |
|
|
|
|
|
if search_type == "keyword": |
|
|
art_type_list = [ |
|
|
"Address", |
|
|
"Bibliography", |
|
|
"Biography", |
|
|
"Books and Documents", |
|
|
"Clinical Conference", |
|
|
"Clinical Study", |
|
|
"Collected Works", |
|
|
"Comment", |
|
|
"Congress", |
|
|
"Consensus Development Conference", |
|
|
"Consensus Development Conference, NIH", |
|
|
"Dictionary", |
|
|
"Directory", |
|
|
"Duplicate Publication", |
|
|
"Editorial", |
|
|
"Festschrift", |
|
|
"Government Document", |
|
|
"Guideline", |
|
|
"Interactive Tutorial", |
|
|
"Interview", |
|
|
"Lecture", |
|
|
"Legal Case", |
|
|
"Legislation", |
|
|
"Letter", |
|
|
"News", |
|
|
"Newspaper Article", |
|
|
"Patient Education Handout", |
|
|
"Periodical Index", |
|
|
"Personal Narrative", |
|
|
"Practice Guideline", |
|
|
"Published Erratum", |
|
|
"Technical Report", |
|
|
"Video-Audio Media", |
|
|
"Webcast", |
|
|
] |
|
|
art_type = "(" + " OR ".join(f'"{j}"[Filter]' for j in art_type_list) + ")" |
|
|
query = "( " + query + ")" |
|
|
query += " AND (fha[Filter]) NOT " + art_type |
|
|
|
|
|
id_list = await self.esearch(query=query, retmax=retmax) |
|
|
|
|
|
if len(id_list) == 0: |
|
|
return [] |
|
|
|
|
|
return id_list |
|
|
|
|
|
async def esearch(self, query=None, retmax=10): |
|
|
start_time = time.time() |
|
|
db = "pubmed" |
|
|
server = "esearch" |
|
|
random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT)) |
|
|
random_pubmed_account = PUBMED_ACCOUNT[random_index] |
|
|
|
|
|
api_key = random_pubmed_account["api_key"] |
|
|
url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&term={query}&retmode=json&api_key={api_key}&sort=relevance&retmax={retmax}" |
|
|
response = await self.async_http_get(url=url) |
|
|
|
|
|
id_list = response["esearchresult"]["idlist"] |
|
|
logger.info( |
|
|
f"pubmed_async_http get id_list, search Time taken: {time.time() - start_time}s" |
|
|
) |
|
|
|
|
|
return id_list |
|
|
|
|
|
async def async_http_get(self, url: str): |
|
|
async with aiohttp.ClientSession() as session: |
|
|
try_time = 1 |
|
|
while try_time < 4: |
|
|
async with session.get(url) as response: |
|
|
if response.status == 200: |
|
|
return await response.json() |
|
|
else: |
|
|
logger.error( |
|
|
f"{url},try_time:{try_time},Error: {response.status}" |
|
|
) |
|
|
try_time += 1 |
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
raise Exception(f"Failed to fetch data from {url} after 3 attempts") |
|
|
|
|
|
async def async_http_get_text(self, url: str, params=None): |
|
|
async with aiohttp.ClientSession() as session: |
|
|
try_time = 1 |
|
|
while try_time < 4: |
|
|
async with session.get(url, params=params) as response: |
|
|
if response.status == 200: |
|
|
|
|
|
return await response.text() |
|
|
else: |
|
|
logger.error( |
|
|
f"{url},try_time:{try_time},Error: {response.status}" |
|
|
) |
|
|
try_time += 1 |
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
raise Exception(f"Failed to fetch data from {url} after 3 attempts") |
|
|
|
|
|
|
|
|
async def fetch_details(self, id_list, db="pubmed", rettype="abstract"): |
|
|
start_time = time.time() |
|
|
try: |
|
|
ids = ",".join(id_list) |
|
|
server = "efetch" |
|
|
|
|
|
random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT)) |
|
|
random_pubmed_account = PUBMED_ACCOUNT[random_index] |
|
|
api_key = random_pubmed_account["api_key"] |
|
|
url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&id={ids}&retmode=xml&api_key={api_key}&rettype={rettype}" |
|
|
response = await self.async_http_get_text(url=url) |
|
|
articles = self.pubmed_xml_parse.parse_pubmed_xml(response) |
|
|
logger.info( |
|
|
f"pubmed_async_http fetch detail, Time taken: {time.time() - start_time}" |
|
|
) |
|
|
return articles |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching details for id_list: {id_list}, error: {e}") |
|
|
|
|
|
|
|
|
return [] |
|
|
|