BactKing / engine /parser_ext.py
EphAsad's picture
Update engine/parser_ext.py
90ac78d verified
# engine/parser_ext.py
# ======================================================================
# Extended test parser — Stage 12C-fix4
#
# GOAL:
# • Explicit-only parsing
# • ML-safe
# • Deterministic
# • No inference
# • Schema-backed fallback for enum_PNV
# ======================================================================
from __future__ import annotations
import os, re, json
from typing import Dict, Any
EXTENDED_SCHEMA_PATH = os.path.join("data", "extended_schema.json")
UNKNOWN = "Unknown"
# ======================================================================
# Fields NOT parsed here
# ======================================================================
CORE_FIELDS = {
"Genus","Species",
"Gram Stain","Shape","Colony Morphology",
"Haemolysis","Motility","Capsule","Spore Formation",
"Growth Temperature","Oxygen Requirement","Media Grown On",
"Catalase","Oxidase","Indole","Urease","Citrate","Methyl Red","VP",
"H2S","DNase","ONPG","Coagulase","Lipase Test","Nitrate Reduction",
"Lysine Decarboxylase","Arginine dihydrolase",
"Gelatin Hydrolysis","Esculin Hydrolysis",
"Glucose Fermentation","Lactose Fermentation","Sucrose Fermentation",
"Mannitol Fermentation","Sorbitol Fermentation","Maltose Fermentation",
"Xylose Fermentation","Rhamnose Fermentation","Arabinose Fermentation",
"Raffinose Fermentation","Trehalose Fermentation","Inositol Fermentation",
}
# ======================================================================
# Helpers
# ======================================================================
def _clean_text(t: str) -> str:
if not t:
return ""
t = t.replace("°", "").replace("º", "").replace("₂", "2")
return " ".join(t.split())
def _set_if_stronger(parsed: Dict[str,str], field: str, value: str):
if not value:
return
if field not in parsed or parsed[field] == UNKNOWN:
parsed[field] = value
def _parse_pnv_after_anchor(text: str, parsed: Dict[str,str], field: str, anchor: str):
m = re.search(
rf"\b{re.escape(anchor)}\b\s*(positive|negative|variable|unknown)",
text,
re.IGNORECASE,
)
if m:
_set_if_stronger(parsed, field, m.group(1).capitalize())
def _load_extended_schema(path: str) -> Dict[str, Any]:
if not os.path.exists(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
obj = json.load(f)
return obj if isinstance(obj, dict) else {}
except Exception:
return {}
# ======================================================================
# 1. Gram Stain Variable (explicit-only)
# ======================================================================
def _parse_gram_variable(text: str, parsed: Dict[str,str]):
t = text.lower()
if (
re.search(r"\bgram[- ]variable\b", t) or
re.search(r"\bgram stain variable\b", t) or
re.search(r"\bvariable gram stain\b", t)
):
_set_if_stronger(parsed, "Gram Stain", "Variable")
# ======================================================================
# 2. Shape (yeast phrasing fix)
# ======================================================================
def _parse_shape_yeast(text: str, parsed: Dict[str,str]):
if re.search(r"\byeast cells?\b", text.lower()):
_set_if_stronger(parsed, "Shape", "Yeast")
# ======================================================================
# 3. Capsule (explicit Variable only)
# ======================================================================
def _parse_capsule_variable(text: str, parsed: Dict[str,str]):
t = text.lower()
patterns = [
r"\bcapsule\s*[:\-]?\s*variable\b",
r"\bcapsule-variable\b",
r"\bvariable\s+capsule\b",
]
if any(re.search(p, t) for p in patterns):
_set_if_stronger(parsed, "Capsule", "Variable")
# ======================================================================
# 4. Gas Production
# ======================================================================
def _parse_gas_production(text: str, parsed: Dict[str,str]):
t = text.lower()
POS = [
"produces gas","gas produced","with gas",
"gas production positive","gas producer",
"production of gas","ferments glucose with gas",
]
NEG = [
"does not produce gas","no gas",
"absence of gas","gas production negative",
]
if any(p in t for p in POS):
_set_if_stronger(parsed,"Gas Production","Positive")
elif any(n in t for n in NEG):
_set_if_stronger(parsed,"Gas Production","Negative")
# ======================================================================
# 5. Motility Type (explicit)
# ======================================================================
MOTILITY_TYPES = {
"Peritrichous","Monotrichous","Polytrichous","Polar",
"Swarming","Tumbling","Gliding","Corkscrew","Axial",
}
def _parse_motility_type(text: str, parsed: Dict[str,str]):
t = text.lower()
mneg = re.search(r"\bmotility type\b\s*[:\-]?\s*(negative|none)\b", t)
if mneg:
_set_if_stronger(parsed, "Motility Type", mneg.group(1).capitalize())
return
m = re.search(r"\bmotility type\b\s*[:\-]?\s*([a-z]+)", t)
if m:
val = m.group(1).capitalize()
if val in MOTILITY_TYPES:
_set_if_stronger(parsed, "Motility Type", val)
return
for mt in MOTILITY_TYPES:
if re.search(rf"\b{mt.lower()}\b", t):
_set_if_stronger(parsed, "Motility Type", mt)
return
# ======================================================================
# 6. Pigment (EXPLICIT + SCIENTIFIC TERMS ONLY)
# ======================================================================
SCIENTIFIC_PIGMENTS = {
"Pyocyanin","Pyoverdine","Pyovacin","Bioluminescent"
}
COLOUR_PIGMENTS = {
"green","yellow","pink","red","orange","brown","black","violet","cream"
}
def _parse_pigment(text: str, parsed: Dict[str,str]):
t = text.lower()
# Joint negative phrase
if re.search(r"\bno pigmentation or odou?r\b", t):
_set_if_stronger(parsed, "Pigment", "None")
_set_if_stronger(parsed, "Odor", "None")
return
has_anchor = re.search(r"\b(pigment|pigmentation)\b", t)
found = set()
# Scientific pigments (allowed without anchor)
for sp in SCIENTIFIC_PIGMENTS:
if re.search(rf"\b{sp.lower()}\b", t):
found.add(sp)
# Colour pigments ONLY if pigment anchor exists
if has_anchor:
for cp in COLOUR_PIGMENTS:
if re.search(rf"\b{cp}\b", t):
found.add(cp.capitalize())
if re.search(r"\bno pigmentation\b|\bpigment none\b", t):
_set_if_stronger(parsed, "Pigment", "None")
elif found:
_set_if_stronger(parsed, "Pigment", "; ".join(sorted(found)))
# ======================================================================
# 7. Colony Pattern (explicit only)
# ======================================================================
COLONY_PATTERNS = {
"Mucoid","Smooth","Rough","Filamentous",
"Spreading","Swarming","Sticky","Irregular",
"Ground-glass","Molar-tooth","Dry","Chalky","Corroding",
}
def _parse_colony_pattern(text: str, parsed: Dict[str,str]):
t = text.lower()
if not re.search(r"\bcolony pattern\b", t):
return
m = re.search(r"\bcolony pattern\b\s*[:\-]?\s*([a-z\-]+)", t)
if m:
val = m.group(1).capitalize()
if val in COLONY_PATTERNS:
_set_if_stronger(parsed, "Colony Pattern", val)
# ======================================================================
# 8. Odor (explicit anchor-based)
# ======================================================================
def _parse_odor(text: str, parsed: Dict[str,str]):
t = text.lower()
m = re.search(r"\b(odor|odour|smell)\b\s*[:\-]?\s*([a-z; ]+)", t)
if not m:
return
vals = [v.strip().capitalize() for v in m.group(2).split(";") if v.strip()]
if vals:
_set_if_stronger(parsed, "Odor", "; ".join(vals))
# ======================================================================
# 9. TSI Pattern
# ======================================================================
def _parse_tsi(text: str, parsed: Dict[str,str]):
t = text.upper()
if "TSI" in t and "UNKNOWN" in t:
_set_if_stronger(parsed, "TSI Pattern", "Unknown")
return
m = re.search(r"\b([KA]/[KA])(\s*\+\s*H2S)?\b", t)
if m:
base = m.group(1)
_set_if_stronger(parsed, "TSI Pattern", f"{base}+H2S" if m.group(2) else base)
# ======================================================================
# 10. NaCl Tolerant (>=6%)
# ======================================================================
def _parse_nacl(text: str, parsed: Dict[str,str]):
m = re.search(
r"NaCl\s*Tolerant\s*\(>=\s*6%\)\s*(positive|negative|variable|unknown)",
text,
re.IGNORECASE,
)
if m:
_set_if_stronger(parsed, "NaCl Tolerant (>=6%)", m.group(1).capitalize())
return
_parse_pnv_after_anchor(text, parsed, "NaCl Tolerant (>=6%)", "NaCl Tolerant")
# ======================================================================
# 11. Haemolysis Type
# ======================================================================
def _parse_haemolysis_type(text: str, parsed: Dict[str,str]):
m = re.search(
r"\bhaemolysis type\b\s*[:\-]?\s*(alpha|beta|gamma|none)",
text,
re.IGNORECASE,
)
if m:
_set_if_stronger(parsed, "Haemolysis Type", m.group(1).capitalize())
# ======================================================================
# 12. Ornithine Decarboxylase (both spellings)
# ======================================================================
def _parse_ornithine_dec(text: str, parsed: Dict[str,str]):
_parse_pnv_after_anchor(text, parsed, "Ornithine Decarboxylase", "Ornithine Decarboxylase")
_parse_pnv_after_anchor(text, parsed, "Ornitihine Decarboxylase", "Ornitihine Decarboxylase")
if "Ornitihine Decarboxylase" in parsed and "Ornithine Decarboxylase" not in parsed:
_set_if_stronger(parsed, "Ornithine Decarboxylase", parsed["Ornitihine Decarboxylase"])
# ======================================================================
# 13. Schema-driven enum_PNV fallback (SAFE)
# ======================================================================
def _parse_schema_enum_pnv(text: str, parsed: Dict[str,str]):
schema = _load_extended_schema(EXTENDED_SCHEMA_PATH)
t = text.lower()
for field, meta in schema.items():
if field in CORE_FIELDS or field in parsed:
continue
if meta.get("value_type") != "enum_PNV":
continue
aliases = meta.get("aliases", [])
for name in [field] + aliases:
m = re.search(
rf"\b{re.escape(name.lower())}\b\s*(positive|negative|variable|unknown)",
t,
)
if m:
_set_if_stronger(parsed, field, m.group(1).capitalize())
break
# ======================================================================
# MAIN
# ======================================================================
def parse_text_extended(text: str) -> Dict[str,Any]:
orig = text or ""
if not orig.strip():
return {"parsed_fields": {}, "source": "extended_parser", "raw": orig}
cleaned = _clean_text(orig)
parsed: Dict[str,str] = {}
_parse_gram_variable(cleaned, parsed)
_parse_shape_yeast(cleaned, parsed)
_parse_capsule_variable(cleaned, parsed)
_parse_gas_production(cleaned, parsed)
_parse_motility_type(cleaned, parsed)
_parse_pigment(cleaned, parsed)
_parse_colony_pattern(cleaned, parsed)
_parse_odor(cleaned, parsed)
_parse_tsi(cleaned, parsed)
_parse_nacl(cleaned, parsed)
_parse_haemolysis_type(cleaned, parsed)
_parse_ornithine_dec(cleaned, parsed)
_parse_schema_enum_pnv(cleaned, parsed)
return {
"parsed_fields": parsed,
"source": "extended_parser",
"raw": orig,
}