Ragcore / app /core /bm25.py
NinjainPJs's picture
Initial deploy: RagCore RAG system with hybrid search and Gradio UI
a34068e
import logging
import re
import time
from rank_bm25 import BM25Okapi
from app.models.document import Chunk
logger = logging.getLogger(__name__)
STOP_WORDS = {
"a", "an", "the", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will", "would", "could",
"should", "may", "might", "can", "shall", "to", "of", "in", "for",
"on", "with", "at", "by", "from", "as", "into", "through", "during",
"before", "after", "and", "but", "or", "not", "no", "if", "then",
"than", "that", "this", "it", "its", "he", "she", "they", "we", "you",
}
def tokenize(text: str) -> list[str]:
text = text.lower()
words = re.findall(r"\b\w+\b", text)
return [w for w in words if w not in STOP_WORDS and len(w) > 1]
class BM25Index:
def __init__(self):
self.documents: list[dict] = []
self.index: BM25Okapi | None = None
def build_index(self, chunks: list[Chunk]) -> None:
self.documents = [
{
"chunk_id": chunk.chunk_id,
"document_id": chunk.document_id,
"text": chunk.text,
"tokens": tokenize(chunk.text),
"metadata": chunk.metadata.model_dump() if chunk.metadata else {},
}
for chunk in chunks
]
if self.documents:
corpus = [doc["tokens"] for doc in self.documents]
self.index = BM25Okapi(corpus)
logger.info(f"Built BM25 index with {len(self.documents)} documents")
def add_documents(self, chunks: list[Chunk]) -> None:
new_docs = [
{
"chunk_id": chunk.chunk_id,
"document_id": chunk.document_id,
"text": chunk.text,
"tokens": tokenize(chunk.text),
"metadata": chunk.metadata.model_dump() if chunk.metadata else {},
}
for chunk in chunks
]
self.documents.extend(new_docs)
if self.documents:
corpus = [doc["tokens"] for doc in self.documents]
self.index = BM25Okapi(corpus)
logger.info(f"BM25 index updated: {len(self.documents)} total documents")
def search(self, query: str, top_k: int = 10) -> list[dict]:
if not self.index or not self.documents:
return []
tokens = tokenize(query)
if not tokens:
return []
scores = self.index.get_scores(tokens)
scored_docs = [
(score, doc) for score, doc in zip(scores, self.documents) if score > 0
]
scored_docs.sort(key=lambda x: x[0], reverse=True)
return [
{
"chunk_id": doc["chunk_id"],
"document_id": doc["document_id"],
"text": doc["text"],
"score": float(score),
"metadata": doc["metadata"],
}
for score, doc in scored_docs[:top_k]
]
def rebuild_from_vectorstore(self, vectorstore) -> None:
start = time.perf_counter()
all_points = vectorstore.scroll_all()
self.documents = [
{
"chunk_id": p["chunk_id"],
"document_id": p["document_id"],
"text": p["text"],
"tokens": tokenize(p["text"]),
"metadata": p["metadata"],
}
for p in all_points
if p.get("text")
]
if self.documents:
corpus = [doc["tokens"] for doc in self.documents]
self.index = BM25Okapi(corpus)
elapsed = (time.perf_counter() - start) * 1000
logger.info(
f"Rebuilt BM25 index from vectorstore: {len(self.documents)} docs in {elapsed:.0f}ms"
)
@property
def doc_count(self) -> int:
return len(self.documents)
_bm25: BM25Index | None = None
def get_bm25() -> BM25Index:
global _bm25
if _bm25 is None:
_bm25 = BM25Index()
return _bm25