Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Query Engine - FAISS + SentenceTransformer | |
| Simples e eficiente | |
| """ | |
| import faiss | |
| import numpy as np | |
| import json | |
| from pathlib import Path | |
| from sentence_transformers import SentenceTransformer | |
| import time | |
| class QueryEngine: | |
| def __init__(self, faiss_index_path, data_path): | |
| """Inicializa engine com FAISS e metadata""" | |
| print("🔧 Inicializando QueryEngine...", flush=True) | |
| self.faiss_index_path = Path(faiss_index_path) | |
| self.data_path = Path(data_path) | |
| # Carrega modelo | |
| print(" 📥 Carregando modelo...", flush=True) | |
| self.model = SentenceTransformer("paraphrase-multilingual-mpnet-base-v2") | |
| print(f" ✅ Modelo carregado", flush=True) | |
| # Carrega FAISS index | |
| index_file = self.faiss_index_path / "index.faiss" | |
| if not index_file.exists(): | |
| raise FileNotFoundError(f"FAISS index não encontrado: {index_file}") | |
| print(f" 📥 Carregando FAISS index...", flush=True) | |
| self.index = faiss.read_index(str(index_file)) | |
| print(f" ✅ FAISS: {self.index.ntotal} vetores", flush=True) | |
| # Carrega metadata | |
| print(f" 📥 Carregando metadata...", flush=True) | |
| self.metadata = [] | |
| # Suporta múltiplos arquivos JSONL | |
| metadata_files = sorted(self.data_path.glob("metadata*.jsonl")) | |
| for metadata_file in metadata_files: | |
| print(f" 📄 {metadata_file.name}", flush=True) | |
| with open(metadata_file, 'r') as f: | |
| for line in f: | |
| if line.strip(): | |
| self.metadata.append(json.loads(line)) | |
| print(f" ✅ Metadata: {len(self.metadata)} registros", flush=True) | |
| # Valida sincronização | |
| if len(self.metadata) != self.index.ntotal: | |
| print(f" ⚠️ AVISO: Metadata ({len(self.metadata)}) != FAISS ({self.index.ntotal})") | |
| def search(self, query: str, top_k: int = 10): | |
| """Busca semântica""" | |
| start = time.time() | |
| # Gera embedding da query | |
| query_vec = self.model.encode([query])[0] | |
| query_vec = np.array([query_vec], dtype='float32') | |
| # Busca no FAISS | |
| distances, indices = self.index.search(query_vec, top_k) | |
| # Monta resultados | |
| results = [] | |
| for i, (dist, idx) in enumerate(zip(distances[0], indices[0])): | |
| if idx < len(self.metadata): | |
| result = self.metadata[idx].copy() | |
| result['score'] = float(1 / (1 + dist)) # Normaliza distância | |
| result['rank'] = i + 1 | |
| results.append(result) | |
| elapsed = time.time() - start | |
| print(f" ⚡ Busca: {elapsed:.3f}s", flush=True) | |
| return results | |
| def add_document(self, text: str, filename: str): | |
| """Adiciona novo documento ao índice""" | |
| print(f" 📄 Adicionando: {filename}", flush=True) | |
| # Chunking simples | |
| chunks = self._chunk_text(text) | |
| print(f" ✂️ {len(chunks)} chunks", flush=True) | |
| # Gera embeddings | |
| embeddings = self.model.encode(chunks, show_progress_bar=False) | |
| embeddings = np.array(embeddings, dtype='float32') | |
| # Adiciona ao FAISS | |
| start_idx = self.index.ntotal | |
| self.index.add(embeddings) | |
| # Adiciona metadata | |
| for i, chunk in enumerate(chunks): | |
| self.metadata.append({ | |
| 'text': chunk, | |
| 'source': filename, | |
| 'chunk_idx': i, | |
| 'total_idx': start_idx + i | |
| }) | |
| # Salva (opcional - pode remover se não precisar persistir) | |
| faiss.write_index(self.index, str(self.faiss_index_path / "index.faiss")) | |
| with open(self.data_path / "metadata_new.jsonl", 'a') as f: | |
| for meta in self.metadata[start_idx:]: | |
| f.write(json.dumps(meta, ensure_ascii=False) + '\n') | |
| print(f" ✅ Adicionado ao índice", flush=True) | |
| return len(chunks) | |
| def _chunk_text(self, text: str, chunk_size: int = 800, overlap: int = 200): | |
| """Divide texto em chunks""" | |
| if not text or not text.strip(): | |
| return [] | |
| chunks = [] | |
| start = 0 | |
| text_len = len(text) | |
| while start < text_len: | |
| end = start + chunk_size | |
| chunk = text[start:end].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| start += (chunk_size - overlap) | |
| return chunks if chunks else [text[:chunk_size]] | |