suryaprakash01's picture
Update app.py
970ef10 verified
#!/usr/bin/env python3
"""
Pathology RAG System - Hugging Face Deployment
Features:
- Global RAG across all reports
- Report-specific RAG via dropdown selection
- No PDF or OCR text stored
- Fixed top_k = 5 (internal)
"""
import os
import sys
from pathlib import Path
from datetime import datetime
import gradio as gr
# --------------------------------------------------
# Environment setup
# --------------------------------------------------
os.environ["CUDA_VISIBLE_DEVICES"] = ""
sys.path.append("src")
# --------------------------------------------------
# Configuration
# --------------------------------------------------
FAISS_DB_PATH = "output/biomedbert_vector_db"
EMBEDDING_MODEL = "microsoft/BiomedNLP-BiomedBERT-base-uncased-abstract-fulltext"
# --------------------------------------------------
# Sanity check
# --------------------------------------------------
if not Path(FAISS_DB_PATH).exists():
print("Error: FAISS database not found.")
sys.exit(1)
# --------------------------------------------------
# Imports
# --------------------------------------------------
try:
from retriever import CompleteRAGPipeline
from document_processor import DynamicRAGUpdater
except ImportError as e:
print(f"Import Error: {e}")
sys.exit(1)
# ==================================================
# UI CLASS
# ==================================================
class PathologyRAGUI:
def __init__(self, faiss_db_path: str, embedding_model: str):
print("Initializing Pathology RAG System...")
self.faiss_db_path = faiss_db_path
self.embedding_model = embedding_model
self.pipeline = CompleteRAGPipeline(
faiss_db_path=faiss_db_path,
embedding_model=embedding_model,
)
self.updater = DynamicRAGUpdater(
vector_db_path=faiss_db_path,
embedding_model=embedding_model,
)
self.query_count = 0
self.upload_count = 0
print("System ready.")
# --------------------------------------------------
# Helpers
# --------------------------------------------------
def get_report_choices(self):
reports = self.pipeline.get_available_reports()
return ["All reports"] + reports
# --------------------------------------------------
# Upload PDF
# --------------------------------------------------
def upload_pdf(self, file):
if file is None:
return "No file uploaded.", "", gr.update()
try:
stats = self.updater.process_and_add_pdf(file.path)
self.pipeline = CompleteRAGPipeline(
faiss_db_path=self.faiss_db_path,
embedding_model=self.embedding_model,
)
self.upload_count += 1
status = (
f"Upload successful\n\n"
f"Chunks created: {stats['num_chunks']}\n"
f"Vectors added: {stats['vectors_added']}\n"
f"Processing time: {stats['processing_time_seconds']:.2f}s"
)
return (
status,
f"Total uploads: {self.upload_count}",
gr.update(choices=self.get_report_choices(), value="All reports"),
)
except Exception as e:
return f"Error: {str(e)}", "", gr.update()
# --------------------------------------------------
# Process Query
# --------------------------------------------------
def process_query(self, question: str, selected_report: str):
if not question or not question.strip():
return "Please enter a question.", "", ""
self.query_count += 1
report_name = None if selected_report == "All reports" else selected_report
result = self.pipeline.ask(
question,
report_name=report_name,
)
answer = result.get("answer", "No answer generated.")
sources = result.get("sources", [])
sources_text = self.format_sources(sources)
metadata = (
f"Query number: {self.query_count}\n"
f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Scope: {selected_report}"
)
return answer, sources_text, metadata
# --------------------------------------------------
# Format sources
# --------------------------------------------------
def format_sources(self, sources):
if not sources:
return "No sources available."
formatted = []
for i, src in enumerate(sources, 1):
chunk = src["chunk"]
score = src.get("ce_score", src.get("score", 0.0))
formatted.append(
f"Source {i}\n"
f"File: {chunk.get('filename', 'N/A')}\n"
f"Score: {score:.3f}\n"
f"{chunk.get('text', '')[:300]}...\n"
)
return "\n---\n".join(formatted)
# --------------------------------------------------
# UI
# --------------------------------------------------
def create_interface(self):
with gr.Blocks(title="Pathology RAG System") as demo:
gr.Markdown(
"# Pathology Report Analysis System\n"
"Search and question answering over pathology reports"
)
with gr.Tabs():
# Upload
with gr.Tab("Upload Report"):
file_upload = gr.File(
label="Select PDF",
file_types=[".pdf"],
)
upload_btn = gr.Button("Process and Add", variant="primary")
upload_status = gr.Markdown("")
upload_stats = gr.Markdown("")
report_dropdown = gr.Dropdown(
choices=self.get_report_choices(),
value="All reports",
label="Select report",
)
upload_btn.click(
fn=self.upload_pdf,
inputs=[file_upload],
outputs=[upload_status, upload_stats, report_dropdown],
)
# Query
with gr.Tab("Search and Ask"):
question_input = gr.Textbox(
label="Ask a question",
lines=3,
)
report_dropdown_query = gr.Dropdown(
choices=self.get_report_choices(),
value="All reports",
label="Select report",
)
submit_btn = gr.Button("Search", variant="primary")
answer_output = gr.Markdown("")
sources_output = gr.Markdown("")
metadata_output = gr.Markdown("")
submit_btn.click(
fn=self.process_query,
inputs=[question_input, report_dropdown_query],
outputs=[answer_output, sources_output, metadata_output],
)
gr.Markdown(
"---\nFor research and educational purposes only."
)
return demo
# ==================================================
# MAIN
# ==================================================
def main():
ui = PathologyRAGUI(
faiss_db_path=FAISS_DB_PATH,
embedding_model=EMBEDDING_MODEL,
)
demo = ui.create_interface()
demo.launch()
if __name__ == "__main__":
main()