Carlex22's picture
Upload 6 files
b74f975 verified
#!/usr/bin/env python3
"""
beeROOT Setup v3.6 - Com fragmentação de chunks
Processa chunks do GitHub e constrói índice FAISS
"""
import os, sys, yaml, json, subprocess, logging, traceback, time, tarfile, re
from pathlib import Path
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('/tmp/setup_debug.log', mode='w')
]
)
logger = logging.getLogger(__name__)
STATUS_FILE = Path('/tmp/setup_status.json')
READY_FLAG = Path('/tmp/faiss_ready')
def update_status(status, message, progress=0):
data = {
'status': status,
'message': message,
'progress': progress,
'timestamp': datetime.now().isoformat()
}
with open(STATUS_FILE, 'w') as f:
json.dump(data, f)
logger.info(f"STATUS [{progress}%]: {status} - {message}")
def run_cmd(cmd, desc, check=True, timeout=300):
logger.info("="*80)
logger.info(f"🔧 {desc}")
logger.info(f"📝 {cmd}")
logger.info("-"*80)
try:
start = time.time()
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=timeout,
check=check
)
elapsed = time.time() - start
logger.info(f"⏱️ {elapsed:.2f}s | Exit: {result.returncode}")
if result.stdout and len(result.stdout.strip()) > 0:
logger.info(f"STDOUT: {result.stdout[:500]}")
if result.stderr and len(result.stderr.strip()) > 0:
logger.warning(f"STDERR: {result.stderr[:500]}")
if result.returncode == 0:
logger.info(f"✅ {desc} - OK")
return result.stdout
except subprocess.CalledProcessError as e:
logger.error(f"❌ FALHOU: {desc}")
raise
def clean_html_text(text):
"""Remove tags HTML e limpa texto"""
if not text or not isinstance(text, str):
return ""
text = re.sub(r'<[^>]+>', '', text)
text = re.sub(r'&[a-zA-Z]+;', ' ', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def is_valid_value(value):
"""Valida se valor é válido"""
if value is None:
return False
if isinstance(value, str):
return bool(value.strip())
if isinstance(value, (int, float)):
return True
return True
def filter_and_validate_record(record, fields_to_keep):
"""Filtra e valida um registro JSON"""
filtered = {}
for field in fields_to_keep:
value = None
# Tenta variações do nome do campo
if field in record:
value = record[field]
else:
for variant in [field.lower(), field.capitalize(), field.upper()]:
if variant in record:
value = record[variant]
break
if value is None:
return None, False
if not is_valid_value(value):
return None, False
# Nome padronizado
field_name = 'id' if field in ['Id', 'id', 'ID'] else field.lower()
# Limpa HTML da ementa
if field.lower() == 'ementa' and isinstance(value, str):
cleaned_value = clean_html_text(value)
if not cleaned_value or not cleaned_value.strip():
return None, False
filtered[field_name] = cleaned_value
else:
filtered[field_name] = value
return filtered, True
def process_tar_gz(tar_path, output_jsonl, fields_to_keep, max_records_per_file=None):
"""
Processa arquivo tar.gz e extrai registros
Args:
tar_path: Path do arquivo tar.gz
output_jsonl: Path do arquivo JSONL de saída
fields_to_keep: Lista de campos para manter
max_records_per_file: Máximo de registros por arquivo (None = sem limite)
"""
logger.info(f"📦 {tar_path.name}")
stats = {'total': 0, 'validos': 0}
file_counter = 1
current_file_records = 0
current_output = output_jsonl
try:
with tarfile.open(tar_path, 'r:gz') as tar:
for member in tar.getmembers():
if member.name.endswith('jurisprudencias.jsonl') and member.isfile():
logger.info(f" ✅ {member.name}")
file_obj = tar.extractfile(member)
content = file_obj.read().decode('utf-8')
lines = content.strip().split('\n')
stats['total'] = len(lines)
# Abre arquivo para append
out_file = open(current_output, 'a', encoding='utf-8')
for line in lines:
if not line.strip():
continue
try:
record = json.loads(line)
filtered, is_valid = filter_and_validate_record(
record,
fields_to_keep
)
if is_valid:
# Fragmentação por número de registros
if max_records_per_file and current_file_records >= max_records_per_file:
out_file.close()
# Cria novo arquivo fragmentado
file_counter += 1
base = output_jsonl.stem
ext = output_jsonl.suffix
current_output = output_jsonl.parent / f"{base}_part{file_counter:03d}{ext}"
logger.info(f" 📝 Novo fragmento: {current_output.name}")
out_file = open(current_output, 'w', encoding='utf-8')
current_file_records = 0
out_file.write(json.dumps(filtered, ensure_ascii=False) + '\n')
stats['validos'] += 1
current_file_records += 1
except Exception as e:
logger.debug(f"Erro processando linha: {e}")
pass
out_file.close()
logger.info(f" ✅ {stats['validos']}/{stats['total']} válidos")
return stats['validos']
except Exception as e:
logger.error(f" ❌ {e}")
raise
def main():
try:
logger.info("\n" + "="*80)
logger.info("🐝 beeROOT SETUP v3.6")
logger.info("="*80)
# Verifica se já foi executado
if READY_FLAG.exists():
logger.info("✅ Setup já foi executado anteriormente")
update_status('ready', 'Ready', 100)
return
# Carrega configuração
with open('config.yaml') as f:
config = yaml.safe_load(f)
chunk_start = config['chunk_start']
chunk_end = config['chunk_end']
github_repo = config['github_repo']
campos_filter = config['campos_filter']
# NOVA OPÇÃO: fragmentação por número de registros
max_records_per_file = config.get('max_records_per_file', None)
if max_records_per_file:
logger.info(f"📊 Fragmentação ativada: {max_records_per_file} registros/arquivo")
# Prepara URL base
base_url = github_repo.replace('https://github.com/', 'https://raw.githubusercontent.com/')
if base_url.endswith('.git'):
base_url = base_url[:-4]
base_url = f"{base_url}/main/chunks_dados"
# Diretório de trabalho
work_dir = Path('/tmp/work')
work_dir.mkdir(exist_ok=True)
output_jsonl = work_dir / 'all_filtered.jsonl'
if output_jsonl.exists():
output_jsonl.unlink()
logger.info("\n📥 DOWNLOAD E PROCESSAMENTO")
logger.info(f" Chunks: {chunk_start} a {chunk_end}")
update_status('downloading', f'Processando chunks {chunk_start}-{chunk_end}', 10)
total_validos = 0
for chunk_num in range(chunk_start, chunk_end + 1):
chunk_name = f"chunk_dados_{chunk_num:06d}.tar.gz"
chunk_url = f"{base_url}/{chunk_name}"
chunk_path = work_dir / chunk_name
progress = 10 + int(((chunk_num - chunk_start + 1) / (chunk_end - chunk_start + 1)) * 50)
update_status('processing', f'Chunk {chunk_num}/{chunk_end}', progress)
try:
# Download
run_cmd(
f"curl -L -f -o {chunk_path} {chunk_url}",
f"Download chunk {chunk_num}",
timeout=300
)
# Processa
if chunk_path.exists():
validos = process_tar_gz(
chunk_path,
output_jsonl,
campos_filter,
max_records_per_file
)
total_validos += validos
chunk_path.unlink()
except Exception as e:
logger.error(f" ❌ Erro no chunk {chunk_num}: {e}")
if chunk_path.exists():
chunk_path.unlink()
logger.info(f"\n✅ Total de registros válidos: {total_validos:,}")
if total_validos == 0:
raise Exception("Nenhum registro válido encontrado!")
# Build FAISS
logger.info("\n🤖 CONSTRUINDO ÍNDICE FAISS")
update_status('building', 'Building FAISS index', 70)
os.chdir('/home/user/app')
# Processa todos os fragmentos se houver
jsonl_files = list(work_dir.glob('all_filtered*.jsonl'))
logger.info(f" 📁 {len(jsonl_files)} arquivo(s) JSONL para processar")
for jsonl_file in jsonl_files:
logger.info(f" 📝 Processando: {jsonl_file.name}")
result = subprocess.run(
f"python3 rag_builder.py --input {jsonl_file}",
shell=True,
capture_output=True,
text=True,
timeout=900
)
if result.stdout:
for line in result.stdout.split('\n'):
if line.strip():
logger.info(f" {line}")
if result.stderr:
for line in result.stderr.split('\n'):
if line.strip():
logger.warning(f" {line}")
if result.returncode != 0:
raise Exception(f"Build falhou: exit {result.returncode}")
logger.info("✅ FAISS build completo!")
# Cleanup
run_cmd(f"rm -rf {work_dir}", "Cleanup", check=False)
# Marca como pronto
update_status('ready', f'{total_validos:,} documentos indexados', 100)
READY_FLAG.touch()
logger.info("\n" + "="*80)
logger.info("🎉 SETUP CONCLUÍDO COM SUCESSO!")
logger.info("="*80)
except Exception as e:
logger.error(f"\n💥 ERRO FATAL: {e}")
logger.error(traceback.format_exc())
update_status('error', str(e), 0)
sys.exit(1)
if __name__ == "__main__":
main()