| import gradio as gr |
|
|
| |
| |
| try: |
| from langchain_community.document_loaders import PyPDFLoader |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
| from langchain_community.vectorstores import FAISS |
| from langchain_community.llms import Ollama |
| from langchain_core.prompts import PromptTemplate |
| HEAVY_BACKEND = True |
| except Exception as _err: |
| HEAVY_BACKEND = False |
| print("Falling back to lightweight PDF loader/retriever due to import error:", _err) |
| |
| from pypdf import PdfReader |
| import re |
|
|
| class _SimpleDoc: |
| def __init__(self, text, page_index=0): |
| self.page_content = text |
| self.metadata = {"page": page_index} |
|
|
| def PyPDFLoader(path): |
| class L: |
| def __init__(self, p): |
| self.p = p |
|
|
| def load(self): |
| reader = PdfReader(self.p) |
| docs = [] |
| for i, page in enumerate(reader.pages): |
| text = page.extract_text() or "" |
| docs.append(_SimpleDoc(text, i)) |
| return docs |
|
|
| return L(path) |
|
|
| |
| class RecursiveCharacterTextSplitter: |
| def __init__(self, chunk_size=500, chunk_overlap=100): |
| self.chunk_size = chunk_size |
| self.chunk_overlap = chunk_overlap |
|
|
| def split_documents(self, documents): |
| out = [] |
| for d in documents: |
| text = d.page_content |
| if not text: |
| continue |
| start = 0 |
| while start < len(text): |
| end = start + self.chunk_size |
| chunk = text[start:end] |
| out.append(_SimpleDoc(chunk, d.metadata.get("page", 0))) |
| start = max(end - self.chunk_overlap, end) |
| return out |
|
|
| |
| try: |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| class SimpleRetriever: |
| def __init__(self, docs): |
| self.docs = docs |
| self.texts = [d.page_content for d in docs] |
| self.vectorizer = TfidfVectorizer().fit(self.texts) |
| self.vectors = self.vectorizer.transform(self.texts) |
|
|
| def invoke(self, query, topk=3): |
| qv = self.vectorizer.transform([query]) |
| sims = cosine_similarity(qv, self.vectors)[0] |
| idxs = sims.argsort()[::-1][:topk] |
| return [self.docs[i] for i in idxs] |
|
|
| except Exception: |
| class SimpleRetriever: |
| def __init__(self, docs): |
| self.docs = docs |
|
|
| def invoke(self, query, topk=3): |
| hits = [d for d in self.docs if query.lower() in d.page_content.lower()] |
| return hits[:topk] |
|
|
| |
| class Ollama: |
| def __init__(self, model=None): |
| self.model = model |
|
|
| def invoke(self, prompt): |
| |
| if "Context:" in prompt: |
| parts = prompt.split("Context:") |
| if len(parts) > 1: |
| ctx = parts[1].split("Question:")[0].strip() |
| return ctx[:1000] or "(no context found)" |
| return "(LLM fallback)" |
|
|
| vectorstore = None |
| retriever = None |
| llm = None |
| latest_text = None |
| plan_terms = {} |
|
|
| def process_pdf(file): |
| global vectorstore, retriever, llm |
| global latest_text, plan_terms |
|
|
| import traceback |
|
|
| def _resolve_path(f): |
| |
| if isinstance(f, str): |
| return f |
| if isinstance(f, dict): |
| return f.get("name") or f.get("tmp_path") or f.get("file") |
| if hasattr(f, "name"): |
| return f.name |
| return None |
| try: |
| path = _resolve_path(file) |
| print(" PDF received:", path) |
| if not path: |
| raise ValueError("Could not resolve uploaded file path") |
|
|
| |
| loader = PyPDFLoader(path) |
| documents = loader.load() |
| print(" Loaded pages:", len(documents)) |
|
|
| |
| latest_text = "\n\n".join([d.page_content for d in documents]) |
|
|
| |
| splitter = RecursiveCharacterTextSplitter( |
| chunk_size=500, |
| chunk_overlap=100 |
| ) |
| chunks = splitter.split_documents(documents) |
| print(" Created chunks:", len(chunks)) |
|
|
| |
| print(" Creating embeddings...") |
| embeddings = None |
| if HEAVY_BACKEND: |
| embeddings = HuggingFaceEmbeddings( |
| model_name="all-MiniLM-L6-v2" |
| ) |
|
|
| |
| if HEAVY_BACKEND and embeddings is not None: |
| vectorstore = FAISS.from_documents(chunks, embeddings) |
| retriever = vectorstore.as_retriever() |
| else: |
| |
| retriever = SimpleRetriever(chunks) |
|
|
| print(" Vector DB ready!") |
|
|
| |
| llm = Ollama(model="llama3") |
| print(" Ollama LLM ready!") |
|
|
| |
| try: |
| plan_terms = parse_plan_terms(latest_text) |
| print('Parsed plan terms:', plan_terms) |
| except Exception: |
| plan_terms = {} |
|
|
| return "PDF processed successfully! You can now ask questions." |
| except Exception as e: |
| tb = traceback.format_exc() |
| print(tb) |
| return f"ERROR processing PDF: {e}\n{tb}" |
|
|
|
|
| def chat_with_pdf(question): |
| global retriever, llm |
| global latest_text, plan_terms |
| import traceback |
| try: |
| if retriever is None: |
| return "Please upload and process a PDF first." |
|
|
| print(" Question:", question) |
|
|
| docs = retriever.invoke(question) |
| print(" Retrieved chunks:", len(docs)) |
|
|
| context = "\n\n".join([doc.page_content for doc in docs]) |
|
|
| prompt = f""" |
| You are a helpful assistant. |
| Answer ONLY from the provided context. |
| |
| Context: |
| {context} |
| |
| Question: |
| {question} |
| |
| Answer: |
| """ |
|
|
| print(" Sending to LLM...") |
| |
| m = re.search(r"\$(\s?[0-9,]+)", question) |
| if m and plan_terms: |
| |
| amt = float(re.sub(r"[^0-9.]", "", m.group(0))) |
| |
| if re.search(r"hospital|facility|inpatient|delivery", question, re.I): |
| est = estimate_member_payment(amt, service_type='hospital', network='network', plan=plan_terms) |
| return est |
|
|
| response = llm.invoke(prompt) |
| print(" Response generated.") |
| return response |
| except Exception as e: |
| tb = traceback.format_exc() |
| print(tb) |
| return f"ERROR in chat: {e}\n{tb}" |
|
|
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# Local RAG Chatbot (Modern Version)") |
| gr.Markdown("Upload a PDF, process it, then ask questions.") |
|
|
| file_input = gr.File(label="Upload PDF", file_types=[".pdf"]) |
| process_button = gr.Button("Process PDF") |
| status_output = gr.Textbox(label="Status") |
|
|
| question_input = gr.Textbox(label="Ask a Question") |
| answer_output = gr.Textbox(label="Answer") |
|
|
| process_button.click(process_pdf, inputs=file_input, outputs=status_output) |
| question_input.submit(chat_with_pdf, inputs=question_input, outputs=answer_output) |
|
|
| if __name__ == '__main__': |
| demo.launch() |
|
|
| def parse_plan_terms(text: str) -> dict: |
| """Extract common plan numeric terms from SBC text. |
| |
| Returns keys: overall_deductible_network_individual, out_of_pocket_limit_network_individual, |
| specialist_copay, pcp_copay, urgent_copay, hospital_coinsurance, other_coinsurance |
| """ |
| import re |
| terms = {} |
| |
| m = re.search(r"For network providers\s*\$\s?([0-9,]+)\s*individual", text, re.I) |
| if m: |
| terms['overall_deductible_network_individual'] = float(m.group(1).replace(',', '')) |
| else: |
| |
| m2 = re.search(r"deductible[^\$]{0,40}\$\s?([0-9,]+)", text, re.I) |
| if m2: |
| terms['overall_deductible_network_individual'] = float(m2.group(1).replace(',', '')) |
|
|
| |
| m = re.search(r"out-of-pocket limit[\s\S]{0,80}For network providers\s*\$\s?([0-9,]+)\s*individual", text, re.I) |
| if m: |
| terms['out_of_pocket_limit_network_individual'] = float(m.group(1).replace(',', '')) |
| else: |
| m2 = re.search(r"out-of-pocket limit[\s\S]{0,80}\$\s?([0-9,]+)\s*individual", text, re.I) |
| if m2: |
| terms['out_of_pocket_limit_network_individual'] = float(m2.group(1).replace(',', '')) |
| |
| m_alt = re.search(r"For network providers\s*\$\s?([0-9,]+)\s*individual\s*/\s*\$\s?([0-9,]+)\s*family", text, re.I) |
| if m_alt: |
| terms['out_of_pocket_limit_network_individual'] = float(m_alt.group(1).replace(',', '')) |
|
|
| |
| m = re.search(r"Primary care visit[\s\S]{0,80}\$\s?([0-9,]+)", text, re.I) |
| if m: |
| terms['pcp_copay'] = float(m.group(1).replace(',', '')) |
| m = re.search(r"Specialist\s*Visit[\s\S]{0,80}\$\s?([0-9,]+)", text, re.I) |
| if m: |
| terms['specialist_copay'] = float(m.group(1).replace(',', '')) |
| m = re.search(r"Urgent care[\s\S]{0,80}\$\s?([0-9,]+)", text, re.I) |
| if m: |
| terms['urgent_copay'] = float(m.group(1).replace(',', '')) |
|
|
| |
| |
| for mm in re.finditer(r"([0-9]{1,3})%\s*(?:\n|\s)*Coinsurance", text, re.I): |
| pct = float(mm.group(1)) / 100.0 |
| head = text[max(0, mm.start()-80):mm.start()].lower() |
| if any(k in head for k in ('hospital', 'facility', 'hospital (facility)', 'facility fee')): |
| terms['hospital_coinsurance'] = pct |
| break |
| |
| if 'hospital_coinsurance' not in terms: |
| for mm in re.finditer(r"([0-9]{1,3})%\s*(?:\n|\s)*Coinsurance", text, re.I): |
| pct = float(mm.group(1)) / 100.0 |
| head = text[max(0, mm.start()-80):mm.start()].lower() |
| if 'other' in head: |
| terms['other_coinsurance'] = pct |
| break |
|
|
| |
| if 'hospital_coinsurance' not in terms: |
| m = re.search(r"([0-9]{1,3})%\s*Coinsurance", text, re.I) |
| if m: |
| terms['other_coinsurance'] = float(m.group(1)) / 100.0 |
|
|
| return terms |
|
|
| def estimate_member_payment(bill_amount: float, service_type: str, network: str, plan: dict) -> str: |
| """Estimate member payment for a single service given plan terms. Simplified rules: |
| - Member pays deductible first up to overall deductible |
| - After deductible, coinsurance applies to remaining amount |
| - Copays are ignored for facility inpatient calculations |
| - Cap at out-of-pocket limit if available |
| """ |
| ded = plan.get('overall_deductible_network_individual', 0.0) |
| oop = plan.get('out_of_pocket_limit_network_individual', None) |
| if service_type == 'hospital': |
| coin = plan.get('hospital_coinsurance', plan.get('other_coinsurance', 0.0)) |
| else: |
| coin = plan.get('other_coinsurance', 0.0) |
|
|
| remaining = max(0.0, bill_amount - ded) |
| member_after_ded = coin * remaining |
| member_total = min(ded, bill_amount) + member_after_ded |
|
|
| if oop is not None: |
| |
| member_total_capped = min(member_total, oop) |
| else: |
| member_total_capped = member_total |
|
|
| return f"Estimate for ${bill_amount:,.0f} {('in-network' if network=='network' else '')} {service_type} bill: member pays ${member_total_capped:,.2f} (raw calc ${member_total:,.2f})" |
|
|
|
|