PARA.AI / api /services /validation_service.py
Carlex22's picture
Revert "ParaAIV3.1"
1f24745
"""
Serviço de validação de dados para para.AI API v3.0
Valida formato de acórdãos, arquivos JSONL e outros dados
"""
from typing import Dict, List, Optional, Any, Tuple
import json
import re
from datetime import datetime
from pathlib import Path
from api.utils.exceptions import ValidationError
from api.utils.logger import setup_logger
logger = setup_logger(__name__)
class ValidationService:
"""
Serviço centralizado de validação de dados.
Valida:
- Arquivos JSONL
- Registros de acórdãos
- Dados de upload
- Formatos de campo
"""
# ========================================================================
# CONSTANTES
# ========================================================================
# Campos obrigatórios em acórdãos
REQUIRED_FIELDS = ["ementa", "integra"]
# Campos opcionais mas recomendados
OPTIONAL_FIELDS = [
"acordao_id", "numero", "tribunal", "ano", "data_julgamento",
"relator", "orgao_julgador", "tipo_decisao", "classe_processual"
]
# Siglas válidas de tribunais (principais)
VALID_TRIBUNAIS = [
"STF", "STJ", "TST", "TSE", "STM", # Superiores
"TRF1", "TRF2", "TRF3", "TRF4", "TRF5", "TRF6", # Federais
"TJAC", "TJAL", "TJAP", "TJAM", "TJBA", "TJCE", "TJDF", "TJES",
"TJGO", "TJMA", "TJMT", "TJMS", "TJMG", "TJPA", "TJPB", "TJPR",
"TJPE", "TJPI", "TJRJ", "TJRN", "TJRS", "TJRO", "TJRR", "TJSC",
"TJSP", "TJSE", "TJTO" # Estaduais
]
# Tamanhos mínimos de texto
MIN_EMENTA_LENGTH = 50
MIN_INTEGRA_LENGTH = 100
# Tamanhos máximos de texto
MAX_EMENTA_LENGTH = 50000 # ~50KB
MAX_INTEGRA_LENGTH = 500000 # ~500KB
# Padrões regex
PATTERN_NUMERO_PROCESSO = re.compile(
r'^\d{7}-\d{2}\.\d{4}\.\d{1}\.\d{2}\.\d{4}$' # CNJ
)
PATTERN_ANO = re.compile(r'^(19|20)\d{2}$')
PATTERN_DATA = re.compile(r'^\d{4}-\d{2}-\d{2}')
# ========================================================================
# VALIDAÇÃO DE ARQUIVO JSONL
# ========================================================================
def validate_jsonl_file(
self,
file_path: str,
max_size_mb: int = 500
) -> Tuple[bool, Optional[str], Dict[str, Any]]:
"""
Valida arquivo JSONL completo.
Args:
file_path: Caminho do arquivo
max_size_mb: Tamanho máximo permitido em MB
Returns:
Tuple[bool, Optional[str], Dict]: (is_valid, error_message, metadata)
"""
try:
path = Path(file_path)
# Verificar se existe
if not path.exists():
return False, f"Arquivo não encontrado: {file_path}", {}
# Verificar extensão
if path.suffix.lower() not in ['.jsonl', '.json']:
return False, "Arquivo deve ter extensão .jsonl ou .json", {}
# Verificar tamanho
size_mb = path.stat().st_size / (1024 * 1024)
if size_mb > max_size_mb:
return False, f"Arquivo muito grande: {size_mb:.2f}MB (máx: {max_size_mb}MB)", {}
# Validar conteúdo
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
valid, error, metadata = self.validate_jsonl_content(content)
metadata['file_size_mb'] = size_mb
metadata['file_path'] = str(path)
return valid, error, metadata
except Exception as e:
logger.error(f"Error validating JSONL file: {e}")
return False, f"Erro ao validar arquivo: {str(e)}", {}
def validate_jsonl_content(
self,
content: str
) -> Tuple[bool, Optional[str], Dict[str, Any]]:
"""
Valida conteúdo JSONL (string).
Args:
content: Conteúdo do arquivo JSONL
Returns:
Tuple[bool, Optional[str], Dict]: (is_valid, error_message, metadata)
"""
try:
lines = content.strip().split('\n')
total_lines = len(lines)
valid_records = 0
invalid_records = 0
errors = []
for line_num, line in enumerate(lines, 1):
# Pular linhas vazias
if not line.strip():
continue
# Tentar fazer parse JSON
try:
record = json.loads(line)
except json.JSONDecodeError as e:
invalid_records += 1
error_msg = f"Linha {line_num}: JSON inválido - {str(e)}"
errors.append(error_msg)
if len(errors) <= 5: # Limitar erros reportados
logger.warning(error_msg)
continue
# Validar estrutura do registro
is_valid, error = self.validate_acordao_record(record)
if is_valid:
valid_records += 1
else:
invalid_records += 1
error_msg = f"Linha {line_num}: {error}"
errors.append(error_msg)
if len(errors) <= 5:
logger.warning(error_msg)
# Verificar se tem pelo menos 1 registro válido
if valid_records == 0:
return False, "Nenhum registro válido encontrado", {
"total_lines": total_lines,
"valid_records": 0,
"invalid_records": invalid_records,
"errors": errors[:10] # Primeiros 10 erros
}
# Se mais de 50% inválidos, considerar arquivo inválido
if invalid_records > valid_records:
return False, f"Muitos registros inválidos ({invalid_records}/{total_lines})", {
"total_lines": total_lines,
"valid_records": valid_records,
"invalid_records": invalid_records,
"errors": errors[:10]
}
metadata = {
"total_lines": total_lines,
"valid_records": valid_records,
"invalid_records": invalid_records,
"success_rate": (valid_records / (valid_records + invalid_records)) * 100
}
if errors:
metadata["errors"] = errors[:10] # Primeiros 10 erros
return True, None, metadata
except Exception as e:
logger.error(f"Error validating JSONL content: {e}")
return False, f"Erro ao validar conteúdo: {str(e)}", {}
# ========================================================================
# VALIDAÇÃO DE REGISTRO DE ACÓRDÃO
# ========================================================================
def validate_acordao_record(
self,
record: Dict[str, Any]
) -> Tuple[bool, Optional[str]]:
"""
Valida um registro de acórdão individual.
Args:
record: Dicionário com dados do acórdão
Returns:
Tuple[bool, Optional[str]]: (is_valid, error_message)
"""
# Verificar se é dict
if not isinstance(record, dict):
return False, "Registro deve ser um objeto JSON"
# Verificar campos obrigatórios
for field in self.REQUIRED_FIELDS:
if field not in record:
return False, f"Campo obrigatório ausente: '{field}'"
if not record[field] or not str(record[field]).strip():
return False, f"Campo obrigatório vazio: '{field}'"
# Validar ementa
ementa = str(record.get("ementa", "")).strip()
if len(ementa) < self.MIN_EMENTA_LENGTH:
return False, f"Ementa muito curta (mín: {self.MIN_EMENTA_LENGTH} chars)"
if len(ementa) > self.MAX_EMENTA_LENGTH:
return False, f"Ementa muito longa (máx: {self.MAX_EMENTA_LENGTH} chars)"
# Validar integra
integra = str(record.get("integra", "")).strip()
if len(integra) < self.MIN_INTEGRA_LENGTH:
return False, f"Íntegra muito curta (mín: {self.MIN_INTEGRA_LENGTH} chars)"
if len(integra) > self.MAX_INTEGRA_LENGTH:
return False, f"Íntegra muito longa (máx: {self.MAX_INTEGRA_LENGTH} chars)"
# Validar campos opcionais (se presentes)
if "tribunal" in record:
tribunal = str(record["tribunal"]).upper().strip()
if tribunal and tribunal not in self.VALID_TRIBUNAIS:
logger.warning(f"Tribunal não reconhecido: {tribunal}")
if "numero" in record and record["numero"]:
numero = str(record["numero"]).strip()
if not self.PATTERN_NUMERO_PROCESSO.match(numero):
logger.warning(f"Número de processo não segue padrão CNJ: {numero}")
if "ano" in record and record["ano"]:
ano = str(record["ano"]).strip()
if not self.PATTERN_ANO.match(ano):
return False, f"Ano inválido: {ano}"
return True, None
# ========================================================================
# VALIDAÇÃO DE CAMPOS ESPECÍFICOS
# ========================================================================
def validate_tribunal(self, sigla: str) -> bool:
"""Valida sigla de tribunal."""
return sigla.upper().strip() in self.VALID_TRIBUNAIS
def validate_numero_processo(self, numero: str) -> bool:
"""Valida número de processo (padrão CNJ)."""
return bool(self.PATTERN_NUMERO_PROCESSO.match(numero.strip()))
def validate_ano(self, ano: str) -> bool:
"""Valida ano (1900-2099)."""
return bool(self.PATTERN_ANO.match(str(ano).strip()))
def validate_data(self, data: str) -> bool:
"""Valida data (formato ISO: YYYY-MM-DD)."""
try:
datetime.fromisoformat(data.strip())
return True
except:
return False
# ========================================================================
# SANITIZAÇÃO
# ========================================================================
def sanitize_text(self, text: str, max_length: Optional[int] = None) -> str:
"""
Sanitiza texto removendo caracteres problemáticos.
Args:
text: Texto a sanitizar
max_length: Tamanho máximo (trunca se necessário)
Returns:
Texto sanitizado
"""
if not text:
return ""
# Remover caracteres de controle (exceto \n, \r, \t)
text = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', text)
# Normalizar espaços
text = re.sub(r'\s+', ' ', text)
# Truncar se necessário
if max_length and len(text) > max_length:
text = text[:max_length]
return text.strip()
def sanitize_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
"""
Sanitiza todos os campos de texto de um registro.
Args:
record: Registro a sanitizar
Returns:
Registro sanitizado
"""
sanitized = {}
for key, value in record.items():
if isinstance(value, str):
if key == "ementa":
sanitized[key] = self.sanitize_text(value, self.MAX_EMENTA_LENGTH)
elif key == "integra":
sanitized[key] = self.sanitize_text(value, self.MAX_INTEGRA_LENGTH)
else:
sanitized[key] = self.sanitize_text(value, 1000)
else:
sanitized[key] = value
return sanitized
# ========================================================================
# HELPERS
# ========================================================================
def get_validation_summary(
self,
valid_count: int,
invalid_count: int,
errors: List[str]
) -> Dict[str, Any]:
"""
Gera sumário de validação.
Args:
valid_count: Número de registros válidos
invalid_count: Número de registros inválidos
errors: Lista de erros encontrados
Returns:
Dicionário com sumário
"""
total = valid_count + invalid_count
return {
"total": total,
"valid": valid_count,
"invalid": invalid_count,
"success_rate": (valid_count / total * 100) if total > 0 else 0,
"errors_count": len(errors),
"errors_sample": errors[:5] if errors else []
}
# ============================================================================
# INSTÂNCIA GLOBAL
# ============================================================================
_validation_service = None
def get_validation_service() -> ValidationService:
"""
Retorna instância singleton do ValidationService.
Returns:
ValidationService: Instância do serviço
"""
global _validation_service
if _validation_service is None:
_validation_service = ValidationService()
return _validation_service