Spaces:
Sleeping
Sleeping
| """Read and validate an Excel workbook for UofA import. | |
| Knows about Excel structure (sheets, rows, cells) but nothing about JSON-LD. | |
| Returns a clean intermediate dict that excel_mapper.py transforms into JSON-LD. | |
| """ | |
| from __future__ import annotations | |
| from pathlib import Path | |
| from uofa_cli.excel_constants import ( | |
| SHEET_NAMES, FACTOR_START_ROW, | |
| VV40_FACTOR_NAMES, NASA_ALL_FACTOR_NAMES, NASA_ONLY_FACTOR_NAMES, | |
| VV40_LEVEL_RANGE, NASA_LEVEL_RANGE, | |
| VALID_PROFILES, VALID_DECISION_OUTCOMES, VALID_FACTOR_STATUSES, | |
| VALID_DEVICE_CLASSES, VALID_ASSURANCE_LEVELS, | |
| EVIDENCE_TYPES, | |
| normalize_evidence_type, | |
| ) | |
| class ImportError(Exception): | |
| """Raised when an Excel workbook has validation errors.""" | |
| def __init__(self, errors: list[str]): | |
| self.errors = errors | |
| super().__init__("\n".join(errors)) | |
| def _cell_ref(col: int, row: int) -> str: | |
| """Convert 1-based column index to Excel cell reference like A3, B5.""" | |
| return f"{chr(64 + col)}{row}" | |
| def _fuzzy_normalize_enum(value: str, valid_values: list[str]) -> str | None: | |
| """Detect and normalize LLM enum-echo. Returns the canonical value if | |
| the input is an enumerator-text echo of the prompt (e.g. "Minimal or | |
| Complete", "Low / Medium / High", "Accepted / Not accepted / | |
| Conditional"); returns None otherwise. | |
| Intentionally NARROW — only normalizes inputs whose token split | |
| produces at least one canonical match. A genuinely-invalid value like | |
| "Approved" (not enum-echo, just wrong) returns None so the caller can | |
| still error loudly. This preserves the strict-mode contract that | |
| invalid-but-not-echo inputs are import errors. | |
| Mirrors the enum-split branch of excel_writer._fuzzy_match_dropdown. | |
| """ | |
| import re | |
| if value in valid_values: | |
| return value | |
| tokens = re.split(r"\s+or\s+|\s*[/,]\s*", value) | |
| for tok in tokens: | |
| tok = tok.strip() | |
| if tok in valid_values: | |
| return tok | |
| return None | |
| def _cell_value(ws, row: int, col: int) -> str | None: | |
| """Read a cell value, stripping whitespace. Returns None for empty cells.""" | |
| val = ws.cell(row=row, column=col).value | |
| if val is None: | |
| return None | |
| val = str(val).strip() | |
| return val if val else None | |
| def _parse_int(val: str | None) -> int | None: | |
| """Parse a value as integer, returning None if not parseable.""" | |
| if val is None: | |
| return None | |
| try: | |
| return int(float(val)) | |
| except (ValueError, TypeError): | |
| return None | |
| def _parse_date(val) -> str | None: | |
| """Normalize a date value to ISO 8601 string.""" | |
| if val is None: | |
| return None | |
| # openpyxl may return datetime objects directly | |
| import datetime | |
| if isinstance(val, (datetime.datetime, datetime.date)): | |
| return val.isoformat() | |
| s = str(val).strip() | |
| if not s: | |
| return None | |
| # Try ISO 8601 first | |
| for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%m/%d/%Y", "%d/%m/%Y"): | |
| try: | |
| return datetime.datetime.strptime(s, fmt).date().isoformat() | |
| except ValueError: | |
| continue | |
| # Excel serial date number | |
| try: | |
| serial = float(s) | |
| if 1 < serial < 100000: | |
| base = datetime.date(1899, 12, 30) | |
| return (base + datetime.timedelta(days=int(serial))).isoformat() | |
| except (ValueError, OverflowError): | |
| pass | |
| return s # Return as-is if we can't parse | |
| def read_workbook(xlsx_path: Path, packs: list[str]) -> dict: | |
| """Read and validate an Excel workbook. | |
| Returns an intermediate dict with keys: | |
| summary, entities, validation_results, factors, decision | |
| Raises ImportError with a list of validation errors. | |
| """ | |
| try: | |
| import openpyxl | |
| except ImportError: | |
| raise ImportError([ | |
| "openpyxl is required for Excel import. " | |
| "Install with: pip install uofa[excel]" | |
| ]) | |
| if not xlsx_path.exists(): | |
| raise ImportError([f"File not found: {xlsx_path}"]) | |
| try: | |
| wb = openpyxl.load_workbook(str(xlsx_path), data_only=True) | |
| except Exception as exc: | |
| raise ImportError([f"Cannot open workbook: {exc}"]) | |
| errors = [] | |
| warnings: list[str] = [] | |
| # ── Validate required sheets ───────────────────────────── | |
| required_sheets = [ | |
| SHEET_NAMES["summary"], | |
| SHEET_NAMES["model_data"], | |
| SHEET_NAMES["validation"], | |
| SHEET_NAMES["decision"], | |
| ] | |
| for sheet_name in required_sheets: | |
| if sheet_name not in wb.sheetnames: | |
| errors.append(f"Sheet '{sheet_name}' not found in workbook") | |
| if errors: | |
| raise ImportError(errors) | |
| # ── Read Assessment Summary (row 3) ────────────────────── | |
| ws = wb[SHEET_NAMES["summary"]] | |
| summary = _read_summary(ws, errors, warnings=warnings) | |
| profile = summary.get("profile", "Minimal") | |
| # Credibility Factors is only required for Complete profile | |
| has_factors_sheet = SHEET_NAMES["factors"] in wb.sheetnames | |
| if profile == "Complete" and not has_factors_sheet: | |
| errors.append(f"Sheet '{SHEET_NAMES['factors']}' not found (required for Complete profile)") | |
| # ── Read Model & Data ──────────────────────────────────── | |
| ws = wb[SHEET_NAMES["model_data"]] | |
| entities = _read_entities(ws, profile, errors, warnings=warnings, summary=summary) | |
| # ── Read Validation Results ────────────────────────────── | |
| ws = wb[SHEET_NAMES["validation"]] | |
| validation_results = _read_validation_results(ws, errors, warnings) | |
| # ── Read Credibility Factors ───────────────────────────── | |
| factors = [] | |
| if has_factors_sheet: | |
| ws = wb[SHEET_NAMES["factors"]] | |
| factors = _read_factors(ws, packs, errors) | |
| # ── Read Decision ──────────────────────────────────────── | |
| ws = wb[SHEET_NAMES["decision"]] | |
| decision = _read_decision(ws, errors, warnings=warnings) | |
| if errors: | |
| raise ImportError(errors) | |
| return { | |
| "summary": summary, | |
| "entities": entities, | |
| "validation_results": validation_results, | |
| "factors": factors, | |
| "decision": decision, | |
| "_warnings": warnings, | |
| } | |
| def _find_data_row(ws, header_keyword: str, search_col: int = 1, max_row: int = 10) -> int: | |
| """Find the first data row after a header row containing the keyword. | |
| Scans column search_col for a cell matching header_keyword (case-insensitive). | |
| Returns the row AFTER the last header/instruction row (i.e., the first row | |
| that doesn't look like a header). | |
| Heuristic: the data row is the last non-empty row in the first block. | |
| For templates with instruction rows, the data row follows the instructions. | |
| """ | |
| for r in range(1, max_row + 1): | |
| val = _cell_value(ws, r, search_col) | |
| if val and header_keyword.lower() in val.lower(): | |
| # Found header row. Data is the next non-instruction row. | |
| # Check if there's an instruction row right after | |
| for data_r in range(r + 1, max_row + 1): | |
| val = _cell_value(ws, data_r, search_col) | |
| if val and val not in ("", None): | |
| # Check if this looks like actual data vs instructions | |
| # Instructions tend to be long descriptions starting with verbs/articles | |
| # Data tends to be short identifiers | |
| # But the safest heuristic: the last row before an empty row | |
| return data_r | |
| return r + 1 | |
| # Fallback: assume row 3 has headers, row 4 has data | |
| return 4 | |
| def _find_header_row(ws, header_keyword: str, search_col: int = 1, max_row: int = 10) -> int: | |
| """Find the row containing column headers.""" | |
| for r in range(1, max_row + 1): | |
| val = _cell_value(ws, r, search_col) | |
| if val and header_keyword.lower() == val.lower(): | |
| return r | |
| return 2 # default | |
| def _read_summary(ws, errors: list, warnings: list | None = None) -> dict: | |
| """Read Assessment Summary sheet. | |
| Finds the header row (containing "Project Name") and reads data from | |
| the last populated row in the header block. | |
| If ``warnings`` is provided, enum fields (profile, assurance_level) | |
| that don't match the canonical set are normalized via | |
| ``_fuzzy_normalize_enum`` (handles "Minimal or Complete" → "Minimal" | |
| LLM-output gaps) and a per-field warning is appended instead of an | |
| error. If ``warnings`` is None, falls back to the strict legacy | |
| behavior of appending to ``errors``. | |
| """ | |
| sheet = SHEET_NAMES["summary"] | |
| # Find header row by looking for "Project Name" | |
| header_row = _find_header_row(ws, "Project Name") | |
| # Data row: scan forward from header to find actual data | |
| # In the starter template: row 2 = headers, row 3 = instructions, row 4 = data | |
| # In the spec: row 2 = headers, row 3 = data | |
| # Strategy: find the LAST non-empty row starting from header+1 | |
| row = header_row + 1 | |
| for r in range(header_row + 1, header_row + 4): | |
| val = _cell_value(ws, r, 1) | |
| if val: | |
| row = r # keep advancing to last non-empty row | |
| project_name = _cell_value(ws, row, 1) # A | |
| cou_name = _cell_value(ws, row, 2) # B | |
| cou_description = _cell_value(ws, row, 3) # C | |
| profile = _cell_value(ws, row, 4) # D | |
| device_class = _cell_value(ws, row, 5) # E | |
| mrl = _cell_value(ws, row, 6) # F | |
| assurance_level = _cell_value(ws, row, 7) # G | |
| standards_ref = _cell_value(ws, row, 8) # H | |
| assessor_name = _cell_value(ws, row, 9) # I | |
| assessment_date = ws.cell(row=row, column=10).value # J — raw for date parsing | |
| source_doc = _cell_value(ws, row, 11) # K | |
| has_uq = _cell_value(ws, row, 12) # L | |
| # Required for Minimal | |
| if not project_name: | |
| errors.append(f"Sheet '{sheet}', cell {_cell_ref(1, row)} (Project Name) is required for Minimal profile") | |
| if not cou_name: | |
| errors.append(f"Sheet '{sheet}', cell {_cell_ref(2, row)} (COU Name) is required") | |
| # Validate profile (lenient mode: normalize enum-echo via fuzzy match; | |
| # genuinely-invalid non-echo inputs still error) | |
| if profile and profile not in VALID_PROFILES: | |
| normalized = _fuzzy_normalize_enum(profile, VALID_PROFILES) if warnings is not None else None | |
| if normalized is not None: | |
| warnings.append( | |
| f"Sheet '{sheet}', cell {_cell_ref(4, row)}: " | |
| f"'{profile}' is not a canonical profile — normalized to " | |
| f"'{normalized}'. Canonical set: {', '.join(VALID_PROFILES)}" | |
| ) | |
| profile = normalized | |
| else: | |
| errors.append(f"Sheet '{sheet}', cell {_cell_ref(4, row)}: '{profile}' is not a valid profile. Expected: {', '.join(VALID_PROFILES)}") | |
| if not profile: | |
| profile = "Minimal" | |
| # Validate device class | |
| if device_class and device_class not in VALID_DEVICE_CLASSES: | |
| # Allow free text (Category A-E, Other) per spec | |
| pass | |
| # Validate assurance level (lenient mode: normalize enum-echo only; | |
| # genuinely-invalid non-echo inputs still error) | |
| if assurance_level and assurance_level not in VALID_ASSURANCE_LEVELS: | |
| normalized = _fuzzy_normalize_enum(assurance_level, VALID_ASSURANCE_LEVELS) if warnings is not None else None | |
| if normalized is not None: | |
| warnings.append( | |
| f"Sheet '{sheet}', cell {_cell_ref(7, row)}: " | |
| f"'{assurance_level}' is not a canonical assurance level — normalized to " | |
| f"'{normalized}'. Canonical set: {', '.join(VALID_ASSURANCE_LEVELS)}" | |
| ) | |
| assurance_level = normalized | |
| else: | |
| errors.append(f"Sheet '{sheet}', cell {_cell_ref(7, row)}: '{assurance_level}' is not a valid assurance level. Expected: {', '.join(VALID_ASSURANCE_LEVELS)}") | |
| # Parse MRL | |
| mrl_int = None | |
| if mrl: | |
| # Handle "MRL 3" format | |
| mrl_str = mrl.replace("MRL", "").strip() | |
| mrl_int = _parse_int(mrl_str) | |
| return { | |
| "project_name": project_name, | |
| "cou_name": cou_name, | |
| "cou_description": cou_description, | |
| "profile": profile, | |
| "device_class": device_class, | |
| "model_risk_level": mrl_int, | |
| "assurance_level": assurance_level, | |
| "standards_reference": standards_ref, | |
| "assessor_name": assessor_name, | |
| "assessment_date": _parse_date(assessment_date), | |
| "source_document": source_doc, | |
| "has_uq": has_uq, | |
| } | |
| def _read_entities(ws, profile: str, errors: list, | |
| warnings: list | None = None, | |
| summary: dict | None = None) -> list[dict]: | |
| """Read Model & Data sheet. | |
| If ``warnings`` is provided and the sheet has no Requirement entity, | |
| a placeholder Requirement is synthesized from the Assessment Summary | |
| and a warning is appended (lenient mode). If ``warnings`` is None, | |
| falls back to the strict behavior of appending to ``errors``. | |
| """ | |
| sheet = SHEET_NAMES["model_data"] | |
| entities = [] | |
| has_requirement = False | |
| # Find header row containing "Entity Type" | |
| header_row = _find_header_row(ws, "Entity Type") | |
| # Skip instruction row if present | |
| data_start = header_row + 1 | |
| for r in range(header_row + 1, header_row + 4): | |
| val = _cell_value(ws, r, 1) | |
| if val and val in ("Requirement", "Model", "Dataset"): | |
| data_start = r | |
| break | |
| for row in range(data_start, ws.max_row + 1): | |
| entity_type = _cell_value(ws, row, 1) # A | |
| if not entity_type: | |
| continue | |
| if entity_type not in ("Requirement", "Model", "Dataset"): | |
| errors.append( | |
| f"Sheet '{sheet}', cell {_cell_ref(1, row)}: " | |
| f"'{entity_type}' is not a valid entity type. " | |
| f"Expected: Requirement, Model, Dataset" | |
| ) | |
| continue | |
| if entity_type == "Requirement": | |
| has_requirement = True | |
| name = _cell_value(ws, row, 2) # B | |
| uri = _cell_value(ws, row, 3) # C | |
| desc = _cell_value(ws, row, 4) # D | |
| version = _cell_value(ws, row, 5) # E | |
| source = _cell_value(ws, row, 6) # F | |
| if not name: | |
| errors.append(f"Sheet '{sheet}', cell {_cell_ref(2, row)}: Name is required") | |
| entities.append({ | |
| "entity_type": entity_type, | |
| "name": name, | |
| "uri": uri, | |
| "description": desc, | |
| "version": version, | |
| "source": source, | |
| }) | |
| if not has_requirement: | |
| if warnings is not None: | |
| # Lenient mode: synthesize a Requirement from the Assessment | |
| # Summary so the import succeeds. Handles the common case where | |
| # a small LLM dropped the Requirement on a continuation-of-prior | |
| # COU and the user shouldn't have to re-run extract just to add | |
| # one row by hand. | |
| cou_name = (summary or {}).get("cou_name") or None | |
| cou_desc = (summary or {}).get("cou_description") or None | |
| synth_name = cou_name or "Implicit COU Requirement" | |
| synth_desc = ( | |
| f"Auto-synthesized from COU context: {cou_desc or cou_name or 'unspecified'}. " | |
| "The extractor did not emit a Requirement entity. Review and " | |
| "replace with the explicit top-level requirement before signing." | |
| ) | |
| entities.insert(0, { | |
| "entity_type": "Requirement", | |
| "name": synth_name, | |
| "uri": None, | |
| "description": synth_desc, | |
| "version": None, | |
| "source": "auto-synthesized", | |
| }) | |
| warnings.append( | |
| f"Sheet '{sheet}' has no Requirement entity — synthesized " | |
| f"'{synth_name}' from the Assessment Summary. Review the " | |
| f"auto-filled row before signing." | |
| ) | |
| else: | |
| errors.append( | |
| f"Sheet '{sheet}' must have at least one row with Entity Type = 'Requirement'" | |
| ) | |
| return entities | |
| def _read_validation_results(ws, errors: list, warnings: list | None = None) -> list[dict]: | |
| """Read Validation Results sheet. | |
| Detects whether the Type column (v2) is present by checking headers. | |
| Old template: Result Name, Identifier/URI, Description, ... | |
| v2 template: Result Name, Type, Identifier/URI, Description, ... | |
| If ``warnings`` is provided, evidence_type values outside the canonical | |
| enum are normalized (via ``normalize_evidence_type``) and a per-row | |
| warning is appended instead of an error. If ``warnings`` is None, | |
| falls back to the legacy strict behavior (append to errors). | |
| """ | |
| sheet = SHEET_NAMES["validation"] | |
| results = [] | |
| # Detect column layout by scanning header rows for "Type" | |
| has_type_col = False | |
| header_row = 2 # default | |
| for r in range(1, 5): | |
| for c in range(1, 10): | |
| val = _cell_value(ws, r, c) | |
| if val and val.lower() == "type": | |
| has_type_col = True | |
| header_row = r | |
| break | |
| if val and val.lower() == "result name": | |
| header_row = r | |
| if has_type_col: | |
| break | |
| # Find first data row (skip instruction rows) | |
| data_start = header_row + 1 | |
| for r in range(header_row + 1, header_row + 4): | |
| val = _cell_value(ws, r, 1) | |
| if val and not val.lower().startswith("short name"): | |
| data_start = r | |
| break | |
| if has_type_col: | |
| # v2 layout: A=Name, B=Type, C=URI, D=Desc, E=ComparesTo, F=HasUQ, G=UQMethod, H=Metric, I=PassFail | |
| col_type, col_uri, col_desc, col_cmp, col_uq, col_uqm, col_met, col_pf = 2, 3, 4, 5, 6, 7, 8, 9 | |
| else: | |
| # Old layout: A=Name, B=URI, C=Desc, D=ComparesTo, E=HasUQ, F=UQMethod, G=Metric, H=PassFail | |
| col_type, col_uri, col_desc, col_cmp, col_uq, col_uqm, col_met, col_pf = None, 2, 3, 4, 5, 6, 7, 8 | |
| for row in range(data_start, ws.max_row + 1): | |
| name = _cell_value(ws, row, 1) # A — Result Name | |
| if not name: | |
| continue | |
| evidence_type = _cell_value(ws, row, col_type) if col_type else None | |
| uri = _cell_value(ws, row, col_uri) | |
| desc = _cell_value(ws, row, col_desc) | |
| compares_to = _cell_value(ws, row, col_cmp) | |
| has_uq = _cell_value(ws, row, col_uq) | |
| uq_method = _cell_value(ws, row, col_uqm) | |
| metric = _cell_value(ws, row, col_met) | |
| pass_fail = _cell_value(ws, row, col_pf) | |
| # Default evidence type to ValidationResult | |
| if not evidence_type: | |
| evidence_type = "ValidationResult" | |
| elif evidence_type not in EVIDENCE_TYPES: | |
| if warnings is not None: | |
| # Lenient mode: normalize against the canonical enum and warn. | |
| # Handles the common case where an LLM extractor emits a | |
| # descriptive domain label (GridConvergenceStudy, etc.) | |
| # instead of the constrained enum value. | |
| original = evidence_type | |
| normalized, _substituted = normalize_evidence_type(original) | |
| warnings.append( | |
| f"Sheet '{sheet}', cell {_cell_ref(col_type, row)}: " | |
| f"'{original}' is not a canonical evidence type — " | |
| f"normalized to '{normalized}'. " | |
| f"Canonical set: {', '.join(EVIDENCE_TYPES)}" | |
| ) | |
| evidence_type = normalized | |
| else: | |
| errors.append( | |
| f"Sheet '{sheet}', cell {_cell_ref(col_type, row)}: " | |
| f"'{evidence_type}' is not a valid evidence type. " | |
| f"Expected: {', '.join(EVIDENCE_TYPES)}" | |
| ) | |
| results.append({ | |
| "name": name, | |
| "evidence_type": evidence_type, | |
| "uri": uri, | |
| "description": desc, | |
| "compares_to": compares_to, | |
| "has_uq": has_uq, | |
| "uq_method": uq_method, | |
| "metric_value": metric, | |
| "pass_fail": pass_fail, | |
| }) | |
| return results | |
| def _read_factors(ws, packs: list[str], errors: list) -> list[dict]: | |
| """Read Credibility Factors sheet. | |
| Factor Type (col A) and Category (col B) are pre-populated and locked. | |
| User fills in Required Level (C), Achieved Level (D), Acceptance Criteria (E), | |
| Rationale (F), Factor Status (G). | |
| """ | |
| sheet = SHEET_NAMES["factors"] | |
| factors = [] | |
| # Determine valid factor names based on active packs | |
| if "nasa-7009b" in packs: | |
| valid_names = set(NASA_ALL_FACTOR_NAMES) | |
| else: | |
| valid_names = set(VV40_FACTOR_NAMES) | |
| for row in range(FACTOR_START_ROW, ws.max_row + 1): | |
| factor_type = _cell_value(ws, row, 1) # A | |
| if not factor_type: | |
| continue | |
| category = _cell_value(ws, row, 2) # B | |
| required_level = _cell_value(ws, row, 3) # C | |
| achieved_level = _cell_value(ws, row, 4) # D | |
| acceptance = _cell_value(ws, row, 5) # E | |
| rationale = _cell_value(ws, row, 6) # F | |
| status = _cell_value(ws, row, 7) # G | |
| linked_evidence = _cell_value(ws, row, 8) # H — Linked Evidence URI | |
| # Validate factor type | |
| if factor_type not in valid_names: | |
| if "nasa-7009b" in packs: | |
| errors.append( | |
| f"Sheet '{sheet}', row {row}: " | |
| f"'{factor_type}' is not a valid NASA-STD-7009B factor type" | |
| ) | |
| else: | |
| errors.append( | |
| f"Sheet '{sheet}', row {row}: " | |
| f"'{factor_type}' is not a valid V&V 40 factor type" | |
| ) | |
| continue | |
| # Validate factor status | |
| if status and status not in VALID_FACTOR_STATUSES: | |
| errors.append( | |
| f"Sheet '{sheet}', cell {_cell_ref(7, row)}: " | |
| f"'{status}' is not a valid factor status. " | |
| f"Expected: {', '.join(VALID_FACTOR_STATUSES)}" | |
| ) | |
| # Parse and validate levels | |
| req_int = _parse_int(required_level) | |
| ach_int = _parse_int(achieved_level) | |
| # Determine level range based on factor standard | |
| is_nasa_only = factor_type in NASA_ONLY_FACTOR_NAMES | |
| if is_nasa_only: | |
| lo, hi = NASA_LEVEL_RANGE | |
| else: | |
| lo, hi = VV40_LEVEL_RANGE | |
| if req_int is not None and (req_int < lo or req_int > hi): | |
| errors.append( | |
| f"Sheet '{sheet}', cell {_cell_ref(3, row)}: " | |
| f"Required Level {req_int} out of range {lo}-{hi}" | |
| ) | |
| if ach_int is not None and (ach_int < lo or ach_int > hi): | |
| errors.append( | |
| f"Sheet '{sheet}', cell {_cell_ref(4, row)}: " | |
| f"Achieved Level {ach_int} out of range {lo}-{hi}" | |
| ) | |
| factors.append({ | |
| "factor_type": factor_type, | |
| "category": category, | |
| "required_level": req_int, | |
| "achieved_level": ach_int, | |
| "acceptance_criteria": acceptance, | |
| "rationale": rationale, | |
| "status": status or "not-assessed", | |
| "linked_evidence": linked_evidence, | |
| }) | |
| return factors | |
| def _read_decision(ws, errors: list, warnings: list | None = None) -> dict: | |
| """Read Decision sheet. | |
| Lenient mode (warnings provided) normalizes non-canonical outcome values | |
| via _fuzzy_normalize_enum — handles LLM enum-echo like | |
| "Accepted / Not accepted / Conditional". | |
| """ | |
| sheet = SHEET_NAMES["decision"] | |
| # Find header row containing "Decision Outcome" | |
| header_row = _find_header_row(ws, "Decision Outcome") | |
| # Data row: last non-empty row after header | |
| row = header_row + 1 | |
| for r in range(header_row + 1, header_row + 4): | |
| val = _cell_value(ws, r, 1) | |
| if val: | |
| row = r | |
| outcome = _cell_value(ws, row, 1) # A | |
| rationale = _cell_value(ws, row, 2) # B | |
| criteria_set = _cell_value(ws, row, 3) # C | |
| decided_by = _cell_value(ws, row, 4) # D | |
| decision_date = ws.cell(row=row, column=5).value # E — raw for date | |
| valid_outcomes = VALID_DECISION_OUTCOMES + ["Conditional"] | |
| if not outcome: | |
| errors.append(f"Sheet '{sheet}', cell {_cell_ref(1, row)}: Decision Outcome is required") | |
| elif outcome not in valid_outcomes: | |
| normalized = _fuzzy_normalize_enum(outcome, valid_outcomes) if warnings is not None else None | |
| if normalized is not None: | |
| warnings.append( | |
| f"Sheet '{sheet}', cell {_cell_ref(1, row)}: " | |
| f"'{outcome}' is not a canonical decision outcome — normalized to " | |
| f"'{normalized}'. Canonical set: {', '.join(valid_outcomes)}" | |
| ) | |
| outcome = normalized | |
| else: | |
| errors.append( | |
| f"Sheet '{sheet}', cell {_cell_ref(1, row)}: " | |
| f"'{outcome}' is not a valid outcome. " | |
| f"Expected: {', '.join(valid_outcomes)}" | |
| ) | |
| return { | |
| "outcome": outcome, | |
| "rationale": rationale, | |
| "criteria_set": criteria_set, | |
| "decided_by": decided_by, | |
| "decision_date": _parse_date(decision_date), | |
| } | |