File size: 2,965 Bytes
f1e8dc8
 
 
 
 
 
 
 
 
 
 
 
144312a
 
f1e8dc8
2b9d72c
f1e8dc8
 
 
2b9d72c
f1e8dc8
 
 
 
 
 
 
2b9d72c
f1e8dc8
2b9d72c
f1e8dc8
 
 
 
 
 
 
 
 
 
 
 
 
 
2b9d72c
 
f1e8dc8
 
 
 
2b9d72c
f1e8dc8
 
 
 
 
 
 
 
 
 
 
 
2b9d72c
f1e8dc8
 
 
 
 
 
 
 
 
 
2b9d72c
f1e8dc8
 
 
 
 
 
 
2b9d72c
f1e8dc8
 
 
 
2b9d72c
f1e8dc8
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
##PARA.AI/core/validator.py
"""
Schema Validator - Validação com jsonschema V13.6
"""
import logging
import json
from typing import Dict, Any, Tuple, List

try:
    import jsonschema
    from jsonschema import validate, ValidationError
    HAS_JSONSCHEMA = True
except ImportError:
    HAS_JSONSCHEMA = False

logger = logging.getLogger(__name__)


class SchemaValidator:
    """Validador de schema JSON usando jsonschema"""

    def __init__(self, schema_path: str):
        self.schema_path = schema_path
        self.schema = self._load_schema(schema_path)
        self.enabled = HAS_JSONSCHEMA

        if not self.enabled:
            logger.warning("⚠️ jsonschema não instalado")
        else:
            logger.info(f"✅ SchemaValidator: {schema_path}")

    def _load_schema(self, schema_path: str) -> Dict[str, Any]:
        """Carrega schema JSON"""
        try:
            with open(schema_path, 'r', encoding='utf-8') as f:
                return json.load(f)
        except FileNotFoundError:
            logger.error(f"❌ Schema não encontrado: {schema_path}")
            return {}
        except json.JSONDecodeError as e:
            logger.error(f"❌ Erro ao parsear schema: {e}")
            return {}

    def validate(self, data: Dict[str, Any]) -> Tuple[bool, List[str]]:
        """Valida dados completos contra schema V13.6"""
        if not self.enabled or not self.schema:
            return True, []

        try:
            validate(instance=data, schema=self.schema)
            logger.info("✅ Validação: SUCESSO")
            return True, []
        except ValidationError as e:
            error_msg = f"{e.message} (campo: {'.'.join(map(str, e.path))})"
            return False, [error_msg]
        except Exception as e:
            return False, [str(e)]

    def validate_partial(
        self, 
        data: Dict[str, Any],
        partial_schema_path: str
    ) -> Tuple[bool, List[str]]:
        """Valida dados parciais contra schema de especialista"""
        if not self.enabled:
            return True, []

        try:
            with open(partial_schema_path, 'r', encoding='utf-8') as f:
                partial_schema = json.load(f)

            validate(instance=data, schema=partial_schema)
            return True, []
        except FileNotFoundError:
            return True, []
        except ValidationError as e:
            error_msg = f"{e.message} (campo: {'.'.join(map(str, e.path))})"
            return False, [error_msg]
        except Exception as e:
            return False, [str(e)]

    def validate_required_fields(self, data: Dict[str, Any]) -> Tuple[bool, List[str]]:
        """Valida apenas campos obrigatórios"""
        if not self.schema:
            return True, []

        required_fields = self.schema.get('required', [])
        missing = [f for f in required_fields if f not in data]

        if missing:
            return False, missing
        return True, []