| from __future__ import annotations |
|
|
| import copy |
| import json |
| from typing import Any, Dict, List |
|
|
| from .fixes import cast_bool, cast_number, map_enum, parse_date_iso, rename_key |
| from .ml_model import SemanticReasoner |
| from .rules_engine import validate_with_jsonschema |
| from .schema_utils import collect_enums |
| from .types import Prediction, Report |
|
|
|
|
| def _apply_fix(schema: Dict[str, Any], payload: Any, pred: Prediction) -> Any | None: |
| path = pred.get("jsonpath", "$") |
| if not path.startswith("$"): |
| return None |
| |
| tokens: List[str] = [] |
| rest = path[1:] |
| i = 0 |
| while i < len(rest): |
| ch = rest[i] |
| if ch == ".": |
| j = i + 1 |
| name = [] |
| while j < len(rest) and rest[j] not in ".[": |
| name.append(rest[j]) |
| j += 1 |
| if name: |
| tokens.append("." + "".join(name)) |
| i = j |
| continue |
| if ch == "[": |
| j = rest.find("]", i) |
| tokens.append(rest[i : j + 1]) |
| i = j + 1 |
| continue |
| i += 1 |
|
|
| action = pred.get("fix_action", "") |
| if action == "rename_key": |
| dst = pred.get("fix_value") or "_renamed" |
| try: |
| return rename_key(payload, tokens, dst) |
| except Exception: |
| return None |
| if action == "cast_number": |
| return cast_number(payload, tokens) |
| if action == "cast_bool": |
| return cast_bool(payload, tokens) |
| if action == "parse_date_iso": |
| return parse_date_iso(payload, tokens) |
| if action == "map_enum": |
| enums = collect_enums(schema) |
| allowed = enums.get(path.replace("$", ""), []) |
| return map_enum(payload, tokens, allowed) |
| |
| return None |
|
|
|
|
| def run_validation( |
| schema: Dict[str, Any], |
| payload: Any, |
| *, |
| apply_fixes: bool = True, |
| max_fixes: int = 5, |
| backend: str = "local", |
| ) -> Report: |
| is_valid, errors = validate_with_jsonschema(schema, payload) |
| if is_valid: |
| return { |
| "valid": True, |
| "rule_errors": [], |
| "ml_predictions": [], |
| "applied_fixes": [], |
| "corrected_json": payload, |
| } |
|
|
| |
| if backend == "rules-only": |
| return { |
| "valid": False, |
| "rule_errors": errors, |
| "ml_predictions": [], |
| "applied_fixes": [], |
| "corrected_json": payload, |
| } |
|
|
| reasoner = SemanticReasoner(backend=backend) |
| preds = reasoner.predict(json.dumps(schema), json.dumps(payload), errors) |
| applied: List[Prediction] = [] |
| corrected = copy.deepcopy(payload) |
|
|
| if not apply_fixes: |
| return { |
| "valid": False, |
| "rule_errors": errors, |
| "ml_predictions": preds, |
| "applied_fixes": [], |
| "corrected_json": corrected, |
| } |
|
|
| for pred in preds[:max_fixes]: |
| candidate = copy.deepcopy(corrected) |
| updated = _apply_fix(schema, candidate, pred) |
| if updated is None: |
| continue |
| now_valid, _ = validate_with_jsonschema(schema, candidate) |
| if now_valid: |
| corrected = candidate |
| applied.append(pred) |
| break |
| else: |
| |
| prev_count = len(errors) |
| _, new_errs = validate_with_jsonschema(schema, candidate) |
| if len(new_errs) <= prev_count: |
| corrected = candidate |
| applied.append(pred) |
| errors = new_errs |
| if len(applied) >= max_fixes: |
| break |
|
|
| final_valid, final_errors = validate_with_jsonschema(schema, corrected) |
| return { |
| "valid": final_valid, |
| "rule_errors": final_errors if not final_valid else [], |
| "ml_predictions": preds, |
| "applied_fixes": applied, |
| "corrected_json": corrected, |
| } |
|
|