Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from langchain_core.documents import Document | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| class PolicyRAG: | |
| def __init__(self): | |
| self.vector_store = None | |
| # Use a small, lightning-fast model for local embeddings | |
| self.embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| # Paths to your admin data | |
| self.base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| self.policy_file = os.path.join(self.base_dir, "data", "policy_terms.json") | |
| self.rules_file = os.path.join(self.base_dir, "data", "adjudication_rules.md") | |
| def initialize(self): | |
| """ | |
| Loads the policy and rules files, chunks them, and builds the FAISS index in memory. | |
| This runs ONLY ONCE when the FastAPI server starts. | |
| """ | |
| print("🧠 Initializing LangChain RAG Vector Database...") | |
| documents = [] | |
| # 1. Load and parse JSON Policy Terms | |
| if os.path.exists(self.policy_file): | |
| with open(self.policy_file, "r") as f: | |
| policy_data = json.load(f) | |
| # Convert the JSON dictionary into a flat readable string for the LLM | |
| policy_text = json.dumps(policy_data, indent=2) | |
| documents.append(Document(page_content=f"POLICY TERMS AND LIMITS:\n{policy_text}", metadata={"source": "policy_terms"})) | |
| else: | |
| print("⚠️ Warning: policy_terms.json not found.") | |
| # 2. Load Markdown Adjudication Rules | |
| if os.path.exists(self.rules_file): | |
| with open(self.rules_file, "r") as f: | |
| rules_text = f.read() | |
| documents.append(Document(page_content=f"ADJUDICATION RULES:\n{rules_text}", metadata={"source": "adjudication_rules"})) | |
| else: | |
| print("⚠️ Warning: adjudication_rules.md not found.") | |
| # 3. Chunk the documents so we only pull relevant sections | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=100, | |
| separators=["\n## ", "\n### ", "\n", " ", ""] | |
| ) | |
| chunks = text_splitter.split_documents(documents) | |
| # 4. Build the FAISS Vector Database | |
| if chunks: | |
| self.vector_store = FAISS.from_documents(chunks, self.embeddings) | |
| print(f"✅ RAG initialized successfully with {len(chunks)} knowledge chunks.") | |
| else: | |
| print("❌ RAG initialization failed: No documents loaded.") | |
| def get_relevant_context(self, query: str, k: int = 3) -> str: | |
| """ | |
| Searches the FAISS database for the most relevant policy chunks based on the query. | |
| Execution time: ~0.02 seconds. | |
| """ | |
| if not self.vector_store: | |
| return "No policy context available." | |
| # Retrieve top k matching documents | |
| docs = self.vector_store.similarity_search(query, k=k) | |
| # Combine the content into a single string for the Gemini prompt | |
| context = "\n\n---\n\n".join([doc.page_content for doc in docs]) | |
| return context | |
| # Create a singleton instance to be imported by the API router | |
| rag_engine = PolicyRAG() |