File size: 6,194 Bytes
bb6a42c 4396f57 bb6a42c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
from typing import List, Dict, Optional
from pdf_parser import extract_text_from_pdfs
from vector_store import VectorStore
from embeddings import CLIPEmbedder
from multimodal_model import Gemma3Model # β Changed from GemmaVisionModel
import logging
logger = logging.getLogger(__name__)
class RAGPipeline:
def __init__(self, pdf_dir: str, chroma_dir: str = "./chroma_db", device: str = "cpu"):
self.pdf_dir = pdf_dir
self.device = device
logger.info("β Initializing RAG Pipeline...")
try:
# Initialize embedder
logger.debug("Loading embedder...")
self.embedder = CLIPEmbedder(
model_name="openai/clip-vit-base-patch32",
device=device
)
# Initialize vector store
logger.debug("Initializing vector store...")
self.vector_store = VectorStore(persist_dir=chroma_dir)
self.vector_store.get_or_create_collection()
# Initialize LLM with Gemma3Model
logger.debug("Loading Gemma 3 1B model...")
self.llm = Gemma3Model(model_name="unsloth/gemma-3-1b-pt", device=device) # β Use Gemma3Model
logger.info("β RAG Pipeline initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize pipeline: {str(e)}", exc_info=True)
raise
def index_pdfs(self):
"""Index PDFs with error logging"""
logger.info("β Starting PDF indexing...")
try:
documents, metadatas = extract_text_from_pdfs(self.pdf_dir)
if not documents:
logger.warning("No documents extracted")
return
logger.debug(f"Extracted {len(documents)} document chunks")
ids = [f"doc_{i}" for i in range(len(documents))]
self.vector_store.add_documents(documents, metadatas, ids)
logger.info(f"β Indexed {len(documents)} document chunks")
except Exception as e:
logger.error(f"Error during indexing: {str(e)}", exc_info=True)
raise
def retrieve_documents(self, query: str, n_results: int = 5) -> List[Dict]:
"""Retrieve documents with error handling"""
try:
logger.debug(f"Searching for: {query[:50]}...")
results = self.vector_store.search(query, n_results=n_results)
retrieved_docs = []
for doc, metadata in zip(results["documents"][0], results["metadatas"][0]):
retrieved_docs.append({
"content": doc,
"source": f"{metadata.get('filename', 'Unknown')} (p{metadata.get('page', '?')})"
})
logger.debug(f"Retrieved {len(retrieved_docs)} documents")
return retrieved_docs
except Exception as e:
logger.error(f"Error retrieving documents: {str(e)}", exc_info=True)
return []
def answer_question(self, question: str, n_context_docs: int = 3) -> Dict:
"""Answer question using RAG with comprehensive error handling"""
logger.info(f"Processing question: {question[:50]}...")
try:
# Retrieve relevant documents
logger.debug(f"Retrieving {n_context_docs} documents...")
retrieved_docs = self.retrieve_documents(question, n_results=n_context_docs)
if not retrieved_docs:
logger.warning("No documents retrieved")
return {
"answer": "No relevant documents found.",
"sources": [],
"context_used": 0
}
logger.debug(f"Retrieved {len(retrieved_docs)} documents")
# Combine context (limit to prevent memory issues)
context = "\n\n".join([
f"[{doc['source']}]\n{doc['content'][:500]}"
for doc in retrieved_docs
])[:2000]
logger.debug("Generating answer with Gemma 3...")
try:
# Use greedy decoding for faster inference with Gemma 3
answer = self.llm.answer_question(question, context)
except Exception as e:
logger.warning(f"Answer generation failed ({e}), using greedy fallback...")
# Fallback to greedy
answer = self.llm.generate_response_greedy(
f"Q: {question}\nA:"
)
# Extract answer
if "Answer:" in answer:
answer = answer.split("Answer:")[-1].strip()
logger.info("β Answer generated successfully")
return {
"answer": answer[:1000], # Limit output length
"sources": [doc["source"] for doc in retrieved_docs],
"context_used": len(retrieved_docs)
}
except Exception as e:
logger.error(f"Error in answer_question: {str(e)}", exc_info=True)
return {
"answer": f"Error generating answer: {str(e)}",
"sources": [],
"context_used": 0
}
def summarize_documents(self) -> str:
"""Summarize all indexed documents"""
collection_info = self.vector_store.get_collection_info()
doc_count = collection_info.get("document_count", 0)
if doc_count == 0:
return "No documents to summarize"
# Sample documents
results = self.vector_store.search("main topic summary", n_results=5)
sampled_content = " ".join([doc for docs in results["documents"] for doc in docs[:200]])
summary = self.llm.summarize_text(sampled_content)
return summary
def get_collection_info(self) -> Dict:
"""Get collection statistics"""
return self.vector_store.get_collection_info()
|