import os import json from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain_huggingface import HuggingFaceEmbeddings class PolicyRAG: def __init__(self): self.vector_store = None # Use a small, lightning-fast model for local embeddings self.embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") # Paths to your admin data self.base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.policy_file = os.path.join(self.base_dir, "data", "policy_terms.json") self.rules_file = os.path.join(self.base_dir, "data", "adjudication_rules.md") def initialize(self): """ Loads the policy and rules files, chunks them, and builds the FAISS index in memory. This runs ONLY ONCE when the FastAPI server starts. """ print("🧠 Initializing LangChain RAG Vector Database...") documents = [] # 1. Load and parse JSON Policy Terms if os.path.exists(self.policy_file): with open(self.policy_file, "r") as f: policy_data = json.load(f) # Convert the JSON dictionary into a flat readable string for the LLM policy_text = json.dumps(policy_data, indent=2) documents.append(Document(page_content=f"POLICY TERMS AND LIMITS:\n{policy_text}", metadata={"source": "policy_terms"})) else: print("⚠️ Warning: policy_terms.json not found.") # 2. Load Markdown Adjudication Rules if os.path.exists(self.rules_file): with open(self.rules_file, "r") as f: rules_text = f.read() documents.append(Document(page_content=f"ADJUDICATION RULES:\n{rules_text}", metadata={"source": "adjudication_rules"})) else: print("⚠️ Warning: adjudication_rules.md not found.") # 3. Chunk the documents so we only pull relevant sections text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=100, separators=["\n## ", "\n### ", "\n", " ", ""] ) chunks = text_splitter.split_documents(documents) # 4. Build the FAISS Vector Database if chunks: self.vector_store = FAISS.from_documents(chunks, self.embeddings) print(f"✅ RAG initialized successfully with {len(chunks)} knowledge chunks.") else: print("❌ RAG initialization failed: No documents loaded.") def get_relevant_context(self, query: str, k: int = 3) -> str: """ Searches the FAISS database for the most relevant policy chunks based on the query. Execution time: ~0.02 seconds. """ if not self.vector_store: return "No policy context available." # Retrieve top k matching documents docs = self.vector_store.similarity_search(query, k=k) # Combine the content into a single string for the Gemini prompt context = "\n\n---\n\n".join([doc.page_content for doc in docs]) return context # Create a singleton instance to be imported by the API router rag_engine = PolicyRAG()