#!/usr/bin/env python3 """ beeROOT Setup v3.6 - Com fragmentação de chunks Processa chunks do GitHub e constrói índice FAISS """ import os, sys, yaml, json, subprocess, logging, traceback, time, tarfile, re from pathlib import Path from datetime import datetime logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('/tmp/setup_debug.log', mode='w') ] ) logger = logging.getLogger(__name__) STATUS_FILE = Path('/tmp/setup_status.json') READY_FLAG = Path('/tmp/faiss_ready') def update_status(status, message, progress=0): data = { 'status': status, 'message': message, 'progress': progress, 'timestamp': datetime.now().isoformat() } with open(STATUS_FILE, 'w') as f: json.dump(data, f) logger.info(f"STATUS [{progress}%]: {status} - {message}") def run_cmd(cmd, desc, check=True, timeout=300): logger.info("="*80) logger.info(f"🔧 {desc}") logger.info(f"📝 {cmd}") logger.info("-"*80) try: start = time.time() result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=timeout, check=check ) elapsed = time.time() - start logger.info(f"⏱️ {elapsed:.2f}s | Exit: {result.returncode}") if result.stdout and len(result.stdout.strip()) > 0: logger.info(f"STDOUT: {result.stdout[:500]}") if result.stderr and len(result.stderr.strip()) > 0: logger.warning(f"STDERR: {result.stderr[:500]}") if result.returncode == 0: logger.info(f"✅ {desc} - OK") return result.stdout except subprocess.CalledProcessError as e: logger.error(f"❌ FALHOU: {desc}") raise def clean_html_text(text): """Remove tags HTML e limpa texto""" if not text or not isinstance(text, str): return "" text = re.sub(r'<[^>]+>', '', text) text = re.sub(r'&[a-zA-Z]+;', ' ', text) text = re.sub(r'\s+', ' ', text) return text.strip() def is_valid_value(value): """Valida se valor é válido""" if value is None: return False if isinstance(value, str): return bool(value.strip()) if isinstance(value, (int, float)): return True return True def filter_and_validate_record(record, fields_to_keep): """Filtra e valida um registro JSON""" filtered = {} for field in fields_to_keep: value = None # Tenta variações do nome do campo if field in record: value = record[field] else: for variant in [field.lower(), field.capitalize(), field.upper()]: if variant in record: value = record[variant] break if value is None: return None, False if not is_valid_value(value): return None, False # Nome padronizado field_name = 'id' if field in ['Id', 'id', 'ID'] else field.lower() # Limpa HTML da ementa if field.lower() == 'ementa' and isinstance(value, str): cleaned_value = clean_html_text(value) if not cleaned_value or not cleaned_value.strip(): return None, False filtered[field_name] = cleaned_value else: filtered[field_name] = value return filtered, True def process_tar_gz(tar_path, output_jsonl, fields_to_keep, max_records_per_file=None): """ Processa arquivo tar.gz e extrai registros Args: tar_path: Path do arquivo tar.gz output_jsonl: Path do arquivo JSONL de saída fields_to_keep: Lista de campos para manter max_records_per_file: Máximo de registros por arquivo (None = sem limite) """ logger.info(f"📦 {tar_path.name}") stats = {'total': 0, 'validos': 0} file_counter = 1 current_file_records = 0 current_output = output_jsonl try: with tarfile.open(tar_path, 'r:gz') as tar: for member in tar.getmembers(): if member.name.endswith('jurisprudencias.jsonl') and member.isfile(): logger.info(f" ✅ {member.name}") file_obj = tar.extractfile(member) content = file_obj.read().decode('utf-8') lines = content.strip().split('\n') stats['total'] = len(lines) # Abre arquivo para append out_file = open(current_output, 'a', encoding='utf-8') for line in lines: if not line.strip(): continue try: record = json.loads(line) filtered, is_valid = filter_and_validate_record( record, fields_to_keep ) if is_valid: # Fragmentação por número de registros if max_records_per_file and current_file_records >= max_records_per_file: out_file.close() # Cria novo arquivo fragmentado file_counter += 1 base = output_jsonl.stem ext = output_jsonl.suffix current_output = output_jsonl.parent / f"{base}_part{file_counter:03d}{ext}" logger.info(f" 📝 Novo fragmento: {current_output.name}") out_file = open(current_output, 'w', encoding='utf-8') current_file_records = 0 out_file.write(json.dumps(filtered, ensure_ascii=False) + '\n') stats['validos'] += 1 current_file_records += 1 except Exception as e: logger.debug(f"Erro processando linha: {e}") pass out_file.close() logger.info(f" ✅ {stats['validos']}/{stats['total']} válidos") return stats['validos'] except Exception as e: logger.error(f" ❌ {e}") raise def main(): try: logger.info("\n" + "="*80) logger.info("🐝 beeROOT SETUP v3.6") logger.info("="*80) # Verifica se já foi executado if READY_FLAG.exists(): logger.info("✅ Setup já foi executado anteriormente") update_status('ready', 'Ready', 100) return # Carrega configuração with open('config.yaml') as f: config = yaml.safe_load(f) chunk_start = config['chunk_start'] chunk_end = config['chunk_end'] github_repo = config['github_repo'] campos_filter = config['campos_filter'] # NOVA OPÇÃO: fragmentação por número de registros max_records_per_file = config.get('max_records_per_file', None) if max_records_per_file: logger.info(f"📊 Fragmentação ativada: {max_records_per_file} registros/arquivo") # Prepara URL base base_url = github_repo.replace('https://github.com/', 'https://raw.githubusercontent.com/') if base_url.endswith('.git'): base_url = base_url[:-4] base_url = f"{base_url}/main/chunks_dados" # Diretório de trabalho work_dir = Path('/tmp/work') work_dir.mkdir(exist_ok=True) output_jsonl = work_dir / 'all_filtered.jsonl' if output_jsonl.exists(): output_jsonl.unlink() logger.info("\n📥 DOWNLOAD E PROCESSAMENTO") logger.info(f" Chunks: {chunk_start} a {chunk_end}") update_status('downloading', f'Processando chunks {chunk_start}-{chunk_end}', 10) total_validos = 0 for chunk_num in range(chunk_start, chunk_end + 1): chunk_name = f"chunk_dados_{chunk_num:06d}.tar.gz" chunk_url = f"{base_url}/{chunk_name}" chunk_path = work_dir / chunk_name progress = 10 + int(((chunk_num - chunk_start + 1) / (chunk_end - chunk_start + 1)) * 50) update_status('processing', f'Chunk {chunk_num}/{chunk_end}', progress) try: # Download run_cmd( f"curl -L -f -o {chunk_path} {chunk_url}", f"Download chunk {chunk_num}", timeout=300 ) # Processa if chunk_path.exists(): validos = process_tar_gz( chunk_path, output_jsonl, campos_filter, max_records_per_file ) total_validos += validos chunk_path.unlink() except Exception as e: logger.error(f" ❌ Erro no chunk {chunk_num}: {e}") if chunk_path.exists(): chunk_path.unlink() logger.info(f"\n✅ Total de registros válidos: {total_validos:,}") if total_validos == 0: raise Exception("Nenhum registro válido encontrado!") # Build FAISS logger.info("\n🤖 CONSTRUINDO ÍNDICE FAISS") update_status('building', 'Building FAISS index', 70) os.chdir('/home/user/app') # Processa todos os fragmentos se houver jsonl_files = list(work_dir.glob('all_filtered*.jsonl')) logger.info(f" 📁 {len(jsonl_files)} arquivo(s) JSONL para processar") for jsonl_file in jsonl_files: logger.info(f" 📝 Processando: {jsonl_file.name}") result = subprocess.run( f"python3 rag_builder.py --input {jsonl_file}", shell=True, capture_output=True, text=True, timeout=900 ) if result.stdout: for line in result.stdout.split('\n'): if line.strip(): logger.info(f" {line}") if result.stderr: for line in result.stderr.split('\n'): if line.strip(): logger.warning(f" {line}") if result.returncode != 0: raise Exception(f"Build falhou: exit {result.returncode}") logger.info("✅ FAISS build completo!") # Cleanup run_cmd(f"rm -rf {work_dir}", "Cleanup", check=False) # Marca como pronto update_status('ready', f'{total_validos:,} documentos indexados', 100) READY_FLAG.touch() logger.info("\n" + "="*80) logger.info("🎉 SETUP CONCLUÍDO COM SUCESSO!") logger.info("="*80) except Exception as e: logger.error(f"\n💥 ERRO FATAL: {e}") logger.error(traceback.format_exc()) update_status('error', str(e), 0) sys.exit(1) if __name__ == "__main__": main()