File size: 6,819 Bytes
5c389ab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | # taxonomy.py
"""
FunGO β NCBI Taxonomy Service
===============================
Species name β taxon ID lookup and reverse lookup.
Fixes applied:
1. UID string/int consistency β result_map keys are always strings,
now explicitly uses str(uid) so 9606 never resolves to {}.
2. Species-rank preference β results sorted so "species" rank
appears before "genus". Prevents 9605 (Homo genus) showing
before 9606 (Homo sapiens species).
3. Exact-name boost β exact query match moved to position 0.
4. Cache key includes max_results to prevent stale smaller lists.
5. xml.etree.ElementTree replaces fragile regex XML parsing.
6. Retry logic β 3 attempts with 2s gap on connection errors.
"""
import logging
import time
import xml.etree.ElementTree as ET
import requests
from config import (
NCBI_SEARCH_URL, NCBI_SUMMARY_URL, NCBI_FETCH_URL,
NCBI_TOOL, NCBI_EMAIL,
)
logger = logging.getLogger(__name__)
HEADERS = {"User-Agent": f"FunGO/1.0 ({NCBI_EMAIL})"}
TIMEOUT = 10
RETRIES = 3
RETRY_DELAY = 2
_RANK_PRIORITY = {
"species": 0, "subspecies": 1, "varietas": 2,
"forma": 3, "strain": 4, "no rank": 5,
"genus": 6, "family": 7, "order": 8,
"class": 9, "phylum": 10, "kingdom": 11, "superkingdom": 12,
}
def _rank_priority(rank: str) -> int:
return _RANK_PRIORITY.get(rank.lower().strip(), 99)
_search_cache: dict = {}
_id_to_info_cache: dict = {}
def _ncbi_get(url: str, params: dict) -> requests.Response:
last_exc = None
for attempt in range(1, RETRIES + 1):
try:
resp = requests.get(url, params=params, timeout=TIMEOUT, headers=HEADERS)
resp.raise_for_status()
return resp
except requests.RequestException as exc:
last_exc = exc
if attempt < RETRIES:
logger.warning("[taxonomy] Request error (attempt %d/%d): %s β retrying in %ds",
attempt, RETRIES, exc, RETRY_DELAY)
time.sleep(RETRY_DELAY)
raise last_exc
def search_species(query: str, max_results: int = 8) -> list:
"""
Search NCBI taxonomy by species name.
Returns [{taxon_id, scientific_name, common_name, rank, division}]
Sorted: species rank first, exact name match at position 0.
"""
query = query.strip()
if len(query) < 2:
return []
cache_key = (query.lower(), max_results)
if cache_key in _search_cache:
return _search_cache[cache_key]
try:
search_resp = _ncbi_get(NCBI_SEARCH_URL, {
"db": "taxonomy", "term": query,
"retmax": max_results, "retmode": "json",
"tool": NCBI_TOOL, "email": NCBI_EMAIL,
})
ids = search_resp.json().get("esearchresult", {}).get("idlist", [])
if not ids:
_search_cache[cache_key] = []
return []
summary_resp = _ncbi_get(NCBI_SUMMARY_URL, {
"db": "taxonomy", "id": ",".join(ids),
"retmode": "json", "tool": NCBI_TOOL, "email": NCBI_EMAIL,
})
result_map = summary_resp.json().get("result", {})
uids = result_map.get("uids", ids)
results = []
for uid in uids:
item = result_map.get(str(uid), {}) # FIX: explicit str()
if not item:
continue
results.append({
"taxon_id": int(uid),
"scientific_name": item.get("scientificname", ""),
"common_name": item.get("commonname", ""),
"rank": item.get("rank", ""),
"division": item.get("division", ""),
})
# FIX: sort by rank β species before genus
results.sort(key=lambda r: _rank_priority(r.get("rank", "")))
# FIX: exact name match β front of list
q_lower = query.lower()
exact = [r for r in results if r["scientific_name"].lower() == q_lower]
rest = [r for r in results if r["scientific_name"].lower() != q_lower]
results = exact + rest
_search_cache[cache_key] = results
logger.info("[taxonomy] search %r β %d results", query, len(results))
return results
except Exception as exc:
logger.error("[taxonomy] search_species(%r) failed: %s", query, exc)
return [{"error": str(exc)}]
def get_taxon_info(taxon_id: int) -> dict:
"""
Reverse lookup: taxon ID β full species info with lineage.
Uses xml.etree.ElementTree β handles multi-line XML correctly.
"""
if taxon_id in _id_to_info_cache:
return _id_to_info_cache[taxon_id]
base = {
"taxon_id": taxon_id, "scientific_name": "",
"common_name": "", "rank": "", "division": "",
"lineage": "", "verified": False,
}
try:
resp = _ncbi_get(NCBI_FETCH_URL, {
"db": "taxonomy", "id": taxon_id,
"retmode": "xml", "tool": NCBI_TOOL, "email": NCBI_EMAIL,
})
root = ET.fromstring(resp.text)
taxon_el = root.find("Taxon")
if taxon_el is None:
base["error"] = "Taxon element not found in NCBI XML"
return base
def txt(tag: str) -> str:
el = taxon_el.find(tag)
return (el.text or "").strip() if el is not None else ""
lineage_parts = [
(a.findtext("ScientificName") or "").strip()
for a in taxon_el.findall("./LineageEx/Taxon")
]
common = (taxon_el.findtext("OtherNames/CommonName") or
taxon_el.findtext("CommonName") or "")
info = {
**base,
"scientific_name": txt("ScientificName"),
"common_name": common.strip(),
"rank": txt("Rank"),
"division": txt("Division"),
"lineage": " > ".join(p for p in lineage_parts if p),
"verified": True,
}
_id_to_info_cache[taxon_id] = info
logger.info("[taxonomy] Resolved taxon %d β %s", taxon_id, info["scientific_name"])
return info
except ET.ParseError as exc:
logger.error("[taxonomy] XML parse error for taxon %d: %s", taxon_id, exc)
base["error"] = f"XML parse error: {exc}"
return base
except Exception as exc:
logger.error("[taxonomy] get_taxon_info(%d) failed: %s", taxon_id, exc)
base["error"] = str(exc)
return base
def resolve_taxon(taxon_id: int, top50_taxa: list) -> dict:
"""Check training-set membership for a taxon ID."""
info = get_taxon_info(taxon_id)
in_training = taxon_id in top50_taxa
return {
**info,
"in_training": in_training,
"training_status": "in_training_data" if in_training else "unknown_species_fallback",
}
|