Spaces:
Runtime error
Runtime error
| ##PARA.AI/core/validator.py | |
| """ | |
| Schema Validator - Validação com jsonschema V13.6 | |
| """ | |
| import logging | |
| import json | |
| from typing import Dict, Any, Tuple, List | |
| try: | |
| import jsonschema | |
| from jsonschema import validate, ValidationError | |
| HAS_JSONSCHEMA = True | |
| except ImportError: | |
| HAS_JSONSCHEMA = False | |
| logger = logging.getLogger(__name__) | |
| class SchemaValidator: | |
| """Validador de schema JSON usando jsonschema""" | |
| def __init__(self, schema_path: str): | |
| self.schema_path = schema_path | |
| self.schema = self._load_schema(schema_path) | |
| self.enabled = HAS_JSONSCHEMA | |
| if not self.enabled: | |
| logger.warning("⚠️ jsonschema não instalado") | |
| else: | |
| logger.info(f"✅ SchemaValidator: {schema_path}") | |
| def _load_schema(self, schema_path: str) -> Dict[str, Any]: | |
| """Carrega schema JSON""" | |
| try: | |
| with open(schema_path, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| logger.error(f"❌ Schema não encontrado: {schema_path}") | |
| return {} | |
| except json.JSONDecodeError as e: | |
| logger.error(f"❌ Erro ao parsear schema: {e}") | |
| return {} | |
| def validate(self, data: Dict[str, Any]) -> Tuple[bool, List[str]]: | |
| """Valida dados completos contra schema V13.6""" | |
| if not self.enabled or not self.schema: | |
| return True, [] | |
| try: | |
| validate(instance=data, schema=self.schema) | |
| logger.info("✅ Validação: SUCESSO") | |
| return True, [] | |
| except ValidationError as e: | |
| error_msg = f"{e.message} (campo: {'.'.join(map(str, e.path))})" | |
| return False, [error_msg] | |
| except Exception as e: | |
| return False, [str(e)] | |
| def validate_partial( | |
| self, | |
| data: Dict[str, Any], | |
| partial_schema_path: str | |
| ) -> Tuple[bool, List[str]]: | |
| """Valida dados parciais contra schema de especialista""" | |
| if not self.enabled: | |
| return True, [] | |
| try: | |
| with open(partial_schema_path, 'r', encoding='utf-8') as f: | |
| partial_schema = json.load(f) | |
| validate(instance=data, schema=partial_schema) | |
| return True, [] | |
| except FileNotFoundError: | |
| return True, [] | |
| except ValidationError as e: | |
| error_msg = f"{e.message} (campo: {'.'.join(map(str, e.path))})" | |
| return False, [error_msg] | |
| except Exception as e: | |
| return False, [str(e)] | |
| def validate_required_fields(self, data: Dict[str, Any]) -> Tuple[bool, List[str]]: | |
| """Valida apenas campos obrigatórios""" | |
| if not self.schema: | |
| return True, [] | |
| required_fields = self.schema.get('required', []) | |
| missing = [f for f in required_fields if f not in data] | |
| if missing: | |
| return False, missing | |
| return True, [] | |