ClariDoc / app /services /RAG_service.py
Kshitijk20's picture
refine response instructions for unanswered queries in RAGService
b63d969
from app.utils.model_loader import ModelLoader
from app.ingestion.file_loader import FileLoader
from app.ingestion.text_splitter import splitting_text
from app.retrieval.retriever import Retriever
from app.embedding.embeder import QueryEmbedding
from app.embedding.vectore_store import VectorStore
from app.metadata_extraction.metadata_ext import MetadataExtractor
from app.utils.metadata_utils import MetadataService
from langchain_core.documents import Document
import json
from langchain_community.retrievers import BM25Retriever
# Global model instances (loaded once)
_embedding_model = None
def get_models():
global _embedding_model
if _embedding_model is None:
print("Loading models (one-time initialization)...")
embedding_loader = ModelLoader(model_provider="huggingface")
_embedding_model = embedding_loader.load_llm()
return _embedding_model
class RAGService:
def __init__(self):
print("[RAGService] Initializing service...")
self._init_models()
self.Docuement_Type = None
self.Pinecone_index = None
self.Document_path = None
self.Document_Type = None
self.DocumentTypeScheme = None
self.url = None
self.chunks = None
self.vector_store = None
self.index = None
self.namespace = None
self.retriever = None
self.metadataservice = MetadataService()
print("[RAGService] Initialization complete.")
def _init_models(self):
"""Initialize LLM and embedding Models"""
print("[RAGService] Loading LLM model (groq)...")
self.model_loader = ModelLoader(model_provider="groq")
self.llm = self.model_loader.load_llm()
print("[RAGService] LLM model loaded.")
print("[RAGService] Loading embedding model (huggingface)...")
# self.model_loader = ModelLoader(model_provider="huggingface")
self.embedding_model = get_models()
print("[RAGService] Embedding model loaded.")
def load_and_split_document(self, type:str, path:str= None, url:str = None):
"""Load and chunk document from local path or URL"""
print(f"[RAGService] Loading document. Type: {type}, Path: {path}, URL: {url}")
file_loader = FileLoader(llm = self.llm)
if type == "pdf":
if path:
print(f"[RAGService] Loading PDF from path: {path}")
doc = file_loader.load_pdf(path)
elif url:
print(f"[RAGService] Loading PDF from URL: {url}")
doc = file_loader.load_documents_from_url(url)
else:
print("[RAGService] Error: Either path or url must be provided for PDF.")
raise ValueError("Either path or url must be provided for PDF.")
elif type == "word":
if path:
print(f"[RAGService] Loading Word document from path: {path}")
doc = file_loader.load_word_document(path)
elif url:
print("[RAGService] Error: URL loading not supported for Word documents.")
raise ValueError("URL loading not supported for Word documents.")
else:
print("[RAGService] Error: Path must be provided for Word document.")
raise ValueError("Path must be provided for Word document.")
else:
print("[RAGService] Error: Unsupported document type.")
raise ValueError("Unsupported document type. Use 'pdf' or 'word'.")
print("[RAGService] Detecting document type scheme...")
self.DocumentTypeScheme = file_loader.detect_document_type(doc[0:2])
print(f"[RAGService] Document type scheme detected: {self.DocumentTypeScheme}")
self.Document_Type = self.metadataservice.Return_document_model(self.DocumentTypeScheme)
print(f"[RAGService] Document type model: {self.Document_Type}")
self.splitter = splitting_text(documentTypeSchema=self.Document_Type, llm=self.llm, embedding_model=self.embedding_model)
print("[RAGService] Splitting document into chunks...")
self.chunks = self.splitter.text_splitting(doc)
print(f"[RAGService] Total chunks created: {len(self.chunks)}")
def create_query_embedding(self, query: str):
print("[RAGService] Creating query embedding...")
self.query = query
self.query_embedder = QueryEmbedding(query=query, embedding_model=self.embedding_model)
self.query_embedding = self.query_embedder.get_embedding()
print(f"[RAGService] Query embedding created: {self.query_embedding}")
langchain_doc = Document(page_content=query)
print("[RAGService] Extracting metadata for the query...")
self.metadataExtractor = MetadataExtractor(llm=self.llm)
with open(self.splitter.Keywordsfile_path, "r") as f:
known_keywords = json.load(f)
raw_metadata = self.metadataExtractor.extractMetadata_query(self.Document_Type,langchain_doc, known_keywords = known_keywords)
print(f"[RAGService] Query metadata extracted: {raw_metadata}")
# Convert to dictionary and format for Pinecone
metadata_dict = raw_metadata.model_dump(exclude_none=True)
formatted_metadata = self.metadataservice.format_metadata_for_pinecone(metadata_dict)
# Remove problematic fields that cause serialization issues
self.query_metadata = {
k: v for k, v in formatted_metadata.items()
if k not in ["obligations", "exclusions", "notes", "added_new_keyword"]
}
print(f"[RAGService] Query metadata type: {type(self.query_metadata)}")
print(f"[RAGService] Query metadata: {self.query_metadata}")
def create_vector_store(self):
print("[RAGService] Creating vector store...")
self.vector_store_class_instance = VectorStore(self.chunks, self.embedding_model)
self.index, self.namespace, self.vector_store = self.vector_store_class_instance.create_vectorestore()
print(f"[RAGService] Vector store created. Index: {self.index}, Namespace: {self.namespace}")
### Sparse Retriever(BM25)
self.sparse_retriever=BM25Retriever.from_documents(self.chunks)
self.sparse_retriever.k=3 ##top- k documents to retriever
def retrive_documents(self, raw_query: str):
print("[RAGService] Retrieving documents from vector store...")
self.create_query_embedding(raw_query)
self.retriever = Retriever(self.index,raw_query,self.query_metadata, self.namespace, self.vector_store,sparse_retriever = self.sparse_retriever,llm = self.llm)
self.result = self.retriever.retrieval_from_pinecone_vectoreStore()
# self.result = self.retriever.invoke(raw_query)
# print(f"[RAGService] Retrieval result: {self.result}")
def answer_query(self, raw_query:str) -> str:
"""Answer user query using retrieved documents and LLM"""
print(f"[RAGService] Answering query: {raw_query}")
# top_clause = self.result['matches']
# top_clause_dicts = [r.to_dict() for r in top_clause]
# self.top_clauses = top_clause_dicts
# keys_to_remove = {"file_path", "source", "producer", "keywords", "subject", "added_new_keyword", "author", "chunk_id"}
# for r in top_clause_dicts:
# meta = r.get("metadata", {})
# for k in keys_to_remove:
# meta.pop(k, None)
# context_clauses = json.dumps(top_clause_dicts, separators=(",", ":"))
context_clauses = [doc.page_content for doc in self.result]
print(f"context_clauses: {context_clauses}")
prompt = f"""
You are a legal/insurance domain expert and policy analyst.
Use the following extracted clauses from policy documents to answer the question.
If you can't find the answer, either say it is not available in the provided context or respond with whatever you can find gracefully.
Provide a concise and clear answer based on the context.
Context clauses:
{"".join(context_clauses)}
Question: {raw_query}
"""
print("[RAGService] Invoking LLM with prompt...")
response = self.llm.invoke(prompt)
print(f"[RAGService] LLM response: {response}")
# Extract string content from response object
if hasattr(response, 'content'):
return response.content
elif isinstance(response, str):
return response
else:
return str(response)