#!/usr/bin/env python3 """ Pathology RAG System - Hugging Face Deployment Features: - Global RAG across all reports - Report-specific RAG via dropdown selection - No PDF or OCR text stored - Fixed top_k = 5 (internal) """ import os import sys from pathlib import Path from datetime import datetime import gradio as gr # -------------------------------------------------- # Environment setup # -------------------------------------------------- os.environ["CUDA_VISIBLE_DEVICES"] = "" sys.path.append("src") # -------------------------------------------------- # Configuration # -------------------------------------------------- FAISS_DB_PATH = "output/biomedbert_vector_db" EMBEDDING_MODEL = "microsoft/BiomedNLP-BiomedBERT-base-uncased-abstract-fulltext" # -------------------------------------------------- # Sanity check # -------------------------------------------------- if not Path(FAISS_DB_PATH).exists(): print("Error: FAISS database not found.") sys.exit(1) # -------------------------------------------------- # Imports # -------------------------------------------------- try: from retriever import CompleteRAGPipeline from document_processor import DynamicRAGUpdater except ImportError as e: print(f"Import Error: {e}") sys.exit(1) # ================================================== # UI CLASS # ================================================== class PathologyRAGUI: def __init__(self, faiss_db_path: str, embedding_model: str): print("Initializing Pathology RAG System...") self.faiss_db_path = faiss_db_path self.embedding_model = embedding_model self.pipeline = CompleteRAGPipeline( faiss_db_path=faiss_db_path, embedding_model=embedding_model, ) self.updater = DynamicRAGUpdater( vector_db_path=faiss_db_path, embedding_model=embedding_model, ) self.query_count = 0 self.upload_count = 0 print("System ready.") # -------------------------------------------------- # Helpers # -------------------------------------------------- def get_report_choices(self): reports = self.pipeline.get_available_reports() return ["All reports"] + reports # -------------------------------------------------- # Upload PDF # -------------------------------------------------- def upload_pdf(self, file): if file is None: return "No file uploaded.", "", gr.update() try: stats = self.updater.process_and_add_pdf(file.path) self.pipeline = CompleteRAGPipeline( faiss_db_path=self.faiss_db_path, embedding_model=self.embedding_model, ) self.upload_count += 1 status = ( f"Upload successful\n\n" f"Chunks created: {stats['num_chunks']}\n" f"Vectors added: {stats['vectors_added']}\n" f"Processing time: {stats['processing_time_seconds']:.2f}s" ) return ( status, f"Total uploads: {self.upload_count}", gr.update(choices=self.get_report_choices(), value="All reports"), ) except Exception as e: return f"Error: {str(e)}", "", gr.update() # -------------------------------------------------- # Process Query # -------------------------------------------------- def process_query(self, question: str, selected_report: str): if not question or not question.strip(): return "Please enter a question.", "", "" self.query_count += 1 report_name = None if selected_report == "All reports" else selected_report result = self.pipeline.ask( question, report_name=report_name, ) answer = result.get("answer", "No answer generated.") sources = result.get("sources", []) sources_text = self.format_sources(sources) metadata = ( f"Query number: {self.query_count}\n" f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" f"Scope: {selected_report}" ) return answer, sources_text, metadata # -------------------------------------------------- # Format sources # -------------------------------------------------- def format_sources(self, sources): if not sources: return "No sources available." formatted = [] for i, src in enumerate(sources, 1): chunk = src["chunk"] score = src.get("ce_score", src.get("score", 0.0)) formatted.append( f"Source {i}\n" f"File: {chunk.get('filename', 'N/A')}\n" f"Score: {score:.3f}\n" f"{chunk.get('text', '')[:300]}...\n" ) return "\n---\n".join(formatted) # -------------------------------------------------- # UI # -------------------------------------------------- def create_interface(self): with gr.Blocks(title="Pathology RAG System") as demo: gr.Markdown( "# Pathology Report Analysis System\n" "Search and question answering over pathology reports" ) with gr.Tabs(): # Upload with gr.Tab("Upload Report"): file_upload = gr.File( label="Select PDF", file_types=[".pdf"], ) upload_btn = gr.Button("Process and Add", variant="primary") upload_status = gr.Markdown("") upload_stats = gr.Markdown("") report_dropdown = gr.Dropdown( choices=self.get_report_choices(), value="All reports", label="Select report", ) upload_btn.click( fn=self.upload_pdf, inputs=[file_upload], outputs=[upload_status, upload_stats, report_dropdown], ) # Query with gr.Tab("Search and Ask"): question_input = gr.Textbox( label="Ask a question", lines=3, ) report_dropdown_query = gr.Dropdown( choices=self.get_report_choices(), value="All reports", label="Select report", ) submit_btn = gr.Button("Search", variant="primary") answer_output = gr.Markdown("") sources_output = gr.Markdown("") metadata_output = gr.Markdown("") submit_btn.click( fn=self.process_query, inputs=[question_input, report_dropdown_query], outputs=[answer_output, sources_output, metadata_output], ) gr.Markdown( "---\nFor research and educational purposes only." ) return demo # ================================================== # MAIN # ================================================== def main(): ui = PathologyRAGUI( faiss_db_path=FAISS_DB_PATH, embedding_model=EMBEDDING_MODEL, ) demo = ui.create_interface() demo.launch() if __name__ == "__main__": main()