|
|
from Bio import Entrez, Medline |
|
|
|
|
|
import core.mtdna_classifier |
|
|
from core.NER.html import extractHTML |
|
|
import core.data_preprocess |
|
|
import core.pipeline |
|
|
|
|
|
def fetch_ncbi(accession_number): |
|
|
try: |
|
|
Entrez.email = "your.email@example.com" |
|
|
handle = Entrez.efetch(db="nucleotide", id=str(accession_number), rettype="gb", retmode="xml") |
|
|
record = Entrez.read(handle) |
|
|
handle.close() |
|
|
outputs = {"authors":"unknown", |
|
|
"institution":"unknown", |
|
|
"isolate":"unknown", |
|
|
"definition":"unknown", |
|
|
"title":"unknown", |
|
|
"seq_comment":"unknown", |
|
|
"collection_date":"unknown" } |
|
|
gb_seq = None |
|
|
|
|
|
if isinstance(record, list) and len(record) > 0: |
|
|
if isinstance(record[0], dict): |
|
|
gb_seq = record[0] |
|
|
else: |
|
|
print(f"Warning: record[0] is not a dictionary for {accession_number}. Type: {type(record[0])}") |
|
|
|
|
|
if "GBSeq_create-date" in gb_seq and outputs["collection_date"]=="unknown": |
|
|
outputs["collection_date"] = gb_seq["GBSeq_create-date"] |
|
|
else: |
|
|
if "GBSeq_update-date" in gb_seq and outputs["collection_date"]=="unknown": |
|
|
outputs["collection_date"] = gb_seq["GBSeq_update-date"] |
|
|
|
|
|
if "GBSeq_definition" in gb_seq and outputs["definition"]=="unknown": |
|
|
outputs["definition"] = gb_seq["GBSeq_definition"] |
|
|
|
|
|
if "GBSeq_references" in gb_seq: |
|
|
for ref in gb_seq["GBSeq_references"]: |
|
|
|
|
|
if "GBReference_authors" in ref and outputs["authors"]=="unknown": |
|
|
outputs["authors"] = "and ".join(ref["GBReference_authors"]) |
|
|
|
|
|
if "GBReference_title" in ref and outputs["title"]=="unknown": |
|
|
outputs["title"] = ref["GBReference_title"] |
|
|
|
|
|
if 'GBReference_journal' in ref and outputs["institution"]=="unknown": |
|
|
outputs["institution"] = ref['GBReference_journal'] |
|
|
|
|
|
if 'GBSeq_comment'in gb_seq and outputs["seq_comment"]=="unknown": |
|
|
outputs["seq_comment"] = gb_seq["GBSeq_comment"] |
|
|
|
|
|
if "GBSeq_feature-table" in gb_seq: |
|
|
if 'GBFeature_quals' in gb_seq["GBSeq_feature-table"][0]: |
|
|
for ref in gb_seq["GBSeq_feature-table"][0]["GBFeature_quals"]: |
|
|
if ref['GBQualifier_name'] == "isolate" and outputs["isolate"]=="unknown": |
|
|
outputs["isolate"] = ref["GBQualifier_value"] |
|
|
else: |
|
|
print(f"Warning: No valid record or empty record list from NCBI for {accession_number}.") |
|
|
|
|
|
|
|
|
if gb_seq is None: |
|
|
return {"authors":"unknown", |
|
|
"institution":"unknown", |
|
|
"isolate":"unknown", |
|
|
"definition":"unknown", |
|
|
"title":"unknown", |
|
|
"seq_comment":"unknown", |
|
|
"collection_date":"unknown" } |
|
|
return outputs |
|
|
except: |
|
|
print("error in fetching ncbi data") |
|
|
return {"authors":"unknown", |
|
|
"institution":"unknown", |
|
|
"isolate":"unknown", |
|
|
"definition":"unknown", |
|
|
"title":"unknown", |
|
|
"seq_comment":"unknown", |
|
|
"collection_date":"unknown" } |
|
|
|
|
|
def google_accession_search(accession_id): |
|
|
""" |
|
|
Search for metadata by accession ID using Google Custom Search. |
|
|
Falls back to known biological databases and archives. |
|
|
""" |
|
|
queries = [ |
|
|
f"{accession_id}", |
|
|
f"{accession_id} site:ncbi.nlm.nih.gov", |
|
|
f"{accession_id} site:pubmed.ncbi.nlm.nih.gov", |
|
|
f"{accession_id} site:europepmc.org", |
|
|
f"{accession_id} site:researchgate.net", |
|
|
f"{accession_id} mtDNA", |
|
|
f"{accession_id} mitochondrial DNA" |
|
|
] |
|
|
|
|
|
links = [] |
|
|
for query in queries: |
|
|
search_results = mtdna_classifier.search_google_custom(query, 2) |
|
|
for link in search_results: |
|
|
if link not in links: |
|
|
links.append(link) |
|
|
return links |
|
|
|
|
|
|
|
|
def smart_google_queries(metadata: dict): |
|
|
queries = [] |
|
|
|
|
|
|
|
|
isolate = metadata.get("isolate") |
|
|
author = metadata.get("authors") |
|
|
institution = metadata.get("institution") |
|
|
title = metadata.get("title") |
|
|
combined = [] |
|
|
|
|
|
if isolate and isolate!="unknown" and isolate!="Unpublished": |
|
|
queries.append(f'"{isolate}" mitochondrial DNA') |
|
|
queries.append(f'"{isolate}" site:ncbi.nlm.nih.gov') |
|
|
|
|
|
if author and author!="unknown" and author!="Unpublished": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
author_name = author.split(',')[0] |
|
|
except: |
|
|
author_name = author |
|
|
queries.append(f'"{author_name}" mitochondrial DNA') |
|
|
queries.append(f'"{author_name}" mtDNA site:researchgate.net') |
|
|
|
|
|
if institution and institution!="unknown" and institution!="Unpublished": |
|
|
try: |
|
|
short_inst = ",".join(institution.split(',')[:2]) |
|
|
except: |
|
|
try: |
|
|
short_inst = institution.split(',')[0] |
|
|
except: |
|
|
short_inst = institution |
|
|
queries.append(f'"{short_inst}" mtDNA sequence') |
|
|
|
|
|
if title and title!='unknown' and title!="Unpublished": |
|
|
if title!="Direct Submission": |
|
|
queries.append(title) |
|
|
|
|
|
return queries |
|
|
|
|
|
def filter_links_by_metadata(search_results, saveLinkFolder, accession=None, stop_flag=None): |
|
|
TRUSTED_DOMAINS = [ |
|
|
"ncbi.nlm.nih.gov", |
|
|
"pubmed.ncbi.nlm.nih.gov", |
|
|
"pmc.ncbi.nlm.nih.gov", |
|
|
"biorxiv.org", |
|
|
"researchgate.net", |
|
|
"nature.com", |
|
|
"sciencedirect.com" |
|
|
] |
|
|
if stop_flag is not None and stop_flag.value: |
|
|
print(f"π Stop detected {accession}, aborting early...") |
|
|
return [] |
|
|
def is_trusted_link(link): |
|
|
for domain in TRUSTED_DOMAINS: |
|
|
if domain in link: |
|
|
return True |
|
|
return False |
|
|
def is_relevant_title_snippet(link, saveLinkFolder, accession=None): |
|
|
output = [] |
|
|
keywords = ["mtDNA", "mitochondrial", "accession", "isolate", "Homo sapiens", "sequence"] |
|
|
if accession: |
|
|
keywords = [accession] + keywords |
|
|
title_snippet = link.lower() |
|
|
print("save link folder inside this filter function: ", saveLinkFolder) |
|
|
success_process, output_process = pipeline.run_with_timeout(data_preprocess.extract_text,args=(link,saveLinkFolder),timeout=60) |
|
|
if stop_flag is not None and stop_flag.value: |
|
|
print(f"π Stop detected {accession}, aborting early...") |
|
|
return [] |
|
|
if success_process: |
|
|
article_text = output_process |
|
|
print("yes succeed for getting article text") |
|
|
else: |
|
|
print("no suceed, fallback to no link") |
|
|
article_text = "" |
|
|
|
|
|
print("article text") |
|
|
|
|
|
if stop_flag is not None and stop_flag.value: |
|
|
print(f"π Stop detected {accession}, aborting early...") |
|
|
return [] |
|
|
try: |
|
|
ext = link.split(".")[-1].lower() |
|
|
if ext not in ["pdf", "docx", "xlsx"]: |
|
|
html = extractHTML.HTML("", link) |
|
|
if stop_flag is not None and stop_flag.value: |
|
|
print(f"π Stop detected {accession}, aborting early...") |
|
|
return [] |
|
|
jsonSM = html.getSupMaterial() |
|
|
if jsonSM: |
|
|
output += sum((jsonSM[key] for key in jsonSM), []) |
|
|
except Exception: |
|
|
pass |
|
|
for keyword in keywords: |
|
|
if keyword.lower() in article_text.lower(): |
|
|
if link not in output: |
|
|
output.append([link,keyword.lower()]) |
|
|
print("link and keyword for article text: ", link, keyword) |
|
|
return output |
|
|
if keyword.lower() in title_snippet.lower(): |
|
|
if link not in output: |
|
|
output.append([link,keyword.lower()]) |
|
|
print("link and keyword for title: ", link, keyword) |
|
|
return output |
|
|
return output |
|
|
|
|
|
filtered = [] |
|
|
better_filter = [] |
|
|
if len(search_results) > 0: |
|
|
for link in search_results: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(link) |
|
|
if stop_flag is not None and stop_flag.value: |
|
|
print(f"π Stop detected {accession}, aborting early...") |
|
|
return [] |
|
|
if link: |
|
|
output_link = is_relevant_title_snippet(link,saveLinkFolder, accession) |
|
|
print("output link: ") |
|
|
print(output_link) |
|
|
for out_link in output_link: |
|
|
if isinstance(out_link,list) and len(out_link) > 1: |
|
|
print(out_link) |
|
|
kw = out_link[1] |
|
|
print("kw and acc: ", kw, accession.lower()) |
|
|
if accession and kw == accession.lower(): |
|
|
better_filter.append(out_link[0]) |
|
|
filtered.append(out_link[0]) |
|
|
else: filtered.append(out_link) |
|
|
print("done with link and here is filter: ",filtered) |
|
|
if better_filter: |
|
|
filtered = better_filter |
|
|
return filtered |
|
|
|
|
|
def smart_google_search(metadata): |
|
|
queries = smart_google_queries(metadata) |
|
|
links = [] |
|
|
for q in queries: |
|
|
|
|
|
results = mtdna_classifier.search_google_custom(q,2) |
|
|
for link in results: |
|
|
|
|
|
if link not in links: |
|
|
links.append(link) |
|
|
|
|
|
return links |
|
|
|
|
|
|