import gradio as gr # Try to import the original (heavy) dependencies; if they fail (e.g. torch DLL issues), # fall back to lightweight implementations that avoid torch/transformers. try: from langchain_community.document_loaders import PyPDFLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import FAISS from langchain_community.llms import Ollama from langchain_core.prompts import PromptTemplate HEAVY_BACKEND = True except Exception as _err: HEAVY_BACKEND = False print("Falling back to lightweight PDF loader/retriever due to import error:", _err) # Lightweight PDF loader using pypdf from pypdf import PdfReader import re class _SimpleDoc: def __init__(self, text, page_index=0): self.page_content = text self.metadata = {"page": page_index} def PyPDFLoader(path): class L: def __init__(self, p): self.p = p def load(self): reader = PdfReader(self.p) docs = [] for i, page in enumerate(reader.pages): text = page.extract_text() or "" docs.append(_SimpleDoc(text, i)) return docs return L(path) # Simple character splitter class RecursiveCharacterTextSplitter: def __init__(self, chunk_size=500, chunk_overlap=100): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def split_documents(self, documents): out = [] for d in documents: text = d.page_content if not text: continue start = 0 while start < len(text): end = start + self.chunk_size chunk = text[start:end] out.append(_SimpleDoc(chunk, d.metadata.get("page", 0))) start = max(end - self.chunk_overlap, end) return out # Simple retriever using TF-IDF if available, otherwise substring match try: from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity class SimpleRetriever: def __init__(self, docs): self.docs = docs self.texts = [d.page_content for d in docs] self.vectorizer = TfidfVectorizer().fit(self.texts) self.vectors = self.vectorizer.transform(self.texts) def invoke(self, query, topk=3): qv = self.vectorizer.transform([query]) sims = cosine_similarity(qv, self.vectors)[0] idxs = sims.argsort()[::-1][:topk] return [self.docs[i] for i in idxs] except Exception: class SimpleRetriever: def __init__(self, docs): self.docs = docs def invoke(self, query, topk=3): hits = [d for d in self.docs if query.lower() in d.page_content.lower()] return hits[:topk] # Lightweight LLM fallback (echo / context-based) if Ollama unavailable class Ollama: def __init__(self, model=None): self.model = model def invoke(self, prompt): # Very small heuristic: return the context first 1000 chars as an answer stub if "Context:" in prompt: parts = prompt.split("Context:") if len(parts) > 1: ctx = parts[1].split("Question:")[0].strip() return ctx[:1000] or "(no context found)" return "(LLM fallback)" vectorstore = None retriever = None llm = None latest_text = None plan_terms = {} def process_pdf(file): global vectorstore, retriever, llm global latest_text, plan_terms import traceback def _resolve_path(f): # Accept a file path string, a file-like with .name, or a Gradio dict if isinstance(f, str): return f if isinstance(f, dict): return f.get("name") or f.get("tmp_path") or f.get("file") if hasattr(f, "name"): return f.name return None try: path = _resolve_path(file) print(" PDF received:", path) if not path: raise ValueError("Could not resolve uploaded file path") # Load PDF loader = PyPDFLoader(path) documents = loader.load() print(" Loaded pages:", len(documents)) # concatenate raw text for parsing latest_text = "\n\n".join([d.page_content for d in documents]) # Split text splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=100 ) chunks = splitter.split_documents(documents) print(" Created chunks:", len(chunks)) # Create embeddings print(" Creating embeddings...") embeddings = None if HEAVY_BACKEND: embeddings = HuggingFaceEmbeddings( model_name="all-MiniLM-L6-v2" ) # Create vector DB if HEAVY_BACKEND and embeddings is not None: vectorstore = FAISS.from_documents(chunks, embeddings) retriever = vectorstore.as_retriever() else: # lightweight retriever retriever = SimpleRetriever(chunks) print(" Vector DB ready!") # Load LLM llm = Ollama(model="llama3") print(" Ollama LLM ready!") # parse plan terms for numeric Q&A try: plan_terms = parse_plan_terms(latest_text) print('Parsed plan terms:', plan_terms) except Exception: plan_terms = {} return "PDF processed successfully! You can now ask questions." except Exception as e: tb = traceback.format_exc() print(tb) return f"ERROR processing PDF: {e}\n{tb}" def chat_with_pdf(question): global retriever, llm global latest_text, plan_terms import traceback try: if retriever is None: return "Please upload and process a PDF first." print(" Question:", question) docs = retriever.invoke(question) print(" Retrieved chunks:", len(docs)) context = "\n\n".join([doc.page_content for doc in docs]) prompt = f""" You are a helpful assistant. Answer ONLY from the provided context. Context: {context} Question: {question} Answer: """ print(" Sending to LLM...") # detect direct numeric cost questions and answer using parsed plan terms m = re.search(r"\$(\s?[0-9,]+)", question) if m and plan_terms: # get numeric value amt = float(re.sub(r"[^0-9.]", "", m.group(0))) # basic detection for hospital if re.search(r"hospital|facility|inpatient|delivery", question, re.I): est = estimate_member_payment(amt, service_type='hospital', network='network', plan=plan_terms) return est response = llm.invoke(prompt) print(" Response generated.") return response except Exception as e: tb = traceback.format_exc() print(tb) return f"ERROR in chat: {e}\n{tb}" with gr.Blocks() as demo: gr.Markdown("# Local RAG Chatbot (Modern Version)") gr.Markdown("Upload a PDF, process it, then ask questions.") file_input = gr.File(label="Upload PDF", file_types=[".pdf"]) process_button = gr.Button("Process PDF") status_output = gr.Textbox(label="Status") question_input = gr.Textbox(label="Ask a Question") answer_output = gr.Textbox(label="Answer") process_button.click(process_pdf, inputs=file_input, outputs=status_output) question_input.submit(chat_with_pdf, inputs=question_input, outputs=answer_output) if __name__ == '__main__': demo.launch() def parse_plan_terms(text: str) -> dict: """Extract common plan numeric terms from SBC text. Returns keys: overall_deductible_network_individual, out_of_pocket_limit_network_individual, specialist_copay, pcp_copay, urgent_copay, hospital_coinsurance, other_coinsurance """ import re terms = {} # overall deductible (network) individual m = re.search(r"For network providers\s*\$\s?([0-9,]+)\s*individual", text, re.I) if m: terms['overall_deductible_network_individual'] = float(m.group(1).replace(',', '')) else: # fallback: first occurrence of 'deductible' followed by $xxx m2 = re.search(r"deductible[^\$]{0,40}\$\s?([0-9,]+)", text, re.I) if m2: terms['overall_deductible_network_individual'] = float(m2.group(1).replace(',', '')) # out-of-pocket limit network individual m = re.search(r"out-of-pocket limit[\s\S]{0,80}For network providers\s*\$\s?([0-9,]+)\s*individual", text, re.I) if m: terms['out_of_pocket_limit_network_individual'] = float(m.group(1).replace(',', '')) else: m2 = re.search(r"out-of-pocket limit[\s\S]{0,80}\$\s?([0-9,]+)\s*individual", text, re.I) if m2: terms['out_of_pocket_limit_network_individual'] = float(m2.group(1).replace(',', '')) # alternative pattern: "For network providers $8,000 individual / $16,000 family" m_alt = re.search(r"For network providers\s*\$\s?([0-9,]+)\s*individual\s*/\s*\$\s?([0-9,]+)\s*family", text, re.I) if m_alt: terms['out_of_pocket_limit_network_individual'] = float(m_alt.group(1).replace(',', '')) # copays m = re.search(r"Primary care visit[\s\S]{0,80}\$\s?([0-9,]+)", text, re.I) if m: terms['pcp_copay'] = float(m.group(1).replace(',', '')) m = re.search(r"Specialist\s*Visit[\s\S]{0,80}\$\s?([0-9,]+)", text, re.I) if m: terms['specialist_copay'] = float(m.group(1).replace(',', '')) m = re.search(r"Urgent care[\s\S]{0,80}\$\s?([0-9,]+)", text, re.I) if m: terms['urgent_copay'] = float(m.group(1).replace(',', '')) # coinsurance percentages (hospital/other) # find all percent coinsurance occurrences and choose the one nearest 'hospital' or 'facility' for mm in re.finditer(r"([0-9]{1,3})%\s*(?:\n|\s)*Coinsurance", text, re.I): pct = float(mm.group(1)) / 100.0 head = text[max(0, mm.start()-80):mm.start()].lower() if any(k in head for k in ('hospital', 'facility', 'hospital (facility)', 'facility fee')): terms['hospital_coinsurance'] = pct break # if not found, try generic 'Other' context if 'hospital_coinsurance' not in terms: for mm in re.finditer(r"([0-9]{1,3})%\s*(?:\n|\s)*Coinsurance", text, re.I): pct = float(mm.group(1)) / 100.0 head = text[max(0, mm.start()-80):mm.start()].lower() if 'other' in head: terms['other_coinsurance'] = pct break # fallback coinsurance general if 'hospital_coinsurance' not in terms: m = re.search(r"([0-9]{1,3})%\s*Coinsurance", text, re.I) if m: terms['other_coinsurance'] = float(m.group(1)) / 100.0 return terms def estimate_member_payment(bill_amount: float, service_type: str, network: str, plan: dict) -> str: """Estimate member payment for a single service given plan terms. Simplified rules: - Member pays deductible first up to overall deductible - After deductible, coinsurance applies to remaining amount - Copays are ignored for facility inpatient calculations - Cap at out-of-pocket limit if available """ ded = plan.get('overall_deductible_network_individual', 0.0) oop = plan.get('out_of_pocket_limit_network_individual', None) if service_type == 'hospital': coin = plan.get('hospital_coinsurance', plan.get('other_coinsurance', 0.0)) else: coin = plan.get('other_coinsurance', 0.0) remaining = max(0.0, bill_amount - ded) member_after_ded = coin * remaining member_total = min(ded, bill_amount) + member_after_ded if oop is not None: # cap at out-of-pocket member_total_capped = min(member_total, oop) else: member_total_capped = member_total return f"Estimate for ${bill_amount:,.0f} {('in-network' if network=='network' else '')} {service_type} bill: member pays ${member_total_capped:,.2f} (raw calc ${member_total:,.2f})"