Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import base64 | |
| import requests | |
| from PIL import Image | |
| from io import BytesIO | |
| import fitz # PyMuPDF | |
| import os | |
| # Configuration - Replace with your API key | |
| GEMINI_API_KEY = st.secrets["GEMINI_API_KEY"] | |
| GEMINI_MODEL = "gemini-2.0-flash" | |
| DOCUMENT_TYPES = [ | |
| "Insurance Policies", "Explanation of Benefits (EOBs)", | |
| "Claims (Approved, Denied, or Pending)", "Visit Summaries", | |
| "Test Results (Lab Reports, Imaging Reports)", | |
| "Prescriptions (E-Prescriptions, Handwritten)", | |
| "Discharge Summaries", "Medical Bills", "Payment Statements", | |
| "Pharmacy Receipts", "Prior Authorization Requests", | |
| "Consent Forms", "Referral Letters", "Others" | |
| ] | |
| def initialize_session_state(): | |
| """Initialize all session state variables""" | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if "processed_doc" not in st.session_state: | |
| st.session_state.processed_doc = None | |
| if "doc_preview" not in st.session_state: | |
| st.session_state.doc_preview = None | |
| def encode_file(uploaded_file): | |
| """Safely encode different file types to base64""" | |
| try: | |
| file_bytes = uploaded_file.getvalue() | |
| if uploaded_file.type == "application/pdf": | |
| # Convert PDF to image using PyMuPDF | |
| pdf = fitz.open(stream=BytesIO(file_bytes)) | |
| page = pdf[0] # Get the first page | |
| pix = page.get_pixmap() | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| img_byte_arr = BytesIO() | |
| img.save(img_byte_arr, format='JPEG') | |
| return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8') | |
| elif uploaded_file.type in ["image/webp", "image/avif", "image/jpeg", "image/png", "image/gif"]: | |
| img = Image.open(BytesIO(file_bytes)) | |
| img_byte_arr = BytesIO() | |
| img.save(img_byte_arr, format='JPEG') | |
| return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8') | |
| elif uploaded_file.type == "text/plain": | |
| # Convert TXT to image | |
| text = file_bytes.decode('utf-8') | |
| img = Image.new('RGB', (800, 600), color = (73, 109, 137)) | |
| d = ImageDraw.Draw(img) | |
| d.text((10,10), text, fill=(255,255,0)) | |
| img_byte_arr = BytesIO() | |
| img.save(img_byte_arr, format='JPEG') | |
| return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8') | |
| return base64.b64encode(file_bytes).decode('utf-8') | |
| except Exception as e: | |
| st.error(f"File processing error: {str(e)}") | |
| return None | |
| def query_gemini(prompt, image_b64=None): | |
| """Handle Gemini API communication""" | |
| url = f"https://generativelanguage.googleapis.com/v1/models/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}" | |
| parts = [{"text": prompt}] | |
| if image_b64: | |
| parts.append({ | |
| "inline_data": { | |
| "mime_type": "image/jpeg", | |
| "data": image_b64 | |
| } | |
| }) | |
| try: | |
| response = requests.post( | |
| url, | |
| json={"contents": [{"parts": parts}]}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=30 | |
| ) | |
| response.raise_for_status() | |
| return response.json()["candidates"][0]["content"]["parts"][0]["text"] | |
| except Exception as e: | |
| st.error(f"API Error: {str(e)}") | |
| return None | |
| def process_document(): | |
| """Handle document processing pipeline""" | |
| uploaded_file = st.session_state.uploaded_file | |
| if not uploaded_file: | |
| return | |
| try: | |
| with st.spinner("Analyzing document..."): | |
| # Convert to base64 | |
| image_b64 = encode_file(uploaded_file) | |
| if not image_b64: | |
| return | |
| # Generate preview | |
| if uploaded_file.type == "application/pdf": | |
| pdf = fitz.open(stream=BytesIO(uploaded_file.getvalue())) | |
| page = pdf[0] # Get the first page | |
| pix = page.get_pixmap() | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| st.session_state.doc_preview = img | |
| elif uploaded_file.type in ["image/webp", "image/avif", "image/jpeg", "image/png", "image/gif"]: | |
| st.session_state.doc_preview = Image.open(uploaded_file) | |
| elif uploaded_file.type == "text/plain": | |
| text = uploaded_file.getvalue().decode('utf-8') | |
| img = Image.new('RGB', (800, 600), color = (73, 109, 137)) | |
| d = ImageDraw.Draw(img) | |
| d.text((10,10), text, fill=(255,255,0)) | |
| st.session_state.doc_preview = img | |
| # Classify document | |
| classify_prompt = f"Classify this healthcare document into one of: {DOCUMENT_TYPES}. Respond only with the category name." | |
| doc_type = query_gemini(classify_prompt, image_b64) or "Others" | |
| # Store results | |
| st.session_state.processed_doc = { | |
| "type": doc_type, | |
| "content": image_b64, | |
| "summary": query_gemini("Create a detailed structured summary of this healthcare document.", image_b64) | |
| } | |
| except Exception as e: | |
| st.error(f"Processing failed: {str(e)}") | |
| st.session_state.processed_doc = None | |
| def handle_chat_query(): | |
| """Process user chat input""" | |
| user_input = st.session_state.chat_input | |
| if not user_input or not st.session_state.processed_doc: | |
| return | |
| prompt = f""" | |
| Document Context: | |
| - Type: {st.session_state.processed_doc['type']} | |
| - Summary: {st.session_state.processed_doc['summary']} | |
| Question: {user_input} | |
| Answer concisely and factually. If unsure, state "Information not found". | |
| """ | |
| with st.spinner("Generating response..."): | |
| response = query_gemini(prompt, st.session_state.processed_doc['content']) | |
| st.session_state.chat_history.append(("user", user_input)) | |
| st.session_state.chat_history.append(("assistant", response or "Could not generate response")) | |
| # UI Layout | |
| def main(): | |
| st.set_page_config(page_title="Healthcare Document Assistant", layout="wide") | |
| initialize_session_state() | |
| # Sidebar Section | |
| with st.sidebar: | |
| st.header("Document Management") | |
| # Preview above upload button | |
| if st.session_state.doc_preview: | |
| st.subheader("Preview") | |
| st.image(st.session_state.doc_preview, use_container_width=True) | |
| # Upload button | |
| st.file_uploader( | |
| "Upload Document", | |
| type=["pdf", "webp", "avif", "jpg", "jpeg", "png", "gif", "txt"], | |
| key="uploaded_file", | |
| on_change=process_document | |
| ) | |
| # Document type below upload button | |
| if st.session_state.processed_doc: | |
| st.divider() | |
| st.subheader("Document Type") | |
| st.markdown(f"**{st.session_state.processed_doc['type']}**") | |
| # Main Content | |
| st.title("Healthcare Document Assistant") | |
| if st.session_state.processed_doc: | |
| # Document Summary | |
| st.subheader("Document Summary") | |
| st.markdown(st.session_state.processed_doc['summary']) | |
| # Chat Interface | |
| st.divider() | |
| st.subheader("Document Q&A") | |
| # Chat history | |
| for role, message in st.session_state.chat_history: | |
| with st.chat_message(role.capitalize()): | |
| st.markdown(message) | |
| # Chat input | |
| st.chat_input( | |
| "Ask about the document...", | |
| key="chat_input", | |
| on_submit=handle_chat_query | |
| ) | |
| else: | |
| st.info("Please upload a document to begin analysis") | |
| if __name__ == "__main__": | |
| main() |