#!/usr/bin/env python3 """ beeROOT Para.AI v4.0 - Setup com BATCHING automático Evita timeout fragmentando all_filtered.jsonl em batches """ import os, sys, yaml, json, subprocess, logging, traceback, time, tarfile, re from pathlib import Path from datetime import datetime logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('/tmp/setup_debug.log', mode='w') ] ) logger = logging.getLogger(__name__) STATUS_FILE = Path('/tmp/setup_status.json') READY_FLAG = Path('/tmp/faiss_ready') def update_status(status, message, progress=0): data = { 'status': status, 'message': message, 'progress': progress, 'timestamp': datetime.now().isoformat() } with open(STATUS_FILE, 'w') as f: json.dump(data, f) logger.info(f"STATUS [{progress}%]: {status} - {message}") def run_cmd(cmd, desc, check=True, timeout=300): logger.info("="*80) logger.info(f"🔧 {desc}") logger.info(f"📝 {cmd}") logger.info("-"*80) try: start = time.time() result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=timeout, check=check ) elapsed = time.time() - start logger.info(f"⏱️ {elapsed:.2f}s | Exit: {result.returncode}") if result.stdout and len(result.stdout.strip()) > 0: logger.info(f"STDOUT: {result.stdout[:500]}") if result.stderr and len(result.stderr.strip()) > 0: logger.warning(f"STDERR: {result.stderr[:500]}") if result.returncode == 0: logger.info(f"✅ {desc} - OK") return result.stdout except subprocess.CalledProcessError as e: logger.error(f"❌ FALHOU: {desc}") raise def clean_html_text(text): if not text or not isinstance(text, str): return "" text = re.sub(r'<[^>]+>', '', text) text = re.sub(r'&[a-zA-Z]+;', ' ', text) text = re.sub(r'\s+', ' ', text) return text.strip() def is_valid_value(value): if value is None: return False if isinstance(value, str): return bool(value.strip()) if isinstance(value, (int, float)): return True return True def filter_and_validate_record(record, fields_to_keep): filtered = {} for field in fields_to_keep: value = None if field in record: value = record[field] else: for variant in [field.lower(), field.capitalize(), field.upper()]: if variant in record: value = record[variant] break if value is None: return None, False if not is_valid_value(value): return None, False field_name = 'id' if field in ['Id', 'id', 'ID'] else field.lower() if field.lower() == 'ementa' and isinstance(value, str): cleaned_value = clean_html_text(value) if not cleaned_value or not cleaned_value.strip(): return None, False filtered[field_name] = cleaned_value else: filtered[field_name] = value return filtered, True def process_tar_gz(tar_path, output_jsonl, fields_to_keep): logger.info(f"📦 {tar_path.name}") stats = {'total': 0, 'validos': 0} try: with tarfile.open(tar_path, 'r:gz') as tar: for member in tar.getmembers(): if member.name.endswith('jurisprudencias.jsonl') and member.isfile(): logger.info(f" ✅ {member.name}") file_obj = tar.extractfile(member) content = file_obj.read().decode('utf-8') lines = content.strip().split('\n') stats['total'] = len(lines) with open(output_jsonl, 'a', encoding='utf-8') as out: for line in lines: if not line.strip(): continue try: record = json.loads(line) filtered, is_valid = filter_and_validate_record( record, fields_to_keep ) if is_valid: out.write(json.dumps(filtered, ensure_ascii=False) + '\n') stats['validos'] += 1 except: pass logger.info(f" ✅ {stats['validos']}/{stats['total']} válidos") return stats['validos'] return 0 except Exception as e: logger.error(f" ❌ {e}") raise # ============================================================================ # NOVA FUNÇÃO: FRAGMENTAÇÃO AUTOMÁTICA # ============================================================================ def fragment_jsonl(input_file, output_dir, batch_size=2000): """Fragmenta JSONL grande em partes menores para evitar timeout""" logger.info(f"\n📦 FRAGMENTANDO {input_file.name}") logger.info(f" Batch size: {batch_size} registros") output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) fragments = [] current_batch = [] batch_num = 1 total_lines = 0 with open(input_file, 'r', encoding='utf-8') as f: for line in f: if not line.strip(): continue current_batch.append(line) total_lines += 1 # Quando atingir batch_size, salva fragmento if len(current_batch) >= batch_size: fragment_path = output_dir / f"part_{batch_num:03d}.jsonl" with open(fragment_path, 'w', encoding='utf-8') as out: out.writelines(current_batch) fragments.append(fragment_path) logger.info(f" ✅ {fragment_path.name}: {len(current_batch)} registros") current_batch = [] batch_num += 1 # Salva último batch (se houver restos) if current_batch: fragment_path = output_dir / f"part_{batch_num:03d}.jsonl" with open(fragment_path, 'w', encoding='utf-8') as out: out.writelines(current_batch) fragments.append(fragment_path) logger.info(f" ✅ {fragment_path.name}: {len(current_batch)} registros") logger.info(f"\n 📊 RESUMO FRAGMENTAÇÃO:") logger.info(f" Total de registros: {total_lines}") logger.info(f" Total de fragmentos: {len(fragments)}") logger.info(f" Registros/fragmento: ~{total_lines // len(fragments)}") return fragments # ============================================================================ # NOVA FUNÇÃO: BUILD FAISS INCREMENTAL # ============================================================================ def build_faiss_incremental(fragments, faiss_dir, timeout_per_batch=600): """Constrói FAISS index incrementalmente para evitar timeout""" logger.info(f"\n🔨 BUILD FAISS INCREMENTAL") logger.info(f" Total de fragmentos: {len(fragments)}") logger.info(f" Timeout por batch: {timeout_per_batch}s") for i, fragment in enumerate(fragments, 1): logger.info(f"\n [{i}/{len(fragments)}] Processando {fragment.name}...") start = time.time() try: # Primeiro fragmento: cria index novo # Demais: append ao existente append_flag = "--append" if i > 1 else "" cmd = f"python3 rag_builder.py --input {fragment} --output {faiss_dir} {append_flag}" result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=timeout_per_batch, check=True ) elapsed = time.time() - start logger.info(f" ✅ {fragment.name} processado em {elapsed:.1f}s") # Log output do rag_builder if result.stdout: for line in result.stdout.split('\n'): if line.strip(): logger.info(f" {line}") except subprocess.TimeoutExpired: logger.error(f" ❌ TIMEOUT em {fragment.name} ({timeout_per_batch}s)") raise except subprocess.CalledProcessError as e: logger.error(f" ❌ ERRO em {fragment.name}") if e.stderr: logger.error(f" {e.stderr[:500]}") raise logger.info(f"\n✅ FAISS index completo!") # ============================================================================ # MAIN # ============================================================================ def main(): try: logger.info("\n" + "="*80) logger.info("🚀 beeROOT Para.AI v4.0 - SETUP COM BATCHING") logger.info("="*80) if READY_FLAG.exists(): logger.info("✅ Já pronto!") update_status('ready', 'Ready', 100) return # Carrega config with open('config.yaml') as f: config = yaml.safe_load(f) chunk_start = config['chunk_start'] chunk_end = config['chunk_end'] github_repo = config['github_repo'] campos_filter = config['campos_filter'] batch_size = config.get('batch_size', 2000) timeout_per_batch = config.get('timeout_per_batch', 600) logger.info(f"\n⚙️ CONFIGURAÇÃO:") logger.info(f" Chunks: {chunk_start} → {chunk_end}") logger.info(f" Batch size: {batch_size} registros") logger.info(f" Timeout/batch: {timeout_per_batch}s") # Prepara URL base_url = github_repo.replace('https://github.com/', 'https://raw.githubusercontent.com/') if base_url.endswith('.git'): base_url = base_url[:-4] base_url = f"{base_url}/main/chunks_dados" # Diretórios work_dir = Path('/tmp/work') work_dir.mkdir(exist_ok=True) fragments_dir = work_dir / 'fragments' faiss_dir = Path('/home/user/app/faiss_index') faiss_dir.mkdir(parents=True, exist_ok=True) output_jsonl = work_dir / 'all_filtered.jsonl' if output_jsonl.exists(): output_jsonl.unlink() # FASE 1: Download e filtragem logger.info("\n" + "="*80) logger.info("📥 FASE 1: DOWNLOAD E FILTRAGEM") logger.info("="*80) update_status('downloading', 'Downloading chunks', 10) total_validos = 0 for chunk_num in range(chunk_start, chunk_end + 1): chunk_name = f"chunk_dados_{chunk_num:06d}.tar.gz" chunk_url = f"{base_url}/{chunk_name}" chunk_path = work_dir / chunk_name try: run_cmd( f"curl -L -f -o {chunk_path} {chunk_url}", f"Chunk {chunk_num}", timeout=300 ) if chunk_path.exists(): validos = process_tar_gz(chunk_path, output_jsonl, campos_filter) total_validos += validos chunk_path.unlink() except Exception as e: logger.error(f" ❌ Erro no chunk {chunk_num}: {e}") if chunk_path.exists(): chunk_path.unlink() logger.info(f"\n✅ Total de registros válidos: {total_validos}") if total_validos == 0: raise Exception("Nenhum registro válido encontrado!") # FASE 2: Fragmentação logger.info("\n" + "="*80) logger.info("📦 FASE 2: FRAGMENTAÇÃO") logger.info("="*80) update_status('fragmenting', 'Fragmentando JSONL', 50) fragments = fragment_jsonl(output_jsonl, fragments_dir, batch_size) # FASE 3: Build FAISS incremental logger.info("\n" + "="*80) logger.info("🤖 FASE 3: BUILD FAISS INCREMENTAL") logger.info("="*80) update_status('building', 'Building FAISS index', 60) os.chdir('/home/user/app') build_faiss_incremental(fragments, faiss_dir, timeout_per_batch) # Cleanup logger.info("\n🧹 Limpeza...") run_cmd(f"rm -rf {work_dir}", "Cleanup", check=False) # Finalizado! update_status('ready', f'{total_validos} documentos indexados', 100) READY_FLAG.touch() logger.info("\n" + "="*80) logger.info("✅ SETUP COMPLETO!") logger.info("="*80) logger.info(f" 📊 Total: {total_validos} documentos") logger.info(f" 📁 FAISS: {faiss_dir}/") except Exception as e: logger.error(f"\n💥 ERRO FATAL: {type(e).__name__}: {e}") logger.error(traceback.format_exc()) update_status('error', str(e), 0) sys.exit(1) if __name__ == "__main__": main()