Spaces:
Sleeping
Sleeping
| import os, pickle, hashlib | |
| from pathlib import Path | |
| import numpy as np | |
| import chromadb | |
| from sentence_transformers import SentenceTransformer | |
| from rank_bm25 import BM25Okapi | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from config import ( | |
| DOCS_DIR, CHROMA_PATH, CHROMA_COLLECTION, | |
| BM25_PATH, EMBEDDER_NAME, CHUNK_SIZE, CHUNK_OVERLAP | |
| ) | |
| # ββ helpers βββββββββββββββββββββββββββββββββββββββββββ | |
| def read_pdf_text(fpath): | |
| import fitz | |
| doc = fitz.open(fpath) | |
| return "\n".join(page.get_text() for page in doc).strip() | |
| def clean_text(text): | |
| return " ".join(text.split()) | |
| def doc_hash(text: str) -> str: | |
| """SHA-256 of the document β used to skip duplicate ingestion.""" | |
| return hashlib.sha256(text.encode()).hexdigest()[:16] | |
| # ββ loading βββββββββββββββββββββββββββββββββββββββββββ | |
| def load_documents(): | |
| docs, filenames = [], [] | |
| path = Path(DOCS_DIR) | |
| path.mkdir(exist_ok=True) | |
| for fpath in path.glob("*.txt"): | |
| try: | |
| text = clean_text(fpath.read_text(encoding="utf-8")) | |
| docs.append(text); filenames.append(fpath.name) | |
| print(f" Loaded txt: {fpath.name}") | |
| except Exception as e: | |
| print(f" Skipped {fpath.name}: {e}") | |
| for fpath in path.glob("*.pdf"): | |
| try: | |
| text = clean_text(read_pdf_text(fpath)) | |
| if text: | |
| docs.append(text); filenames.append(fpath.name) | |
| print(f" Loaded pdf: {fpath.name}") | |
| except Exception as e: | |
| print(f" Skipped {fpath.name}: {e}") | |
| if not docs: | |
| raise FileNotFoundError( | |
| f"No .txt or .pdf files found in '{DOCS_DIR}'." | |
| ) | |
| print(f"\nLoaded {len(docs)} document(s)") | |
| return docs, filenames | |
| # ββ chunking ββββββββββββββββββββββββββββββββββββββββββ | |
| def semantic_chunk(docs, filenames): | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=CHUNK_OVERLAP, | |
| separators=["\n\n", "\n", ". ", " "], | |
| ) | |
| all_chunks, all_sources = [], [] | |
| for doc, fname in zip(docs, filenames): | |
| chunks = splitter.split_text(doc) | |
| all_chunks.extend(chunks) | |
| all_sources.extend([fname] * len(chunks)) | |
| avg = sum(len(c) for c in all_chunks) // len(all_chunks) | |
| print(f"Created {len(all_chunks)} chunks (avg {avg} chars)") | |
| return all_chunks, all_sources | |
| # ββ indexing ββββββββββββββββββββββββββββββββββββββββββ | |
| def build_and_save_indexes(chunks, sources, model=None): | |
| if model is None: | |
| model = SentenceTransformer(EMBEDDER_NAME) | |
| print("\nBuilding embeddings...") | |
| embeddings = model.encode( | |
| chunks, show_progress_bar=True, batch_size=32 | |
| ).tolist() | |
| # ββ ChromaDB ββ | |
| client = chromadb.PersistentClient(path=CHROMA_PATH) | |
| collection = client.get_or_create_collection( | |
| name=CHROMA_COLLECTION, | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| # Skip chunks already indexed (dedup by content hash) | |
| existing_ids = set(collection.get()["ids"]) | |
| new_chunks, new_embeddings, new_sources, new_ids, new_meta = [], [], [], [], [] | |
| for i, (chunk, emb, src) in enumerate(zip(chunks, embeddings, sources)): | |
| chunk_id = f"doc_{doc_hash(chunk)}" | |
| if chunk_id not in existing_ids: | |
| new_chunks.append(chunk) | |
| new_embeddings.append(emb) | |
| new_sources.append(src) | |
| new_ids.append(chunk_id) | |
| new_meta.append({"source": src}) | |
| if new_chunks: | |
| collection.add( | |
| documents=new_chunks, | |
| embeddings=new_embeddings, | |
| ids=new_ids, | |
| metadatas=new_meta, | |
| ) | |
| print(f"Added {len(new_chunks)} new chunks to ChromaDB") | |
| else: | |
| print("No new chunks β all already indexed") | |
| # ββ BM25 (full rebuild, cheap) ββ | |
| all_chunks_in_db = collection.get()["documents"] | |
| all_sources_in_db = [m["source"] for m in collection.get()["metadatas"]] | |
| tokenized = [c.lower().split() for c in all_chunks_in_db] | |
| bm25_index = BM25Okapi(tokenized) | |
| with open(BM25_PATH, "wb") as f: | |
| pickle.dump({ | |
| "bm25": bm25_index, | |
| "chunks": all_chunks_in_db, | |
| "sources": all_sources_in_db | |
| }, f) | |
| print(f"BM25 saved β {len(all_chunks_in_db)} total chunks") | |
| # ββ entry point βββββββββββββββββββββββββββββββββββββββ | |
| def run_ingestion(model=None): | |
| print("=== Starting ingestion ===\n") | |
| docs, filenames = load_documents() | |
| chunks, sources = semantic_chunk(docs, filenames) | |
| build_and_save_indexes(chunks, sources, model=model) | |
| print("\n=== Ingestion complete ===") | |
| if __name__ == "__main__": | |
| run_ingestion() | |