|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
import os, re, json |
|
|
from typing import Dict, Any |
|
|
|
|
|
EXTENDED_SCHEMA_PATH = os.path.join("data", "extended_schema.json") |
|
|
|
|
|
UNKNOWN = "Unknown" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CORE_FIELDS = { |
|
|
"Genus","Species", |
|
|
"Gram Stain","Shape","Colony Morphology", |
|
|
"Haemolysis","Motility","Capsule","Spore Formation", |
|
|
"Growth Temperature","Oxygen Requirement","Media Grown On", |
|
|
"Catalase","Oxidase","Indole","Urease","Citrate","Methyl Red","VP", |
|
|
"H2S","DNase","ONPG","Coagulase","Lipase Test","Nitrate Reduction", |
|
|
"Lysine Decarboxylase","Arginine dihydrolase", |
|
|
"Gelatin Hydrolysis","Esculin Hydrolysis", |
|
|
"Glucose Fermentation","Lactose Fermentation","Sucrose Fermentation", |
|
|
"Mannitol Fermentation","Sorbitol Fermentation","Maltose Fermentation", |
|
|
"Xylose Fermentation","Rhamnose Fermentation","Arabinose Fermentation", |
|
|
"Raffinose Fermentation","Trehalose Fermentation","Inositol Fermentation", |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _clean_text(t: str) -> str: |
|
|
if not t: |
|
|
return "" |
|
|
t = t.replace("°", "").replace("º", "").replace("₂", "2") |
|
|
return " ".join(t.split()) |
|
|
|
|
|
def _set_if_stronger(parsed: Dict[str,str], field: str, value: str): |
|
|
if not value: |
|
|
return |
|
|
if field not in parsed or parsed[field] == UNKNOWN: |
|
|
parsed[field] = value |
|
|
|
|
|
def _parse_pnv_after_anchor(text: str, parsed: Dict[str,str], field: str, anchor: str): |
|
|
m = re.search( |
|
|
rf"\b{re.escape(anchor)}\b\s*(positive|negative|variable|unknown)", |
|
|
text, |
|
|
re.IGNORECASE, |
|
|
) |
|
|
if m: |
|
|
_set_if_stronger(parsed, field, m.group(1).capitalize()) |
|
|
|
|
|
def _load_extended_schema(path: str) -> Dict[str, Any]: |
|
|
if not os.path.exists(path): |
|
|
return {} |
|
|
try: |
|
|
with open(path, "r", encoding="utf-8") as f: |
|
|
obj = json.load(f) |
|
|
return obj if isinstance(obj, dict) else {} |
|
|
except Exception: |
|
|
return {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_gram_variable(text: str, parsed: Dict[str,str]): |
|
|
t = text.lower() |
|
|
if ( |
|
|
re.search(r"\bgram[- ]variable\b", t) or |
|
|
re.search(r"\bgram stain variable\b", t) or |
|
|
re.search(r"\bvariable gram stain\b", t) |
|
|
): |
|
|
_set_if_stronger(parsed, "Gram Stain", "Variable") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_shape_yeast(text: str, parsed: Dict[str,str]): |
|
|
if re.search(r"\byeast cells?\b", text.lower()): |
|
|
_set_if_stronger(parsed, "Shape", "Yeast") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_capsule_variable(text: str, parsed: Dict[str,str]): |
|
|
t = text.lower() |
|
|
patterns = [ |
|
|
r"\bcapsule\s*[:\-]?\s*variable\b", |
|
|
r"\bcapsule-variable\b", |
|
|
r"\bvariable\s+capsule\b", |
|
|
] |
|
|
if any(re.search(p, t) for p in patterns): |
|
|
_set_if_stronger(parsed, "Capsule", "Variable") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_gas_production(text: str, parsed: Dict[str,str]): |
|
|
t = text.lower() |
|
|
POS = [ |
|
|
"produces gas","gas produced","with gas", |
|
|
"gas production positive","gas producer", |
|
|
"production of gas","ferments glucose with gas", |
|
|
] |
|
|
NEG = [ |
|
|
"does not produce gas","no gas", |
|
|
"absence of gas","gas production negative", |
|
|
] |
|
|
if any(p in t for p in POS): |
|
|
_set_if_stronger(parsed,"Gas Production","Positive") |
|
|
elif any(n in t for n in NEG): |
|
|
_set_if_stronger(parsed,"Gas Production","Negative") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MOTILITY_TYPES = { |
|
|
"Peritrichous","Monotrichous","Polytrichous","Polar", |
|
|
"Swarming","Tumbling","Gliding","Corkscrew","Axial", |
|
|
} |
|
|
|
|
|
def _parse_motility_type(text: str, parsed: Dict[str,str]): |
|
|
t = text.lower() |
|
|
|
|
|
mneg = re.search(r"\bmotility type\b\s*[:\-]?\s*(negative|none)\b", t) |
|
|
if mneg: |
|
|
_set_if_stronger(parsed, "Motility Type", mneg.group(1).capitalize()) |
|
|
return |
|
|
|
|
|
m = re.search(r"\bmotility type\b\s*[:\-]?\s*([a-z]+)", t) |
|
|
if m: |
|
|
val = m.group(1).capitalize() |
|
|
if val in MOTILITY_TYPES: |
|
|
_set_if_stronger(parsed, "Motility Type", val) |
|
|
return |
|
|
|
|
|
for mt in MOTILITY_TYPES: |
|
|
if re.search(rf"\b{mt.lower()}\b", t): |
|
|
_set_if_stronger(parsed, "Motility Type", mt) |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SCIENTIFIC_PIGMENTS = { |
|
|
"Pyocyanin","Pyoverdine","Pyovacin","Bioluminescent" |
|
|
} |
|
|
|
|
|
COLOUR_PIGMENTS = { |
|
|
"green","yellow","pink","red","orange","brown","black","violet","cream" |
|
|
} |
|
|
|
|
|
def _parse_pigment(text: str, parsed: Dict[str,str]): |
|
|
t = text.lower() |
|
|
|
|
|
|
|
|
if re.search(r"\bno pigmentation or odou?r\b", t): |
|
|
_set_if_stronger(parsed, "Pigment", "None") |
|
|
_set_if_stronger(parsed, "Odor", "None") |
|
|
return |
|
|
|
|
|
has_anchor = re.search(r"\b(pigment|pigmentation)\b", t) |
|
|
found = set() |
|
|
|
|
|
|
|
|
for sp in SCIENTIFIC_PIGMENTS: |
|
|
if re.search(rf"\b{sp.lower()}\b", t): |
|
|
found.add(sp) |
|
|
|
|
|
|
|
|
if has_anchor: |
|
|
for cp in COLOUR_PIGMENTS: |
|
|
if re.search(rf"\b{cp}\b", t): |
|
|
found.add(cp.capitalize()) |
|
|
|
|
|
if re.search(r"\bno pigmentation\b|\bpigment none\b", t): |
|
|
_set_if_stronger(parsed, "Pigment", "None") |
|
|
elif found: |
|
|
_set_if_stronger(parsed, "Pigment", "; ".join(sorted(found))) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
COLONY_PATTERNS = { |
|
|
"Mucoid","Smooth","Rough","Filamentous", |
|
|
"Spreading","Swarming","Sticky","Irregular", |
|
|
"Ground-glass","Molar-tooth","Dry","Chalky","Corroding", |
|
|
} |
|
|
|
|
|
def _parse_colony_pattern(text: str, parsed: Dict[str,str]): |
|
|
t = text.lower() |
|
|
if not re.search(r"\bcolony pattern\b", t): |
|
|
return |
|
|
m = re.search(r"\bcolony pattern\b\s*[:\-]?\s*([a-z\-]+)", t) |
|
|
if m: |
|
|
val = m.group(1).capitalize() |
|
|
if val in COLONY_PATTERNS: |
|
|
_set_if_stronger(parsed, "Colony Pattern", val) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_odor(text: str, parsed: Dict[str,str]): |
|
|
t = text.lower() |
|
|
m = re.search(r"\b(odor|odour|smell)\b\s*[:\-]?\s*([a-z; ]+)", t) |
|
|
if not m: |
|
|
return |
|
|
vals = [v.strip().capitalize() for v in m.group(2).split(";") if v.strip()] |
|
|
if vals: |
|
|
_set_if_stronger(parsed, "Odor", "; ".join(vals)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_tsi(text: str, parsed: Dict[str,str]): |
|
|
t = text.upper() |
|
|
if "TSI" in t and "UNKNOWN" in t: |
|
|
_set_if_stronger(parsed, "TSI Pattern", "Unknown") |
|
|
return |
|
|
m = re.search(r"\b([KA]/[KA])(\s*\+\s*H2S)?\b", t) |
|
|
if m: |
|
|
base = m.group(1) |
|
|
_set_if_stronger(parsed, "TSI Pattern", f"{base}+H2S" if m.group(2) else base) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_nacl(text: str, parsed: Dict[str,str]): |
|
|
m = re.search( |
|
|
r"NaCl\s*Tolerant\s*\(>=\s*6%\)\s*(positive|negative|variable|unknown)", |
|
|
text, |
|
|
re.IGNORECASE, |
|
|
) |
|
|
if m: |
|
|
_set_if_stronger(parsed, "NaCl Tolerant (>=6%)", m.group(1).capitalize()) |
|
|
return |
|
|
_parse_pnv_after_anchor(text, parsed, "NaCl Tolerant (>=6%)", "NaCl Tolerant") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_haemolysis_type(text: str, parsed: Dict[str,str]): |
|
|
m = re.search( |
|
|
r"\bhaemolysis type\b\s*[:\-]?\s*(alpha|beta|gamma|none)", |
|
|
text, |
|
|
re.IGNORECASE, |
|
|
) |
|
|
if m: |
|
|
_set_if_stronger(parsed, "Haemolysis Type", m.group(1).capitalize()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_ornithine_dec(text: str, parsed: Dict[str,str]): |
|
|
_parse_pnv_after_anchor(text, parsed, "Ornithine Decarboxylase", "Ornithine Decarboxylase") |
|
|
_parse_pnv_after_anchor(text, parsed, "Ornitihine Decarboxylase", "Ornitihine Decarboxylase") |
|
|
if "Ornitihine Decarboxylase" in parsed and "Ornithine Decarboxylase" not in parsed: |
|
|
_set_if_stronger(parsed, "Ornithine Decarboxylase", parsed["Ornitihine Decarboxylase"]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_schema_enum_pnv(text: str, parsed: Dict[str,str]): |
|
|
schema = _load_extended_schema(EXTENDED_SCHEMA_PATH) |
|
|
t = text.lower() |
|
|
for field, meta in schema.items(): |
|
|
if field in CORE_FIELDS or field in parsed: |
|
|
continue |
|
|
if meta.get("value_type") != "enum_PNV": |
|
|
continue |
|
|
aliases = meta.get("aliases", []) |
|
|
for name in [field] + aliases: |
|
|
m = re.search( |
|
|
rf"\b{re.escape(name.lower())}\b\s*(positive|negative|variable|unknown)", |
|
|
t, |
|
|
) |
|
|
if m: |
|
|
_set_if_stronger(parsed, field, m.group(1).capitalize()) |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_text_extended(text: str) -> Dict[str,Any]: |
|
|
orig = text or "" |
|
|
if not orig.strip(): |
|
|
return {"parsed_fields": {}, "source": "extended_parser", "raw": orig} |
|
|
|
|
|
cleaned = _clean_text(orig) |
|
|
parsed: Dict[str,str] = {} |
|
|
|
|
|
_parse_gram_variable(cleaned, parsed) |
|
|
_parse_shape_yeast(cleaned, parsed) |
|
|
_parse_capsule_variable(cleaned, parsed) |
|
|
_parse_gas_production(cleaned, parsed) |
|
|
_parse_motility_type(cleaned, parsed) |
|
|
_parse_pigment(cleaned, parsed) |
|
|
_parse_colony_pattern(cleaned, parsed) |
|
|
_parse_odor(cleaned, parsed) |
|
|
_parse_tsi(cleaned, parsed) |
|
|
_parse_nacl(cleaned, parsed) |
|
|
_parse_haemolysis_type(cleaned, parsed) |
|
|
_parse_ornithine_dec(cleaned, parsed) |
|
|
_parse_schema_enum_pnv(cleaned, parsed) |
|
|
|
|
|
return { |
|
|
"parsed_fields": parsed, |
|
|
"source": "extended_parser", |
|
|
"raw": orig, |
|
|
} |