Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| beeROOT Para.AI v4.0 - Setup com BATCHING automático | |
| Evita timeout fragmentando all_filtered.jsonl em batches | |
| """ | |
| import os, sys, yaml, json, subprocess, logging, traceback, time, tarfile, re | |
| from pathlib import Path | |
| from datetime import datetime | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s [%(levelname)s] %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| logging.FileHandler('/tmp/setup_debug.log', mode='w') | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| STATUS_FILE = Path('/tmp/setup_status.json') | |
| READY_FLAG = Path('/tmp/faiss_ready') | |
| def update_status(status, message, progress=0): | |
| data = { | |
| 'status': status, | |
| 'message': message, | |
| 'progress': progress, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| with open(STATUS_FILE, 'w') as f: | |
| json.dump(data, f) | |
| logger.info(f"STATUS [{progress}%]: {status} - {message}") | |
| def run_cmd(cmd, desc, check=True, timeout=300): | |
| logger.info("="*80) | |
| logger.info(f"🔧 {desc}") | |
| logger.info(f"📝 {cmd}") | |
| logger.info("-"*80) | |
| try: | |
| start = time.time() | |
| result = subprocess.run( | |
| cmd, shell=True, capture_output=True, text=True, | |
| timeout=timeout, check=check | |
| ) | |
| elapsed = time.time() - start | |
| logger.info(f"⏱️ {elapsed:.2f}s | Exit: {result.returncode}") | |
| if result.stdout and len(result.stdout.strip()) > 0: | |
| logger.info(f"STDOUT: {result.stdout[:500]}") | |
| if result.stderr and len(result.stderr.strip()) > 0: | |
| logger.warning(f"STDERR: {result.stderr[:500]}") | |
| if result.returncode == 0: | |
| logger.info(f"✅ {desc} - OK") | |
| return result.stdout | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"❌ FALHOU: {desc}") | |
| raise | |
| def clean_html_text(text): | |
| if not text or not isinstance(text, str): | |
| return "" | |
| text = re.sub(r'<[^>]+>', '', text) | |
| text = re.sub(r'&[a-zA-Z]+;', ' ', text) | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def is_valid_value(value): | |
| if value is None: | |
| return False | |
| if isinstance(value, str): | |
| return bool(value.strip()) | |
| if isinstance(value, (int, float)): | |
| return True | |
| return True | |
| def filter_and_validate_record(record, fields_to_keep): | |
| filtered = {} | |
| for field in fields_to_keep: | |
| value = None | |
| if field in record: | |
| value = record[field] | |
| else: | |
| for variant in [field.lower(), field.capitalize(), field.upper()]: | |
| if variant in record: | |
| value = record[variant] | |
| break | |
| if value is None: | |
| return None, False | |
| if not is_valid_value(value): | |
| return None, False | |
| field_name = 'id' if field in ['Id', 'id', 'ID'] else field.lower() | |
| if field.lower() == 'ementa' and isinstance(value, str): | |
| cleaned_value = clean_html_text(value) | |
| if not cleaned_value or not cleaned_value.strip(): | |
| return None, False | |
| filtered[field_name] = cleaned_value | |
| else: | |
| filtered[field_name] = value | |
| return filtered, True | |
| def process_tar_gz(tar_path, output_jsonl, fields_to_keep): | |
| logger.info(f"📦 {tar_path.name}") | |
| stats = {'total': 0, 'validos': 0} | |
| try: | |
| with tarfile.open(tar_path, 'r:gz') as tar: | |
| for member in tar.getmembers(): | |
| if member.name.endswith('jurisprudencias.jsonl') and member.isfile(): | |
| logger.info(f" ✅ {member.name}") | |
| file_obj = tar.extractfile(member) | |
| content = file_obj.read().decode('utf-8') | |
| lines = content.strip().split('\n') | |
| stats['total'] = len(lines) | |
| with open(output_jsonl, 'a', encoding='utf-8') as out: | |
| for line in lines: | |
| if not line.strip(): | |
| continue | |
| try: | |
| record = json.loads(line) | |
| filtered, is_valid = filter_and_validate_record( | |
| record, fields_to_keep | |
| ) | |
| if is_valid: | |
| out.write(json.dumps(filtered, ensure_ascii=False) + '\n') | |
| stats['validos'] += 1 | |
| except: | |
| pass | |
| logger.info(f" ✅ {stats['validos']}/{stats['total']} válidos") | |
| return stats['validos'] | |
| return 0 | |
| except Exception as e: | |
| logger.error(f" ❌ {e}") | |
| raise | |
| # ============================================================================ | |
| # NOVA FUNÇÃO: FRAGMENTAÇÃO AUTOMÁTICA | |
| # ============================================================================ | |
| def fragment_jsonl(input_file, output_dir, batch_size=2000): | |
| """Fragmenta JSONL grande em partes menores para evitar timeout""" | |
| logger.info(f"\n📦 FRAGMENTANDO {input_file.name}") | |
| logger.info(f" Batch size: {batch_size} registros") | |
| output_dir = Path(output_dir) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| fragments = [] | |
| current_batch = [] | |
| batch_num = 1 | |
| total_lines = 0 | |
| with open(input_file, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| if not line.strip(): | |
| continue | |
| current_batch.append(line) | |
| total_lines += 1 | |
| # Quando atingir batch_size, salva fragmento | |
| if len(current_batch) >= batch_size: | |
| fragment_path = output_dir / f"part_{batch_num:03d}.jsonl" | |
| with open(fragment_path, 'w', encoding='utf-8') as out: | |
| out.writelines(current_batch) | |
| fragments.append(fragment_path) | |
| logger.info(f" ✅ {fragment_path.name}: {len(current_batch)} registros") | |
| current_batch = [] | |
| batch_num += 1 | |
| # Salva último batch (se houver restos) | |
| if current_batch: | |
| fragment_path = output_dir / f"part_{batch_num:03d}.jsonl" | |
| with open(fragment_path, 'w', encoding='utf-8') as out: | |
| out.writelines(current_batch) | |
| fragments.append(fragment_path) | |
| logger.info(f" ✅ {fragment_path.name}: {len(current_batch)} registros") | |
| logger.info(f"\n 📊 RESUMO FRAGMENTAÇÃO:") | |
| logger.info(f" Total de registros: {total_lines}") | |
| logger.info(f" Total de fragmentos: {len(fragments)}") | |
| logger.info(f" Registros/fragmento: ~{total_lines // len(fragments)}") | |
| return fragments | |
| # ============================================================================ | |
| # NOVA FUNÇÃO: BUILD FAISS INCREMENTAL | |
| # ============================================================================ | |
| def build_faiss_incremental(fragments, faiss_dir, timeout_per_batch=600): | |
| """Constrói FAISS index incrementalmente para evitar timeout""" | |
| logger.info(f"\n🔨 BUILD FAISS INCREMENTAL") | |
| logger.info(f" Total de fragmentos: {len(fragments)}") | |
| logger.info(f" Timeout por batch: {timeout_per_batch}s") | |
| for i, fragment in enumerate(fragments, 1): | |
| logger.info(f"\n [{i}/{len(fragments)}] Processando {fragment.name}...") | |
| start = time.time() | |
| try: | |
| # Primeiro fragmento: cria index novo | |
| # Demais: append ao existente | |
| append_flag = "--append" if i > 1 else "" | |
| cmd = f"python3 rag_builder.py --input {fragment} --output {faiss_dir} {append_flag}" | |
| result = subprocess.run( | |
| cmd, | |
| shell=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout_per_batch, | |
| check=True | |
| ) | |
| elapsed = time.time() - start | |
| logger.info(f" ✅ {fragment.name} processado em {elapsed:.1f}s") | |
| # Log output do rag_builder | |
| if result.stdout: | |
| for line in result.stdout.split('\n'): | |
| if line.strip(): | |
| logger.info(f" {line}") | |
| except subprocess.TimeoutExpired: | |
| logger.error(f" ❌ TIMEOUT em {fragment.name} ({timeout_per_batch}s)") | |
| raise | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f" ❌ ERRO em {fragment.name}") | |
| if e.stderr: | |
| logger.error(f" {e.stderr[:500]}") | |
| raise | |
| logger.info(f"\n✅ FAISS index completo!") | |
| # ============================================================================ | |
| # MAIN | |
| # ============================================================================ | |
| def main(): | |
| try: | |
| logger.info("\n" + "="*80) | |
| logger.info("🚀 beeROOT Para.AI v4.0 - SETUP COM BATCHING") | |
| logger.info("="*80) | |
| if READY_FLAG.exists(): | |
| logger.info("✅ Já pronto!") | |
| update_status('ready', 'Ready', 100) | |
| return | |
| # Carrega config | |
| with open('config.yaml') as f: | |
| config = yaml.safe_load(f) | |
| chunk_start = config['chunk_start'] | |
| chunk_end = config['chunk_end'] | |
| github_repo = config['github_repo'] | |
| campos_filter = config['campos_filter'] | |
| batch_size = config.get('batch_size', 2000) | |
| timeout_per_batch = config.get('timeout_per_batch', 600) | |
| logger.info(f"\n⚙️ CONFIGURAÇÃO:") | |
| logger.info(f" Chunks: {chunk_start} → {chunk_end}") | |
| logger.info(f" Batch size: {batch_size} registros") | |
| logger.info(f" Timeout/batch: {timeout_per_batch}s") | |
| # Prepara URL | |
| base_url = github_repo.replace('https://github.com/', 'https://raw.githubusercontent.com/') | |
| if base_url.endswith('.git'): | |
| base_url = base_url[:-4] | |
| base_url = f"{base_url}/main/chunks_dados" | |
| # Diretórios | |
| work_dir = Path('/tmp/work') | |
| work_dir.mkdir(exist_ok=True) | |
| fragments_dir = work_dir / 'fragments' | |
| faiss_dir = Path('/home/user/app/faiss_index') | |
| faiss_dir.mkdir(parents=True, exist_ok=True) | |
| output_jsonl = work_dir / 'all_filtered.jsonl' | |
| if output_jsonl.exists(): | |
| output_jsonl.unlink() | |
| # FASE 1: Download e filtragem | |
| logger.info("\n" + "="*80) | |
| logger.info("📥 FASE 1: DOWNLOAD E FILTRAGEM") | |
| logger.info("="*80) | |
| update_status('downloading', 'Downloading chunks', 10) | |
| total_validos = 0 | |
| for chunk_num in range(chunk_start, chunk_end + 1): | |
| chunk_name = f"chunk_dados_{chunk_num:06d}.tar.gz" | |
| chunk_url = f"{base_url}/{chunk_name}" | |
| chunk_path = work_dir / chunk_name | |
| try: | |
| run_cmd( | |
| f"curl -L -f -o {chunk_path} {chunk_url}", | |
| f"Chunk {chunk_num}", | |
| timeout=300 | |
| ) | |
| if chunk_path.exists(): | |
| validos = process_tar_gz(chunk_path, output_jsonl, campos_filter) | |
| total_validos += validos | |
| chunk_path.unlink() | |
| except Exception as e: | |
| logger.error(f" ❌ Erro no chunk {chunk_num}: {e}") | |
| if chunk_path.exists(): | |
| chunk_path.unlink() | |
| logger.info(f"\n✅ Total de registros válidos: {total_validos}") | |
| if total_validos == 0: | |
| raise Exception("Nenhum registro válido encontrado!") | |
| # FASE 2: Fragmentação | |
| logger.info("\n" + "="*80) | |
| logger.info("📦 FASE 2: FRAGMENTAÇÃO") | |
| logger.info("="*80) | |
| update_status('fragmenting', 'Fragmentando JSONL', 50) | |
| fragments = fragment_jsonl(output_jsonl, fragments_dir, batch_size) | |
| # FASE 3: Build FAISS incremental | |
| logger.info("\n" + "="*80) | |
| logger.info("🤖 FASE 3: BUILD FAISS INCREMENTAL") | |
| logger.info("="*80) | |
| update_status('building', 'Building FAISS index', 60) | |
| os.chdir('/home/user/app') | |
| build_faiss_incremental(fragments, faiss_dir, timeout_per_batch) | |
| # Cleanup | |
| logger.info("\n🧹 Limpeza...") | |
| run_cmd(f"rm -rf {work_dir}", "Cleanup", check=False) | |
| # Finalizado! | |
| update_status('ready', f'{total_validos} documentos indexados', 100) | |
| READY_FLAG.touch() | |
| logger.info("\n" + "="*80) | |
| logger.info("✅ SETUP COMPLETO!") | |
| logger.info("="*80) | |
| logger.info(f" 📊 Total: {total_validos} documentos") | |
| logger.info(f" 📁 FAISS: {faiss_dir}/") | |
| except Exception as e: | |
| logger.error(f"\n💥 ERRO FATAL: {type(e).__name__}: {e}") | |
| logger.error(traceback.format_exc()) | |
| update_status('error', str(e), 0) | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |