thearnabsarkar's picture
Upload json_semval/pipeline.py with huggingface_hub
46ae221 verified
from __future__ import annotations
import copy
import json
from typing import Any, Dict, List
from .fixes import cast_bool, cast_number, map_enum, parse_date_iso, rename_key
from .ml_model import SemanticReasoner
from .rules_engine import validate_with_jsonschema
from .schema_utils import collect_enums
from .types import Prediction, Report
def _apply_fix(schema: Dict[str, Any], payload: Any, pred: Prediction) -> Any | None:
path = pred.get("jsonpath", "$")
if not path.startswith("$"):
return None
# convert to tokens
tokens: List[str] = []
rest = path[1:]
i = 0
while i < len(rest):
ch = rest[i]
if ch == ".":
j = i + 1
name = []
while j < len(rest) and rest[j] not in ".[":
name.append(rest[j])
j += 1
if name:
tokens.append("." + "".join(name))
i = j
continue
if ch == "[":
j = rest.find("]", i)
tokens.append(rest[i : j + 1])
i = j + 1
continue
i += 1
action = pred.get("fix_action", "")
if action == "rename_key":
dst = pred.get("fix_value") or "_renamed"
try:
return rename_key(payload, tokens, dst)
except Exception:
return None
if action == "cast_number":
return cast_number(payload, tokens)
if action == "cast_bool":
return cast_bool(payload, tokens)
if action == "parse_date_iso":
return parse_date_iso(payload, tokens)
if action == "map_enum":
enums = collect_enums(schema)
allowed = enums.get(path.replace("$", ""), [])
return map_enum(payload, tokens, allowed)
# fill_default or unknown → skip
return None
def run_validation(
schema: Dict[str, Any],
payload: Any,
*,
apply_fixes: bool = True,
max_fixes: int = 5,
backend: str = "local",
) -> Report:
is_valid, errors = validate_with_jsonschema(schema, payload)
if is_valid:
return {
"valid": True,
"rule_errors": [],
"ml_predictions": [],
"applied_fixes": [],
"corrected_json": payload,
}
# Honor explicit rules-only backend: do not invoke ML or apply fixes
if backend == "rules-only":
return {
"valid": False,
"rule_errors": errors,
"ml_predictions": [],
"applied_fixes": [],
"corrected_json": payload,
}
reasoner = SemanticReasoner(backend=backend)
preds = reasoner.predict(json.dumps(schema), json.dumps(payload), errors)
applied: List[Prediction] = []
corrected = copy.deepcopy(payload)
if not apply_fixes:
return {
"valid": False,
"rule_errors": errors,
"ml_predictions": preds,
"applied_fixes": [],
"corrected_json": corrected,
}
for pred in preds[:max_fixes]:
candidate = copy.deepcopy(corrected)
updated = _apply_fix(schema, candidate, pred)
if updated is None:
continue
now_valid, _ = validate_with_jsonschema(schema, candidate)
if now_valid:
corrected = candidate
applied.append(pred)
break
else:
# keep only if it reduces number of errors by any amount
prev_count = len(errors)
_, new_errs = validate_with_jsonschema(schema, candidate)
if len(new_errs) <= prev_count:
corrected = candidate
applied.append(pred)
errors = new_errs
if len(applied) >= max_fixes:
break
final_valid, final_errors = validate_with_jsonschema(schema, corrected)
return {
"valid": final_valid,
"rule_errors": final_errors if not final_valid else [],
"ml_predictions": preds,
"applied_fixes": applied,
"corrected_json": corrected,
}