uofa-demo / src /uofa_cli /excel_reader.py
cloudronin's picture
push build context (uofa source + packs + space app)
a28ec65 verified
Raw
History Blame Contribute Delete
25.8 kB
"""Read and validate an Excel workbook for UofA import.
Knows about Excel structure (sheets, rows, cells) but nothing about JSON-LD.
Returns a clean intermediate dict that excel_mapper.py transforms into JSON-LD.
"""
from __future__ import annotations
from pathlib import Path
from uofa_cli.excel_constants import (
SHEET_NAMES, FACTOR_START_ROW,
VV40_FACTOR_NAMES, NASA_ALL_FACTOR_NAMES, NASA_ONLY_FACTOR_NAMES,
VV40_LEVEL_RANGE, NASA_LEVEL_RANGE,
VALID_PROFILES, VALID_DECISION_OUTCOMES, VALID_FACTOR_STATUSES,
VALID_DEVICE_CLASSES, VALID_ASSURANCE_LEVELS,
EVIDENCE_TYPES,
normalize_evidence_type,
)
class ImportError(Exception):
"""Raised when an Excel workbook has validation errors."""
def __init__(self, errors: list[str]):
self.errors = errors
super().__init__("\n".join(errors))
def _cell_ref(col: int, row: int) -> str:
"""Convert 1-based column index to Excel cell reference like A3, B5."""
return f"{chr(64 + col)}{row}"
def _fuzzy_normalize_enum(value: str, valid_values: list[str]) -> str | None:
"""Detect and normalize LLM enum-echo. Returns the canonical value if
the input is an enumerator-text echo of the prompt (e.g. "Minimal or
Complete", "Low / Medium / High", "Accepted / Not accepted /
Conditional"); returns None otherwise.
Intentionally NARROW — only normalizes inputs whose token split
produces at least one canonical match. A genuinely-invalid value like
"Approved" (not enum-echo, just wrong) returns None so the caller can
still error loudly. This preserves the strict-mode contract that
invalid-but-not-echo inputs are import errors.
Mirrors the enum-split branch of excel_writer._fuzzy_match_dropdown.
"""
import re
if value in valid_values:
return value
tokens = re.split(r"\s+or\s+|\s*[/,]\s*", value)
for tok in tokens:
tok = tok.strip()
if tok in valid_values:
return tok
return None
def _cell_value(ws, row: int, col: int) -> str | None:
"""Read a cell value, stripping whitespace. Returns None for empty cells."""
val = ws.cell(row=row, column=col).value
if val is None:
return None
val = str(val).strip()
return val if val else None
def _parse_int(val: str | None) -> int | None:
"""Parse a value as integer, returning None if not parseable."""
if val is None:
return None
try:
return int(float(val))
except (ValueError, TypeError):
return None
def _parse_date(val) -> str | None:
"""Normalize a date value to ISO 8601 string."""
if val is None:
return None
# openpyxl may return datetime objects directly
import datetime
if isinstance(val, (datetime.datetime, datetime.date)):
return val.isoformat()
s = str(val).strip()
if not s:
return None
# Try ISO 8601 first
for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%m/%d/%Y", "%d/%m/%Y"):
try:
return datetime.datetime.strptime(s, fmt).date().isoformat()
except ValueError:
continue
# Excel serial date number
try:
serial = float(s)
if 1 < serial < 100000:
base = datetime.date(1899, 12, 30)
return (base + datetime.timedelta(days=int(serial))).isoformat()
except (ValueError, OverflowError):
pass
return s # Return as-is if we can't parse
def read_workbook(xlsx_path: Path, packs: list[str]) -> dict:
"""Read and validate an Excel workbook.
Returns an intermediate dict with keys:
summary, entities, validation_results, factors, decision
Raises ImportError with a list of validation errors.
"""
try:
import openpyxl
except ImportError:
raise ImportError([
"openpyxl is required for Excel import. "
"Install with: pip install uofa[excel]"
])
if not xlsx_path.exists():
raise ImportError([f"File not found: {xlsx_path}"])
try:
wb = openpyxl.load_workbook(str(xlsx_path), data_only=True)
except Exception as exc:
raise ImportError([f"Cannot open workbook: {exc}"])
errors = []
warnings: list[str] = []
# ── Validate required sheets ─────────────────────────────
required_sheets = [
SHEET_NAMES["summary"],
SHEET_NAMES["model_data"],
SHEET_NAMES["validation"],
SHEET_NAMES["decision"],
]
for sheet_name in required_sheets:
if sheet_name not in wb.sheetnames:
errors.append(f"Sheet '{sheet_name}' not found in workbook")
if errors:
raise ImportError(errors)
# ── Read Assessment Summary (row 3) ──────────────────────
ws = wb[SHEET_NAMES["summary"]]
summary = _read_summary(ws, errors, warnings=warnings)
profile = summary.get("profile", "Minimal")
# Credibility Factors is only required for Complete profile
has_factors_sheet = SHEET_NAMES["factors"] in wb.sheetnames
if profile == "Complete" and not has_factors_sheet:
errors.append(f"Sheet '{SHEET_NAMES['factors']}' not found (required for Complete profile)")
# ── Read Model & Data ────────────────────────────────────
ws = wb[SHEET_NAMES["model_data"]]
entities = _read_entities(ws, profile, errors, warnings=warnings, summary=summary)
# ── Read Validation Results ──────────────────────────────
ws = wb[SHEET_NAMES["validation"]]
validation_results = _read_validation_results(ws, errors, warnings)
# ── Read Credibility Factors ─────────────────────────────
factors = []
if has_factors_sheet:
ws = wb[SHEET_NAMES["factors"]]
factors = _read_factors(ws, packs, errors)
# ── Read Decision ────────────────────────────────────────
ws = wb[SHEET_NAMES["decision"]]
decision = _read_decision(ws, errors, warnings=warnings)
if errors:
raise ImportError(errors)
return {
"summary": summary,
"entities": entities,
"validation_results": validation_results,
"factors": factors,
"decision": decision,
"_warnings": warnings,
}
def _find_data_row(ws, header_keyword: str, search_col: int = 1, max_row: int = 10) -> int:
"""Find the first data row after a header row containing the keyword.
Scans column search_col for a cell matching header_keyword (case-insensitive).
Returns the row AFTER the last header/instruction row (i.e., the first row
that doesn't look like a header).
Heuristic: the data row is the last non-empty row in the first block.
For templates with instruction rows, the data row follows the instructions.
"""
for r in range(1, max_row + 1):
val = _cell_value(ws, r, search_col)
if val and header_keyword.lower() in val.lower():
# Found header row. Data is the next non-instruction row.
# Check if there's an instruction row right after
for data_r in range(r + 1, max_row + 1):
val = _cell_value(ws, data_r, search_col)
if val and val not in ("", None):
# Check if this looks like actual data vs instructions
# Instructions tend to be long descriptions starting with verbs/articles
# Data tends to be short identifiers
# But the safest heuristic: the last row before an empty row
return data_r
return r + 1
# Fallback: assume row 3 has headers, row 4 has data
return 4
def _find_header_row(ws, header_keyword: str, search_col: int = 1, max_row: int = 10) -> int:
"""Find the row containing column headers."""
for r in range(1, max_row + 1):
val = _cell_value(ws, r, search_col)
if val and header_keyword.lower() == val.lower():
return r
return 2 # default
def _read_summary(ws, errors: list, warnings: list | None = None) -> dict:
"""Read Assessment Summary sheet.
Finds the header row (containing "Project Name") and reads data from
the last populated row in the header block.
If ``warnings`` is provided, enum fields (profile, assurance_level)
that don't match the canonical set are normalized via
``_fuzzy_normalize_enum`` (handles "Minimal or Complete" → "Minimal"
LLM-output gaps) and a per-field warning is appended instead of an
error. If ``warnings`` is None, falls back to the strict legacy
behavior of appending to ``errors``.
"""
sheet = SHEET_NAMES["summary"]
# Find header row by looking for "Project Name"
header_row = _find_header_row(ws, "Project Name")
# Data row: scan forward from header to find actual data
# In the starter template: row 2 = headers, row 3 = instructions, row 4 = data
# In the spec: row 2 = headers, row 3 = data
# Strategy: find the LAST non-empty row starting from header+1
row = header_row + 1
for r in range(header_row + 1, header_row + 4):
val = _cell_value(ws, r, 1)
if val:
row = r # keep advancing to last non-empty row
project_name = _cell_value(ws, row, 1) # A
cou_name = _cell_value(ws, row, 2) # B
cou_description = _cell_value(ws, row, 3) # C
profile = _cell_value(ws, row, 4) # D
device_class = _cell_value(ws, row, 5) # E
mrl = _cell_value(ws, row, 6) # F
assurance_level = _cell_value(ws, row, 7) # G
standards_ref = _cell_value(ws, row, 8) # H
assessor_name = _cell_value(ws, row, 9) # I
assessment_date = ws.cell(row=row, column=10).value # J — raw for date parsing
source_doc = _cell_value(ws, row, 11) # K
has_uq = _cell_value(ws, row, 12) # L
# Required for Minimal
if not project_name:
errors.append(f"Sheet '{sheet}', cell {_cell_ref(1, row)} (Project Name) is required for Minimal profile")
if not cou_name:
errors.append(f"Sheet '{sheet}', cell {_cell_ref(2, row)} (COU Name) is required")
# Validate profile (lenient mode: normalize enum-echo via fuzzy match;
# genuinely-invalid non-echo inputs still error)
if profile and profile not in VALID_PROFILES:
normalized = _fuzzy_normalize_enum(profile, VALID_PROFILES) if warnings is not None else None
if normalized is not None:
warnings.append(
f"Sheet '{sheet}', cell {_cell_ref(4, row)}: "
f"'{profile}' is not a canonical profile — normalized to "
f"'{normalized}'. Canonical set: {', '.join(VALID_PROFILES)}"
)
profile = normalized
else:
errors.append(f"Sheet '{sheet}', cell {_cell_ref(4, row)}: '{profile}' is not a valid profile. Expected: {', '.join(VALID_PROFILES)}")
if not profile:
profile = "Minimal"
# Validate device class
if device_class and device_class not in VALID_DEVICE_CLASSES:
# Allow free text (Category A-E, Other) per spec
pass
# Validate assurance level (lenient mode: normalize enum-echo only;
# genuinely-invalid non-echo inputs still error)
if assurance_level and assurance_level not in VALID_ASSURANCE_LEVELS:
normalized = _fuzzy_normalize_enum(assurance_level, VALID_ASSURANCE_LEVELS) if warnings is not None else None
if normalized is not None:
warnings.append(
f"Sheet '{sheet}', cell {_cell_ref(7, row)}: "
f"'{assurance_level}' is not a canonical assurance level — normalized to "
f"'{normalized}'. Canonical set: {', '.join(VALID_ASSURANCE_LEVELS)}"
)
assurance_level = normalized
else:
errors.append(f"Sheet '{sheet}', cell {_cell_ref(7, row)}: '{assurance_level}' is not a valid assurance level. Expected: {', '.join(VALID_ASSURANCE_LEVELS)}")
# Parse MRL
mrl_int = None
if mrl:
# Handle "MRL 3" format
mrl_str = mrl.replace("MRL", "").strip()
mrl_int = _parse_int(mrl_str)
return {
"project_name": project_name,
"cou_name": cou_name,
"cou_description": cou_description,
"profile": profile,
"device_class": device_class,
"model_risk_level": mrl_int,
"assurance_level": assurance_level,
"standards_reference": standards_ref,
"assessor_name": assessor_name,
"assessment_date": _parse_date(assessment_date),
"source_document": source_doc,
"has_uq": has_uq,
}
def _read_entities(ws, profile: str, errors: list,
warnings: list | None = None,
summary: dict | None = None) -> list[dict]:
"""Read Model & Data sheet.
If ``warnings`` is provided and the sheet has no Requirement entity,
a placeholder Requirement is synthesized from the Assessment Summary
and a warning is appended (lenient mode). If ``warnings`` is None,
falls back to the strict behavior of appending to ``errors``.
"""
sheet = SHEET_NAMES["model_data"]
entities = []
has_requirement = False
# Find header row containing "Entity Type"
header_row = _find_header_row(ws, "Entity Type")
# Skip instruction row if present
data_start = header_row + 1
for r in range(header_row + 1, header_row + 4):
val = _cell_value(ws, r, 1)
if val and val in ("Requirement", "Model", "Dataset"):
data_start = r
break
for row in range(data_start, ws.max_row + 1):
entity_type = _cell_value(ws, row, 1) # A
if not entity_type:
continue
if entity_type not in ("Requirement", "Model", "Dataset"):
errors.append(
f"Sheet '{sheet}', cell {_cell_ref(1, row)}: "
f"'{entity_type}' is not a valid entity type. "
f"Expected: Requirement, Model, Dataset"
)
continue
if entity_type == "Requirement":
has_requirement = True
name = _cell_value(ws, row, 2) # B
uri = _cell_value(ws, row, 3) # C
desc = _cell_value(ws, row, 4) # D
version = _cell_value(ws, row, 5) # E
source = _cell_value(ws, row, 6) # F
if not name:
errors.append(f"Sheet '{sheet}', cell {_cell_ref(2, row)}: Name is required")
entities.append({
"entity_type": entity_type,
"name": name,
"uri": uri,
"description": desc,
"version": version,
"source": source,
})
if not has_requirement:
if warnings is not None:
# Lenient mode: synthesize a Requirement from the Assessment
# Summary so the import succeeds. Handles the common case where
# a small LLM dropped the Requirement on a continuation-of-prior
# COU and the user shouldn't have to re-run extract just to add
# one row by hand.
cou_name = (summary or {}).get("cou_name") or None
cou_desc = (summary or {}).get("cou_description") or None
synth_name = cou_name or "Implicit COU Requirement"
synth_desc = (
f"Auto-synthesized from COU context: {cou_desc or cou_name or 'unspecified'}. "
"The extractor did not emit a Requirement entity. Review and "
"replace with the explicit top-level requirement before signing."
)
entities.insert(0, {
"entity_type": "Requirement",
"name": synth_name,
"uri": None,
"description": synth_desc,
"version": None,
"source": "auto-synthesized",
})
warnings.append(
f"Sheet '{sheet}' has no Requirement entity — synthesized "
f"'{synth_name}' from the Assessment Summary. Review the "
f"auto-filled row before signing."
)
else:
errors.append(
f"Sheet '{sheet}' must have at least one row with Entity Type = 'Requirement'"
)
return entities
def _read_validation_results(ws, errors: list, warnings: list | None = None) -> list[dict]:
"""Read Validation Results sheet.
Detects whether the Type column (v2) is present by checking headers.
Old template: Result Name, Identifier/URI, Description, ...
v2 template: Result Name, Type, Identifier/URI, Description, ...
If ``warnings`` is provided, evidence_type values outside the canonical
enum are normalized (via ``normalize_evidence_type``) and a per-row
warning is appended instead of an error. If ``warnings`` is None,
falls back to the legacy strict behavior (append to errors).
"""
sheet = SHEET_NAMES["validation"]
results = []
# Detect column layout by scanning header rows for "Type"
has_type_col = False
header_row = 2 # default
for r in range(1, 5):
for c in range(1, 10):
val = _cell_value(ws, r, c)
if val and val.lower() == "type":
has_type_col = True
header_row = r
break
if val and val.lower() == "result name":
header_row = r
if has_type_col:
break
# Find first data row (skip instruction rows)
data_start = header_row + 1
for r in range(header_row + 1, header_row + 4):
val = _cell_value(ws, r, 1)
if val and not val.lower().startswith("short name"):
data_start = r
break
if has_type_col:
# v2 layout: A=Name, B=Type, C=URI, D=Desc, E=ComparesTo, F=HasUQ, G=UQMethod, H=Metric, I=PassFail
col_type, col_uri, col_desc, col_cmp, col_uq, col_uqm, col_met, col_pf = 2, 3, 4, 5, 6, 7, 8, 9
else:
# Old layout: A=Name, B=URI, C=Desc, D=ComparesTo, E=HasUQ, F=UQMethod, G=Metric, H=PassFail
col_type, col_uri, col_desc, col_cmp, col_uq, col_uqm, col_met, col_pf = None, 2, 3, 4, 5, 6, 7, 8
for row in range(data_start, ws.max_row + 1):
name = _cell_value(ws, row, 1) # A — Result Name
if not name:
continue
evidence_type = _cell_value(ws, row, col_type) if col_type else None
uri = _cell_value(ws, row, col_uri)
desc = _cell_value(ws, row, col_desc)
compares_to = _cell_value(ws, row, col_cmp)
has_uq = _cell_value(ws, row, col_uq)
uq_method = _cell_value(ws, row, col_uqm)
metric = _cell_value(ws, row, col_met)
pass_fail = _cell_value(ws, row, col_pf)
# Default evidence type to ValidationResult
if not evidence_type:
evidence_type = "ValidationResult"
elif evidence_type not in EVIDENCE_TYPES:
if warnings is not None:
# Lenient mode: normalize against the canonical enum and warn.
# Handles the common case where an LLM extractor emits a
# descriptive domain label (GridConvergenceStudy, etc.)
# instead of the constrained enum value.
original = evidence_type
normalized, _substituted = normalize_evidence_type(original)
warnings.append(
f"Sheet '{sheet}', cell {_cell_ref(col_type, row)}: "
f"'{original}' is not a canonical evidence type — "
f"normalized to '{normalized}'. "
f"Canonical set: {', '.join(EVIDENCE_TYPES)}"
)
evidence_type = normalized
else:
errors.append(
f"Sheet '{sheet}', cell {_cell_ref(col_type, row)}: "
f"'{evidence_type}' is not a valid evidence type. "
f"Expected: {', '.join(EVIDENCE_TYPES)}"
)
results.append({
"name": name,
"evidence_type": evidence_type,
"uri": uri,
"description": desc,
"compares_to": compares_to,
"has_uq": has_uq,
"uq_method": uq_method,
"metric_value": metric,
"pass_fail": pass_fail,
})
return results
def _read_factors(ws, packs: list[str], errors: list) -> list[dict]:
"""Read Credibility Factors sheet.
Factor Type (col A) and Category (col B) are pre-populated and locked.
User fills in Required Level (C), Achieved Level (D), Acceptance Criteria (E),
Rationale (F), Factor Status (G).
"""
sheet = SHEET_NAMES["factors"]
factors = []
# Determine valid factor names based on active packs
if "nasa-7009b" in packs:
valid_names = set(NASA_ALL_FACTOR_NAMES)
else:
valid_names = set(VV40_FACTOR_NAMES)
for row in range(FACTOR_START_ROW, ws.max_row + 1):
factor_type = _cell_value(ws, row, 1) # A
if not factor_type:
continue
category = _cell_value(ws, row, 2) # B
required_level = _cell_value(ws, row, 3) # C
achieved_level = _cell_value(ws, row, 4) # D
acceptance = _cell_value(ws, row, 5) # E
rationale = _cell_value(ws, row, 6) # F
status = _cell_value(ws, row, 7) # G
linked_evidence = _cell_value(ws, row, 8) # H — Linked Evidence URI
# Validate factor type
if factor_type not in valid_names:
if "nasa-7009b" in packs:
errors.append(
f"Sheet '{sheet}', row {row}: "
f"'{factor_type}' is not a valid NASA-STD-7009B factor type"
)
else:
errors.append(
f"Sheet '{sheet}', row {row}: "
f"'{factor_type}' is not a valid V&V 40 factor type"
)
continue
# Validate factor status
if status and status not in VALID_FACTOR_STATUSES:
errors.append(
f"Sheet '{sheet}', cell {_cell_ref(7, row)}: "
f"'{status}' is not a valid factor status. "
f"Expected: {', '.join(VALID_FACTOR_STATUSES)}"
)
# Parse and validate levels
req_int = _parse_int(required_level)
ach_int = _parse_int(achieved_level)
# Determine level range based on factor standard
is_nasa_only = factor_type in NASA_ONLY_FACTOR_NAMES
if is_nasa_only:
lo, hi = NASA_LEVEL_RANGE
else:
lo, hi = VV40_LEVEL_RANGE
if req_int is not None and (req_int < lo or req_int > hi):
errors.append(
f"Sheet '{sheet}', cell {_cell_ref(3, row)}: "
f"Required Level {req_int} out of range {lo}-{hi}"
)
if ach_int is not None and (ach_int < lo or ach_int > hi):
errors.append(
f"Sheet '{sheet}', cell {_cell_ref(4, row)}: "
f"Achieved Level {ach_int} out of range {lo}-{hi}"
)
factors.append({
"factor_type": factor_type,
"category": category,
"required_level": req_int,
"achieved_level": ach_int,
"acceptance_criteria": acceptance,
"rationale": rationale,
"status": status or "not-assessed",
"linked_evidence": linked_evidence,
})
return factors
def _read_decision(ws, errors: list, warnings: list | None = None) -> dict:
"""Read Decision sheet.
Lenient mode (warnings provided) normalizes non-canonical outcome values
via _fuzzy_normalize_enum — handles LLM enum-echo like
"Accepted / Not accepted / Conditional".
"""
sheet = SHEET_NAMES["decision"]
# Find header row containing "Decision Outcome"
header_row = _find_header_row(ws, "Decision Outcome")
# Data row: last non-empty row after header
row = header_row + 1
for r in range(header_row + 1, header_row + 4):
val = _cell_value(ws, r, 1)
if val:
row = r
outcome = _cell_value(ws, row, 1) # A
rationale = _cell_value(ws, row, 2) # B
criteria_set = _cell_value(ws, row, 3) # C
decided_by = _cell_value(ws, row, 4) # D
decision_date = ws.cell(row=row, column=5).value # E — raw for date
valid_outcomes = VALID_DECISION_OUTCOMES + ["Conditional"]
if not outcome:
errors.append(f"Sheet '{sheet}', cell {_cell_ref(1, row)}: Decision Outcome is required")
elif outcome not in valid_outcomes:
normalized = _fuzzy_normalize_enum(outcome, valid_outcomes) if warnings is not None else None
if normalized is not None:
warnings.append(
f"Sheet '{sheet}', cell {_cell_ref(1, row)}: "
f"'{outcome}' is not a canonical decision outcome — normalized to "
f"'{normalized}'. Canonical set: {', '.join(valid_outcomes)}"
)
outcome = normalized
else:
errors.append(
f"Sheet '{sheet}', cell {_cell_ref(1, row)}: "
f"'{outcome}' is not a valid outcome. "
f"Expected: {', '.join(valid_outcomes)}"
)
return {
"outcome": outcome,
"rationale": rationale,
"criteria_set": criteria_set,
"decided_by": decided_by,
"decision_date": _parse_date(decision_date),
}