File size: 6,819 Bytes
5c389ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# taxonomy.py
"""
FunGO β€” NCBI Taxonomy Service
===============================
Species name β†’ taxon ID lookup and reverse lookup.

Fixes applied:
  1. UID string/int consistency β€” result_map keys are always strings,
     now explicitly uses str(uid) so 9606 never resolves to {}.
  2. Species-rank preference β€” results sorted so "species" rank
     appears before "genus". Prevents 9605 (Homo genus) showing
     before 9606 (Homo sapiens species).
  3. Exact-name boost β€” exact query match moved to position 0.
  4. Cache key includes max_results to prevent stale smaller lists.
  5. xml.etree.ElementTree replaces fragile regex XML parsing.
  6. Retry logic β€” 3 attempts with 2s gap on connection errors.
"""

import logging
import time
import xml.etree.ElementTree as ET
import requests

from config import (
    NCBI_SEARCH_URL, NCBI_SUMMARY_URL, NCBI_FETCH_URL,
    NCBI_TOOL, NCBI_EMAIL,
)

logger  = logging.getLogger(__name__)
HEADERS = {"User-Agent": f"FunGO/1.0 ({NCBI_EMAIL})"}
TIMEOUT = 10
RETRIES = 3
RETRY_DELAY = 2

_RANK_PRIORITY = {
    "species": 0, "subspecies": 1, "varietas": 2,
    "forma": 3, "strain": 4, "no rank": 5,
    "genus": 6, "family": 7, "order": 8,
    "class": 9, "phylum": 10, "kingdom": 11, "superkingdom": 12,
}

def _rank_priority(rank: str) -> int:
    return _RANK_PRIORITY.get(rank.lower().strip(), 99)

_search_cache: dict = {}
_id_to_info_cache: dict = {}


def _ncbi_get(url: str, params: dict) -> requests.Response:
    last_exc = None
    for attempt in range(1, RETRIES + 1):
        try:
            resp = requests.get(url, params=params, timeout=TIMEOUT, headers=HEADERS)
            resp.raise_for_status()
            return resp
        except requests.RequestException as exc:
            last_exc = exc
            if attempt < RETRIES:
                logger.warning("[taxonomy] Request error (attempt %d/%d): %s β€” retrying in %ds",
                               attempt, RETRIES, exc, RETRY_DELAY)
                time.sleep(RETRY_DELAY)
    raise last_exc


def search_species(query: str, max_results: int = 8) -> list:
    """
    Search NCBI taxonomy by species name.
    Returns [{taxon_id, scientific_name, common_name, rank, division}]
    Sorted: species rank first, exact name match at position 0.
    """
    query = query.strip()
    if len(query) < 2:
        return []

    cache_key = (query.lower(), max_results)
    if cache_key in _search_cache:
        return _search_cache[cache_key]

    try:
        search_resp = _ncbi_get(NCBI_SEARCH_URL, {
            "db": "taxonomy", "term": query,
            "retmax": max_results, "retmode": "json",
            "tool": NCBI_TOOL, "email": NCBI_EMAIL,
        })
        ids = search_resp.json().get("esearchresult", {}).get("idlist", [])

        if not ids:
            _search_cache[cache_key] = []
            return []

        summary_resp = _ncbi_get(NCBI_SUMMARY_URL, {
            "db": "taxonomy", "id": ",".join(ids),
            "retmode": "json", "tool": NCBI_TOOL, "email": NCBI_EMAIL,
        })
        result_map = summary_resp.json().get("result", {})
        uids = result_map.get("uids", ids)

        results = []
        for uid in uids:
            item = result_map.get(str(uid), {})   # FIX: explicit str()
            if not item:
                continue
            results.append({
                "taxon_id":        int(uid),
                "scientific_name": item.get("scientificname", ""),
                "common_name":     item.get("commonname", ""),
                "rank":            item.get("rank", ""),
                "division":        item.get("division", ""),
            })

        # FIX: sort by rank β€” species before genus
        results.sort(key=lambda r: _rank_priority(r.get("rank", "")))

        # FIX: exact name match β†’ front of list
        q_lower = query.lower()
        exact = [r for r in results if r["scientific_name"].lower() == q_lower]
        rest  = [r for r in results if r["scientific_name"].lower() != q_lower]
        results = exact + rest

        _search_cache[cache_key] = results
        logger.info("[taxonomy] search %r β†’ %d results", query, len(results))
        return results

    except Exception as exc:
        logger.error("[taxonomy] search_species(%r) failed: %s", query, exc)
        return [{"error": str(exc)}]


def get_taxon_info(taxon_id: int) -> dict:
    """
    Reverse lookup: taxon ID β†’ full species info with lineage.
    Uses xml.etree.ElementTree β€” handles multi-line XML correctly.
    """
    if taxon_id in _id_to_info_cache:
        return _id_to_info_cache[taxon_id]

    base = {
        "taxon_id": taxon_id, "scientific_name": "",
        "common_name": "", "rank": "", "division": "",
        "lineage": "", "verified": False,
    }

    try:
        resp = _ncbi_get(NCBI_FETCH_URL, {
            "db": "taxonomy", "id": taxon_id,
            "retmode": "xml", "tool": NCBI_TOOL, "email": NCBI_EMAIL,
        })

        root     = ET.fromstring(resp.text)
        taxon_el = root.find("Taxon")

        if taxon_el is None:
            base["error"] = "Taxon element not found in NCBI XML"
            return base

        def txt(tag: str) -> str:
            el = taxon_el.find(tag)
            return (el.text or "").strip() if el is not None else ""

        lineage_parts = [
            (a.findtext("ScientificName") or "").strip()
            for a in taxon_el.findall("./LineageEx/Taxon")
        ]

        common = (taxon_el.findtext("OtherNames/CommonName") or
                  taxon_el.findtext("CommonName") or "")

        info = {
            **base,
            "scientific_name": txt("ScientificName"),
            "common_name":     common.strip(),
            "rank":            txt("Rank"),
            "division":        txt("Division"),
            "lineage":         " > ".join(p for p in lineage_parts if p),
            "verified":        True,
        }

        _id_to_info_cache[taxon_id] = info
        logger.info("[taxonomy] Resolved taxon %d β†’ %s", taxon_id, info["scientific_name"])
        return info

    except ET.ParseError as exc:
        logger.error("[taxonomy] XML parse error for taxon %d: %s", taxon_id, exc)
        base["error"] = f"XML parse error: {exc}"
        return base
    except Exception as exc:
        logger.error("[taxonomy] get_taxon_info(%d) failed: %s", taxon_id, exc)
        base["error"] = str(exc)
        return base


def resolve_taxon(taxon_id: int, top50_taxa: list) -> dict:
    """Check training-set membership for a taxon ID."""
    info        = get_taxon_info(taxon_id)
    in_training = taxon_id in top50_taxa
    return {
        **info,
        "in_training":     in_training,
        "training_status": "in_training_data" if in_training else "unknown_species_fallback",
    }