EphAsad commited on
Commit
93ba629
·
verified ·
1 Parent(s): b3bf186

Update engine/schema.py

Browse files
Files changed (1) hide show
  1. engine/schema.py +67 -42
engine/schema.py CHANGED
@@ -1,17 +1,29 @@
1
  # engine/schema.py
 
 
 
 
 
2
  from typing import Dict, List, Any, Tuple
 
 
 
 
 
 
3
 
4
  POS_NEG_VAR = ["Positive", "Negative", "Variable"]
5
- POS_NEG_VAR_UNKNOWN = ["Positive", "Negative", "Variable", "Unknown"]
6
  UNKNOWN = "Unknown"
7
  MULTI_SEPARATOR = ";"
8
 
 
9
  ENUMS = {
10
  "Gram Stain": ["Positive", "Negative", "Variable"],
11
  "Shape": ["Cocci", "Rods", "Bacilli", "Spiral", "Short Rods"],
12
  "Haemolysis Type": ["None", "Beta", "Gamma", "Alpha"],
13
  }
14
 
 
15
  SCHEMA: Dict[str, Dict[str, Any]] = {
16
  "Genus": {"type": "text", "required": True},
17
  "Species": {"type": "text", "required": False},
@@ -21,6 +33,7 @@ SCHEMA: Dict[str, Dict[str, Any]] = {
21
  "Colony Morphology": {"type": "multienum", "separator": MULTI_SEPARATOR},
22
  "Haemolysis": {"type": "enum", "allowed": POS_NEG_VAR},
23
  "Haemolysis Type": {"type": "multienum", "separator": MULTI_SEPARATOR, "allowed": ENUMS["Haemolysis Type"]},
 
24
  "Motility": {"type": "enum", "allowed": POS_NEG_VAR},
25
  "Capsule": {"type": "enum", "allowed": POS_NEG_VAR},
26
  "Spore Formation": {"type": "enum", "allowed": POS_NEG_VAR},
@@ -68,25 +81,49 @@ SCHEMA: Dict[str, Dict[str, Any]] = {
68
  "Extra Notes": {"type": "text"},
69
  }
70
 
 
71
  FIELDS_ORDER: List[str] = list(SCHEMA.keys())
72
 
73
  MULTI_FIELDS: List[str] = [
74
- k for k, v in SCHEMA.items() if v.get("type") == "multienum"
75
  ]
76
 
77
  PNV_FIELDS: List[str] = [
78
- k for k, v in SCHEMA.items()
79
- if v.get("type") == "enum" and v.get("allowed") == POS_NEG_VAR
80
  ]
81
 
82
- def is_enum_field(field: str) -> bool:
83
- return SCHEMA.get(field, {}).get("type") == "enum"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
 
85
- def is_multienum_field(field: str) -> bool:
86
- return SCHEMA.get(field, {}).get("type") == "multienum"
 
 
 
87
 
88
- def is_range_field(field: str) -> bool:
89
- return SCHEMA.get(field, {}).get("type") == "range"
 
 
90
 
91
  def normalize_value(field: str, value: str) -> str:
92
  if value is None or str(value).strip() == "":
@@ -105,11 +142,11 @@ def normalize_value(field: str, value: str) -> str:
105
  if v.lower() == a.lower():
106
  return a
107
  if v.lower() in ["+", "positive", "pos"]:
108
- return "Positive" if "Positive" in allowed else v
109
  if v.lower() in ["-", "negative", "neg"]:
110
- return "Negative" if "Negative" in allowed else v
111
  if v.lower() in ["variable", "var", "v"]:
112
- return "Variable" if "Variable" in allowed else v
113
  return v
114
 
115
  if ftype == "multienum":
@@ -117,64 +154,52 @@ def normalize_value(field: str, value: str) -> str:
117
  allowed = meta.get("allowed")
118
  normed = []
119
  for p in parts:
120
- if not allowed:
121
- normed.append(p)
122
- else:
123
  hit = next((a for a in allowed if a.lower() == p.lower()), None)
124
  normed.append(hit if hit else p)
125
- return f" {MULTI_SEPARATOR} ".join(normed) if normed else UNKNOWN
 
 
126
 
127
  if ftype == "range":
128
- txt = v.replace(" ", "")
129
- return txt
130
 
131
  return v
132
 
 
133
  def validate_record(rec: Dict[str, Any]) -> Tuple[bool, List[str]]:
134
- issues: List[str] = []
135
  for field in FIELDS_ORDER:
136
- meta = SCHEMA[field]
137
  if field not in rec:
138
  continue
139
  val = rec[field]
 
140
 
141
  if meta["type"] == "enum":
142
  allowed = meta.get("allowed", [])
143
  if str(val) not in allowed + [UNKNOWN]:
144
- issues.append(f"{field}: '{val}' not in {allowed + [UNKNOWN]}")
145
 
146
  elif meta["type"] == "multienum":
147
  if val == UNKNOWN:
148
  continue
149
- parts = [p.strip() for p in str(val).split(MULTI_SEPARATOR) if p.strip()]
150
  allowed = meta.get("allowed")
151
  if allowed:
152
  bad = [p for p in parts if p not in allowed]
153
  if bad:
154
- issues.append(f"{field}: invalid values {bad}; allowed {allowed}")
155
 
156
  elif meta["type"] == "range":
157
  if val == UNKNOWN:
158
  continue
159
- txt = str(val).replace(" ", "")
160
- if "//" not in txt:
161
- issues.append(f"{field}: expected 'low//high' got '{val}'")
162
- else:
163
- try:
164
- low, high = [float(x) for x in txt.split("//")]
165
- if low > high:
166
- issues.append(f"{field}: low {low} > high {high}")
167
- except Exception:
168
- issues.append(f"{field}: non-numeric bounds '{val}'")
169
 
170
- ok = len(issues) == 0
171
- return ok, issues
172
 
173
  def empty_record() -> Dict[str, str]:
174
  rec = {}
175
- for f, meta in SCHEMA.items():
176
- if f in ("Genus", "Species"):
177
- rec[f] = ""
178
- else:
179
- rec[f] = UNKNOWN
180
- return rec
 
1
  # engine/schema.py
2
+ # ------------------------------------------------------------
3
+ # Core schema + Extended schema support
4
+ # ------------------------------------------------------------
5
+
6
+ from __future__ import annotations
7
  from typing import Dict, List, Any, Tuple
8
+ import json
9
+ import os
10
+
11
+ # ============================
12
+ # CORE SCHEMA DEFINITIONS
13
+ # ============================
14
 
15
  POS_NEG_VAR = ["Positive", "Negative", "Variable"]
 
16
  UNKNOWN = "Unknown"
17
  MULTI_SEPARATOR = ";"
18
 
19
+
20
  ENUMS = {
21
  "Gram Stain": ["Positive", "Negative", "Variable"],
22
  "Shape": ["Cocci", "Rods", "Bacilli", "Spiral", "Short Rods"],
23
  "Haemolysis Type": ["None", "Beta", "Gamma", "Alpha"],
24
  }
25
 
26
+
27
  SCHEMA: Dict[str, Dict[str, Any]] = {
28
  "Genus": {"type": "text", "required": True},
29
  "Species": {"type": "text", "required": False},
 
33
  "Colony Morphology": {"type": "multienum", "separator": MULTI_SEPARATOR},
34
  "Haemolysis": {"type": "enum", "allowed": POS_NEG_VAR},
35
  "Haemolysis Type": {"type": "multienum", "separator": MULTI_SEPARATOR, "allowed": ENUMS["Haemolysis Type"]},
36
+
37
  "Motility": {"type": "enum", "allowed": POS_NEG_VAR},
38
  "Capsule": {"type": "enum", "allowed": POS_NEG_VAR},
39
  "Spore Formation": {"type": "enum", "allowed": POS_NEG_VAR},
 
81
  "Extra Notes": {"type": "text"},
82
  }
83
 
84
+
85
  FIELDS_ORDER: List[str] = list(SCHEMA.keys())
86
 
87
  MULTI_FIELDS: List[str] = [
88
+ f for f, meta in SCHEMA.items() if meta.get("type") == "multienum"
89
  ]
90
 
91
  PNV_FIELDS: List[str] = [
92
+ f for f, meta in SCHEMA.items()
93
+ if meta.get("type") == "enum" and meta.get("allowed") == POS_NEG_VAR
94
  ]
95
 
96
+ # ============================================================
97
+ # EXTENDED SCHEMA SUPPORT (needed for Stage 10C)
98
+ # ============================================================
99
+
100
+ def get_core_fields() -> List[str]:
101
+ """Return the exact core schema fields (columns in DB)."""
102
+ return list(SCHEMA.keys())
103
+
104
+
105
+ def load_extended_schema(path: str = "data/extended_schema.json") -> Dict[str, Any]:
106
+ """Load extended schema from JSON; always returns a dict."""
107
+ if not os.path.exists(path):
108
+ return {}
109
+ try:
110
+ with open(path, "r", encoding="utf-8") as f:
111
+ obj = json.load(f)
112
+ return obj if isinstance(obj, dict) else {}
113
+ except Exception:
114
+ return {}
115
+
116
 
117
+ def save_extended_schema(schema: Dict[str, Any], path: str = "data/extended_schema.json") -> None:
118
+ """Save updated extended schema."""
119
+ os.makedirs(os.path.dirname(path), exist_ok=True)
120
+ with open(path, "w", encoding="utf-8") as f:
121
+ json.dump(schema, f, indent=2, ensure_ascii=False)
122
 
123
+
124
+ # ============================================================
125
+ # NORMALIZATION / VALIDATION (your existing logic preserved)
126
+ # ============================================================
127
 
128
  def normalize_value(field: str, value: str) -> str:
129
  if value is None or str(value).strip() == "":
 
142
  if v.lower() == a.lower():
143
  return a
144
  if v.lower() in ["+", "positive", "pos"]:
145
+ return "Positive"
146
  if v.lower() in ["-", "negative", "neg"]:
147
+ return "Negative"
148
  if v.lower() in ["variable", "var", "v"]:
149
+ return "Variable"
150
  return v
151
 
152
  if ftype == "multienum":
 
154
  allowed = meta.get("allowed")
155
  normed = []
156
  for p in parts:
157
+ if allowed:
 
 
158
  hit = next((a for a in allowed if a.lower() == p.lower()), None)
159
  normed.append(hit if hit else p)
160
+ else:
161
+ normed.append(p)
162
+ return "; ".join(normed) if normed else UNKNOWN
163
 
164
  if ftype == "range":
165
+ return v.replace(" ", "")
 
166
 
167
  return v
168
 
169
+
170
  def validate_record(rec: Dict[str, Any]) -> Tuple[bool, List[str]]:
171
+ issues = []
172
  for field in FIELDS_ORDER:
 
173
  if field not in rec:
174
  continue
175
  val = rec[field]
176
+ meta = SCHEMA[field]
177
 
178
  if meta["type"] == "enum":
179
  allowed = meta.get("allowed", [])
180
  if str(val) not in allowed + [UNKNOWN]:
181
+ issues.append(f"{field}: '{val}' invalid")
182
 
183
  elif meta["type"] == "multienum":
184
  if val == UNKNOWN:
185
  continue
186
+ parts = [p.strip() for p in val.split(MULTI_SEPARATOR)]
187
  allowed = meta.get("allowed")
188
  if allowed:
189
  bad = [p for p in parts if p not in allowed]
190
  if bad:
191
+ issues.append(f"{field}: invalid values {bad}")
192
 
193
  elif meta["type"] == "range":
194
  if val == UNKNOWN:
195
  continue
196
+ if "//" not in str(val):
197
+ issues.append(f"{field}: malformed range '{val}'")
198
+ return (len(issues) == 0), issues
 
 
 
 
 
 
 
199
 
 
 
200
 
201
  def empty_record() -> Dict[str, str]:
202
  rec = {}
203
+ for f in SCHEMA.keys():
204
+ rec[f] = "" if f in ("Genus", "Species") else UNKNOWN
205
+ return rec