Spaces:
Sleeping
Sleeping
| """Workbook engine — load, edit, and validate Excel workbooks via openpyxl.""" | |
| from __future__ import annotations | |
| import copy | |
| import json | |
| import os | |
| import re | |
| import shutil | |
| from datetime import date, datetime | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| import openpyxl | |
| from openpyxl.utils import get_column_letter, column_index_from_string | |
| from pydantic import BaseModel | |
| WORKBOOKS_DIR = os.getenv("WORKBOOKS_DIR", str(Path(__file__).resolve().parent.parent / "workbooks")) | |
| HIDDEN_TESTS_DIR = os.path.join(WORKBOOKS_DIR, "hidden_tests") | |
| FIXTURES_DIR = os.path.join(WORKBOOKS_DIR, "fixtures") | |
| TEMPLATES_DIR = os.path.join(WORKBOOKS_DIR, "templates") | |
| class WorkbookSession(BaseModel): | |
| session_id: str | |
| scenario_id: str | |
| workbook_path: str | |
| modified_cells: list[dict] = [] | |
| step_count: int = 0 | |
| solved: bool = False | |
| def _parse_cell_ref(ref: str) -> tuple[str, int]: | |
| """Parse 'A1' into (column_letter, row_number).""" | |
| m = re.match(r"^([A-Z]+)(\d+)$", ref.upper().strip()) | |
| if not m: | |
| raise ValueError(f"Invalid cell reference: {ref}") | |
| return m.group(1), int(m.group(2)) | |
| def _parse_range_ref(range_str: str) -> tuple[str, str]: | |
| """Parse 'A1:D10' into ('A1', 'D10').""" | |
| parts = range_str.upper().strip().split(":") | |
| if len(parts) == 1: | |
| return parts[0], parts[0] | |
| if len(parts) == 2: | |
| return parts[0], parts[1] | |
| raise ValueError(f"Invalid range reference: {range_str}") | |
| class WorkbookEngine: | |
| """In-memory workbook operations backed by openpyxl.""" | |
| def __init__(self): | |
| self._sessions: dict[str, WorkbookSession] = {} | |
| self._workbooks: dict[str, openpyxl.Workbook] = {} | |
| def load_workbook(self, session: WorkbookSession) -> None: | |
| """Load a workbook from disk into memory for a session.""" | |
| if not os.path.isfile(session.workbook_path): | |
| raise FileNotFoundError(f"Workbook not found: {session.workbook_path}") | |
| wb = openpyxl.load_workbook(session.workbook_path, data_only=False) | |
| self._sessions[session.session_id] = session | |
| self._workbooks[session.session_id] = wb | |
| def reset_workbook(self, session_id: str) -> None: | |
| """Reload the original workbook from disk, discarding all edits.""" | |
| session = self._get_session(session_id) | |
| session.modified_cells = [] | |
| session.step_count = 0 | |
| session.solved = False | |
| wb = openpyxl.load_workbook(session.workbook_path, data_only=False) | |
| self._workbooks[session_id] = wb | |
| def close_session(self, session_id: str) -> None: | |
| """Remove a session and free its workbook.""" | |
| self._sessions.pop(session_id, None) | |
| self._workbooks.pop(session_id, None) | |
| def list_sheets(self, session_id: str) -> list[dict]: | |
| """Return sheet names with basic metadata.""" | |
| wb = self._get_wb(session_id) | |
| sheets = [] | |
| for name in wb.sheetnames: | |
| ws = wb[name] | |
| sheets.append({ | |
| "name": name, | |
| "min_row": ws.min_row, | |
| "max_row": ws.max_row, | |
| "min_column": ws.min_column, | |
| "max_column": ws.max_column, | |
| "state": ws.sheet_state, | |
| }) | |
| return sheets | |
| def read_range(self, session_id: str, sheet: str, range_str: str) -> list[list[Any]]: | |
| """Read a rectangular range and return a 2D list of cell values.""" | |
| ws = self._get_sheet(session_id, sheet) | |
| start, end = _parse_range_ref(range_str) | |
| start_col, start_row = _parse_cell_ref(start) | |
| end_col, end_row = _parse_cell_ref(end) | |
| min_col = column_index_from_string(start_col) | |
| max_col = column_index_from_string(end_col) | |
| rows = [] | |
| for r in range(start_row, end_row + 1): | |
| row_data = [] | |
| for c in range(min_col, max_col + 1): | |
| cell = ws.cell(row=r, column=c) | |
| row_data.append(self._cell_display_value(cell)) | |
| rows.append(row_data) | |
| return rows | |
| def read_cell(self, session_id: str, sheet: str, cell_ref: str) -> dict: | |
| """Read a single cell and return value, formula, and type info.""" | |
| ws = self._get_sheet(session_id, sheet) | |
| col_letter, row_num = _parse_cell_ref(cell_ref) | |
| col_idx = column_index_from_string(col_letter) | |
| cell = ws.cell(row=row_num, column=col_idx) | |
| return { | |
| "cell": cell_ref.upper(), | |
| "value": self._cell_display_value(cell), | |
| "formula": cell.value if isinstance(cell.value, str) and cell.value.startswith("=") else None, | |
| "data_type": cell.data_type, | |
| "number_format": cell.number_format, | |
| } | |
| def inspect_formula(self, session_id: str, sheet: str, cell_ref: str) -> dict: | |
| """Return the raw formula string from a cell, or None if not a formula.""" | |
| ws = self._get_sheet(session_id, sheet) | |
| col_letter, row_num = _parse_cell_ref(cell_ref) | |
| col_idx = column_index_from_string(col_letter) | |
| cell = ws.cell(row=row_num, column=col_idx) | |
| raw = cell.value | |
| is_formula = isinstance(raw, str) and raw.startswith("=") | |
| return { | |
| "cell": cell_ref.upper(), | |
| "formula": raw if is_formula else None, | |
| "is_formula": is_formula, | |
| } | |
| def write_cell(self, session_id: str, sheet: str, cell_ref: str, value: Any) -> dict: | |
| """Write a value or formula to a single cell.""" | |
| session = self._get_session(session_id) | |
| ws = self._get_sheet(session_id, sheet) | |
| col_letter, row_num = _parse_cell_ref(cell_ref) | |
| col_idx = column_index_from_string(col_letter) | |
| ws.cell(row=row_num, column=col_idx, value=value) | |
| session.modified_cells.append({ | |
| "sheet": sheet, | |
| "cell": cell_ref.upper(), | |
| "value": str(value), | |
| "step": session.step_count, | |
| }) | |
| return {"written": cell_ref.upper(), "sheet": sheet, "value": str(value)} | |
| def write_range(self, session_id: str, sheet: str, start_cell: str, data: list[list[Any]]) -> dict: | |
| """Write a 2D block of values starting from start_cell.""" | |
| session = self._get_session(session_id) | |
| ws = self._get_sheet(session_id, sheet) | |
| col_letter, start_row = _parse_cell_ref(start_cell) | |
| start_col = column_index_from_string(col_letter) | |
| cells_written = 0 | |
| for r_offset, row_data in enumerate(data): | |
| for c_offset, val in enumerate(row_data): | |
| row_num = start_row + r_offset | |
| col_idx = start_col + c_offset | |
| ws.cell(row=row_num, column=col_idx, value=val) | |
| cell_ref = f"{get_column_letter(col_idx)}{row_num}" | |
| session.modified_cells.append({ | |
| "sheet": sheet, | |
| "cell": cell_ref, | |
| "value": str(val), | |
| "step": session.step_count, | |
| }) | |
| cells_written += 1 | |
| end_row = start_row + len(data) - 1 | |
| end_col = start_col + (max(len(r) for r in data) - 1 if data else 0) | |
| end_ref = f"{get_column_letter(end_col)}{end_row}" | |
| return { | |
| "range": f"{start_cell.upper()}:{end_ref}", | |
| "sheet": sheet, | |
| "cells_written": cells_written, | |
| } | |
| def copy_range( | |
| self, session_id: str, | |
| src_sheet: str, src_range: str, | |
| dst_sheet: str, dst_start: str, | |
| ) -> dict: | |
| """Copy a range of cells from one location to another (values and formulas).""" | |
| data = self.read_range(session_id, src_sheet, src_range) | |
| src_ws = self._get_sheet(session_id, src_sheet) | |
| start_ref, end_ref = _parse_range_ref(src_range) | |
| start_col_letter, start_row = _parse_cell_ref(start_ref) | |
| end_col_letter, end_row = _parse_cell_ref(end_ref) | |
| min_col = column_index_from_string(start_col_letter) | |
| max_col = column_index_from_string(end_col_letter) | |
| raw_data = [] | |
| for r in range(start_row, end_row + 1): | |
| row = [] | |
| for c in range(min_col, max_col + 1): | |
| cell = src_ws.cell(row=r, column=c) | |
| row.append(cell.value) | |
| raw_data.append(row) | |
| result = self.write_range(session_id, dst_sheet, dst_start, raw_data) | |
| return {"copied_from": f"{src_sheet}!{src_range}", **result} | |
| def get_edit_history(self, session_id: str) -> list[dict]: | |
| """Return the list of all edits made in this session.""" | |
| session = self._get_session(session_id) | |
| return list(session.modified_cells) | |
| def get_session_info(self, session_id: str) -> dict: | |
| """Return session metadata.""" | |
| session = self._get_session(session_id) | |
| return { | |
| "session_id": session.session_id, | |
| "scenario_id": session.scenario_id, | |
| "step_count": session.step_count, | |
| "edits_made": len(session.modified_cells), | |
| "solved": session.solved, | |
| } | |
| # ── Hidden test execution ────────────────────────────────────────── | |
| def run_hidden_tests(self, session_id: str) -> dict: | |
| """Run all hidden test checks for the current scenario and return results.""" | |
| session = self._get_session(session_id) | |
| wb = self._get_wb(session_id) | |
| test_path = os.path.join(HIDDEN_TESTS_DIR, f"{session.scenario_id}.json") | |
| if not os.path.isfile(test_path): | |
| return {"error": f"No hidden tests found for scenario {session.scenario_id}"} | |
| with open(test_path) as f: | |
| test_spec = json.load(f) | |
| checks = test_spec.get("checks", []) | |
| results = [] | |
| passed = 0 | |
| for check in checks: | |
| result = self._run_single_check(wb, check) | |
| results.append(result) | |
| if result["passed"]: | |
| passed += 1 | |
| total = len(checks) | |
| pass_rate = passed / total if total > 0 else 0.0 | |
| session.solved = pass_rate == 1.0 | |
| return { | |
| "scenario_id": session.scenario_id, | |
| "total_checks": total, | |
| "passed": passed, | |
| "failed": total - passed, | |
| "pass_rate": pass_rate, | |
| "results": results, | |
| } | |
| def validate_partial(self, session_id: str) -> dict: | |
| """Run hidden tests but return only pass/fail counts, not full answers.""" | |
| full = self.run_hidden_tests(session_id) | |
| if "error" in full: | |
| return full | |
| return { | |
| "scenario_id": full["scenario_id"], | |
| "total_checks": full["total_checks"], | |
| "passed": full["passed"], | |
| "failed": full["failed"], | |
| "pass_rate": full["pass_rate"], | |
| } | |
| # ── Target region helpers ────────────────────────────────────────── | |
| def get_named_targets(self, session_id: str) -> list[dict]: | |
| """Return scenario-defined target areas where the agent should write.""" | |
| session = self._get_session(session_id) | |
| test_path = os.path.join(HIDDEN_TESTS_DIR, f"{session.scenario_id}.json") | |
| if not os.path.isfile(test_path): | |
| return [] | |
| with open(test_path) as f: | |
| test_spec = json.load(f) | |
| return test_spec.get("target_regions", []) | |
| def is_in_target_region(self, session_id: str, sheet: str, cell_ref: str) -> bool: | |
| """Check if a cell is within a designated target region.""" | |
| targets = self.get_named_targets(session_id) | |
| if not targets: | |
| return True | |
| cell_ref = cell_ref.upper() | |
| for t in targets: | |
| if t.get("sheet") != sheet: | |
| continue | |
| t_range = t.get("range") | |
| if t_range and self._cell_in_range(cell_ref, t_range): | |
| return True | |
| return False | |
| # ── Private helpers ──────────────────────────────────────────────── | |
| def _get_session(self, session_id: str) -> WorkbookSession: | |
| if session_id not in self._sessions: | |
| raise KeyError(f"Session not found: {session_id}") | |
| return self._sessions[session_id] | |
| def _get_wb(self, session_id: str) -> openpyxl.Workbook: | |
| if session_id not in self._workbooks: | |
| raise KeyError(f"No workbook loaded for session: {session_id}") | |
| return self._workbooks[session_id] | |
| def _get_sheet(self, session_id: str, sheet_name: str): | |
| wb = self._get_wb(session_id) | |
| if sheet_name not in wb.sheetnames: | |
| raise ValueError(f"Sheet '{sheet_name}' not found. Available: {wb.sheetnames}") | |
| return wb[sheet_name] | |
| def _cell_display_value(self, cell) -> Any: | |
| """Return a JSON-safe display value for a cell.""" | |
| val = cell.value | |
| if val is None: | |
| return None | |
| if isinstance(val, str) and val.startswith("="): | |
| return val | |
| if isinstance(val, (datetime, date)): | |
| return val.isoformat() | |
| return val | |
| def _cell_in_range(self, cell_ref: str, range_str: str) -> bool: | |
| """Check if cell_ref falls within range_str (e.g. 'B2:D10').""" | |
| start, end = _parse_range_ref(range_str) | |
| s_col, s_row = _parse_cell_ref(start) | |
| e_col, e_row = _parse_cell_ref(end) | |
| c_col, c_row = _parse_cell_ref(cell_ref) | |
| return ( | |
| column_index_from_string(s_col) <= column_index_from_string(c_col) <= column_index_from_string(e_col) | |
| and s_row <= c_row <= e_row | |
| ) | |
| def _run_single_check(self, wb: openpyxl.Workbook, check: dict) -> dict: | |
| """Execute a single hidden test check against the workbook.""" | |
| check_type = self._determine_check_type(check) | |
| try: | |
| if check_type == "expected_formula": | |
| return self._check_expected_formula(wb, check) | |
| elif check_type == "expected_value_range": | |
| return self._check_expected_value_range(wb, check) | |
| elif check_type == "no_blanks": | |
| return self._check_no_blanks(wb, check) | |
| elif check_type == "row_count_equals": | |
| return self._check_row_count_equals(wb, check) | |
| elif check_type == "all_dates_iso_format": | |
| return self._check_all_dates_iso(wb, check) | |
| elif check_type == "constraint_satisfaction": | |
| return self._check_constraint_satisfaction(wb, check) | |
| else: | |
| return {"check": check_type, "passed": False, "reason": f"Unknown check type: {check_type}"} | |
| except Exception as e: | |
| return {"check": check_type, "passed": False, "reason": str(e)} | |
| def _determine_check_type(self, check: dict) -> str: | |
| if "expected_formula" in check: | |
| return "expected_formula" | |
| if "expected_value_range" in check: | |
| return "expected_value_range" | |
| return check.get("check", "unknown") | |
| def _check_expected_formula(self, wb: openpyxl.Workbook, check: dict) -> dict: | |
| ws = wb[check["sheet"]] | |
| col_letter, row_num = _parse_cell_ref(check["cell"]) | |
| col_idx = column_index_from_string(col_letter) | |
| cell = ws.cell(row=row_num, column=col_idx) | |
| actual = cell.value | |
| expected = check["expected_formula"] | |
| passed = isinstance(actual, str) and actual.strip().upper() == expected.strip().upper() | |
| return { | |
| "check": "expected_formula", | |
| "cell": f"{check['sheet']}!{check['cell']}", | |
| "passed": passed, | |
| "expected": expected, | |
| "actual": actual, | |
| } | |
| def _check_expected_value_range(self, wb: openpyxl.Workbook, check: dict) -> dict: | |
| ws = wb[check["sheet"]] | |
| col_letter, row_num = _parse_cell_ref(check["cell"]) | |
| col_idx = column_index_from_string(col_letter) | |
| cell = ws.cell(row=row_num, column=col_idx) | |
| val = cell.value | |
| lo, hi = check["expected_value_range"] | |
| if isinstance(val, str) and val.startswith("="): | |
| from .formula_utils import evaluate_formula | |
| val = evaluate_formula(wb, check["sheet"], check["cell"]) | |
| numeric = self._to_numeric(val) | |
| if numeric is None: | |
| return { | |
| "check": "expected_value_range", | |
| "cell": f"{check['sheet']}!{check['cell']}", | |
| "passed": False, | |
| "reason": f"Non-numeric value: {val}", | |
| } | |
| passed = lo <= numeric <= hi | |
| return { | |
| "check": "expected_value_range", | |
| "cell": f"{check['sheet']}!{check['cell']}", | |
| "passed": passed, | |
| "expected_range": [lo, hi], | |
| "actual": numeric, | |
| } | |
| def _check_no_blanks(self, wb: openpyxl.Workbook, check: dict) -> dict: | |
| ws = wb[check["sheet"]] | |
| range_str = check["range"] | |
| start, end = _parse_range_ref(range_str) | |
| s_col, s_row = _parse_cell_ref(start) | |
| e_col, e_row = _parse_cell_ref(end) | |
| min_col = column_index_from_string(s_col) | |
| max_col = column_index_from_string(e_col) | |
| blanks = [] | |
| for r in range(s_row, e_row + 1): | |
| for c in range(min_col, max_col + 1): | |
| if ws.cell(row=r, column=c).value is None: | |
| blanks.append(f"{get_column_letter(c)}{r}") | |
| return { | |
| "check": "no_blanks", | |
| "range": f"{check['sheet']}!{range_str}", | |
| "passed": len(blanks) == 0, | |
| "blank_count": len(blanks), | |
| } | |
| def _check_row_count_equals(self, wb: openpyxl.Workbook, check: dict) -> dict: | |
| ws = wb[check["sheet"]] | |
| expected = check["value"] | |
| actual = 0 | |
| for row in ws.iter_rows(min_row=2): | |
| if any(cell.value is not None for cell in row): | |
| actual += 1 | |
| return { | |
| "check": "row_count_equals", | |
| "sheet": check["sheet"], | |
| "passed": actual == expected, | |
| "expected": expected, | |
| "actual": actual, | |
| } | |
| def _check_all_dates_iso(self, wb: openpyxl.Workbook, check: dict) -> dict: | |
| ws = wb[check["sheet"]] | |
| col_letter = check["column"].upper() | |
| col_idx = column_index_from_string(col_letter) | |
| iso_re = re.compile(r"^\d{4}-\d{2}-\d{2}") | |
| non_iso = [] | |
| for r in range(2, ws.max_row + 1): | |
| val = ws.cell(row=r, column=col_idx).value | |
| if val is None: | |
| continue | |
| if isinstance(val, (datetime, date)): | |
| continue | |
| if isinstance(val, str) and iso_re.match(val): | |
| continue | |
| non_iso.append(f"{col_letter}{r}: {val}") | |
| return { | |
| "check": "all_dates_iso_format", | |
| "column": f"{check['sheet']}!{col_letter}", | |
| "passed": len(non_iso) == 0, | |
| "non_iso_count": len(non_iso), | |
| } | |
| def _check_constraint_satisfaction(self, wb: openpyxl.Workbook, check: dict) -> dict: | |
| """Evaluate domain constraints from a constraints sheet against an output sheet. | |
| Constraints are read from prose text in column A of the constraints sheet. | |
| Each row is a rule. The engine checks common patterns: | |
| - "No employee works >N days" | |
| - "Night→Morning gap required" | |
| These are matched via regex and evaluated against the output grid. | |
| """ | |
| output_sheet = check["sheet"] | |
| constraints_sheet = check.get("constraints_sheet", "Constraints") | |
| if output_sheet not in wb.sheetnames: | |
| return {"check": "constraint_satisfaction", "passed": False, "reason": f"Sheet '{output_sheet}' not found"} | |
| if constraints_sheet not in wb.sheetnames: | |
| return {"check": "constraint_satisfaction", "passed": False, "reason": f"Sheet '{constraints_sheet}' not found"} | |
| ws_out = wb[output_sheet] | |
| ws_con = wb[constraints_sheet] | |
| constraints = [] | |
| for row in ws_con.iter_rows(min_col=1, max_col=1, values_only=True): | |
| if row[0] and isinstance(row[0], str) and row[0].strip(): | |
| constraints.append(row[0].strip()) | |
| violations = [] | |
| for constraint_text in constraints: | |
| violation = self._evaluate_constraint(ws_out, constraint_text) | |
| if violation: | |
| violations.append(violation) | |
| return { | |
| "check": "constraint_satisfaction", | |
| "sheet": output_sheet, | |
| "passed": len(violations) == 0, | |
| "total_constraints": len(constraints), | |
| "violations": violations, | |
| } | |
| def _evaluate_constraint(self, ws, constraint_text: str) -> Optional[str]: | |
| """Evaluate a single prose constraint against the output sheet. | |
| Returns a violation description or None if satisfied.""" | |
| text_lower = constraint_text.lower() | |
| # "No employee works more than N days per week" | |
| max_days_match = re.search(r"no employee works?\s*(?:more than\s*)?(\d+)\s*days?", text_lower) | |
| if max_days_match and "more than" in text_lower: | |
| max_days = int(max_days_match.group(1)) | |
| for row in ws.iter_rows(min_row=2): | |
| working_days = sum( | |
| 1 for cell in row[1:] | |
| if cell.value is not None | |
| and str(cell.value).upper().strip() not in ("X", "", "OFF") | |
| ) | |
| emp = row[0].value | |
| if working_days > max_days: | |
| return f"{emp} works {working_days} days (max {max_days})" | |
| return None | |
| # "Night shift must not be followed by Morning shift" | |
| if "night" in text_lower and "morning" in text_lower and ("follow" in text_lower or "gap" in text_lower): | |
| for row in ws.iter_rows(min_row=2): | |
| emp = row[0].value | |
| shifts = [str(cell.value).upper().strip() if cell.value else "X" for cell in row[1:]] | |
| for i in range(len(shifts) - 1): | |
| if shifts[i] == "N" and shifts[i + 1] == "M": | |
| return f"{emp} has Night→Morning on days {i + 1}→{i + 2}" | |
| return None | |
| # "At least N employees must be on X shift every day" | |
| min_shift_daily = re.search( | |
| r"at least (\d+) employees?.*\bon\s+(\w+)\s+shift\s+every\s+day", text_lower | |
| ) | |
| if min_shift_daily: | |
| min_count = int(min_shift_daily.group(1)) | |
| shift_name = min_shift_daily.group(2).strip() | |
| shift_code = shift_name[0].upper() | |
| for col_idx in range(2, ws.max_column + 1): | |
| count = 0 | |
| for row_idx in range(2, ws.max_row + 1): | |
| val = ws.cell(row=row_idx, column=col_idx).value | |
| if val and str(val).upper().strip() == shift_code: | |
| count += 1 | |
| day_header = ws.cell(row=1, column=col_idx).value or f"Day{col_idx - 1}" | |
| if count < min_count: | |
| return f"{day_header}: only {count} on {shift_name} shift (need {min_count})" | |
| return None | |
| # "Each employee must work at least N days per week" | |
| min_days_match = re.search(r"each employee must work at least (\d+) days", text_lower) | |
| if min_days_match: | |
| min_days = int(min_days_match.group(1)) | |
| for row in ws.iter_rows(min_row=2): | |
| emp = row[0].value | |
| working = sum( | |
| 1 for cell in row[1:] | |
| if cell.value is not None | |
| and str(cell.value).upper().strip() not in ("X", "", "OFF") | |
| ) | |
| if working < min_days: | |
| return f"{emp} works only {working} days (min {min_days})" | |
| return None | |
| # "Saturday and Sunday must have at least N employees on X shift" | |
| weekend_match = re.search( | |
| r"saturday and sunday.*at least (\d+) employees?\s+on\s+(\w+)\s+shift", text_lower | |
| ) | |
| if weekend_match: | |
| min_count = int(weekend_match.group(1)) | |
| shift_name = weekend_match.group(2).strip() | |
| shift_code = shift_name[0].upper() | |
| headers = {} | |
| for c in range(1, ws.max_column + 1): | |
| h = ws.cell(row=1, column=c).value | |
| if h: | |
| headers[str(h).lower().strip()[:3]] = c | |
| for day_prefix in ["sat", "sun"]: | |
| col = headers.get(day_prefix) | |
| if not col: | |
| continue | |
| count = 0 | |
| for row_idx in range(2, ws.max_row + 1): | |
| val = ws.cell(row=row_idx, column=col).value | |
| if val and str(val).upper().strip() == shift_code: | |
| count += 1 | |
| day_header = ws.cell(row=1, column=col).value | |
| if count < min_count: | |
| return f"{day_header}: only {count} on {shift_name} shift (need {min_count})" | |
| return None | |
| return None | |
| def _to_numeric(self, val: Any) -> Optional[float]: | |
| if val is None: | |
| return None | |
| if isinstance(val, (int, float)): | |
| return float(val) | |
| if isinstance(val, str): | |
| cleaned = val.replace(",", "").replace("$", "").replace("%", "").strip() | |
| try: | |
| return float(cleaned) | |
| except (ValueError, TypeError): | |
| return None | |
| return None | |