import os import io import sys import json import base64 import certifi import requests from PIL import Image from bson import ObjectId from gridfs import GridFS from pymongo import MongoClient from fastapi import FastAPI, File, UploadFile, HTTPException, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse import uvicorn from reportlab.pdfgen import canvas from reportlab.lib.pagesizes import letter, A4 from reportlab.lib.styles import getSampleStyleSheet from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer from reportlab.lib.units import inch from datetime import datetime import hashlib import random import string API_URL = "https://n0bcq3t2emgrp7ip.us-east-1.aws.endpoints.huggingface.cloud" headers = { "Accept": "application/json", "Authorization": f"Bearer {os.getenv('HF_TOKEN')}", "Content-Type": "application/json" } reportsaved= [] # Disable verbose tracebacks sys.tracebacklimit = 0 # Configure Python path parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(parent_dir) def get_reports_with_empty_findings(): reportsaved = [] try: # Use environment variable for MongoDB connection string mongo_uri = os.getenv('MONGODB_URI', "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB ) client = MongoClient( mongo_uri, tlsCAFile=certifi.where(), serverSelectionTimeoutMS=5000 ) db = client.radiologyDB reports = [str(doc["_id"]) for doc in db.reports.find({"findings": ""})] reportsaved = reports # ✅ First do all processing (analyze, reconstruct, etc.) for report_id in reportsaved: analyze(report_id) # this includes image reconstruction and update # ✅ Then print JSON as final stdout response print(json.dumps({ "success": True, "count": len(reportsaved), "reports": reportsaved }), flush=True) except Exception as e: print(json.dumps({ "success": False, "error": str(e), "count": 0, "reports": [] }), flush=True) finally: if 'client' in locals(): client.close() def update_findings(report_id, new_findings_text): """Updates findings for a specific report""" try: mongo_uri = os.getenv('MONGODB_URI', "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB ) client = MongoClient( mongo_uri, tlsCAFile=certifi.where() ) db = client.radiologyDB result = db.reports.update_one( {"_id": ObjectId(report_id)}, {"$set": {"findings": new_findings_text.strip()}} ) return { "success": result.modified_count > 0, "modified_count": result.modified_count, "message": "Updated" if result.modified_count else "No document found" } except Exception as e: return { "success": False, "error": str(e), "message": f"Error: {str(e)}" } finally: if 'client' in locals(): client.close() def reconstruct_image_if_not_exists(report_id): try: mongo_uri = os.getenv('MONGODB_URI', "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB ) client = MongoClient( mongo_uri, tlsCAFile=certifi.where() ) db = client["radiologyDB"] fs = GridFS(db, collection="images") # ✅ Correct bucket report = db.reports.find_one({"_id": ObjectId(report_id)}) if not report: print(f"❌ Report not found for ID: {report_id}", file=sys.stderr) return if "images" not in report or len(report["images"]) == 0: print(f"❌ No image ID found in report: {report_id}", file=sys.stderr) return image_id = report["images"][0] # Assuming one image per report output_dir = os.path.join(os.path.dirname(__file__), "images") os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, f"{image_id}.jpg") # Check if image already exists if os.path.exists(output_path): print(f"✅ Image already exists: {output_path}", file=sys.stderr) return # Get the file from GridFS grid_out = fs.get(ObjectId(image_id)) with open(output_path, "wb") as f: f.write(grid_out.read()) print(f"✅ Reconstructed and saved image: {output_path}", file=sys.stderr) except Exception as e: print(f"❌ Error reconstructing image: {e}", file=sys.stderr) finally: if 'client' in locals(): client.close() def query(payload): response = requests.post(API_URL, headers=headers, json=payload) print(f"Hugging Face API Response: {response.text}", file=sys.stderr) return response.json() def analyze_image(image_path: str): """ Analyze medical image using Hugging Face API. Takes an image path and a prompt, then returns AI-generated findings. """ try: # Load the image using PIL image = Image.open(image_path) # Convert image to base64 string buffer = io.BytesIO() image.save(buffer, format="PNG") img_str = base64.b64encode(buffer.getvalue()).decode("utf-8") # Send to Hugging Face API output = query({ "inputs": img_str, }) return output except Exception as e: # Fallback response in case of API errors return f"Error analyzing image: {str(e)}. Using fallback analysis." def analyze(id): """ Analyze medical images using Hugging Face API for findings extraction. """ try: # First reconstruct the image reconstruct_image_if_not_exists(id) # Get the path to the reconstructed image image_path = os.path.join(os.path.dirname(__file__), "images", f"{id}.jpg") # Analyze the image using Hugging Face API findings = analyze_image(image_path) # If we got a valid response, update the findings if isinstance(findings, str): template_response = findings else: # Format the API response into a readable string (no prefix) template_response = json.dumps(findings, indent=2) # Update the findings in the database update_findings(id, template_response) return template_response except Exception as e: error_msg = f"Error during analysis: {str(e)}" print(error_msg, file=sys.stderr) return error_msg app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Mount static files directory # Mount static files directory - removed since using remote image # app.mount("/static", StaticFiles(directory=os.path.dirname(__file__)), name="static") @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "service": "radiology-backend"} @app.get("/") async def root(): """Root endpoint with service info""" return { "service": "MediVision Radiology Backend", "version": "1.0.0", "status": "running", "endpoints": { "analyze": "/analyze/", "health": "/health" } } @app.post("/analyze/") async def analyze_endpoint(file: UploadFile = File(...)): """ Accepts an uploaded image file and returns AI-generated findings. """ try: # Validate file type if not file.content_type.startswith('image/'): return {"error": "Invalid file type. Please upload an image file.", "findings": "File type error"} contents = await file.read() if len(contents) == 0: return {"error": "Empty file uploaded.", "findings": "Empty file error"} # Process image image = Image.open(io.BytesIO(contents)) # Convert to RGB if necessary if image.mode != 'RGB': image = image.convert('RGB') # Resize image if too large (optional optimization) max_size = (1024, 1024) if image.size[0] > max_size[0] or image.size[1] > max_size[1]: image.thumbnail(max_size, Image.Resampling.LANCZOS) buffer = io.BytesIO() image.save(buffer, format="PNG", optimize=True) img_str = base64.b64encode(buffer.getvalue()).decode("utf-8") # Get response from Hugging Face API print(f"Sending image to Hugging Face API (size: {len(img_str)} chars)", file=sys.stderr) output = query({"inputs": img_str}) # Enhanced response processing findings = "" if isinstance(output, dict): if "error" in output: findings = f"API Error: {output['error']}" elif "generated_text" in output: findings = output["generated_text"] else: findings = str(output) elif isinstance(output, list) and len(output) > 0: first_result = output[0] if isinstance(first_result, dict) and "generated_text" in first_result: findings = first_result["generated_text"] else: findings = str(first_result) elif isinstance(output, str): findings = output else: findings = f"Unexpected response format: {type(output)}" print(f"Analysis complete. Findings length: {len(findings)} chars", file=sys.stderr) # Return formatted findings return {"findings": findings, "status": "success"} except Exception as e: error_msg = f"Error in analyze_endpoint: {str(e)}" print(error_msg, file=sys.stderr) return {"error": error_msg, "findings": "Error analyzing image", "status": "error"} def generate_pdf_report(patient_name, patient_info, findings): """Generate a PDF report for the patient with optimized layout""" try: # Create reports directory if it doesn't exist reports_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "pdf_reports") os.makedirs(reports_dir, exist_ok=True) # Create filename safe_name = "".join(c for c in patient_name if c.isalnum() or c in (' ', '-', '_')).rstrip() pdf_filename = f"{safe_name}_report.pdf" pdf_path = os.path.join(reports_dir, pdf_filename) # Create PDF document with optimized margins doc = SimpleDocTemplate(pdf_path, pagesize=A4, topMargin=0.8*inch, bottomMargin=0.6*inch, leftMargin=0.8*inch, rightMargin=0.8*inch) styles = getSampleStyleSheet() story = [] # Custom styles for better space utilization compact_style = styles['Normal'].clone('CompactStyle') compact_style.fontSize = 10 compact_style.leading = 12 compact_style.spaceAfter = 6 header_style = styles['Heading1'].clone('CompactHeader') header_style.fontSize = 16 header_style.alignment = 1 # Center header_style.spaceAfter = 8 section_style = styles['Heading2'].clone('CompactSection') section_style.fontSize = 12 section_style.spaceAfter = 4 section_style.spaceBefore = 8 # Compact header story.append(Paragraph("🔬 MediVision Radiology Center", header_style)) story.append(Paragraph("AI-Powered Medical Imaging Analysis", compact_style)) story.append(Spacer(1, 0.2*inch)) # Patient Information in compact table format story.append(Paragraph("RADIOLOGY REPORT", section_style)) # Create a compact patient info section current_date = datetime.now().strftime("%m/%d/%Y") current_time = datetime.now().strftime("%H:%M") patient_info_compact = f""" Patient: {patient_info.get('name', 'N/A')}     MRN: {patient_info.get('medical_record_number', 'N/A')}     DOB: {patient_info.get('date_of_birth', 'N/A')}
Gender: {patient_info.get('gender', 'N/A')}     Study Date: {patient_info.get('date_of_study', current_date)}     Physician: Dr. {patient_info.get('referring_physician', 'N/A')}
Report Date: {current_date} {current_time}     Radiologist: MediVision AI """ story.append(Paragraph(patient_info_compact, compact_style)) story.append(Spacer(1, 0.15*inch)) # Process findings to remove duplicates and optimize content if "answer:" in findings.lower(): parts = findings.split("answer:", 1) if len(parts) == 2: findings_text = parts[1].strip() else: findings_text = findings else: findings_text = findings # Clean up findings - remove redundant phrases findings_text = findings_text.replace("Brief Structured Report:", "") findings_text = findings_text.replace("The chest X-ray is a useful tool for diagnosing conditions like pneumonia, lung cancer, or other lung diseases.", "") # Format findings with better structure story.append(Paragraph("FINDINGS & IMPRESSION:", section_style)) # Split findings into structured sections if possible lines = findings_text.strip().split('\n') formatted_findings = "" for line in lines: line = line.strip() if line: # Check if it's a numbered item or bullet point if line.startswith(('1.', '2.', '3.', '4.', '5.')): formatted_findings += f"{line}
" elif line.upper().startswith(('EXAM TYPE:', 'FINDINGS:', 'IMPRESSION:')): formatted_findings += f"{line}
" else: formatted_findings += f"{line}
" if not formatted_findings.strip(): formatted_findings = findings_text.replace('\n', '
') story.append(Paragraph(formatted_findings, compact_style)) story.append(Spacer(1, 0.15*inch)) # Compact footer with essential info only footer_text = f""" Generated by MediVision AI • {current_date} {current_time} • This AI-generated report requires radiologist review for clinical use. """ footer_style = compact_style.clone('FooterStyle') footer_style.fontSize = 8 footer_style.alignment = 1 # Center footer_style.textColor = 'grey' story.append(Paragraph(footer_text, footer_style)) # Build PDF doc.build(story) return pdf_path except Exception as e: print(f"Error generating PDF: {str(e)}", file=sys.stderr) return None @app.get("/generate-mrn/") async def generate_mrn_endpoint(patient_name: str, date_of_birth: str): """Generate MRN for a patient""" try: mrn = generate_mrn(patient_name, date_of_birth) return { "success": True, "mrn": mrn, "patient_name": patient_name, "date_of_birth": date_of_birth } except Exception as e: return { "success": False, "error": f"Error generating MRN: {str(e)}" } @app.get("/patient/{mrn}") async def get_patient_by_mrn(mrn: str): """Get patient information by MRN""" result = search_patient_by_mrn(mrn) if result["found"]: return result["patient"] else: raise HTTPException(status_code=404, detail=result.get("message", "Patient not found")) @app.get("/patients") async def list_patients(): """List all patients""" result = list_all_patients() return result @app.post("/generate-report/") async def generate_report_endpoint( file: UploadFile = File(...), patient_name: str = Form(...), medical_record_number: str = Form(""), date_of_birth: str = Form(""), gender: str = Form(""), referring_physician: str = Form(""), date_of_study: str = Form("") ): """Generate report with image analysis""" try: # First, analyze the image findings = "" if file: # Validate file type if not file.content_type.startswith('image/'): return {"error": "Invalid file type", "success": False} contents = await file.read() if len(contents) == 0: return {"error": "Empty file", "success": False} # Process image for analysis image = Image.open(io.BytesIO(contents)) if image.mode != 'RGB': image = image.convert('RGB') # Resize if too large max_size = (1024, 1024) if image.size[0] > max_size[0] or image.size[1] > max_size[1]: image.thumbnail(max_size, Image.Resampling.LANCZOS) buffer = io.BytesIO() image.save(buffer, format="PNG", optimize=True) img_str = base64.b64encode(buffer.getvalue()).decode("utf-8") # Get findings from Hugging Face API output = query({"inputs": img_str}) # Process findings if isinstance(output, dict): if "error" in output: findings = f"API Error: {output['error']}" elif "generated_text" in output: findings = output["generated_text"] else: findings = str(output) elif isinstance(output, list) and len(output) > 0: first_result = output[0] if isinstance(first_result, dict) and "generated_text" in first_result: findings = first_result["generated_text"] else: findings = str(first_result) elif isinstance(output, str): findings = output else: findings = f"Unexpected response format: {type(output)}" # Prepare patient info patient_info = { "name": patient_name, "medical_record_number": medical_record_number, "date_of_birth": date_of_birth, "gender": gender, "referring_physician": referring_physician, "date_of_study": date_of_study } # Generate MRN if not provided mrn = medical_record_number if not mrn or len(mrn) == 0: mrn = generate_mrn(patient_name, date_of_birth) # Save patient record to database save_result = save_patient_record(patient_info, findings, mrn) if not save_result.get("success", False): return { "success": False, "error": "Failed to save patient record" } # Generate PDF report pdf_path = generate_pdf_report(patient_name, patient_info, findings) if pdf_path and os.path.exists(pdf_path): return { "success": True, "message": "Report generated successfully", "pdf_filename": os.path.basename(pdf_path), "mrn": mrn, "findings": findings } else: return { "success": False, "error": "Failed to generate PDF report" } except Exception as e: return { "success": False, "error": f"Error generating report: {str(e)}" } @app.get("/reports/{patient_name}/pdf") async def download_pdf_report(patient_name: str): """Download PDF report for a patient""" try: reports_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "pdf_reports") safe_name = "".join(c for c in patient_name if c.isalnum() or c in (' ', '-', '_')).rstrip() pdf_filename = f"{safe_name}_report.pdf" pdf_path = os.path.join(reports_dir, pdf_filename) if os.path.exists(pdf_path): return FileResponse( path=pdf_path, filename=pdf_filename, media_type='application/pdf' ) else: raise HTTPException(status_code=404, detail=f"PDF report not found for patient: {patient_name}") except Exception as e: raise HTTPException(status_code=500, detail=f"Error retrieving PDF: {str(e)}") @app.get("/reports") async def list_reports(): """List all available PDF reports""" try: reports_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "pdf_reports") if not os.path.exists(reports_dir): return {"reports": []} pdf_files = [f for f in os.listdir(reports_dir) if f.endswith('.pdf')] reports = [] for pdf_file in pdf_files: patient_name = pdf_file.replace('_report.pdf', '').replace('_', ' ') pdf_path = os.path.join(reports_dir, pdf_file) file_stats = os.stat(pdf_path) reports.append({ "patient_name": patient_name, "filename": pdf_file, "created_date": datetime.fromtimestamp(file_stats.st_ctime).strftime('%Y-%m-%d %H:%M:%S'), "size": file_stats.st_size }) return {"reports": reports} except Exception as e: return {"error": f"Error listing reports: {str(e)}", "reports": []} def generate_mrn(patient_name, date_of_birth): """Generate a unique Medical Record Number (MRN) for a patient""" try: # Create a hash from patient name and DOB for consistency patient_data = f"{patient_name.lower().strip()}{date_of_birth}".encode('utf-8') hash_object = hashlib.md5(patient_data) hash_hex = hash_object.hexdigest() # Extract first 8 characters and convert to a medical record format # Format: MV-YYYYMMDD-XXXX (MV = MediVision, YYYY = year, MM = month, DD = day, XXXX = hash) current_date = datetime.now() date_part = current_date.strftime("%Y%m%d") hash_part = hash_hex[:4].upper() mrn = f"MV-{date_part}-{hash_part}" return mrn except Exception as e: # Fallback: Generate random MRN current_date = datetime.now().strftime("%Y%m%d") random_part = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4)) return f"MV-{current_date}-{random_part}" def save_patient_record(patient_info, findings, mrn): """Save patient record to database with MRN for future searches""" try: mongo_uri = os.getenv('MONGODB_URI', "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB ) client = MongoClient( mongo_uri, tlsCAFile=certifi.where(), serverSelectionTimeoutMS=5000 ) db = client.radiologyDB # Create patient record patient_record = { "mrn": mrn, "patient_name": patient_info.get("name", ""), "date_of_birth": patient_info.get("date_of_birth", ""), "gender": patient_info.get("gender", ""), "referring_physician": patient_info.get("referring_physician", ""), "date_of_study": patient_info.get("date_of_study", ""), "findings": findings, "report_date": datetime.now().isoformat(), "status": "completed" } # Check if patient already exists (by MRN) existing_patient = db.patient_reports.find_one({"mrn": mrn}) if existing_patient: # Update existing record result = db.patient_reports.update_one( {"mrn": mrn}, {"$set": patient_record} ) return {"action": "updated", "mrn": mrn, "success": True} else: # Insert new record result = db.patient_reports.insert_one(patient_record) return {"action": "created", "mrn": mrn, "success": True} except Exception as e: print(f"Error saving patient record: {str(e)}", file=sys.stderr) return {"action": "error", "error": str(e), "success": False} finally: if 'client' in locals(): client.close() def search_patient_by_mrn(mrn): """Search for patient records by MRN""" try: mongo_uri = os.getenv('MONGODB_URI', "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB ) client = MongoClient( mongo_uri, tlsCAFile=certifi.where(), serverSelectionTimeoutMS=5000 ) db = client.radiologyDB # Search for patient record patient_record = db.patient_reports.find_one({"mrn": mrn}) if patient_record: # Convert ObjectId to string for JSON serialization patient_record["_id"] = str(patient_record["_id"]) return {"found": True, "patient": patient_record} else: return {"found": False, "message": f"No patient found with MRN: {mrn}"} except Exception as e: print(f"Error searching patient: {str(e)}", file=sys.stderr) return {"found": False, "error": str(e)} finally: if 'client' in locals(): client.close() def list_all_patients(): """List all patients in the database""" try: mongo_uri = os.getenv('MONGODB_URI', "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB ) client = MongoClient( mongo_uri, tlsCAFile=certifi.where(), serverSelectionTimeoutMS=5000 ) db = client.radiologyDB # Get all patient records, sorted by report date (newest first) patients = list(db.patient_reports.find().sort("report_date", -1)) # Convert ObjectId to string for JSON serialization for patient in patients: patient["_id"] = str(patient["_id"]) return {"success": True, "patients": patients, "count": len(patients)} except Exception as e: print(f"Error listing patients: {str(e)}", file=sys.stderr) return {"success": False, "error": str(e), "patients": []} finally: if 'client' in locals(): client.close() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument('--get-empty', action='store_true', help='Get empty reports') parser.add_argument('--update', nargs=2, metavar=('ID', 'TEXT'), help='Update findings') parser.add_argument('--serve', action='store_true', help='Run FastAPI server') args = parser.parse_args() if args.serve: uvicorn.run("service:app", host="0.0.0.0", port=8001, reload=True) sys.exit(0) if args.get_empty: get_reports_with_empty_findings() # ✅ DON'T capture or print the return value sys.exit(0) elif args.update: report_id, findings_text = args.update result = update_findings(report_id, findings_text) print(json.dumps(result)) sys.exit(0) else: print(json.dumps({"error": "No valid command specified"})) sys.exit(1)