Spaces:
Running
Running
| # engine/parser_ext.py | |
| # ====================================================================== | |
| # Extended test parser — Stage 12C-fix4 | |
| # | |
| # GOAL: | |
| # • Explicit-only parsing | |
| # • ML-safe | |
| # • Deterministic | |
| # • No inference | |
| # • Schema-backed fallback for enum_PNV | |
| # ====================================================================== | |
| from __future__ import annotations | |
| import os, re, json | |
| from typing import Dict, Any | |
| EXTENDED_SCHEMA_PATH = os.path.join("data", "extended_schema.json") | |
| UNKNOWN = "Unknown" | |
| # ====================================================================== | |
| # Fields NOT parsed here | |
| # ====================================================================== | |
| CORE_FIELDS = { | |
| "Genus","Species", | |
| "Gram Stain","Shape","Colony Morphology", | |
| "Haemolysis","Motility","Capsule","Spore Formation", | |
| "Growth Temperature","Oxygen Requirement","Media Grown On", | |
| "Catalase","Oxidase","Indole","Urease","Citrate","Methyl Red","VP", | |
| "H2S","DNase","ONPG","Coagulase","Lipase Test","Nitrate Reduction", | |
| "Lysine Decarboxylase","Arginine dihydrolase", | |
| "Gelatin Hydrolysis","Esculin Hydrolysis", | |
| "Glucose Fermentation","Lactose Fermentation","Sucrose Fermentation", | |
| "Mannitol Fermentation","Sorbitol Fermentation","Maltose Fermentation", | |
| "Xylose Fermentation","Rhamnose Fermentation","Arabinose Fermentation", | |
| "Raffinose Fermentation","Trehalose Fermentation","Inositol Fermentation", | |
| } | |
| # ====================================================================== | |
| # Helpers | |
| # ====================================================================== | |
| def _clean_text(t: str) -> str: | |
| if not t: | |
| return "" | |
| t = t.replace("°", "").replace("º", "").replace("₂", "2") | |
| return " ".join(t.split()) | |
| def _set_if_stronger(parsed: Dict[str,str], field: str, value: str): | |
| if not value: | |
| return | |
| if field not in parsed or parsed[field] == UNKNOWN: | |
| parsed[field] = value | |
| def _parse_pnv_after_anchor(text: str, parsed: Dict[str,str], field: str, anchor: str): | |
| m = re.search( | |
| rf"\b{re.escape(anchor)}\b\s*(positive|negative|variable|unknown)", | |
| text, | |
| re.IGNORECASE, | |
| ) | |
| if m: | |
| _set_if_stronger(parsed, field, m.group(1).capitalize()) | |
| def _load_extended_schema(path: str) -> Dict[str, Any]: | |
| if not os.path.exists(path): | |
| return {} | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| obj = json.load(f) | |
| return obj if isinstance(obj, dict) else {} | |
| except Exception: | |
| return {} | |
| # ====================================================================== | |
| # 1. Gram Stain Variable (explicit-only) | |
| # ====================================================================== | |
| def _parse_gram_variable(text: str, parsed: Dict[str,str]): | |
| t = text.lower() | |
| if ( | |
| re.search(r"\bgram[- ]variable\b", t) or | |
| re.search(r"\bgram stain variable\b", t) or | |
| re.search(r"\bvariable gram stain\b", t) | |
| ): | |
| _set_if_stronger(parsed, "Gram Stain", "Variable") | |
| # ====================================================================== | |
| # 2. Shape (yeast phrasing fix) | |
| # ====================================================================== | |
| def _parse_shape_yeast(text: str, parsed: Dict[str,str]): | |
| if re.search(r"\byeast cells?\b", text.lower()): | |
| _set_if_stronger(parsed, "Shape", "Yeast") | |
| # ====================================================================== | |
| # 3. Capsule (explicit Variable only) | |
| # ====================================================================== | |
| def _parse_capsule_variable(text: str, parsed: Dict[str,str]): | |
| t = text.lower() | |
| patterns = [ | |
| r"\bcapsule\s*[:\-]?\s*variable\b", | |
| r"\bcapsule-variable\b", | |
| r"\bvariable\s+capsule\b", | |
| ] | |
| if any(re.search(p, t) for p in patterns): | |
| _set_if_stronger(parsed, "Capsule", "Variable") | |
| # ====================================================================== | |
| # 4. Gas Production | |
| # ====================================================================== | |
| def _parse_gas_production(text: str, parsed: Dict[str,str]): | |
| t = text.lower() | |
| POS = [ | |
| "produces gas","gas produced","with gas", | |
| "gas production positive","gas producer", | |
| "production of gas","ferments glucose with gas", | |
| ] | |
| NEG = [ | |
| "does not produce gas","no gas", | |
| "absence of gas","gas production negative", | |
| ] | |
| if any(p in t for p in POS): | |
| _set_if_stronger(parsed,"Gas Production","Positive") | |
| elif any(n in t for n in NEG): | |
| _set_if_stronger(parsed,"Gas Production","Negative") | |
| # ====================================================================== | |
| # 5. Motility Type (explicit) | |
| # ====================================================================== | |
| MOTILITY_TYPES = { | |
| "Peritrichous","Monotrichous","Polytrichous","Polar", | |
| "Swarming","Tumbling","Gliding","Corkscrew","Axial", | |
| } | |
| def _parse_motility_type(text: str, parsed: Dict[str,str]): | |
| t = text.lower() | |
| mneg = re.search(r"\bmotility type\b\s*[:\-]?\s*(negative|none)\b", t) | |
| if mneg: | |
| _set_if_stronger(parsed, "Motility Type", mneg.group(1).capitalize()) | |
| return | |
| m = re.search(r"\bmotility type\b\s*[:\-]?\s*([a-z]+)", t) | |
| if m: | |
| val = m.group(1).capitalize() | |
| if val in MOTILITY_TYPES: | |
| _set_if_stronger(parsed, "Motility Type", val) | |
| return | |
| for mt in MOTILITY_TYPES: | |
| if re.search(rf"\b{mt.lower()}\b", t): | |
| _set_if_stronger(parsed, "Motility Type", mt) | |
| return | |
| # ====================================================================== | |
| # 6. Pigment (EXPLICIT + SCIENTIFIC TERMS ONLY) | |
| # ====================================================================== | |
| SCIENTIFIC_PIGMENTS = { | |
| "Pyocyanin","Pyoverdine","Pyovacin","Bioluminescent" | |
| } | |
| COLOUR_PIGMENTS = { | |
| "green","yellow","pink","red","orange","brown","black","violet","cream" | |
| } | |
| def _parse_pigment(text: str, parsed: Dict[str,str]): | |
| t = text.lower() | |
| # Joint negative phrase | |
| if re.search(r"\bno pigmentation or odou?r\b", t): | |
| _set_if_stronger(parsed, "Pigment", "None") | |
| _set_if_stronger(parsed, "Odor", "None") | |
| return | |
| has_anchor = re.search(r"\b(pigment|pigmentation)\b", t) | |
| found = set() | |
| # Scientific pigments (allowed without anchor) | |
| for sp in SCIENTIFIC_PIGMENTS: | |
| if re.search(rf"\b{sp.lower()}\b", t): | |
| found.add(sp) | |
| # Colour pigments ONLY if pigment anchor exists | |
| if has_anchor: | |
| for cp in COLOUR_PIGMENTS: | |
| if re.search(rf"\b{cp}\b", t): | |
| found.add(cp.capitalize()) | |
| if re.search(r"\bno pigmentation\b|\bpigment none\b", t): | |
| _set_if_stronger(parsed, "Pigment", "None") | |
| elif found: | |
| _set_if_stronger(parsed, "Pigment", "; ".join(sorted(found))) | |
| # ====================================================================== | |
| # 7. Colony Pattern (explicit only) | |
| # ====================================================================== | |
| COLONY_PATTERNS = { | |
| "Mucoid","Smooth","Rough","Filamentous", | |
| "Spreading","Swarming","Sticky","Irregular", | |
| "Ground-glass","Molar-tooth","Dry","Chalky","Corroding", | |
| } | |
| def _parse_colony_pattern(text: str, parsed: Dict[str,str]): | |
| t = text.lower() | |
| if not re.search(r"\bcolony pattern\b", t): | |
| return | |
| m = re.search(r"\bcolony pattern\b\s*[:\-]?\s*([a-z\-]+)", t) | |
| if m: | |
| val = m.group(1).capitalize() | |
| if val in COLONY_PATTERNS: | |
| _set_if_stronger(parsed, "Colony Pattern", val) | |
| # ====================================================================== | |
| # 8. Odor (explicit anchor-based) | |
| # ====================================================================== | |
| def _parse_odor(text: str, parsed: Dict[str,str]): | |
| t = text.lower() | |
| m = re.search(r"\b(odor|odour|smell)\b\s*[:\-]?\s*([a-z; ]+)", t) | |
| if not m: | |
| return | |
| vals = [v.strip().capitalize() for v in m.group(2).split(";") if v.strip()] | |
| if vals: | |
| _set_if_stronger(parsed, "Odor", "; ".join(vals)) | |
| # ====================================================================== | |
| # 9. TSI Pattern | |
| # ====================================================================== | |
| def _parse_tsi(text: str, parsed: Dict[str,str]): | |
| t = text.upper() | |
| if "TSI" in t and "UNKNOWN" in t: | |
| _set_if_stronger(parsed, "TSI Pattern", "Unknown") | |
| return | |
| m = re.search(r"\b([KA]/[KA])(\s*\+\s*H2S)?\b", t) | |
| if m: | |
| base = m.group(1) | |
| _set_if_stronger(parsed, "TSI Pattern", f"{base}+H2S" if m.group(2) else base) | |
| # ====================================================================== | |
| # 10. NaCl Tolerant (>=6%) | |
| # ====================================================================== | |
| def _parse_nacl(text: str, parsed: Dict[str,str]): | |
| m = re.search( | |
| r"NaCl\s*Tolerant\s*\(>=\s*6%\)\s*(positive|negative|variable|unknown)", | |
| text, | |
| re.IGNORECASE, | |
| ) | |
| if m: | |
| _set_if_stronger(parsed, "NaCl Tolerant (>=6%)", m.group(1).capitalize()) | |
| return | |
| _parse_pnv_after_anchor(text, parsed, "NaCl Tolerant (>=6%)", "NaCl Tolerant") | |
| # ====================================================================== | |
| # 11. Haemolysis Type | |
| # ====================================================================== | |
| def _parse_haemolysis_type(text: str, parsed: Dict[str,str]): | |
| m = re.search( | |
| r"\bhaemolysis type\b\s*[:\-]?\s*(alpha|beta|gamma|none)", | |
| text, | |
| re.IGNORECASE, | |
| ) | |
| if m: | |
| _set_if_stronger(parsed, "Haemolysis Type", m.group(1).capitalize()) | |
| # ====================================================================== | |
| # 12. Ornithine Decarboxylase (both spellings) | |
| # ====================================================================== | |
| def _parse_ornithine_dec(text: str, parsed: Dict[str,str]): | |
| _parse_pnv_after_anchor(text, parsed, "Ornithine Decarboxylase", "Ornithine Decarboxylase") | |
| _parse_pnv_after_anchor(text, parsed, "Ornitihine Decarboxylase", "Ornitihine Decarboxylase") | |
| if "Ornitihine Decarboxylase" in parsed and "Ornithine Decarboxylase" not in parsed: | |
| _set_if_stronger(parsed, "Ornithine Decarboxylase", parsed["Ornitihine Decarboxylase"]) | |
| # ====================================================================== | |
| # 13. Schema-driven enum_PNV fallback (SAFE) | |
| # ====================================================================== | |
| def _parse_schema_enum_pnv(text: str, parsed: Dict[str,str]): | |
| schema = _load_extended_schema(EXTENDED_SCHEMA_PATH) | |
| t = text.lower() | |
| for field, meta in schema.items(): | |
| if field in CORE_FIELDS or field in parsed: | |
| continue | |
| if meta.get("value_type") != "enum_PNV": | |
| continue | |
| aliases = meta.get("aliases", []) | |
| for name in [field] + aliases: | |
| m = re.search( | |
| rf"\b{re.escape(name.lower())}\b\s*(positive|negative|variable|unknown)", | |
| t, | |
| ) | |
| if m: | |
| _set_if_stronger(parsed, field, m.group(1).capitalize()) | |
| break | |
| # ====================================================================== | |
| # MAIN | |
| # ====================================================================== | |
| def parse_text_extended(text: str) -> Dict[str,Any]: | |
| orig = text or "" | |
| if not orig.strip(): | |
| return {"parsed_fields": {}, "source": "extended_parser", "raw": orig} | |
| cleaned = _clean_text(orig) | |
| parsed: Dict[str,str] = {} | |
| _parse_gram_variable(cleaned, parsed) | |
| _parse_shape_yeast(cleaned, parsed) | |
| _parse_capsule_variable(cleaned, parsed) | |
| _parse_gas_production(cleaned, parsed) | |
| _parse_motility_type(cleaned, parsed) | |
| _parse_pigment(cleaned, parsed) | |
| _parse_colony_pattern(cleaned, parsed) | |
| _parse_odor(cleaned, parsed) | |
| _parse_tsi(cleaned, parsed) | |
| _parse_nacl(cleaned, parsed) | |
| _parse_haemolysis_type(cleaned, parsed) | |
| _parse_ornithine_dec(cleaned, parsed) | |
| _parse_schema_enum_pnv(cleaned, parsed) | |
| return { | |
| "parsed_fields": parsed, | |
| "source": "extended_parser", | |
| "raw": orig, | |
| } |