import streamlit as st
import requests
import json
from PIL import Image
import pandas as pd
from datetime import datetime
import os
import time
import base64
import io
import re
import html
# Must be the first Streamlit command
st.set_page_config(
page_title="MediVision - Radiology Report System",
page_icon="🔬",
layout="wide",
initial_sidebar_state="expanded"
)
# Configuration
FASTAPI_BASE_URL = "http://localhost:8001"
UPLOAD_ENDPOINT = f"{FASTAPI_BASE_URL}/upload/"
REPORTS_ENDPOINT = f"{FASTAPI_BASE_URL}/reports"
# ============================================================================
# CUSTOM STYLING (Based on Hospital System)
# ============================================================================
st.markdown("""
""", unsafe_allow_html=True)
# ============================================================================
def main():
# Modern header with MediVision branding
st.markdown("""
""", unsafe_allow_html=True)
# Sidebar navigation
st.sidebar.markdown("## 📋 Navigation")
page = st.sidebar.selectbox("Choose Page", [
"📤 Upload & Generate Report",
"📊 View Reports",
"⚙️ System Status"
])
if "Upload" in page:
upload_page()
elif "View Reports" in page:
view_reports_page()
elif "System Status" in page:
system_status_page()
def upload_page():
st.markdown('', unsafe_allow_html=True)
# Create two columns for better layout
col1, col2 = st.columns([2, 1])
with col1:
st.markdown('', unsafe_allow_html=True)
st.markdown("### 👤 Patient Information")
# Patient form with modern styling
with st.form("patient_form"):
name = st.text_input("Patient Name *", placeholder="Enter full name")
date_of_birth = st.date_input("Date of Birth *")
gender = st.selectbox("Gender *", ["Male", "Female", "Other"])
# MRN field (auto-generated, but allow manual override)
col1, col2 = st.columns([3, 1])
with col1:
medical_record_number = st.text_input(
"Medical Record Number (MRN)",
placeholder="Auto-generated if left empty",
help="Leave empty to auto-generate MRN based on patient name and DOB"
)
with col2:
st.markdown("
", unsafe_allow_html=True) # Add spacing
if st.form_submit_button("🔢 Generate MRN", use_container_width=True):
if name and date_of_birth:
try:
response = requests.get(
f"{FASTAPI_BASE_URL}/generate-mrn/",
params={"patient_name": name, "date_of_birth": str(date_of_birth)},
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get("success"):
st.session_state.generated_mrn = data.get("mrn")
st.success(f"✅ Generated MRN: {data.get('mrn')}")
st.rerun()
except Exception as e:
st.error(f"❌ Error generating MRN: {str(e)}")
else:
st.error("⚠️ Please enter patient name and date of birth first")
# Show generated MRN if available
if hasattr(st.session_state, 'generated_mrn') and st.session_state.generated_mrn:
medical_record_number = st.text_input(
"Generated MRN",
value=st.session_state.generated_mrn,
disabled=True,
help="Auto-generated MRN - you can edit the field above to override"
)
referring_physician = st.text_input("Referring Physician *", placeholder="Enter physician name")
date_of_study = st.date_input("Date of Study *", value=datetime.now().date())
st.markdown("### 🖼️ Medical Image")
st.markdown('
', unsafe_allow_html=True)
uploaded_file = st.file_uploader(
"Choose medical image",
type=['png', 'jpg', 'jpeg', 'dcm'],
help="Upload X-ray, CT, MRI, (Max 10MB)",
key="medical_image_upload"
)
st.markdown('
', unsafe_allow_html=True)
# Display uploaded image with modern styling (NO API CALLS)
if uploaded_file is not None:
try:
# Validate file size only (no API calls)
file_size = uploaded_file.size
if file_size > 10 * 1024 * 1024: # 10MB
st.error("⚠️ File size too large. Please upload an image smaller than 10MB.")
else:
# Display success message and file info
st.success(f"✅ File uploaded successfully: {uploaded_file.name}")
st.info(f"📁 File size: {file_size/1024:.1f} KB")
# Display image preview (local only, no backend calls)
if uploaded_file.type.startswith('image/'):
image = Image.open(uploaded_file)
st.image(image, caption="Uploaded Medical Image", use_column_width=True)
else:
st.info("📄 DICOM file uploaded successfully")
# Reset file pointer for later use
uploaded_file.seek(0)
except Exception as e:
st.error(f"❌ Error processing image: {str(e)}")
st.info("💡 Please ensure the file is a valid image format (PNG, JPG, JPEG, DCM)")
else:
st.info("👆 Please upload a medical image to begin analysis")
submit_button = st.form_submit_button("🔍 Generate Report", use_container_width=True)
if submit_button:
# Get the MRN (either generated or manually entered)
final_mrn = medical_record_number
if hasattr(st.session_state, 'generated_mrn') and st.session_state.generated_mrn and not medical_record_number:
final_mrn = st.session_state.generated_mrn
# Validation (MRN is now optional as it can be auto-generated)
if not all([name, date_of_birth, gender, referring_physician, uploaded_file]):
st.error("⚠️ Please fill all required fields and upload an image")
else:
generate_report(name, str(date_of_birth), gender, final_mrn,
referring_physician, str(date_of_study), uploaded_file)
st.markdown('
', unsafe_allow_html=True)
def generate_report(name, date_of_birth, gender, medical_record_number,
referring_physician, date_of_study, uploaded_file):
"""Generate report by sending data to backend and displaying the result"""
try:
if uploaded_file is not None:
with st.spinner("🔄 Analyzing medical image..."):
# Validate file before sending
if uploaded_file.size > 10 * 1024 * 1024: # 10MB limit
st.error("⚠️ File size too large. Please upload an image smaller than 10MB.")
return
# Reset file pointer to beginning
uploaded_file.seek(0)
# Prepare file for upload
files = {"file": (uploaded_file.name, uploaded_file.getvalue(), uploaded_file.type)}
try:
# Call analyze endpoint
analyze_response = requests.post(
f"{FASTAPI_BASE_URL}/analyze",
files=files,
timeout=60 # 60 second timeout
)
if analyze_response.status_code == 200:
response_data = analyze_response.json()
# Handle successful response
if response_data.get("success"):
findings = response_data.get("analysis", "Analysis completed")
# Parse thinking and answer sections
thinking_text = ""
report_text = findings
if "thinking:" in findings and "answer:" in findings:
parts = findings.split("answer:", 1)
thinking_text = parts[0].replace("thinking:", "").strip()
report_text = parts[1].strip()
st.success("✅ Analysis completed successfully!")
stream_response(thinking_text, report_text, {
"name": name,
"medical_record_number": medical_record_number,
"referring_physician": referring_physician,
"date_of_study": date_of_study
})
else:
# Handle failed analysis with fallback
st.warning("⚠️ AI analysis encountered issues, using fallback analysis")
fallback_findings = response_data.get("analysis", "Medical image processed. Professional review recommended.")
stream_response("Fallback analysis used", fallback_findings, {
"name": name,
"medical_record_number": medical_record_number,
"referring_physician": referring_physician,
"date_of_study": date_of_study
})
findings = fallback_findings
else:
st.error(f"❌ Backend Error (Status {analyze_response.status_code})")
st.error(f"Response: {analyze_response.text}")
return
except requests.exceptions.Timeout:
st.error("⏱️ Request timed out. The analysis is taking too long. Please try again.")
return
except requests.exceptions.ConnectionError:
st.error("🔌 Cannot connect to backend service.")
st.error("Please ensure the backend is running at: http://localhost:8001")
st.info("💡 To start the backend, run: `cd backend && python service.py`")
return
except requests.exceptions.RequestException as e:
st.error(f"❌ Request failed: {str(e)}")
return
# Generate PDF report
with st.spinner("📄 Generating PDF report..."):
try:
pdf_response = requests.post(
f"{FASTAPI_BASE_URL}/generate-report/",
data={
"patient_name": name,
"medical_record_number": medical_record_number or "AUTO-GENERATED",
"date_of_birth": date_of_birth,
"gender": gender,
"referring_physician": referring_physician,
"study_date": date_of_study,
"findings": findings
},
timeout=30
)
if pdf_response.status_code == 200:
pdf_data = pdf_response.json()
if pdf_data.get("success"):
pdf_url = f"{FASTAPI_BASE_URL}/reports/{name.replace(' ', '_')}/pdf"
st.markdown(f'''
''', unsafe_allow_html=True)
else:
st.error(f"❌ Failed to generate PDF: {pdf_data.get('message', 'Unknown error')}")
else:
st.error(f"❌ PDF generation failed with status {pdf_response.status_code}")
except requests.exceptions.Timeout:
st.error("⏱️ PDF generation timed out. Please try again.")
except Exception as e:
st.error(f"❌ Error generating PDF: {str(e)}")
except Exception as e:
st.error(f"❌ Unexpected error: {str(e)}")
st.info("💡 Troubleshooting tips:")
st.info(" • Ensure backend service is running on port 8001")
st.info(" • Check that the uploaded file is a valid image")
st.info(" • Try refreshing the page and uploading again")
def view_reports_page():
st.markdown('', unsafe_allow_html=True)
# Search section with modern styling
st.markdown('', unsafe_allow_html=True)
col1, col2 = st.columns([3, 1])
with col1:
patient_mrn = st.text_input("🔍 Search by MRN",
placeholder="Enter Medical Record Number (e.g., MV-20250618-A1B2)")
with col2:
st.markdown("
", unsafe_allow_html=True) # Add spacing
search_button = st.button("🔍 Search", use_container_width=True)
st.markdown('
', unsafe_allow_html=True)
# Action buttons
col1, col2 = st.columns(2)
with col1:
if search_button and patient_mrn:
search_reports_by_mrn(patient_mrn)
with col2:
if st.button("📋 Show All Patients", use_container_width=True):
show_all_patients()
def search_reports_by_mrn(mrn):
"""Search for reports by MRN with modern styling"""
with st.spinner(f"🔍 Searching for patient with MRN: {mrn}..."):
try:
response = requests.get(f"{FASTAPI_BASE_URL}/patient/{mrn}", timeout=10)
if response.status_code == 200:
patient_data = response.json()
st.markdown(f"""
✅ Patient Found: {patient_data.get('patient_name', 'N/A')}
MRN: {patient_data.get('mrn', 'N/A')}
""", unsafe_allow_html=True)
# Display patient details
display_patient_details(patient_data)
elif response.status_code == 404:
st.markdown(f"""
⚠️ No Patient Found
No patient record found with MRN: {mrn}
Please check the MRN and try again.
""", unsafe_allow_html=True)
else:
st.error(f"❌ Error searching patient: HTTP {response.status_code}")
except requests.exceptions.ConnectionError:
st.error("❌ Cannot connect to the backend server. Please make sure the FastAPI server is running.")
except requests.exceptions.Timeout:
st.error("❌ Search request timed out. Please try again.")
except Exception as e:
st.error(f"❌ An error occurred: {str(e)}")
def show_all_patients():
"""Show all patients in the system"""
with st.spinner("📋 Loading all patient records..."):
try:
response = requests.get(f"{FASTAPI_BASE_URL}/patients", timeout=15)
if response.status_code == 200:
data = response.json()
patients = data.get('patients', [])
if patients:
st.markdown(f"""
📊 Found {len(patients)} Patient Records
""", unsafe_allow_html=True)
# Display patients in a table format
for patient in patients:
display_patient_summary(patient)
else:
st.info("📋 No patient records found in the system yet.")
else:
st.error(f"❌ Error retrieving patients: {response.text}")
except requests.exceptions.ConnectionError:
st.error("❌ Cannot connect to the backend server. Please make sure the FastAPI server is running.")
except Exception as e:
st.error(f"❌ An error occurred: {str(e)}")
def display_patient_details(patient_data):
"""Display detailed patient information"""
st.markdown('', unsafe_allow_html=True)
st.markdown("### 👤 Patient Details")
col1, col2 = st.columns(2)
with col1:
st.markdown(f"**Name:** {patient_data.get('patient_name', 'N/A')}")
st.markdown(f"**MRN:** {patient_data.get('mrn', 'N/A')}")
st.markdown(f"**Date of Birth:** {patient_data.get('date_of_birth', 'N/A')}")
st.markdown(f"**Gender:** {patient_data.get('gender', 'N/A')}")
with col2:
st.markdown(f"**Referring Physician:** Dr. {patient_data.get('referring_physician', 'N/A')}")
st.markdown(f"**Study Date:** {patient_data.get('date_of_study', 'N/A')}")
st.markdown(f"**Report Date:** {patient_data.get('report_date', 'N/A')}")
st.markdown(f"**Status:** {patient_data.get('status', 'N/A').title()}")
# Display findings
if patient_data.get('findings'):
st.markdown("### 🔍 Findings")
findings = patient_data.get('findings', '')
# Process findings to show only the answer part
if "answer:" in findings.lower():
parts = findings.split("answer:", 1)
if len(parts) == 2:
findings_display = parts[1].strip()
else:
findings_display = findings
else:
findings_display = findings
st.markdown(findings_display)
# PDF download link
patient_name = patient_data.get('patient_name', 'Unknown')
pdf_url = f"{FASTAPI_BASE_URL}/reports/{patient_name}/pdf"
st.markdown(f"""
📄 Download PDF Report
""", unsafe_allow_html=True)
st.markdown('
', unsafe_allow_html=True)
def display_patient_summary(patient_data):
"""Display patient summary in a card format"""
st.markdown('', unsafe_allow_html=True)
col1, col2, col3 = st.columns([2, 2, 1])
with col1:
st.markdown(f"**{patient_data.get('patient_name', 'N/A')}**")
st.markdown(f"MRN: {patient_data.get('mrn', 'N/A')}")
with col2:
st.markdown(f"Study: {patient_data.get('date_of_study', 'N/A')}")
st.markdown(f"Physician: Dr. {patient_data.get('referring_physician', 'N/A')}")
with col3:
# Search button for this specific patient
if st.button(f"🔍 View", key=f"view_{patient_data.get('mrn', 'unknown')}"):
search_reports_by_mrn(patient_data.get('mrn', ''))
st.markdown('
', unsafe_allow_html=True)
# Remove old search functions - they are replaced by MRN-based search functions above
def display_report_details(report):
"""Display detailed report information with modern cards"""
st.markdown('', unsafe_allow_html=True)
col1, col2 = st.columns(2)
with col1:
st.markdown("**👤 معلومات المريض - Patient Information:**")
st.markdown(f"""
- **الاسم - Name:** {report.get('name', 'غير متاح - N/A')}
- **تاريخ الميلاد - DOB:** {report.get('date_of_birth', 'غير متاح - N/A')}
- **الجنس - Gender:** {report.get('gender', 'غير متاح - N/A')}
- **رقم السجل - MRN:** {report.get('medical_record_number', 'غير متاح - N/A')}
""")
with col2:
st.markdown("**🏥 معلومات الفحص - Study Information:**")
st.markdown(f"""
- **الطبيب المحيل - Physician:** {report.get('referring_physician', 'غير متاح - N/A')}
- **تاريخ الفحص - Study Date:** {report.get('date_of_study', 'غير متاح - N/A')}
- **النتائج - Findings:** {report.get('findings', 'غير متاح - N/A')}
""")
# PDF download button with modern styling (always use backend endpoint)
if 'name' in report:
pdf_url = f"{FASTAPI_BASE_URL}/reports/{report.get('name')}/pdf"
st.markdown(f"""
""", unsafe_allow_html=True)
st.markdown('
', unsafe_allow_html=True)
def show_all_reports():
"""Show all reports using the FastAPI backend"""
with st.spinner("📋 Loading all reports..."):
try:
response = requests.get(f"{REPORTS_ENDPOINT}/")
if response.status_code == 200:
data = response.json()
reports = data.get('reports', [])
count = data.get('count', 0)
if reports:
st.success(f"✅ Found {count} total report(s) in the system")
# Create a summary table
if reports:
df_data = []
for report in reports:
df_data.append({
'Patient Name': report.get('name', 'N/A'),
'Date of Birth': report.get('date_of_birth', 'N/A'),
'Gender': report.get('gender', 'N/A'),
'Study Date': report.get('date_of_study', 'N/A'),
'Physician': report.get('referring_physician', 'N/A')
})
df = pd.DataFrame(df_data)
st.dataframe(df, use_container_width=True)
# Show detailed reports
st.subheader("Detailed Reports")
for i, report in enumerate(reports):
with st.expander(f"{report.get('name', 'Unknown')} - {report.get('date_of_study', 'Unknown Date')}"):
display_report_details(report)
else:
st.info("📋 No reports found in the system yet.")
else:
st.error(f"❌ Error retrieving reports: {response.text}")
except requests.exceptions.ConnectionError:
st.error("❌ Cannot connect to the backend server. Please make sure the FastAPI server is running.")
except Exception as e:
st.error(f"❌ An error occurred: {str(e)}")
def system_status_page():
"""System status and connectivity check page"""
st.markdown('', unsafe_allow_html=True)
st.markdown('', unsafe_allow_html=True)
st.markdown("### 🔍 Backend Service Status")
col1, col2 = st.columns([3, 1])
with col2:
if st.button("🔄 Check Status", use_container_width=True):
check_backend_status()
with col1:
st.info("Click 'Check Status' to test backend connectivity")
st.markdown('
', unsafe_allow_html=True)
# Show system information
st.markdown('', unsafe_allow_html=True)
st.markdown("### 📊 System Information")
col1, col2 = st.columns(2)
with col1:
st.metric("Frontend", "Streamlit", "Running ✅")
st.metric("Backend URL", FASTAPI_BASE_URL, "Configured")
with col2:
st.metric("Upload Endpoint", "/analyze/", "Ready")
st.metric("Reports Endpoint", "/reports", "Ready")
st.markdown('
', unsafe_allow_html=True)
def check_backend_status():
"""Check if backend service is running and accessible"""
try:
with st.spinner("Checking backend connectivity..."):
# Test health endpoint
response = requests.get(f"{FASTAPI_BASE_URL}/health", timeout=10)
if response.status_code == 200:
data = response.json()
st.success(f"✅ Backend service is healthy!")
st.json(data)
else:
st.error(f"❌ Backend returned status {response.status_code}")
except requests.exceptions.ConnectionError:
st.error("❌ Cannot connect to backend service")
st.info("💡 Make sure the backend is running:")
st.code("cd backend && python service.py --serve")
except requests.exceptions.Timeout:
st.error("❌ Backend request timed out")
except Exception as e:
st.error(f"❌ Unexpected error: {str(e)}")
def stream_response(thinking_text, response_text, patient_info):
"""
Streams the AI response with proper formatting and bullet points.
"""
# --- Helper function for formatting ---
def format_text_to_html(text):
"""Converts plain text with newlines and list-like structures to HTML, keeping section headers as normal text."""
section_headers = [
"Initial Assessment:",
"Key Findings:",
"Clinical Significance:",
"Impression:",
"Conclusion:",
"Recommendations:",
"Technical Details:",
"Comparison:",
"History:",
"Exam Type:",
"Findings:",
"Clinical History:",
"Technique:",
"Indication:",
"Result:",
"Results:",
"Summary:",
"Assessment:",
"Plan:",
"Follow-up:",
"Brief Structured Report:",
"Report:",
"Diagnosis:",
"Opinion:"
]
text = html.escape(text)
lines = text.split('\n')
html_lines = []
in_list = False
for line in lines:
stripped_line = line.strip()
# Remove leading bullets/numbers for header checking
clean_line = re.sub(r'^[\d+\.]*[\-\*\•]?\s*', '', stripped_line)
# Also try removing just the number prefix for numbered headers
clean_line_no_number = re.sub(r'^\d+\.?\s*', '', stripped_line)
# Check if this is a section header (case-insensitive, flexible matching)
is_section_header = any(
clean_line.lower().startswith(header.lower()) or
clean_line.lower() == header.lower().rstrip(':') or
clean_line_no_number.lower().startswith(header.lower()) or
clean_line_no_number.lower() == header.lower().rstrip(':') or
# Handle cases like "EXAM TYPE:" matching "Exam Type:"
clean_line.lower().replace(' ', '').startswith(header.lower().replace(' ', '').rstrip(':')) or
clean_line_no_number.lower().replace(' ', '').startswith(header.lower().replace(' ', '').rstrip(':'))
for header in section_headers
)
if is_section_header:
# Close any open list
if in_list:
html_lines.append('' if 'ol>' in str(html_lines[-3:]) else '')
in_list = False
# Use the version without numbers/bullets for display
display_text = clean_line_no_number if clean_line_no_number.lower().endswith(':') else clean_line
html_lines.append(f'{display_text}
')
elif re.match(r'^[\-\*\•]\s', stripped_line) and not is_section_header:
# This is a bullet point (not a section header)
if not in_list:
html_lines.append('')
in_list = True
item_text = re.sub(r'^[\-\*\•]\s*', '', stripped_line)
html_lines.append(f'- {item_text}
')
elif re.match(r'^\d+\.?\s', stripped_line) and not is_section_header:
# This is a numbered list item (not a section header)
if not in_list:
html_lines.append('') # Convert numbered lists to bullet lists for consistency
in_list = True
item_text = re.sub(r'^\d+\.?\s*', '', stripped_line)
html_lines.append(f'- {item_text}
')
else:
# Regular text line
if in_list:
html_lines.append('
')
in_list = False
if stripped_line:
html_lines.append(f'{line}
')
else:
html_lines.append('
') # Preserve empty lines as breaks
# Close any remaining open list
if in_list:
html_lines.append('
')
return ''.join(html_lines)
# --- Thinking Process ---
thinking_container = st.empty()
thinking_container.markdown("""
""", unsafe_allow_html=True)
st.markdown('🤔 AI Analysis Process:
', unsafe_allow_html=True)
thinking_output = st.empty()
thinking_text_container = ""
for char in thinking_text:
thinking_text_container += char
formatted_thinking = format_text_to_html(thinking_text_container)
thinking_output.markdown(f'{formatted_thinking}
', unsafe_allow_html=True)
time.sleep(0.005)
thinking_container.empty()
# --- Define Report Components ---
current_date = datetime.now().strftime("%B %d, %Y")
current_time = datetime.now().strftime("%I:%M %p")
report_header = (
''
)
report_footer = (
''
)
# --- Stream the response ---
report_container = st.empty()
response_text_accumulated = ""
for char in response_text:
response_text_accumulated += char
formatted_content = format_text_to_html(response_text_accumulated)
content_html = (
''
f'{formatted_content}'
'
'
)
full_report_so_far = f'{report_header}{content_html}
'
report_container.markdown(full_report_so_far, unsafe_allow_html=True)
time.sleep(0.01)
# Final report with footer
final_formatted_content = format_text_to_html(response_text_accumulated)
final_content_html = (
''
f'{final_formatted_content}'
'
'
)
final_full_report = f'{report_header}{final_content_html}{report_footer}
'
report_container.markdown(final_full_report, unsafe_allow_html=True)
if __name__ == "__main__":
main()