InsureChat / single_pdf_input.py
nkeer1's picture
Clean snapshot: remove PDF for HF main (including asyncio fix)
47372da
Raw
History Blame Contribute Delete
12.5 kB
import gradio as gr
# Try to import the original (heavy) dependencies; if they fail (e.g. torch DLL issues),
# fall back to lightweight implementations that avoid torch/transformers.
try:
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.llms import Ollama
from langchain_core.prompts import PromptTemplate
HEAVY_BACKEND = True
except Exception as _err:
HEAVY_BACKEND = False
print("Falling back to lightweight PDF loader/retriever due to import error:", _err)
# Lightweight PDF loader using pypdf
from pypdf import PdfReader
import re
class _SimpleDoc:
def __init__(self, text, page_index=0):
self.page_content = text
self.metadata = {"page": page_index}
def PyPDFLoader(path):
class L:
def __init__(self, p):
self.p = p
def load(self):
reader = PdfReader(self.p)
docs = []
for i, page in enumerate(reader.pages):
text = page.extract_text() or ""
docs.append(_SimpleDoc(text, i))
return docs
return L(path)
# Simple character splitter
class RecursiveCharacterTextSplitter:
def __init__(self, chunk_size=500, chunk_overlap=100):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_documents(self, documents):
out = []
for d in documents:
text = d.page_content
if not text:
continue
start = 0
while start < len(text):
end = start + self.chunk_size
chunk = text[start:end]
out.append(_SimpleDoc(chunk, d.metadata.get("page", 0)))
start = max(end - self.chunk_overlap, end)
return out
# Simple retriever using TF-IDF if available, otherwise substring match
try:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class SimpleRetriever:
def __init__(self, docs):
self.docs = docs
self.texts = [d.page_content for d in docs]
self.vectorizer = TfidfVectorizer().fit(self.texts)
self.vectors = self.vectorizer.transform(self.texts)
def invoke(self, query, topk=3):
qv = self.vectorizer.transform([query])
sims = cosine_similarity(qv, self.vectors)[0]
idxs = sims.argsort()[::-1][:topk]
return [self.docs[i] for i in idxs]
except Exception:
class SimpleRetriever:
def __init__(self, docs):
self.docs = docs
def invoke(self, query, topk=3):
hits = [d for d in self.docs if query.lower() in d.page_content.lower()]
return hits[:topk]
# Lightweight LLM fallback (echo / context-based) if Ollama unavailable
class Ollama:
def __init__(self, model=None):
self.model = model
def invoke(self, prompt):
# Very small heuristic: return the context first 1000 chars as an answer stub
if "Context:" in prompt:
parts = prompt.split("Context:")
if len(parts) > 1:
ctx = parts[1].split("Question:")[0].strip()
return ctx[:1000] or "(no context found)"
return "(LLM fallback)"
vectorstore = None
retriever = None
llm = None
latest_text = None
plan_terms = {}
def process_pdf(file):
global vectorstore, retriever, llm
global latest_text, plan_terms
import traceback
def _resolve_path(f):
# Accept a file path string, a file-like with .name, or a Gradio dict
if isinstance(f, str):
return f
if isinstance(f, dict):
return f.get("name") or f.get("tmp_path") or f.get("file")
if hasattr(f, "name"):
return f.name
return None
try:
path = _resolve_path(file)
print(" PDF received:", path)
if not path:
raise ValueError("Could not resolve uploaded file path")
# Load PDF
loader = PyPDFLoader(path)
documents = loader.load()
print(" Loaded pages:", len(documents))
# concatenate raw text for parsing
latest_text = "\n\n".join([d.page_content for d in documents])
# Split text
splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100
)
chunks = splitter.split_documents(documents)
print(" Created chunks:", len(chunks))
# Create embeddings
print(" Creating embeddings...")
embeddings = None
if HEAVY_BACKEND:
embeddings = HuggingFaceEmbeddings(
model_name="all-MiniLM-L6-v2"
)
# Create vector DB
if HEAVY_BACKEND and embeddings is not None:
vectorstore = FAISS.from_documents(chunks, embeddings)
retriever = vectorstore.as_retriever()
else:
# lightweight retriever
retriever = SimpleRetriever(chunks)
print(" Vector DB ready!")
# Load LLM
llm = Ollama(model="llama3")
print(" Ollama LLM ready!")
# parse plan terms for numeric Q&A
try:
plan_terms = parse_plan_terms(latest_text)
print('Parsed plan terms:', plan_terms)
except Exception:
plan_terms = {}
return "PDF processed successfully! You can now ask questions."
except Exception as e:
tb = traceback.format_exc()
print(tb)
return f"ERROR processing PDF: {e}\n{tb}"
def chat_with_pdf(question):
global retriever, llm
global latest_text, plan_terms
import traceback
try:
if retriever is None:
return "Please upload and process a PDF first."
print(" Question:", question)
docs = retriever.invoke(question)
print(" Retrieved chunks:", len(docs))
context = "\n\n".join([doc.page_content for doc in docs])
prompt = f"""
You are a helpful assistant.
Answer ONLY from the provided context.
Context:
{context}
Question:
{question}
Answer:
"""
print(" Sending to LLM...")
# detect direct numeric cost questions and answer using parsed plan terms
m = re.search(r"\$(\s?[0-9,]+)", question)
if m and plan_terms:
# get numeric value
amt = float(re.sub(r"[^0-9.]", "", m.group(0)))
# basic detection for hospital
if re.search(r"hospital|facility|inpatient|delivery", question, re.I):
est = estimate_member_payment(amt, service_type='hospital', network='network', plan=plan_terms)
return est
response = llm.invoke(prompt)
print(" Response generated.")
return response
except Exception as e:
tb = traceback.format_exc()
print(tb)
return f"ERROR in chat: {e}\n{tb}"
with gr.Blocks() as demo:
gr.Markdown("# Local RAG Chatbot (Modern Version)")
gr.Markdown("Upload a PDF, process it, then ask questions.")
file_input = gr.File(label="Upload PDF", file_types=[".pdf"])
process_button = gr.Button("Process PDF")
status_output = gr.Textbox(label="Status")
question_input = gr.Textbox(label="Ask a Question")
answer_output = gr.Textbox(label="Answer")
process_button.click(process_pdf, inputs=file_input, outputs=status_output)
question_input.submit(chat_with_pdf, inputs=question_input, outputs=answer_output)
if __name__ == '__main__':
demo.launch()
def parse_plan_terms(text: str) -> dict:
"""Extract common plan numeric terms from SBC text.
Returns keys: overall_deductible_network_individual, out_of_pocket_limit_network_individual,
specialist_copay, pcp_copay, urgent_copay, hospital_coinsurance, other_coinsurance
"""
import re
terms = {}
# overall deductible (network) individual
m = re.search(r"For network providers\s*\$\s?([0-9,]+)\s*individual", text, re.I)
if m:
terms['overall_deductible_network_individual'] = float(m.group(1).replace(',', ''))
else:
# fallback: first occurrence of 'deductible' followed by $xxx
m2 = re.search(r"deductible[^\$]{0,40}\$\s?([0-9,]+)", text, re.I)
if m2:
terms['overall_deductible_network_individual'] = float(m2.group(1).replace(',', ''))
# out-of-pocket limit network individual
m = re.search(r"out-of-pocket limit[\s\S]{0,80}For network providers\s*\$\s?([0-9,]+)\s*individual", text, re.I)
if m:
terms['out_of_pocket_limit_network_individual'] = float(m.group(1).replace(',', ''))
else:
m2 = re.search(r"out-of-pocket limit[\s\S]{0,80}\$\s?([0-9,]+)\s*individual", text, re.I)
if m2:
terms['out_of_pocket_limit_network_individual'] = float(m2.group(1).replace(',', ''))
# alternative pattern: "For network providers $8,000 individual / $16,000 family"
m_alt = re.search(r"For network providers\s*\$\s?([0-9,]+)\s*individual\s*/\s*\$\s?([0-9,]+)\s*family", text, re.I)
if m_alt:
terms['out_of_pocket_limit_network_individual'] = float(m_alt.group(1).replace(',', ''))
# copays
m = re.search(r"Primary care visit[\s\S]{0,80}\$\s?([0-9,]+)", text, re.I)
if m:
terms['pcp_copay'] = float(m.group(1).replace(',', ''))
m = re.search(r"Specialist\s*Visit[\s\S]{0,80}\$\s?([0-9,]+)", text, re.I)
if m:
terms['specialist_copay'] = float(m.group(1).replace(',', ''))
m = re.search(r"Urgent care[\s\S]{0,80}\$\s?([0-9,]+)", text, re.I)
if m:
terms['urgent_copay'] = float(m.group(1).replace(',', ''))
# coinsurance percentages (hospital/other)
# find all percent coinsurance occurrences and choose the one nearest 'hospital' or 'facility'
for mm in re.finditer(r"([0-9]{1,3})%\s*(?:\n|\s)*Coinsurance", text, re.I):
pct = float(mm.group(1)) / 100.0
head = text[max(0, mm.start()-80):mm.start()].lower()
if any(k in head for k in ('hospital', 'facility', 'hospital (facility)', 'facility fee')):
terms['hospital_coinsurance'] = pct
break
# if not found, try generic 'Other' context
if 'hospital_coinsurance' not in terms:
for mm in re.finditer(r"([0-9]{1,3})%\s*(?:\n|\s)*Coinsurance", text, re.I):
pct = float(mm.group(1)) / 100.0
head = text[max(0, mm.start()-80):mm.start()].lower()
if 'other' in head:
terms['other_coinsurance'] = pct
break
# fallback coinsurance general
if 'hospital_coinsurance' not in terms:
m = re.search(r"([0-9]{1,3})%\s*Coinsurance", text, re.I)
if m:
terms['other_coinsurance'] = float(m.group(1)) / 100.0
return terms
def estimate_member_payment(bill_amount: float, service_type: str, network: str, plan: dict) -> str:
"""Estimate member payment for a single service given plan terms. Simplified rules:
- Member pays deductible first up to overall deductible
- After deductible, coinsurance applies to remaining amount
- Copays are ignored for facility inpatient calculations
- Cap at out-of-pocket limit if available
"""
ded = plan.get('overall_deductible_network_individual', 0.0)
oop = plan.get('out_of_pocket_limit_network_individual', None)
if service_type == 'hospital':
coin = plan.get('hospital_coinsurance', plan.get('other_coinsurance', 0.0))
else:
coin = plan.get('other_coinsurance', 0.0)
remaining = max(0.0, bill_amount - ded)
member_after_ded = coin * remaining
member_total = min(ded, bill_amount) + member_after_ded
if oop is not None:
# cap at out-of-pocket
member_total_capped = min(member_total, oop)
else:
member_total_capped = member_total
return f"Estimate for ${bill_amount:,.0f} {('in-network' if network=='network' else '')} {service_type} bill: member pays ${member_total_capped:,.2f} (raw calc ${member_total:,.2f})"