import gradio as gr import pandas as pd import asyncio import gspread from google.oauth2.service_account import Credentials from gradio.themes.base import Base import os import google.generativeai as genai from PIL import Image import traceback import time import re import json # Added for loading credentials from a string # --- PDF Generation Imports --- from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as RLImage, PageBreak from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import inch from reportlab.lib.colors import navy, black, dimgrey from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY # ============================================================================== # 1. AUTHENTICATION & CONFIGURATION # ============================================================================== MAX_IMAGES = 5 SHEET_COLUMN_MAP = { "image1_summary": "J", "image2_summary": "K", "image3_summary": "L", "image4_summary": "M", "image5_summary": "N", "executive_summary": "O" } # --- Google Sheets --- # MODIFIED FOR HUGGING FACE DEPLOYMENT is_sheets_authenticated = False try: # Get the JSON credentials content from the Hugging Face Secret (environment variable) creds_json_str = os.getenv("GOOGLE_SHEETS_CREDS_JSON") if not creds_json_str: raise ValueError("GOOGLE_SHEETS_CREDS_JSON secret not found. App will use offline fallback data.") # Parse the JSON string into a dictionary creds_info = json.loads(creds_json_str) SCOPES = ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"] # Authenticate from the dictionary info instead of a file creds = Credentials.from_service_account_info(creds_info, scopes=SCOPES) gc = gspread.authorize(creds) sh = gc.open("PatientData") # Make sure your sheet is named "PatientData" ws = sh.get_worksheet(0) print("✅ Google Sheets authenticated successfully.") is_sheets_authenticated = True except Exception as e: print(f"⚠️ Could not authenticate with Google Sheets: {e}. Using offline fallback data.") ws = pd.DataFrame({ "abha_id": ["12345678901233"], "full_name": ["Pashwiwi Sharma"], "Age": [22], "weight_kg": ["64"], "reason_for_visit": ["Allergy on right hand..."], "allergies": ["Pollen"], "Medication": ["None"], "symptoms_description": ["Unsure of cause..."], "Summary": ["Patient presents with an acute allergic reaction..."] }) # --- Gemini API --- # MODIFIED FOR HUGGING FACE DEPLOYMENT is_gemini_configured = False try: # Get the API key from the Hugging Face Secret (environment variable) GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") if not GOOGLE_API_KEY: raise ValueError("GOOGLE_API_KEY secret not found. AI features will be disabled.") genai.configure(api_key=GOOGLE_API_KEY) gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest') print("✅ Gemini API configured successfully.") is_gemini_configured = True except Exception as e: print(f"⚠️ Could not configure Gemini API: {e}. AI features will be disabled.") gemini_model = None # ============================================================================== # 2. SYSTEM PROMPTS # ============================================================================== SYSTEM_PROMPT_IMAGE_ANALYSIS = """ You are a highly skilled medical imaging expert AI. Analyze the provided medical image, prescription, or report and structure your response according to the following points using clear markdown formatting. 0. **Report Information** (if applicable): Doctor/Clinic Name, Date, Hospital/Facility, Patient Details (Age, Sex, etc.) if visible. 1. **Image Type & Region**: Modality (X-ray, MRI, CT, Ultrasound, Photo, etc.), anatomical region, and positioning. 2. **Key Findings**: Systematically list primary observations and potential abnormalities with detailed descriptions. 3. **Diagnostic Assessment**: Provide a primary assessment or impression. List differential diagnoses if applicable. Highlight any critical/urgent findings. 4. **Patient-Friendly Explanation**: Simplify the findings in clear, non-technical language. --- ***Disclaimer:** This AI-generated analysis is for informational purposes only and is NOT a substitute for professional medical advice, diagnosis, or treatment. A qualified healthcare professional must perform the final interpretation.* """ SYSTEM_PROMPT_DETAILED_REPORT = """You are an expert medical scribe AI. Your task is to create a single, comprehensive, and data-rich patient report by synthesizing all the provided information. **Your Goal:** Weave the patient's demographics, their past medical summary, the reason for their current visit, and the findings from new medical images into a cohesive and professional narrative. **Required Structure:** Generate the report in Markdown format using the exact following headings: ### Patient Information (Summarize the patient's key demographic details: ABHA ID, Name, Age, Weight.) ### Medical History & Previous Summary (Detail the patient's known allergies, current medications, and the summary from their previous visits. This provides historical context.) ### Current Visit Details (Describe the primary reason for the current visit and the specific symptoms the patient is experiencing now.) ### Comprehensive Image Analysis (Integrate the findings from all the provided image analyses. For each image, present its key findings and diagnostic assessment in a clear, organized manner. If there are multiple images, address each one.) ### Overall Synthesis & Impression (This is the most important section. Provide a concise, professional synthesis that connects the dots. Correlate the patient's history and current symptoms with the new findings from the image analysis. Formulate a concluding impression based on the totality of the information.) """ # ============================================================================== # 3. PDF GENERATION ENGINE # ============================================================================== def create_report_pdf(markdown_text, image_paths, image_analyses): try: pdf_path = f"temp_report_{int(time.time())}.pdf" doc = SimpleDocTemplate(pdf_path, pagesize=(8.5 * inch, 11 * inch), topMargin=0.75*inch, bottomMargin=0.75*inch, leftMargin=0.75*inch, rightMargin=0.75*inch) styles = getSampleStyleSheet() styles.add(ParagraphStyle(name='TitleStyle', fontName='Helvetica-Bold', fontSize=18, alignment=TA_CENTER, textColor=navy, spaceAfter=24)) styles.add(ParagraphStyle(name='HeadingStyle', fontName='Helvetica-Bold', fontSize=14, textColor=navy, spaceBefore=12, spaceAfter=6)) styles.add(ParagraphStyle(name='Justify', parent=styles['Normal'], alignment=TA_JUSTIFY)) styles.add(ParagraphStyle(name='BulletStyle', parent=styles['Justify'], leftIndent=20, spaceAfter=4)) styles.add(ParagraphStyle(name='ImageTitle', parent=styles['Normal'], alignment=TA_CENTER, spaceBefore=18, spaceAfter=4, fontName='Helvetica-Bold')) styles.add(ParagraphStyle(name='ImageCaption', parent=styles['Normal'], alignment=TA_CENTER, spaceAfter=12, fontName='Helvetica-Oblique', textColor=dimgrey, fontSize=9)) story = [Paragraph("Comprehensive Medical Report", styles['TitleStyle'])] for line in markdown_text.split('\n'): line = line.strip() if not line: continue line = re.sub(r'\*\*(.*?)\*\*', r'\1', line) if line.startswith('### '): story.append(Paragraph(line.replace('### ', ''), styles['HeadingStyle'])) elif line.startswith('* '): story.append(Paragraph(f"• {line.replace('* ', '', 1)}", styles['BulletStyle'])) else: story.append(Paragraph(line, styles['Justify'])) if image_paths and image_analyses: story.append(PageBreak()) story.append(Paragraph("Appendix: Medical Images & Findings", styles['HeadingStyle'])) for i, img_path in enumerate(image_paths): if i < len(image_analyses): analysis_text = image_analyses[i] caption_text = "No specific assessment found." assessment_match = re.search(r"3\.\s*\*\*Diagnostic Assessment\*\*\n(.*?)(?=\n\n|\n4\.|\Z)", analysis_text, re.DOTALL | re.IGNORECASE) if assessment_match: caption_text = assessment_match.group(1).strip() else: findings_match = re.search(r"2\.\s*\*\*Key Findings\*\*\n(.*?)(?=\n\n|\n3\.|\Z)", analysis_text, re.DOTALL | re.IGNORECASE) if findings_match: caption_text = findings_match.group(1).strip() story.append(Paragraph(f"Image {i+1}", styles['ImageTitle'])) story.append(Paragraph(f"Summary: {caption_text}", styles['ImageCaption'])) try: img = RLImage(img_path, width=5.5*inch, height=5.5*inch, kind='proportional') img.hAlign = 'CENTER' story.append(img) except Exception as img_e: traceback.print_exc() story.append(Paragraph(f"Error: Could not display image {i+1}.", styles['Normal'])) doc.build(story) return pdf_path except Exception as e: traceback.print_exc() return None # ============================================================================== # 4. CORE LOGIC # ============================================================================== async def update_google_sheet(abha_id, report_text, *image_analyses): if not is_sheets_authenticated: gr.Warning("Google Sheets not authenticated. Skipping database update.") return "Could not update Sheet: Authentication failed." try: print(f"Attempting to update Google Sheet for ABHA ID: {abha_id}") cell = ws.find(abha_id, in_column=1) # Search only in the first column for efficiency if not cell: gr.Warning(f"ABHA ID {abha_id} not found in Sheet. Skipping database update.") return f"Could not update Sheet: ABHA ID {abha_id} not found." row_number = cell.row updates_to_make = [] # Prepare image analysis updates for i, analysis in enumerate(image_analyses): if i >= MAX_IMAGES: break col_name = f"image{i+1}_summary" if analysis and "Pending" not in analysis and "Failed" not in analysis: col_letter = SHEET_COLUMN_MAP[col_name] updates_to_make.append({'range': f'{col_letter}{row_number}', 'values': [[analysis]]}) # Prepare executive summary update col_letter = SHEET_COLUMN_MAP["executive_summary"] updates_to_make.append({'range': f'{col_letter}{row_number}', 'values': [[report_text]]}) if updates_to_make: ws.batch_update(updates_to_make) print(f"✅ Successfully updated row {row_number} for ABHA ID: {abha_id}") return "✅ Database update complete." else: return "No new data to update in the database." except Exception as e: print(f"❌ FAILED to update Google Sheet: {e}") traceback.print_exc() gr.Error(f"Failed to update Google Sheet: {e}") return "❌ Database update failed. See console for details." async def fetch_patient_data(abha_id): placeholder_demographics = "*Patient details will appear here.*" placeholder_summary = "*Patient history will appear here.*" if not abha_id: return placeholder_demographics, placeholder_summary try: if is_sheets_authenticated: # ROBUST METHOD: Use get_all_values() to avoid header errors. all_values = ws.get_all_values() if len(all_values) < 2: return "Spreadsheet has no data records.", "" # Manually construct the DataFrame headers = all_values[0] data = all_values[1:] df = pd.DataFrame(data, columns=headers) else: df = ws df["abha_id"] = df["abha_id"].astype(str).str.strip() row = df[df["abha_id"] == abha_id.strip()] if row.empty: return f"**Status:** No record found for ABHA ID: `{abha_id}`", "" record = row.iloc[0] patient_info_md = f""" **ABHA ID:** {record.get('abha_id', 'N/A')} **Name:** {record.get('full_name', 'N/A')} **Age:** {record.get('Age', 'N/A')} **Weight:** {record.get('weight_kg', 'N/A')} kg --- **Reason for Visit:** {record.get('reason_for_visit', 'N/A')} **Symptoms:** {record.get('symptoms_description', 'N/A')} """ summary_text = f""" **Known Allergies:** {record.get('allergies', 'N/A')} **Current Medications:** {record.get('Medication', 'N/A')} --- **Previous Visit Summary:** {record.get('Summary', 'No previous summary available.')} """ return patient_info_md.strip(), summary_text.strip() except Exception as e: traceback.print_exc() if "GSpreadException" in str(e): return ("**Error:** Could not read the spreadsheet. Please ensure the first row has unique, non-empty headers for all columns.", "") return f"**Error:** An error occurred while fetching data: {e}", "" async def analyze_images_on_upload(files): gallery_update = gr.update(value=None, visible=False) row_updates = [gr.update(visible=False)] * MAX_IMAGES image_updates = [gr.update(value=None)] * MAX_IMAGES markdown_updates = [gr.update(value="")] * MAX_IMAGES if not files: yield (gallery_update, *row_updates, *image_updates, *markdown_updates) return if len(files) > MAX_IMAGES: gr.Warning(f"Max {MAX_IMAGES} images allowed. Analyzing the first {MAX_IMAGES}.") files = files[:MAX_IMAGES] filepaths = [f.name for f in files] gallery_update = gr.update(value=filepaths, visible=True) for i in range(MAX_IMAGES): if i < len(files): row_updates[i] = gr.update(visible=True) image_updates[i] = gr.update(value=filepaths[i]) markdown_updates[i] = gr.update(value="⌛ Pending analysis...") else: row_updates[i] = gr.update(visible=False) image_updates[i] = gr.update(value=None) markdown_updates[i] = gr.update(value="") yield (gallery_update, *row_updates, *image_updates, *markdown_updates) if not is_gemini_configured: for i in range(len(files)): markdown_updates[i] = gr.update(value="### Analysis Disabled\nGemini API not configured.") yield (gallery_update, *row_updates, *image_updates, *markdown_updates) return for i in range(len(files)): markdown_updates[i] = gr.update(value=f"⏳ Analyzing Image {i+1}...") yield (gallery_update, *row_updates, *image_updates, *markdown_updates) try: img = Image.open(filepaths[i]) response = await gemini_model.generate_content_async( [SYSTEM_PROMPT_IMAGE_ANALYSIS, img], generation_config=genai.GenerationConfig(temperature=0.1) ) markdown_updates[i] = gr.update(value=response.text) except Exception as e: traceback.print_exc() markdown_updates[i] = gr.update(value=f"### Analysis Failed\nAn error occurred: {e}") yield (gallery_update, *row_updates, *image_updates, *markdown_updates) async def generate_detailed_report(abha_id, uploaded_files, *image_analyses): yield "⏳ Generating report...", gr.update(visible=False), gr.update(visible=False, value="") if not is_gemini_configured: yield "### Report Generation Disabled\nGemini API not configured.", gr.update(visible=False), gr.update(visible=False, value="") return patient_info, visit_summary = await fetch_patient_data(abha_id) if "No record found" in patient_info or "Error:" in patient_info: yield "### Report Generation Failed\nPlease fetch a valid patient record first.", gr.update(visible=False), gr.update(visible=False, value="") return prompt_context = "Here is all the available information for a patient...\n" prompt_context += f"## PATIENT DETAILS & CURRENT VISIT INFO:\n{patient_info}\n\n" prompt_context += f"## PAST MEDICAL SUMMARY:\n{visit_summary}\n\n" analysis_texts = [text for text in image_analyses if text and "Pending" not in text and "Failed" not in text] if analysis_texts: prompt_context += "## NEW IMAGE ANALYSIS FINDINGS:\n" for i, text in enumerate(analysis_texts): prompt_context += f"### Analysis of Image {i+1}\n{text}\n\n" else: prompt_context += "## NEW IMAGE ANALYSIS FINDINGS:\nNo successful image analyses were performed.\n\n" final_prompt = [SYSTEM_PROMPT_DETAILED_REPORT, prompt_context] try: response = await gemini_model.generate_content_async(final_prompt, generation_config=genai.GenerationConfig(temperature=0.4)) markdown_report = response.text valid_image_paths = [f.name for f in uploaded_files[:MAX_IMAGES]] if uploaded_files else [] pdf_path = create_report_pdf(markdown_report, valid_image_paths, analysis_texts) pdf_update = gr.update(value=pdf_path, visible=True) if pdf_path else gr.update(visible=False) yield markdown_report, pdf_update, gr.update(visible=True, value="🔄 Updating database...") status_message = await update_google_sheet(abha_id, markdown_report, *analysis_texts) yield markdown_report, pdf_update, gr.update(visible=True, value=status_message) await asyncio.sleep(3) yield markdown_report, pdf_update, gr.update(visible=False) except Exception as e: traceback.print_exc() yield f"### Report Generation Failed\nAn error occurred: {e}", gr.update(visible=False), gr.update(visible=False) # ============================================================================== # 5. GRADIO UI LAYOUT # ============================================================================== with gr.Blocks(theme=Base(), title="Advanced Medical Report Generator") as app: gr.Markdown("# Advanced Medical Report Generator") with gr.Row(): abha_id_input = gr.Textbox(label="Enter Patient ABHA ID", scale=3) fetch_button = gr.Button("Fetch Patient Details", variant="primary", scale=1) with gr.Row(variant="panel"): with gr.Column(scale=1): with gr.Accordion("Patient Demographics & Current Visit", open=True): patient_info_output = gr.Markdown("*Patient details will appear here.*") with gr.Column(scale=1): with gr.Accordion("Medical History & Visit Summary", open=True): summary_output = gr.Markdown("*Patient history will appear here.*") gr.Markdown("---") gr.Markdown("### 1. Upload Scans & View AI Analysis") with gr.Column(variant="panel"): image_uploader = gr.File(label=f"Upload up to {MAX_IMAGES} images", file_count="multiple", file_types=["image"]) image_gallery = gr.Gallery(label="Image Preview", visible=False, columns=5, height="auto") analysis_rows, analysis_images, analysis_markdowns = [], [], [] for i in range(MAX_IMAGES): with gr.Row(visible=False, variant='panel') as row: with gr.Column(scale=1, min_width=200): img = gr.Image(interactive=False, show_label=False) with gr.Column(scale=2): md = gr.Markdown() analysis_rows.append(row) analysis_images.append(img) analysis_markdowns.append(md) gr.Markdown("---") gr.Markdown("### 2. Generate Final Synthesized Report") with gr.Column(variant='panel'): generate_report_button = gr.Button("Generate Detailed Report & Update Database", variant="primary") status_output = gr.Markdown(visible=False) gr.Markdown("#### Report Preview") report_preview_output = gr.Markdown("*Click the button above to generate a comprehensive, synthesized report.*") download_report_button = gr.File(label="Download Report (PDF)", visible=False) # ============================================================================== # 6. EVENT LISTENERS # ============================================================================== fetch_button.click( fn=fetch_patient_data, inputs=[abha_id_input], outputs=[patient_info_output, summary_output] ) image_uploader.change( fn=analyze_images_on_upload, inputs=[image_uploader], outputs=[image_gallery, *analysis_rows, *analysis_images, *analysis_markdowns] ) generate_report_button.click( fn=generate_detailed_report, inputs=[ abha_id_input, image_uploader, *analysis_markdowns ], outputs=[report_preview_output, download_report_button, status_output] ) # ============================================================================== # 7. LAUNCH APP # ============================================================================== if __name__ == "__main__": app.launch(debug=True)