import streamlit as st import requests import json from PIL import Image import pandas as pd from datetime import datetime import os import time import base64 import io import re import html # Must be the first Streamlit command st.set_page_config( page_title="MediVision - Radiology Report System", page_icon="🔬", layout="wide", initial_sidebar_state="expanded" ) # Configuration FASTAPI_BASE_URL = "http://localhost:8001" UPLOAD_ENDPOINT = f"{FASTAPI_BASE_URL}/upload/" REPORTS_ENDPOINT = f"{FASTAPI_BASE_URL}/reports" # ============================================================================ # CUSTOM STYLING (Based on Hospital System) # ============================================================================ st.markdown(""" """, unsafe_allow_html=True) # ============================================================================ def main(): # Modern header with MediVision branding st.markdown("""

🔬 MediVision

Advanced Radiology Report System - Powered by AI Technology

""", unsafe_allow_html=True) # Sidebar navigation st.sidebar.markdown("## 📋 Navigation") page = st.sidebar.selectbox("Choose Page", [ "📤 Upload & Generate Report", "📊 View Reports", "⚙️ System Status" ]) if "Upload" in page: upload_page() elif "View Reports" in page: view_reports_page() elif "System Status" in page: system_status_page() def upload_page(): st.markdown('
📤 Upload Medical Images & Generate Reports
', unsafe_allow_html=True) # Create two columns for better layout col1, col2 = st.columns([2, 1]) with col1: st.markdown('
', unsafe_allow_html=True) st.markdown("### 👤 Patient Information") # Patient form with modern styling with st.form("patient_form"): name = st.text_input("Patient Name *", placeholder="Enter full name") date_of_birth = st.date_input("Date of Birth *") gender = st.selectbox("Gender *", ["Male", "Female", "Other"]) # MRN field (auto-generated, but allow manual override) col1, col2 = st.columns([3, 1]) with col1: medical_record_number = st.text_input( "Medical Record Number (MRN)", placeholder="Auto-generated if left empty", help="Leave empty to auto-generate MRN based on patient name and DOB" ) with col2: st.markdown("
", unsafe_allow_html=True) # Add spacing if st.form_submit_button("🔢 Generate MRN", use_container_width=True): if name and date_of_birth: try: response = requests.get( f"{FASTAPI_BASE_URL}/generate-mrn/", params={"patient_name": name, "date_of_birth": str(date_of_birth)}, timeout=10 ) if response.status_code == 200: data = response.json() if data.get("success"): st.session_state.generated_mrn = data.get("mrn") st.success(f"✅ Generated MRN: {data.get('mrn')}") st.rerun() except Exception as e: st.error(f"❌ Error generating MRN: {str(e)}") else: st.error("⚠️ Please enter patient name and date of birth first") # Show generated MRN if available if hasattr(st.session_state, 'generated_mrn') and st.session_state.generated_mrn: medical_record_number = st.text_input( "Generated MRN", value=st.session_state.generated_mrn, disabled=True, help="Auto-generated MRN - you can edit the field above to override" ) referring_physician = st.text_input("Referring Physician *", placeholder="Enter physician name") date_of_study = st.date_input("Date of Study *", value=datetime.now().date()) st.markdown("### 🖼️ Medical Image") st.markdown('
', unsafe_allow_html=True) uploaded_file = st.file_uploader( "Choose medical image", type=['png', 'jpg', 'jpeg', 'dcm'], help="Upload X-ray, CT, MRI, (Max 10MB)", key="medical_image_upload" ) st.markdown('
', unsafe_allow_html=True) # Display uploaded image with modern styling (NO API CALLS) if uploaded_file is not None: try: # Validate file size only (no API calls) file_size = uploaded_file.size if file_size > 10 * 1024 * 1024: # 10MB st.error("⚠️ File size too large. Please upload an image smaller than 10MB.") else: # Display success message and file info st.success(f"✅ File uploaded successfully: {uploaded_file.name}") st.info(f"📁 File size: {file_size/1024:.1f} KB") # Display image preview (local only, no backend calls) if uploaded_file.type.startswith('image/'): image = Image.open(uploaded_file) st.image(image, caption="Uploaded Medical Image", use_column_width=True) else: st.info("📄 DICOM file uploaded successfully") # Reset file pointer for later use uploaded_file.seek(0) except Exception as e: st.error(f"❌ Error processing image: {str(e)}") st.info("💡 Please ensure the file is a valid image format (PNG, JPG, JPEG, DCM)") else: st.info("👆 Please upload a medical image to begin analysis") submit_button = st.form_submit_button("🔍 Generate Report", use_container_width=True) if submit_button: # Get the MRN (either generated or manually entered) final_mrn = medical_record_number if hasattr(st.session_state, 'generated_mrn') and st.session_state.generated_mrn and not medical_record_number: final_mrn = st.session_state.generated_mrn # Validation (MRN is now optional as it can be auto-generated) if not all([name, date_of_birth, gender, referring_physician, uploaded_file]): st.error("⚠️ Please fill all required fields and upload an image") else: generate_report(name, str(date_of_birth), gender, final_mrn, referring_physician, str(date_of_study), uploaded_file) st.markdown('
', unsafe_allow_html=True) def generate_report(name, date_of_birth, gender, medical_record_number, referring_physician, date_of_study, uploaded_file): """Generate report by sending data to backend and displaying the result""" try: if uploaded_file is not None: with st.spinner("🔄 Analyzing medical image..."): # Validate file before sending if uploaded_file.size > 10 * 1024 * 1024: # 10MB limit st.error("⚠️ File size too large. Please upload an image smaller than 10MB.") return # Reset file pointer to beginning uploaded_file.seek(0) # Prepare file for upload files = {"file": (uploaded_file.name, uploaded_file.getvalue(), uploaded_file.type)} try: # Call analyze endpoint analyze_response = requests.post( f"{FASTAPI_BASE_URL}/analyze", files=files, timeout=60 # 60 second timeout ) if analyze_response.status_code == 200: response_data = analyze_response.json() # Handle successful response if response_data.get("success"): findings = response_data.get("analysis", "Analysis completed") # Parse thinking and answer sections thinking_text = "" report_text = findings if "thinking:" in findings and "answer:" in findings: parts = findings.split("answer:", 1) thinking_text = parts[0].replace("thinking:", "").strip() report_text = parts[1].strip() st.success("✅ Analysis completed successfully!") stream_response(thinking_text, report_text, { "name": name, "medical_record_number": medical_record_number, "referring_physician": referring_physician, "date_of_study": date_of_study }) else: # Handle failed analysis with fallback st.warning("⚠️ AI analysis encountered issues, using fallback analysis") fallback_findings = response_data.get("analysis", "Medical image processed. Professional review recommended.") stream_response("Fallback analysis used", fallback_findings, { "name": name, "medical_record_number": medical_record_number, "referring_physician": referring_physician, "date_of_study": date_of_study }) findings = fallback_findings else: st.error(f"❌ Backend Error (Status {analyze_response.status_code})") st.error(f"Response: {analyze_response.text}") return except requests.exceptions.Timeout: st.error("⏱️ Request timed out. The analysis is taking too long. Please try again.") return except requests.exceptions.ConnectionError: st.error("🔌 Cannot connect to backend service.") st.error("Please ensure the backend is running at: http://localhost:8001") st.info("💡 To start the backend, run: `cd backend && python service.py`") return except requests.exceptions.RequestException as e: st.error(f"❌ Request failed: {str(e)}") return # Generate PDF report with st.spinner("📄 Generating PDF report..."): try: pdf_response = requests.post( f"{FASTAPI_BASE_URL}/generate-report/", data={ "patient_name": name, "medical_record_number": medical_record_number or "AUTO-GENERATED", "date_of_birth": date_of_birth, "gender": gender, "referring_physician": referring_physician, "study_date": date_of_study, "findings": findings }, timeout=30 ) if pdf_response.status_code == 200: pdf_data = pdf_response.json() if pdf_data.get("success"): pdf_url = f"{FASTAPI_BASE_URL}/reports/{name.replace(' ', '_')}/pdf" st.markdown(f'''

✅ PDF Report Generated Successfully!

📄 Download PDF Report
''', unsafe_allow_html=True) else: st.error(f"❌ Failed to generate PDF: {pdf_data.get('message', 'Unknown error')}") else: st.error(f"❌ PDF generation failed with status {pdf_response.status_code}") except requests.exceptions.Timeout: st.error("⏱️ PDF generation timed out. Please try again.") except Exception as e: st.error(f"❌ Error generating PDF: {str(e)}") except Exception as e: st.error(f"❌ Unexpected error: {str(e)}") st.info("💡 Troubleshooting tips:") st.info(" • Ensure backend service is running on port 8001") st.info(" • Check that the uploaded file is a valid image") st.info(" • Try refreshing the page and uploading again") def view_reports_page(): st.markdown('
📊 View Patient Reports - Search by MRN
', unsafe_allow_html=True) # Search section with modern styling st.markdown('
', unsafe_allow_html=True) col1, col2 = st.columns([3, 1]) with col1: patient_mrn = st.text_input("🔍 Search by MRN", placeholder="Enter Medical Record Number (e.g., MV-20250618-A1B2)") with col2: st.markdown("
", unsafe_allow_html=True) # Add spacing search_button = st.button("🔍 Search", use_container_width=True) st.markdown('
', unsafe_allow_html=True) # Action buttons col1, col2 = st.columns(2) with col1: if search_button and patient_mrn: search_reports_by_mrn(patient_mrn) with col2: if st.button("📋 Show All Patients", use_container_width=True): show_all_patients() def search_reports_by_mrn(mrn): """Search for reports by MRN with modern styling""" with st.spinner(f"🔍 Searching for patient with MRN: {mrn}..."): try: response = requests.get(f"{FASTAPI_BASE_URL}/patient/{mrn}", timeout=10) if response.status_code == 200: patient_data = response.json() st.markdown(f"""

✅ Patient Found: {patient_data.get('patient_name', 'N/A')}

MRN: {patient_data.get('mrn', 'N/A')}

""", unsafe_allow_html=True) # Display patient details display_patient_details(patient_data) elif response.status_code == 404: st.markdown(f"""

⚠️ No Patient Found

No patient record found with MRN: {mrn}

Please check the MRN and try again.

""", unsafe_allow_html=True) else: st.error(f"❌ Error searching patient: HTTP {response.status_code}") except requests.exceptions.ConnectionError: st.error("❌ Cannot connect to the backend server. Please make sure the FastAPI server is running.") except requests.exceptions.Timeout: st.error("❌ Search request timed out. Please try again.") except Exception as e: st.error(f"❌ An error occurred: {str(e)}") def show_all_patients(): """Show all patients in the system""" with st.spinner("📋 Loading all patient records..."): try: response = requests.get(f"{FASTAPI_BASE_URL}/patients", timeout=15) if response.status_code == 200: data = response.json() patients = data.get('patients', []) if patients: st.markdown(f"""

📊 Found {len(patients)} Patient Records

""", unsafe_allow_html=True) # Display patients in a table format for patient in patients: display_patient_summary(patient) else: st.info("📋 No patient records found in the system yet.") else: st.error(f"❌ Error retrieving patients: {response.text}") except requests.exceptions.ConnectionError: st.error("❌ Cannot connect to the backend server. Please make sure the FastAPI server is running.") except Exception as e: st.error(f"❌ An error occurred: {str(e)}") def display_patient_details(patient_data): """Display detailed patient information""" st.markdown('
', unsafe_allow_html=True) st.markdown("### 👤 Patient Details") col1, col2 = st.columns(2) with col1: st.markdown(f"**Name:** {patient_data.get('patient_name', 'N/A')}") st.markdown(f"**MRN:** {patient_data.get('mrn', 'N/A')}") st.markdown(f"**Date of Birth:** {patient_data.get('date_of_birth', 'N/A')}") st.markdown(f"**Gender:** {patient_data.get('gender', 'N/A')}") with col2: st.markdown(f"**Referring Physician:** Dr. {patient_data.get('referring_physician', 'N/A')}") st.markdown(f"**Study Date:** {patient_data.get('date_of_study', 'N/A')}") st.markdown(f"**Report Date:** {patient_data.get('report_date', 'N/A')}") st.markdown(f"**Status:** {patient_data.get('status', 'N/A').title()}") # Display findings if patient_data.get('findings'): st.markdown("### 🔍 Findings") findings = patient_data.get('findings', '') # Process findings to show only the answer part if "answer:" in findings.lower(): parts = findings.split("answer:", 1) if len(parts) == 2: findings_display = parts[1].strip() else: findings_display = findings else: findings_display = findings st.markdown(findings_display) # PDF download link patient_name = patient_data.get('patient_name', 'Unknown') pdf_url = f"{FASTAPI_BASE_URL}/reports/{patient_name}/pdf" st.markdown(f""" 📄 Download PDF Report """, unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) def display_patient_summary(patient_data): """Display patient summary in a card format""" st.markdown('
', unsafe_allow_html=True) col1, col2, col3 = st.columns([2, 2, 1]) with col1: st.markdown(f"**{patient_data.get('patient_name', 'N/A')}**") st.markdown(f"MRN: {patient_data.get('mrn', 'N/A')}") with col2: st.markdown(f"Study: {patient_data.get('date_of_study', 'N/A')}") st.markdown(f"Physician: Dr. {patient_data.get('referring_physician', 'N/A')}") with col3: # Search button for this specific patient if st.button(f"🔍 View", key=f"view_{patient_data.get('mrn', 'unknown')}"): search_reports_by_mrn(patient_data.get('mrn', '')) st.markdown('
', unsafe_allow_html=True) # Remove old search functions - they are replaced by MRN-based search functions above def display_report_details(report): """Display detailed report information with modern cards""" st.markdown('
', unsafe_allow_html=True) col1, col2 = st.columns(2) with col1: st.markdown("**👤 معلومات المريض - Patient Information:**") st.markdown(f""" - **الاسم - Name:** {report.get('name', 'غير متاح - N/A')} - **تاريخ الميلاد - DOB:** {report.get('date_of_birth', 'غير متاح - N/A')} - **الجنس - Gender:** {report.get('gender', 'غير متاح - N/A')} - **رقم السجل - MRN:** {report.get('medical_record_number', 'غير متاح - N/A')} """) with col2: st.markdown("**🏥 معلومات الفحص - Study Information:**") st.markdown(f""" - **الطبيب المحيل - Physician:** {report.get('referring_physician', 'غير متاح - N/A')} - **تاريخ الفحص - Study Date:** {report.get('date_of_study', 'غير متاح - N/A')} - **النتائج - Findings:** {report.get('findings', 'غير متاح - N/A')} """) # PDF download button with modern styling (always use backend endpoint) if 'name' in report: pdf_url = f"{FASTAPI_BASE_URL}/reports/{report.get('name')}/pdf" st.markdown(f"""
📄 تحميل التقرير - Download PDF Report
""", unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) def show_all_reports(): """Show all reports using the FastAPI backend""" with st.spinner("📋 Loading all reports..."): try: response = requests.get(f"{REPORTS_ENDPOINT}/") if response.status_code == 200: data = response.json() reports = data.get('reports', []) count = data.get('count', 0) if reports: st.success(f"✅ Found {count} total report(s) in the system") # Create a summary table if reports: df_data = [] for report in reports: df_data.append({ 'Patient Name': report.get('name', 'N/A'), 'Date of Birth': report.get('date_of_birth', 'N/A'), 'Gender': report.get('gender', 'N/A'), 'Study Date': report.get('date_of_study', 'N/A'), 'Physician': report.get('referring_physician', 'N/A') }) df = pd.DataFrame(df_data) st.dataframe(df, use_container_width=True) # Show detailed reports st.subheader("Detailed Reports") for i, report in enumerate(reports): with st.expander(f"{report.get('name', 'Unknown')} - {report.get('date_of_study', 'Unknown Date')}"): display_report_details(report) else: st.info("📋 No reports found in the system yet.") else: st.error(f"❌ Error retrieving reports: {response.text}") except requests.exceptions.ConnectionError: st.error("❌ Cannot connect to the backend server. Please make sure the FastAPI server is running.") except Exception as e: st.error(f"❌ An error occurred: {str(e)}") def system_status_page(): """System status and connectivity check page""" st.markdown('
⚙️ System Status & Diagnostics
', unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) st.markdown("### 🔍 Backend Service Status") col1, col2 = st.columns([3, 1]) with col2: if st.button("🔄 Check Status", use_container_width=True): check_backend_status() with col1: st.info("Click 'Check Status' to test backend connectivity") st.markdown('
', unsafe_allow_html=True) # Show system information st.markdown('
', unsafe_allow_html=True) st.markdown("### 📊 System Information") col1, col2 = st.columns(2) with col1: st.metric("Frontend", "Streamlit", "Running ✅") st.metric("Backend URL", FASTAPI_BASE_URL, "Configured") with col2: st.metric("Upload Endpoint", "/analyze/", "Ready") st.metric("Reports Endpoint", "/reports", "Ready") st.markdown('
', unsafe_allow_html=True) def check_backend_status(): """Check if backend service is running and accessible""" try: with st.spinner("Checking backend connectivity..."): # Test health endpoint response = requests.get(f"{FASTAPI_BASE_URL}/health", timeout=10) if response.status_code == 200: data = response.json() st.success(f"✅ Backend service is healthy!") st.json(data) else: st.error(f"❌ Backend returned status {response.status_code}") except requests.exceptions.ConnectionError: st.error("❌ Cannot connect to backend service") st.info("💡 Make sure the backend is running:") st.code("cd backend && python service.py --serve") except requests.exceptions.Timeout: st.error("❌ Backend request timed out") except Exception as e: st.error(f"❌ Unexpected error: {str(e)}") def stream_response(thinking_text, response_text, patient_info): """ Streams the AI response with proper formatting and bullet points. """ # --- Helper function for formatting --- def format_text_to_html(text): """Converts plain text with newlines and list-like structures to HTML, keeping section headers as normal text.""" section_headers = [ "Initial Assessment:", "Key Findings:", "Clinical Significance:", "Impression:", "Conclusion:", "Recommendations:", "Technical Details:", "Comparison:", "History:", "Exam Type:", "Findings:", "Clinical History:", "Technique:", "Indication:", "Result:", "Results:", "Summary:", "Assessment:", "Plan:", "Follow-up:", "Brief Structured Report:", "Report:", "Diagnosis:", "Opinion:" ] text = html.escape(text) lines = text.split('\n') html_lines = [] in_list = False for line in lines: stripped_line = line.strip() # Remove leading bullets/numbers for header checking clean_line = re.sub(r'^[\d+\.]*[\-\*\•]?\s*', '', stripped_line) # Also try removing just the number prefix for numbered headers clean_line_no_number = re.sub(r'^\d+\.?\s*', '', stripped_line) # Check if this is a section header (case-insensitive, flexible matching) is_section_header = any( clean_line.lower().startswith(header.lower()) or clean_line.lower() == header.lower().rstrip(':') or clean_line_no_number.lower().startswith(header.lower()) or clean_line_no_number.lower() == header.lower().rstrip(':') or # Handle cases like "EXAM TYPE:" matching "Exam Type:" clean_line.lower().replace(' ', '').startswith(header.lower().replace(' ', '').rstrip(':')) or clean_line_no_number.lower().replace(' ', '').startswith(header.lower().replace(' ', '').rstrip(':')) for header in section_headers ) if is_section_header: # Close any open list if in_list: html_lines.append('' if 'ol>' in str(html_lines[-3:]) else '') in_list = False # Use the version without numbers/bullets for display display_text = clean_line_no_number if clean_line_no_number.lower().endswith(':') else clean_line html_lines.append(f'

{display_text}

') elif re.match(r'^[\-\*\•]\s', stripped_line) and not is_section_header: # This is a bullet point (not a section header) if not in_list: html_lines.append('') return ''.join(html_lines) # --- Thinking Process --- thinking_container = st.empty() thinking_container.markdown("""
AI is analyzing
""", unsafe_allow_html=True) st.markdown('
🤔 AI Analysis Process:
', unsafe_allow_html=True) thinking_output = st.empty() thinking_text_container = "" for char in thinking_text: thinking_text_container += char formatted_thinking = format_text_to_html(thinking_text_container) thinking_output.markdown(f'
{formatted_thinking}
', unsafe_allow_html=True) time.sleep(0.005) thinking_container.empty() # --- Define Report Components --- current_date = datetime.now().strftime("%B %d, %Y") current_time = datetime.now().strftime("%I:%M %p") report_header = ( '
' '
' '
🔬 MediVision Radiology Center
' '
Advanced AI-Powered Medical Imaging Analysis
' '
' '

📋 RADIOLOGY REPORT

' '
' f'
Patient Name:{patient_info["name"]}
' f'
MRN:{patient_info["medical_record_number"]}
' f'
Study Date:{patient_info.get("date_of_study", current_date)}
' f'
Report Date:{current_date} at {current_time}
' f'
Referring Physician:Dr. {patient_info["referring_physician"]}
' f'
Radiologist:MediVision AI Assistant
' '
' '
' ) report_footer = ( '' ) # --- Stream the response --- report_container = st.empty() response_text_accumulated = "" for char in response_text: response_text_accumulated += char formatted_content = format_text_to_html(response_text_accumulated) content_html = ( '
' f'{formatted_content}' '
' ) full_report_so_far = f'
{report_header}{content_html}
' report_container.markdown(full_report_so_far, unsafe_allow_html=True) time.sleep(0.01) # Final report with footer final_formatted_content = format_text_to_html(response_text_accumulated) final_content_html = ( '
' f'{final_formatted_content}' '
' ) final_full_report = f'
{report_header}{final_content_html}{report_footer}
' report_container.markdown(final_full_report, unsafe_allow_html=True) if __name__ == "__main__": main()