from __future__ import annotations from copy import deepcopy try: from .config import DECISION_PHASE_LABELS, INTENT_TYPE_LABELS, PROJECT_VERSION, SUBTYPE_LABELS # type: ignore except ImportError: from config import DECISION_PHASE_LABELS, INTENT_TYPE_LABELS, PROJECT_VERSION, SUBTYPE_LABELS API_SCHEMA_VERSION = "2026-03-22" ALLOWED_MONETIZATION_ELIGIBILITY = { "allowed", "allowed_with_caution", "restricted", "not_allowed", } ALLOWED_DECISION_BASIS = { "score_threshold", "fallback_low_confidence", "fallback_ambiguous_intent", "fallback_policy_default", } ALLOWED_SENSITIVITY = {"low", "medium", "high"} ALLOWED_OPPORTUNITY_TYPES = { "none", "transaction_trigger", "decision_moment", "comparison_slot", "soft_recommendation", } ALLOWED_OPPORTUNITY_STRENGTHS = {"low", "medium", "high"} ALLOWED_FALLBACK_REASONS = {"ambiguous_query", "policy_default", "confidence_below_threshold"} ALLOWED_IAB_MAPPING_MODES = {"exact", "nearest_equivalent", "internal_extension"} class SchemaValidationError(Exception): def __init__(self, code: str, details: list[dict]): super().__init__(code) self.code = code self.details = details def _detail(field: str, message: str, error_type: str = "validation_error") -> dict: return {"field": field, "message": message, "type": error_type} def _expect_dict(value, field: str, errors: list[dict]) -> dict | None: if not isinstance(value, dict): errors.append(_detail(field, "must be an object", "type_error")) return None return value def _expect_list(value, field: str, errors: list[dict]) -> list | None: if not isinstance(value, list): errors.append(_detail(field, "must be an array", "type_error")) return None return value def _expect_bool(value, field: str, errors: list[dict]) -> bool | None: if not isinstance(value, bool): errors.append(_detail(field, "must be a boolean", "type_error")) return None return value def _expect_str(value, field: str, errors: list[dict], *, min_length: int = 0, max_length: int | None = None) -> str | None: if not isinstance(value, str): errors.append(_detail(field, "must be a string", "type_error")) return None cleaned = value.strip() if len(cleaned) < min_length: errors.append(_detail(field, f"must be at least {min_length} characters", "value_error")) if max_length is not None and len(cleaned) > max_length: errors.append(_detail(field, f"must be at most {max_length} characters", "value_error")) return cleaned def _expect_float(value, field: str, errors: list[dict], *, minimum: float = 0.0, maximum: float = 1.0) -> float | None: if not isinstance(value, (int, float)) or isinstance(value, bool): errors.append(_detail(field, "must be a number", "type_error")) return None coerced = float(value) if coerced < minimum or coerced > maximum: errors.append(_detail(field, f"must be between {minimum} and {maximum}", "value_error")) return coerced def _expect_member(value, field: str, allowed: set[str] | tuple[str, ...], errors: list[dict]) -> str | None: member = _expect_str(value, field, errors, min_length=1) if member is not None and member not in allowed: allowed_values = ", ".join(sorted(allowed)) errors.append(_detail(field, f"must be one of: {allowed_values}", "value_error")) return member def validate_classify_request(payload) -> dict: errors: list[dict] = [] payload_dict = _expect_dict(payload, "body", errors) if payload_dict is None: raise SchemaValidationError("request_validation_failed", errors) extra_keys = sorted(set(payload_dict) - {"text"}) if extra_keys: errors.append(_detail("body", f"unexpected fields: {', '.join(extra_keys)}", "value_error")) text = _expect_str(payload_dict.get("text"), "text", errors, min_length=1, max_length=5000) if errors: raise SchemaValidationError("request_validation_failed", errors) return {"text": text} def _validate_head_confidence(payload, field: str, labels: tuple[str, ...], errors: list[dict]) -> None: data = _expect_dict(payload, field, errors) if data is None: return _expect_member(data.get("label"), f"{field}.label", labels, errors) _expect_float(data.get("confidence"), f"{field}.confidence", errors) _expect_float(data.get("raw_confidence"), f"{field}.raw_confidence", errors) _expect_float(data.get("confidence_threshold"), f"{field}.confidence_threshold", errors) _expect_bool(data.get("calibrated"), f"{field}.calibrated", errors) _expect_bool(data.get("meets_threshold"), f"{field}.meets_threshold", errors) def _validate_iab_level(payload, field: str, errors: list[dict]) -> None: data = _expect_dict(payload, field, errors) if data is None: return _expect_str(data.get("id"), f"{field}.id", errors, min_length=1) _expect_str(data.get("label"), f"{field}.label", errors, min_length=1) def _validate_iab_content(payload, field: str, errors: list[dict]) -> None: data = _expect_dict(payload, field, errors) if data is None: return taxonomy = _expect_str(data.get("taxonomy"), f"{field}.taxonomy", errors, min_length=1) if taxonomy is not None and taxonomy != "IAB Content Taxonomy": errors.append(_detail(f"{field}.taxonomy", "must equal 'IAB Content Taxonomy'", "value_error")) _expect_str(data.get("taxonomy_version"), f"{field}.taxonomy_version", errors, min_length=1) _validate_iab_level(data.get("tier1"), f"{field}.tier1", errors) tier2 = data.get("tier2") if tier2 is not None: _validate_iab_level(tier2, f"{field}.tier2", errors) tier3 = data.get("tier3") if tier3 is not None: _validate_iab_level(tier3, f"{field}.tier3", errors) tier4 = data.get("tier4") if tier4 is not None: _validate_iab_level(tier4, f"{field}.tier4", errors) _expect_member(data.get("mapping_mode"), f"{field}.mapping_mode", ALLOWED_IAB_MAPPING_MODES, errors) _expect_float(data.get("mapping_confidence"), f"{field}.mapping_confidence", errors) def _validate_fallback(payload, field: str, errors: list[dict]) -> None: if payload is None: return data = _expect_dict(payload, field, errors) if data is None: return _expect_bool(data.get("applied"), f"{field}.applied", errors) _expect_member(data.get("fallback_intent_type"), f"{field}.fallback_intent_type", INTENT_TYPE_LABELS, errors) _expect_member( data.get("fallback_monetization_eligibility"), f"{field}.fallback_monetization_eligibility", {"not_allowed"}, errors, ) _expect_member(data.get("reason"), f"{field}.reason", ALLOWED_FALLBACK_REASONS, errors) failed_components = _expect_list(data.get("failed_components"), f"{field}.failed_components", errors) if failed_components is not None: for index, item in enumerate(failed_components): _expect_member( item, f"{field}.failed_components[{index}]", {"intent_type", "intent_subtype", "decision_phase"}, errors, ) def _validate_policy(payload, field: str, errors: list[dict]) -> None: data = _expect_dict(payload, field, errors) if data is None: return _expect_member(data.get("monetization_eligibility"), f"{field}.monetization_eligibility", ALLOWED_MONETIZATION_ELIGIBILITY, errors) _expect_str(data.get("eligibility_reason"), f"{field}.eligibility_reason", errors, min_length=1) _expect_member(data.get("decision_basis"), f"{field}.decision_basis", ALLOWED_DECISION_BASIS, errors) thresholds = _expect_dict(data.get("applied_thresholds"), f"{field}.applied_thresholds", errors) if thresholds is not None: _expect_float(thresholds.get("commercial_score_min"), f"{field}.applied_thresholds.commercial_score_min", errors) _expect_float(thresholds.get("intent_type_confidence_min"), f"{field}.applied_thresholds.intent_type_confidence_min", errors) _expect_float( thresholds.get("intent_subtype_confidence_min"), f"{field}.applied_thresholds.intent_subtype_confidence_min", errors, ) _expect_float(thresholds.get("decision_phase_confidence_min"), f"{field}.applied_thresholds.decision_phase_confidence_min", errors) _expect_member(data.get("sensitivity"), f"{field}.sensitivity", ALLOWED_SENSITIVITY, errors) _expect_bool(data.get("regulated_vertical"), f"{field}.regulated_vertical", errors) def _validate_opportunity(payload, field: str, errors: list[dict]) -> None: data = _expect_dict(payload, field, errors) if data is None: return _expect_member(data.get("type"), f"{field}.type", ALLOWED_OPPORTUNITY_TYPES, errors) _expect_member(data.get("strength"), f"{field}.strength", ALLOWED_OPPORTUNITY_STRENGTHS, errors) def validate_classify_response(payload) -> dict: errors: list[dict] = [] response = _expect_dict(payload, "response", errors) if response is None: raise SchemaValidationError("response_validation_failed", errors) model_output = _expect_dict(response.get("model_output"), "model_output", errors) if model_output is not None: classification = _expect_dict(model_output.get("classification"), "model_output.classification", errors) if classification is not None: _validate_iab_content( classification.get("iab_content"), "model_output.classification.iab_content", errors, ) intent = _expect_dict(classification.get("intent"), "model_output.classification.intent", errors) if intent is not None: _expect_member(intent.get("type"), "model_output.classification.intent.type", INTENT_TYPE_LABELS, errors) _expect_member(intent.get("subtype"), "model_output.classification.intent.subtype", SUBTYPE_LABELS, errors) _expect_member( intent.get("decision_phase"), "model_output.classification.intent.decision_phase", DECISION_PHASE_LABELS, errors, ) _expect_float(intent.get("confidence"), "model_output.classification.intent.confidence", errors) _expect_float(intent.get("commercial_score"), "model_output.classification.intent.commercial_score", errors) _expect_str(intent.get("summary"), "model_output.classification.intent.summary", errors, min_length=1) component_confidence = _expect_dict( intent.get("component_confidence"), "model_output.classification.intent.component_confidence", errors, ) if component_confidence is not None: _validate_head_confidence( component_confidence.get("intent_type"), "model_output.classification.intent.component_confidence.intent_type", INTENT_TYPE_LABELS, errors, ) _validate_head_confidence( component_confidence.get("intent_subtype"), "model_output.classification.intent.component_confidence.intent_subtype", SUBTYPE_LABELS, errors, ) _validate_head_confidence( component_confidence.get("decision_phase"), "model_output.classification.intent.component_confidence.decision_phase", DECISION_PHASE_LABELS, errors, ) _expect_member( component_confidence.get("overall_strategy"), "model_output.classification.intent.component_confidence.overall_strategy", {"min_required_component_confidence"}, errors, ) _validate_fallback(model_output.get("fallback"), "model_output.fallback", errors) system_decision = _expect_dict(response.get("system_decision"), "system_decision", errors) if system_decision is not None: _validate_policy(system_decision.get("policy"), "system_decision.policy", errors) _validate_opportunity(system_decision.get("opportunity"), "system_decision.opportunity", errors) intent_trajectory = _expect_list(system_decision.get("intent_trajectory"), "system_decision.intent_trajectory", errors) if intent_trajectory is not None: for index, item in enumerate(intent_trajectory): _expect_member(item, f"system_decision.intent_trajectory[{index}]", DECISION_PHASE_LABELS, errors) meta = _expect_dict(response.get("meta"), "meta", errors) if meta is not None: _expect_str(meta.get("system_version"), "meta.system_version", errors, min_length=1) _expect_bool(meta.get("calibration_enabled"), "meta.calibration_enabled", errors) _expect_bool(meta.get("iab_mapping_is_placeholder"), "meta.iab_mapping_is_placeholder", errors) if errors: raise SchemaValidationError("response_validation_failed", errors) return deepcopy(response) def validate_health_response(payload) -> dict: errors: list[dict] = [] response = _expect_dict(payload, "response", errors) if response is None: raise SchemaValidationError("response_validation_failed", errors) _expect_member(response.get("status"), "status", {"ok"}, errors) _expect_str(response.get("system_version"), "system_version", errors, min_length=1) heads = _expect_list(response.get("heads"), "heads", errors) if heads is not None: for index, item in enumerate(heads): head = _expect_dict(item, f"heads[{index}]", errors) if head is None: continue _expect_str(head.get("head"), f"heads[{index}].head", errors, min_length=1) _expect_str(head.get("model_path"), f"heads[{index}].model_path", errors, min_length=1) _expect_str(head.get("calibration_path"), f"heads[{index}].calibration_path", errors, min_length=1) _expect_bool(head.get("ready"), f"heads[{index}].ready", errors) _expect_bool(head.get("calibrated"), f"heads[{index}].calibrated", errors) if errors: raise SchemaValidationError("response_validation_failed", errors) return deepcopy(response) def validate_version_response(payload) -> dict: errors: list[dict] = [] response = _expect_dict(payload, "response", errors) if response is None: raise SchemaValidationError("response_validation_failed", errors) _expect_str(response.get("system_version"), "system_version", errors, min_length=1) _expect_member(response.get("api_schema_version"), "api_schema_version", {API_SCHEMA_VERSION}, errors) if errors: raise SchemaValidationError("response_validation_failed", errors) return deepcopy(response) def default_version_payload() -> dict: return {"system_version": PROJECT_VERSION, "api_schema_version": API_SCHEMA_VERSION}