Carlex22's picture
Upload 5 files
82f2cfc verified
#!/usr/bin/env python3
"""
beeROOT Para.AI v4.0 - Setup com BATCHING automático
Evita timeout fragmentando all_filtered.jsonl em batches
"""
import os, sys, yaml, json, subprocess, logging, traceback, time, tarfile, re
from pathlib import Path
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('/tmp/setup_debug.log', mode='w')
]
)
logger = logging.getLogger(__name__)
STATUS_FILE = Path('/tmp/setup_status.json')
READY_FLAG = Path('/tmp/faiss_ready')
def update_status(status, message, progress=0):
data = {
'status': status,
'message': message,
'progress': progress,
'timestamp': datetime.now().isoformat()
}
with open(STATUS_FILE, 'w') as f:
json.dump(data, f)
logger.info(f"STATUS [{progress}%]: {status} - {message}")
def run_cmd(cmd, desc, check=True, timeout=300):
logger.info("="*80)
logger.info(f"🔧 {desc}")
logger.info(f"📝 {cmd}")
logger.info("-"*80)
try:
start = time.time()
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True,
timeout=timeout, check=check
)
elapsed = time.time() - start
logger.info(f"⏱️ {elapsed:.2f}s | Exit: {result.returncode}")
if result.stdout and len(result.stdout.strip()) > 0:
logger.info(f"STDOUT: {result.stdout[:500]}")
if result.stderr and len(result.stderr.strip()) > 0:
logger.warning(f"STDERR: {result.stderr[:500]}")
if result.returncode == 0:
logger.info(f"✅ {desc} - OK")
return result.stdout
except subprocess.CalledProcessError as e:
logger.error(f"❌ FALHOU: {desc}")
raise
def clean_html_text(text):
if not text or not isinstance(text, str):
return ""
text = re.sub(r'<[^>]+>', '', text)
text = re.sub(r'&[a-zA-Z]+;', ' ', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def is_valid_value(value):
if value is None:
return False
if isinstance(value, str):
return bool(value.strip())
if isinstance(value, (int, float)):
return True
return True
def filter_and_validate_record(record, fields_to_keep):
filtered = {}
for field in fields_to_keep:
value = None
if field in record:
value = record[field]
else:
for variant in [field.lower(), field.capitalize(), field.upper()]:
if variant in record:
value = record[variant]
break
if value is None:
return None, False
if not is_valid_value(value):
return None, False
field_name = 'id' if field in ['Id', 'id', 'ID'] else field.lower()
if field.lower() == 'ementa' and isinstance(value, str):
cleaned_value = clean_html_text(value)
if not cleaned_value or not cleaned_value.strip():
return None, False
filtered[field_name] = cleaned_value
else:
filtered[field_name] = value
return filtered, True
def process_tar_gz(tar_path, output_jsonl, fields_to_keep):
logger.info(f"📦 {tar_path.name}")
stats = {'total': 0, 'validos': 0}
try:
with tarfile.open(tar_path, 'r:gz') as tar:
for member in tar.getmembers():
if member.name.endswith('jurisprudencias.jsonl') and member.isfile():
logger.info(f" ✅ {member.name}")
file_obj = tar.extractfile(member)
content = file_obj.read().decode('utf-8')
lines = content.strip().split('\n')
stats['total'] = len(lines)
with open(output_jsonl, 'a', encoding='utf-8') as out:
for line in lines:
if not line.strip():
continue
try:
record = json.loads(line)
filtered, is_valid = filter_and_validate_record(
record, fields_to_keep
)
if is_valid:
out.write(json.dumps(filtered, ensure_ascii=False) + '\n')
stats['validos'] += 1
except:
pass
logger.info(f" ✅ {stats['validos']}/{stats['total']} válidos")
return stats['validos']
return 0
except Exception as e:
logger.error(f" ❌ {e}")
raise
# ============================================================================
# NOVA FUNÇÃO: FRAGMENTAÇÃO AUTOMÁTICA
# ============================================================================
def fragment_jsonl(input_file, output_dir, batch_size=2000):
"""Fragmenta JSONL grande em partes menores para evitar timeout"""
logger.info(f"\n📦 FRAGMENTANDO {input_file.name}")
logger.info(f" Batch size: {batch_size} registros")
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
fragments = []
current_batch = []
batch_num = 1
total_lines = 0
with open(input_file, 'r', encoding='utf-8') as f:
for line in f:
if not line.strip():
continue
current_batch.append(line)
total_lines += 1
# Quando atingir batch_size, salva fragmento
if len(current_batch) >= batch_size:
fragment_path = output_dir / f"part_{batch_num:03d}.jsonl"
with open(fragment_path, 'w', encoding='utf-8') as out:
out.writelines(current_batch)
fragments.append(fragment_path)
logger.info(f" ✅ {fragment_path.name}: {len(current_batch)} registros")
current_batch = []
batch_num += 1
# Salva último batch (se houver restos)
if current_batch:
fragment_path = output_dir / f"part_{batch_num:03d}.jsonl"
with open(fragment_path, 'w', encoding='utf-8') as out:
out.writelines(current_batch)
fragments.append(fragment_path)
logger.info(f" ✅ {fragment_path.name}: {len(current_batch)} registros")
logger.info(f"\n 📊 RESUMO FRAGMENTAÇÃO:")
logger.info(f" Total de registros: {total_lines}")
logger.info(f" Total de fragmentos: {len(fragments)}")
logger.info(f" Registros/fragmento: ~{total_lines // len(fragments)}")
return fragments
# ============================================================================
# NOVA FUNÇÃO: BUILD FAISS INCREMENTAL
# ============================================================================
def build_faiss_incremental(fragments, faiss_dir, timeout_per_batch=600):
"""Constrói FAISS index incrementalmente para evitar timeout"""
logger.info(f"\n🔨 BUILD FAISS INCREMENTAL")
logger.info(f" Total de fragmentos: {len(fragments)}")
logger.info(f" Timeout por batch: {timeout_per_batch}s")
for i, fragment in enumerate(fragments, 1):
logger.info(f"\n [{i}/{len(fragments)}] Processando {fragment.name}...")
start = time.time()
try:
# Primeiro fragmento: cria index novo
# Demais: append ao existente
append_flag = "--append" if i > 1 else ""
cmd = f"python3 rag_builder.py --input {fragment} --output {faiss_dir} {append_flag}"
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=timeout_per_batch,
check=True
)
elapsed = time.time() - start
logger.info(f" ✅ {fragment.name} processado em {elapsed:.1f}s")
# Log output do rag_builder
if result.stdout:
for line in result.stdout.split('\n'):
if line.strip():
logger.info(f" {line}")
except subprocess.TimeoutExpired:
logger.error(f" ❌ TIMEOUT em {fragment.name} ({timeout_per_batch}s)")
raise
except subprocess.CalledProcessError as e:
logger.error(f" ❌ ERRO em {fragment.name}")
if e.stderr:
logger.error(f" {e.stderr[:500]}")
raise
logger.info(f"\n✅ FAISS index completo!")
# ============================================================================
# MAIN
# ============================================================================
def main():
try:
logger.info("\n" + "="*80)
logger.info("🚀 beeROOT Para.AI v4.0 - SETUP COM BATCHING")
logger.info("="*80)
if READY_FLAG.exists():
logger.info("✅ Já pronto!")
update_status('ready', 'Ready', 100)
return
# Carrega config
with open('config.yaml') as f:
config = yaml.safe_load(f)
chunk_start = config['chunk_start']
chunk_end = config['chunk_end']
github_repo = config['github_repo']
campos_filter = config['campos_filter']
batch_size = config.get('batch_size', 2000)
timeout_per_batch = config.get('timeout_per_batch', 600)
logger.info(f"\n⚙️ CONFIGURAÇÃO:")
logger.info(f" Chunks: {chunk_start}{chunk_end}")
logger.info(f" Batch size: {batch_size} registros")
logger.info(f" Timeout/batch: {timeout_per_batch}s")
# Prepara URL
base_url = github_repo.replace('https://github.com/', 'https://raw.githubusercontent.com/')
if base_url.endswith('.git'):
base_url = base_url[:-4]
base_url = f"{base_url}/main/chunks_dados"
# Diretórios
work_dir = Path('/tmp/work')
work_dir.mkdir(exist_ok=True)
fragments_dir = work_dir / 'fragments'
faiss_dir = Path('/home/user/app/faiss_index')
faiss_dir.mkdir(parents=True, exist_ok=True)
output_jsonl = work_dir / 'all_filtered.jsonl'
if output_jsonl.exists():
output_jsonl.unlink()
# FASE 1: Download e filtragem
logger.info("\n" + "="*80)
logger.info("📥 FASE 1: DOWNLOAD E FILTRAGEM")
logger.info("="*80)
update_status('downloading', 'Downloading chunks', 10)
total_validos = 0
for chunk_num in range(chunk_start, chunk_end + 1):
chunk_name = f"chunk_dados_{chunk_num:06d}.tar.gz"
chunk_url = f"{base_url}/{chunk_name}"
chunk_path = work_dir / chunk_name
try:
run_cmd(
f"curl -L -f -o {chunk_path} {chunk_url}",
f"Chunk {chunk_num}",
timeout=300
)
if chunk_path.exists():
validos = process_tar_gz(chunk_path, output_jsonl, campos_filter)
total_validos += validos
chunk_path.unlink()
except Exception as e:
logger.error(f" ❌ Erro no chunk {chunk_num}: {e}")
if chunk_path.exists():
chunk_path.unlink()
logger.info(f"\n✅ Total de registros válidos: {total_validos}")
if total_validos == 0:
raise Exception("Nenhum registro válido encontrado!")
# FASE 2: Fragmentação
logger.info("\n" + "="*80)
logger.info("📦 FASE 2: FRAGMENTAÇÃO")
logger.info("="*80)
update_status('fragmenting', 'Fragmentando JSONL', 50)
fragments = fragment_jsonl(output_jsonl, fragments_dir, batch_size)
# FASE 3: Build FAISS incremental
logger.info("\n" + "="*80)
logger.info("🤖 FASE 3: BUILD FAISS INCREMENTAL")
logger.info("="*80)
update_status('building', 'Building FAISS index', 60)
os.chdir('/home/user/app')
build_faiss_incremental(fragments, faiss_dir, timeout_per_batch)
# Cleanup
logger.info("\n🧹 Limpeza...")
run_cmd(f"rm -rf {work_dir}", "Cleanup", check=False)
# Finalizado!
update_status('ready', f'{total_validos} documentos indexados', 100)
READY_FLAG.touch()
logger.info("\n" + "="*80)
logger.info("✅ SETUP COMPLETO!")
logger.info("="*80)
logger.info(f" 📊 Total: {total_validos} documentos")
logger.info(f" 📁 FAISS: {faiss_dir}/")
except Exception as e:
logger.error(f"\n💥 ERRO FATAL: {type(e).__name__}: {e}")
logger.error(traceback.format_exc())
update_status('error', str(e), 0)
sys.exit(1)
if __name__ == "__main__":
main()