Spaces:
Sleeping
Sleeping
| # chroma.py (minimal, no visualization, WITH sentence-transformers, with .env) | |
| import os | |
| import warnings | |
| from pathlib import Path | |
| from typing import List, Dict | |
| import pandas as pd # (currently unused but kept if you need it later) | |
| from dotenv import load_dotenv | |
| from llama_parse import LlamaParse | |
| from llama_index.core.node_parser import SentenceSplitter | |
| import chromadb | |
| from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction | |
| from openai import OpenAI | |
| import nest_asyncio | |
| nest_asyncio.apply() | |
| warnings.filterwarnings("ignore") | |
| # ---------- LOAD .env ---------- | |
| load_dotenv() | |
| # ---------- CONFIG ---------- | |
| CONFIG = { | |
| "pdf_directory": r"C:\Users\Legion\Documents\Ominimo Job\Pdfs for RAG", | |
| "output_directory": "./output/", | |
| "llm_model": "gpt-4.1-mini", | |
| "chunk_size": 512, | |
| "chunk_overlap": 50, | |
| "top_k_retrieval": 3, | |
| # ✅ SentenceTransformer embedding model (384-D for MiniLM) | |
| # Must match your retrieval embedding model. | |
| "embedding_model": "all-MiniLM-L6-v2", | |
| # Optional: force device ("cpu" or "cuda") | |
| "embedding_device": os.getenv("EMB_DEVICE", "cpu"), | |
| } | |
| Path(CONFIG["output_directory"]).mkdir(parents=True, exist_ok=True) | |
| # ---------- OPENAI CLIENT (for summaries only) ---------- | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
| if not OPENAI_API_KEY: | |
| raise RuntimeError("OPENAI_API_KEY is not set in the environment or .env file.") | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| document_summaries: Dict[str, str] = {} | |
| def summarize_document(text: str, client: OpenAI, model: str) -> str: | |
| """Generate a summary of the document using OpenAI (used only for summaries).""" | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are a helpful assistant that creates concise " | |
| "summaries of documents." | |
| ), | |
| }, | |
| { | |
| "role": "user", | |
| "content": ( | |
| "Please provide a comprehensive summary of the " | |
| "following document:\n\n" | |
| f"{text[:4000]}" | |
| ), | |
| }, | |
| ], | |
| temperature=0.3, | |
| max_tokens=500, | |
| ) | |
| return response.choices[0].message.content | |
| # ---------- PDF PARSING ---------- | |
| def parse_pdfs_with_llamaparse(pdf_directory: str) -> List[Dict]: | |
| """Parse PDFs using LlamaParse with batch processing.""" | |
| pdf_files = list(Path(pdf_directory).glob("*.pdf")) | |
| print(f"Found {len(pdf_files)} PDF files") | |
| llama_key = os.environ.get("LLAMA_CLOUD_API_KEY") | |
| if not llama_key: | |
| raise RuntimeError("LLAMA_CLOUD_API_KEY is not set in the environment or .env.") | |
| parser = LlamaParse( | |
| api_key=llama_key, | |
| result_type="markdown", | |
| verbose=True, | |
| language="en", | |
| num_workers=4, | |
| ) | |
| all_documents: List[Dict] = [] | |
| try: | |
| print("\nParsing all PDFs in batch...") | |
| pdf_paths = [str(pdf) for pdf in pdf_files] | |
| documents_batch = parser.load_data(pdf_paths) | |
| print(f"✓ Successfully parsed {len(documents_batch)} document sections") | |
| doc_index = 0 | |
| for pdf_path in pdf_files: | |
| print(f"\nProcessing: {pdf_path.name}") | |
| pdf_docs = [] | |
| while doc_index < len(documents_batch): | |
| doc = documents_batch[doc_index] | |
| if hasattr(doc, "metadata") and doc.metadata.get("file_path"): | |
| if pdf_path.name in doc.metadata.get("file_path", ""): | |
| pdf_docs.append(doc) | |
| doc_index += 1 | |
| else: | |
| break | |
| else: | |
| pdf_docs.append(doc) | |
| doc_index += 1 | |
| if doc_index >= len(documents_batch): | |
| break | |
| if pdf_docs: | |
| full_text = " ".join([d.text for d in pdf_docs]) | |
| summary = summarize_document(full_text, client, CONFIG["llm_model"]) | |
| document_summaries[pdf_path.name] = summary | |
| print(f"Summary for {pdf_path.name}:") | |
| print(summary[:200] + "...\n") | |
| for d in pdf_docs: | |
| all_documents.append( | |
| { | |
| "text": d.text, | |
| "source": pdf_path.name, | |
| "metadata": d.metadata if hasattr(d, "metadata") else {}, | |
| } | |
| ) | |
| else: | |
| print(f"Warning: No content extracted from {pdf_path.name}") | |
| document_summaries[pdf_path.name] = "No content extracted" | |
| except Exception as e: | |
| print(f"Batch processing failed: {str(e)}") | |
| print("\nFalling back to individual file processing with sleep delays...") | |
| import time | |
| for pdf_path in pdf_files: | |
| print(f"\nParsing: {pdf_path.name}") | |
| try: | |
| time.sleep(2) | |
| documents = parser.load_data(str(pdf_path)) | |
| if documents: | |
| full_text = " ".join([d.text for d in documents]) | |
| summary = summarize_document(full_text, client, CONFIG["llm_model"]) | |
| document_summaries[pdf_path.name] = summary | |
| print(f"Summary for {pdf_path.name}:") | |
| print(summary[:200] + "...\n") | |
| for d in documents: | |
| all_documents.append( | |
| { | |
| "text": d.text, | |
| "source": pdf_path.name, | |
| "metadata": d.metadata if hasattr(d, "metadata") else {}, | |
| } | |
| ) | |
| else: | |
| print(f"Warning: No content extracted from {pdf_path.name}") | |
| document_summaries[pdf_path.name] = "No content extracted" | |
| except Exception as e2: | |
| print(f"Error parsing {pdf_path.name}: {str(e2)}") | |
| document_summaries[pdf_path.name] = f"Failed to parse: {str(e2)}" | |
| continue | |
| return all_documents | |
| # ---------- CHUNKING ---------- | |
| def chunk_documents( | |
| documents: List[Dict], | |
| chunk_size: int = 512, | |
| chunk_overlap: int = 50, | |
| ) -> List[Dict]: | |
| """Chunk documents using semantic splitting.""" | |
| text_splitter = SentenceSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| ) | |
| all_chunks: List[Dict] = [] | |
| chunk_id = 0 | |
| for doc in documents: | |
| chunks = text_splitter.split_text(doc["text"]) | |
| for chunk in chunks: | |
| all_chunks.append( | |
| { | |
| "chunk_id": f"chunk_{chunk_id}", | |
| "text": chunk, | |
| "source": doc["source"], | |
| "metadata": doc["metadata"], | |
| } | |
| ) | |
| chunk_id += 1 | |
| return all_chunks | |
| # ---------- CHROMA (SBERT EMBEDDINGS, 384-D) ---------- | |
| def create_chromadb_collection( | |
| chunks: List[Dict], | |
| collection_name: str = "rag_documents", | |
| ) -> chromadb.Collection: | |
| """Create and populate ChromaDB collection using SentenceTransformer embeddings.""" | |
| sbert_ef = SentenceTransformerEmbeddingFunction( | |
| model_name=CONFIG["embedding_model"], | |
| device=CONFIG["embedding_device"], | |
| ) | |
| client_db = chromadb.PersistentClient( | |
| path=os.path.join(CONFIG["output_directory"], "chromadb") | |
| ) | |
| # ✅ Delete existing collection to avoid old 1536-D vectors | |
| try: | |
| client_db.delete_collection(collection_name) | |
| print(f"Deleted existing collection: {collection_name}") | |
| except Exception: | |
| pass | |
| collection = client_db.create_collection( | |
| name=collection_name, | |
| metadata={ | |
| "description": "RAG document chunks", | |
| "embedding_model": CONFIG["embedding_model"], | |
| "embedding_dim": 384, # MiniLM dim | |
| }, | |
| embedding_function=sbert_ef, | |
| ) | |
| ids = [chunk["chunk_id"] for chunk in chunks] | |
| documents = [chunk["text"] for chunk in chunks] | |
| metadatas = [ | |
| {"source": chunk["source"], **(chunk["metadata"] or {})} | |
| for chunk in chunks | |
| ] | |
| batch_size = 100 | |
| for i in range(0, len(ids), batch_size): | |
| batch_end = min(i + batch_size, len(ids)) | |
| collection.add( | |
| ids=ids[i:batch_end], | |
| documents=documents[i:batch_end], | |
| metadatas=metadatas[i:batch_end], | |
| ) | |
| print( | |
| f"Added batch {i // batch_size + 1}/" | |
| f"{(len(ids) - 1) // batch_size + 1}" | |
| ) | |
| print(f"✓ ChromaDB collection created with {len(ids)} chunks") | |
| return collection | |
| # ---------- MAIN ---------- | |
| def main(): | |
| print("✓ Starting pipeline with .env configuration (SentenceTransformer embeddings)") | |
| print("Starting PDF parsing...") | |
| parsed_documents = parse_pdfs_with_llamaparse(CONFIG["pdf_directory"]) | |
| print(f"\n✓ Parsed {len(parsed_documents)} document sections from PDFs") | |
| chunks = chunk_documents( | |
| parsed_documents, | |
| CONFIG["chunk_size"], | |
| CONFIG["chunk_overlap"], | |
| ) | |
| print(f"✓ Created {len(chunks)} chunks") | |
| if chunks: | |
| print("\nSample chunk:") | |
| print(chunks[0]) | |
| chroma_collection = create_chromadb_collection(chunks) | |
| print("ChromaDB collection ready for querying.") | |
| if __name__ == "__main__": | |
| main() | |