Spaces:
Runtime error
Runtime error
| """ | |
| Serviço de validação de dados para para.AI API v3.0 | |
| Valida formato de acórdãos, arquivos JSONL e outros dados | |
| """ | |
| from typing import Dict, List, Optional, Any, Tuple | |
| import json | |
| import re | |
| from datetime import datetime | |
| from pathlib import Path | |
| from api.utils.exceptions import ValidationError | |
| from api.utils.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| class ValidationService: | |
| """ | |
| Serviço centralizado de validação de dados. | |
| Valida: | |
| - Arquivos JSONL | |
| - Registros de acórdãos | |
| - Dados de upload | |
| - Formatos de campo | |
| """ | |
| # ======================================================================== | |
| # CONSTANTES | |
| # ======================================================================== | |
| # Campos obrigatórios em acórdãos | |
| REQUIRED_FIELDS = ["ementa", "integra"] | |
| # Campos opcionais mas recomendados | |
| OPTIONAL_FIELDS = [ | |
| "acordao_id", "numero", "tribunal", "ano", "data_julgamento", | |
| "relator", "orgao_julgador", "tipo_decisao", "classe_processual" | |
| ] | |
| # Siglas válidas de tribunais (principais) | |
| VALID_TRIBUNAIS = [ | |
| "STF", "STJ", "TST", "TSE", "STM", # Superiores | |
| "TRF1", "TRF2", "TRF3", "TRF4", "TRF5", "TRF6", # Federais | |
| "TJAC", "TJAL", "TJAP", "TJAM", "TJBA", "TJCE", "TJDF", "TJES", | |
| "TJGO", "TJMA", "TJMT", "TJMS", "TJMG", "TJPA", "TJPB", "TJPR", | |
| "TJPE", "TJPI", "TJRJ", "TJRN", "TJRS", "TJRO", "TJRR", "TJSC", | |
| "TJSP", "TJSE", "TJTO" # Estaduais | |
| ] | |
| # Tamanhos mínimos de texto | |
| MIN_EMENTA_LENGTH = 50 | |
| MIN_INTEGRA_LENGTH = 100 | |
| # Tamanhos máximos de texto | |
| MAX_EMENTA_LENGTH = 50000 # ~50KB | |
| MAX_INTEGRA_LENGTH = 500000 # ~500KB | |
| # Padrões regex | |
| PATTERN_NUMERO_PROCESSO = re.compile( | |
| r'^\d{7}-\d{2}\.\d{4}\.\d{1}\.\d{2}\.\d{4}$' # CNJ | |
| ) | |
| PATTERN_ANO = re.compile(r'^(19|20)\d{2}$') | |
| PATTERN_DATA = re.compile(r'^\d{4}-\d{2}-\d{2}') | |
| # ======================================================================== | |
| # VALIDAÇÃO DE ARQUIVO JSONL | |
| # ======================================================================== | |
| def validate_jsonl_file( | |
| self, | |
| file_path: str, | |
| max_size_mb: int = 500 | |
| ) -> Tuple[bool, Optional[str], Dict[str, Any]]: | |
| """ | |
| Valida arquivo JSONL completo. | |
| Args: | |
| file_path: Caminho do arquivo | |
| max_size_mb: Tamanho máximo permitido em MB | |
| Returns: | |
| Tuple[bool, Optional[str], Dict]: (is_valid, error_message, metadata) | |
| """ | |
| try: | |
| path = Path(file_path) | |
| # Verificar se existe | |
| if not path.exists(): | |
| return False, f"Arquivo não encontrado: {file_path}", {} | |
| # Verificar extensão | |
| if path.suffix.lower() not in ['.jsonl', '.json']: | |
| return False, "Arquivo deve ter extensão .jsonl ou .json", {} | |
| # Verificar tamanho | |
| size_mb = path.stat().st_size / (1024 * 1024) | |
| if size_mb > max_size_mb: | |
| return False, f"Arquivo muito grande: {size_mb:.2f}MB (máx: {max_size_mb}MB)", {} | |
| # Validar conteúdo | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| valid, error, metadata = self.validate_jsonl_content(content) | |
| metadata['file_size_mb'] = size_mb | |
| metadata['file_path'] = str(path) | |
| return valid, error, metadata | |
| except Exception as e: | |
| logger.error(f"Error validating JSONL file: {e}") | |
| return False, f"Erro ao validar arquivo: {str(e)}", {} | |
| def validate_jsonl_content( | |
| self, | |
| content: str | |
| ) -> Tuple[bool, Optional[str], Dict[str, Any]]: | |
| """ | |
| Valida conteúdo JSONL (string). | |
| Args: | |
| content: Conteúdo do arquivo JSONL | |
| Returns: | |
| Tuple[bool, Optional[str], Dict]: (is_valid, error_message, metadata) | |
| """ | |
| try: | |
| lines = content.strip().split('\n') | |
| total_lines = len(lines) | |
| valid_records = 0 | |
| invalid_records = 0 | |
| errors = [] | |
| for line_num, line in enumerate(lines, 1): | |
| # Pular linhas vazias | |
| if not line.strip(): | |
| continue | |
| # Tentar fazer parse JSON | |
| try: | |
| record = json.loads(line) | |
| except json.JSONDecodeError as e: | |
| invalid_records += 1 | |
| error_msg = f"Linha {line_num}: JSON inválido - {str(e)}" | |
| errors.append(error_msg) | |
| if len(errors) <= 5: # Limitar erros reportados | |
| logger.warning(error_msg) | |
| continue | |
| # Validar estrutura do registro | |
| is_valid, error = self.validate_acordao_record(record) | |
| if is_valid: | |
| valid_records += 1 | |
| else: | |
| invalid_records += 1 | |
| error_msg = f"Linha {line_num}: {error}" | |
| errors.append(error_msg) | |
| if len(errors) <= 5: | |
| logger.warning(error_msg) | |
| # Verificar se tem pelo menos 1 registro válido | |
| if valid_records == 0: | |
| return False, "Nenhum registro válido encontrado", { | |
| "total_lines": total_lines, | |
| "valid_records": 0, | |
| "invalid_records": invalid_records, | |
| "errors": errors[:10] # Primeiros 10 erros | |
| } | |
| # Se mais de 50% inválidos, considerar arquivo inválido | |
| if invalid_records > valid_records: | |
| return False, f"Muitos registros inválidos ({invalid_records}/{total_lines})", { | |
| "total_lines": total_lines, | |
| "valid_records": valid_records, | |
| "invalid_records": invalid_records, | |
| "errors": errors[:10] | |
| } | |
| metadata = { | |
| "total_lines": total_lines, | |
| "valid_records": valid_records, | |
| "invalid_records": invalid_records, | |
| "success_rate": (valid_records / (valid_records + invalid_records)) * 100 | |
| } | |
| if errors: | |
| metadata["errors"] = errors[:10] # Primeiros 10 erros | |
| return True, None, metadata | |
| except Exception as e: | |
| logger.error(f"Error validating JSONL content: {e}") | |
| return False, f"Erro ao validar conteúdo: {str(e)}", {} | |
| # ======================================================================== | |
| # VALIDAÇÃO DE REGISTRO DE ACÓRDÃO | |
| # ======================================================================== | |
| def validate_acordao_record( | |
| self, | |
| record: Dict[str, Any] | |
| ) -> Tuple[bool, Optional[str]]: | |
| """ | |
| Valida um registro de acórdão individual. | |
| Args: | |
| record: Dicionário com dados do acórdão | |
| Returns: | |
| Tuple[bool, Optional[str]]: (is_valid, error_message) | |
| """ | |
| # Verificar se é dict | |
| if not isinstance(record, dict): | |
| return False, "Registro deve ser um objeto JSON" | |
| # Verificar campos obrigatórios | |
| for field in self.REQUIRED_FIELDS: | |
| if field not in record: | |
| return False, f"Campo obrigatório ausente: '{field}'" | |
| if not record[field] or not str(record[field]).strip(): | |
| return False, f"Campo obrigatório vazio: '{field}'" | |
| # Validar ementa | |
| ementa = str(record.get("ementa", "")).strip() | |
| if len(ementa) < self.MIN_EMENTA_LENGTH: | |
| return False, f"Ementa muito curta (mín: {self.MIN_EMENTA_LENGTH} chars)" | |
| if len(ementa) > self.MAX_EMENTA_LENGTH: | |
| return False, f"Ementa muito longa (máx: {self.MAX_EMENTA_LENGTH} chars)" | |
| # Validar integra | |
| integra = str(record.get("integra", "")).strip() | |
| if len(integra) < self.MIN_INTEGRA_LENGTH: | |
| return False, f"Íntegra muito curta (mín: {self.MIN_INTEGRA_LENGTH} chars)" | |
| if len(integra) > self.MAX_INTEGRA_LENGTH: | |
| return False, f"Íntegra muito longa (máx: {self.MAX_INTEGRA_LENGTH} chars)" | |
| # Validar campos opcionais (se presentes) | |
| if "tribunal" in record: | |
| tribunal = str(record["tribunal"]).upper().strip() | |
| if tribunal and tribunal not in self.VALID_TRIBUNAIS: | |
| logger.warning(f"Tribunal não reconhecido: {tribunal}") | |
| if "numero" in record and record["numero"]: | |
| numero = str(record["numero"]).strip() | |
| if not self.PATTERN_NUMERO_PROCESSO.match(numero): | |
| logger.warning(f"Número de processo não segue padrão CNJ: {numero}") | |
| if "ano" in record and record["ano"]: | |
| ano = str(record["ano"]).strip() | |
| if not self.PATTERN_ANO.match(ano): | |
| return False, f"Ano inválido: {ano}" | |
| return True, None | |
| # ======================================================================== | |
| # VALIDAÇÃO DE CAMPOS ESPECÍFICOS | |
| # ======================================================================== | |
| def validate_tribunal(self, sigla: str) -> bool: | |
| """Valida sigla de tribunal.""" | |
| return sigla.upper().strip() in self.VALID_TRIBUNAIS | |
| def validate_numero_processo(self, numero: str) -> bool: | |
| """Valida número de processo (padrão CNJ).""" | |
| return bool(self.PATTERN_NUMERO_PROCESSO.match(numero.strip())) | |
| def validate_ano(self, ano: str) -> bool: | |
| """Valida ano (1900-2099).""" | |
| return bool(self.PATTERN_ANO.match(str(ano).strip())) | |
| def validate_data(self, data: str) -> bool: | |
| """Valida data (formato ISO: YYYY-MM-DD).""" | |
| try: | |
| datetime.fromisoformat(data.strip()) | |
| return True | |
| except: | |
| return False | |
| # ======================================================================== | |
| # SANITIZAÇÃO | |
| # ======================================================================== | |
| def sanitize_text(self, text: str, max_length: Optional[int] = None) -> str: | |
| """ | |
| Sanitiza texto removendo caracteres problemáticos. | |
| Args: | |
| text: Texto a sanitizar | |
| max_length: Tamanho máximo (trunca se necessário) | |
| Returns: | |
| Texto sanitizado | |
| """ | |
| if not text: | |
| return "" | |
| # Remover caracteres de controle (exceto \n, \r, \t) | |
| text = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', text) | |
| # Normalizar espaços | |
| text = re.sub(r'\s+', ' ', text) | |
| # Truncar se necessário | |
| if max_length and len(text) > max_length: | |
| text = text[:max_length] | |
| return text.strip() | |
| def sanitize_record(self, record: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Sanitiza todos os campos de texto de um registro. | |
| Args: | |
| record: Registro a sanitizar | |
| Returns: | |
| Registro sanitizado | |
| """ | |
| sanitized = {} | |
| for key, value in record.items(): | |
| if isinstance(value, str): | |
| if key == "ementa": | |
| sanitized[key] = self.sanitize_text(value, self.MAX_EMENTA_LENGTH) | |
| elif key == "integra": | |
| sanitized[key] = self.sanitize_text(value, self.MAX_INTEGRA_LENGTH) | |
| else: | |
| sanitized[key] = self.sanitize_text(value, 1000) | |
| else: | |
| sanitized[key] = value | |
| return sanitized | |
| # ======================================================================== | |
| # HELPERS | |
| # ======================================================================== | |
| def get_validation_summary( | |
| self, | |
| valid_count: int, | |
| invalid_count: int, | |
| errors: List[str] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Gera sumário de validação. | |
| Args: | |
| valid_count: Número de registros válidos | |
| invalid_count: Número de registros inválidos | |
| errors: Lista de erros encontrados | |
| Returns: | |
| Dicionário com sumário | |
| """ | |
| total = valid_count + invalid_count | |
| return { | |
| "total": total, | |
| "valid": valid_count, | |
| "invalid": invalid_count, | |
| "success_rate": (valid_count / total * 100) if total > 0 else 0, | |
| "errors_count": len(errors), | |
| "errors_sample": errors[:5] if errors else [] | |
| } | |
| # ============================================================================ | |
| # INSTÂNCIA GLOBAL | |
| # ============================================================================ | |
| _validation_service = None | |
| def get_validation_service() -> ValidationService: | |
| """ | |
| Retorna instância singleton do ValidationService. | |
| Returns: | |
| ValidationService: Instância do serviço | |
| """ | |
| global _validation_service | |
| if _validation_service is None: | |
| _validation_service = ValidationService() | |
| return _validation_service | |