Spaces:
Runtime error
Runtime error
| import os | |
| import io | |
| import sys | |
| import json | |
| import base64 | |
| import certifi | |
| import requests | |
| from PIL import Image | |
| from bson import ObjectId | |
| from gridfs import GridFS | |
| from pymongo import MongoClient | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse | |
| import uvicorn | |
| from reportlab.pdfgen import canvas | |
| from reportlab.lib.pagesizes import letter, A4 | |
| from reportlab.lib.styles import getSampleStyleSheet | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
| from reportlab.lib.units import inch | |
| from datetime import datetime | |
| import hashlib | |
| import random | |
| import string | |
| API_URL = "https://n0bcq3t2emgrp7ip.us-east-1.aws.endpoints.huggingface.cloud" | |
| headers = { | |
| "Accept": "application/json", | |
| "Authorization": f"Bearer {os.getenv('HF_TOKEN')}", | |
| "Content-Type": "application/json" | |
| } | |
| reportsaved= [] | |
| # Disable verbose tracebacks | |
| sys.tracebacklimit = 0 | |
| # Configure Python path | |
| parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| sys.path.append(parent_dir) | |
| def get_reports_with_empty_findings(): | |
| reportsaved = [] | |
| try: | |
| # Use environment variable for MongoDB connection string | |
| mongo_uri = os.getenv('MONGODB_URI', | |
| "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB | |
| ) | |
| client = MongoClient( | |
| mongo_uri, | |
| tlsCAFile=certifi.where(), | |
| serverSelectionTimeoutMS=5000 | |
| ) | |
| db = client.radiologyDB | |
| reports = [str(doc["_id"]) for doc in db.reports.find({"findings": ""})] | |
| reportsaved = reports | |
| # β First do all processing (analyze, reconstruct, etc.) | |
| for report_id in reportsaved: | |
| analyze(report_id) # this includes image reconstruction and update | |
| # β Then print JSON as final stdout response | |
| print(json.dumps({ | |
| "success": True, | |
| "count": len(reportsaved), | |
| "reports": reportsaved | |
| }), flush=True) | |
| except Exception as e: | |
| print(json.dumps({ | |
| "success": False, | |
| "error": str(e), | |
| "count": 0, | |
| "reports": [] | |
| }), flush=True) | |
| finally: | |
| if 'client' in locals(): | |
| client.close() | |
| def update_findings(report_id, new_findings_text): | |
| """Updates findings for a specific report""" | |
| try: | |
| mongo_uri = os.getenv('MONGODB_URI', | |
| "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB | |
| ) | |
| client = MongoClient( | |
| mongo_uri, | |
| tlsCAFile=certifi.where() | |
| ) | |
| db = client.radiologyDB | |
| result = db.reports.update_one( | |
| {"_id": ObjectId(report_id)}, | |
| {"$set": {"findings": new_findings_text.strip()}} | |
| ) | |
| return { | |
| "success": result.modified_count > 0, | |
| "modified_count": result.modified_count, | |
| "message": "Updated" if result.modified_count else "No document found" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "message": f"Error: {str(e)}" | |
| } | |
| finally: | |
| if 'client' in locals(): | |
| client.close() | |
| def reconstruct_image_if_not_exists(report_id): | |
| try: | |
| mongo_uri = os.getenv('MONGODB_URI', | |
| "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB | |
| ) | |
| client = MongoClient( | |
| mongo_uri, | |
| tlsCAFile=certifi.where() | |
| ) | |
| db = client["radiologyDB"] | |
| fs = GridFS(db, collection="images") # β Correct bucket | |
| report = db.reports.find_one({"_id": ObjectId(report_id)}) | |
| if not report: | |
| print(f"β Report not found for ID: {report_id}", file=sys.stderr) | |
| return | |
| if "images" not in report or len(report["images"]) == 0: | |
| print(f"β No image ID found in report: {report_id}", file=sys.stderr) | |
| return | |
| image_id = report["images"][0] # Assuming one image per report | |
| output_dir = os.path.join(os.path.dirname(__file__), "images") | |
| os.makedirs(output_dir, exist_ok=True) | |
| output_path = os.path.join(output_dir, f"{image_id}.jpg") | |
| # Check if image already exists | |
| if os.path.exists(output_path): | |
| print(f"β Image already exists: {output_path}", file=sys.stderr) | |
| return | |
| # Get the file from GridFS | |
| grid_out = fs.get(ObjectId(image_id)) | |
| with open(output_path, "wb") as f: | |
| f.write(grid_out.read()) | |
| print(f"β Reconstructed and saved image: {output_path}", file=sys.stderr) | |
| except Exception as e: | |
| print(f"β Error reconstructing image: {e}", file=sys.stderr) | |
| finally: | |
| if 'client' in locals(): | |
| client.close() | |
| def query(payload): | |
| response = requests.post(API_URL, headers=headers, json=payload) | |
| print(f"Hugging Face API Response: {response.text}", file=sys.stderr) | |
| return response.json() | |
| def analyze_image(image_path: str): | |
| """ | |
| Analyze medical image using Hugging Face API. | |
| Takes an image path and a prompt, then returns AI-generated findings. | |
| """ | |
| try: | |
| # Load the image using PIL | |
| image = Image.open(image_path) | |
| # Convert image to base64 string | |
| buffer = io.BytesIO() | |
| image.save(buffer, format="PNG") | |
| img_str = base64.b64encode(buffer.getvalue()).decode("utf-8") | |
| # Send to Hugging Face API | |
| output = query({ | |
| "inputs": img_str, | |
| }) | |
| return output | |
| except Exception as e: | |
| # Fallback response in case of API errors | |
| return f"Error analyzing image: {str(e)}. Using fallback analysis." | |
| def analyze(id): | |
| """ | |
| Analyze medical images using Hugging Face API for findings extraction. | |
| """ | |
| try: | |
| # First reconstruct the image | |
| reconstruct_image_if_not_exists(id) | |
| # Get the path to the reconstructed image | |
| image_path = os.path.join(os.path.dirname(__file__), "images", f"{id}.jpg") | |
| # Analyze the image using Hugging Face API | |
| findings = analyze_image(image_path) | |
| # If we got a valid response, update the findings | |
| if isinstance(findings, str): | |
| template_response = findings | |
| else: | |
| # Format the API response into a readable string (no prefix) | |
| template_response = json.dumps(findings, indent=2) | |
| # Update the findings in the database | |
| update_findings(id, template_response) | |
| return template_response | |
| except Exception as e: | |
| error_msg = f"Error during analysis: {str(e)}" | |
| print(error_msg, file=sys.stderr) | |
| return error_msg | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Mount static files directory | |
| # Mount static files directory - removed since using remote image | |
| # app.mount("/static", StaticFiles(directory=os.path.dirname(__file__)), name="static") | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return {"status": "healthy", "service": "radiology-backend"} | |
| async def root(): | |
| """Root endpoint with service info""" | |
| return { | |
| "service": "MediVision Radiology Backend", | |
| "version": "1.0.0", | |
| "status": "running", | |
| "endpoints": { | |
| "analyze": "/analyze/", | |
| "health": "/health" | |
| } | |
| } | |
| async def analyze_endpoint(file: UploadFile = File(...)): | |
| """ | |
| Accepts an uploaded image file and returns AI-generated findings. | |
| """ | |
| try: | |
| # Validate file type | |
| if not file.content_type.startswith('image/'): | |
| return {"error": "Invalid file type. Please upload an image file.", "findings": "File type error"} | |
| contents = await file.read() | |
| if len(contents) == 0: | |
| return {"error": "Empty file uploaded.", "findings": "Empty file error"} | |
| # Process image | |
| image = Image.open(io.BytesIO(contents)) | |
| # Convert to RGB if necessary | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Resize image if too large (optional optimization) | |
| max_size = (1024, 1024) | |
| if image.size[0] > max_size[0] or image.size[1] > max_size[1]: | |
| image.thumbnail(max_size, Image.Resampling.LANCZOS) | |
| buffer = io.BytesIO() | |
| image.save(buffer, format="PNG", optimize=True) | |
| img_str = base64.b64encode(buffer.getvalue()).decode("utf-8") | |
| # Get response from Hugging Face API | |
| print(f"Sending image to Hugging Face API (size: {len(img_str)} chars)", file=sys.stderr) | |
| output = query({"inputs": img_str}) | |
| # Enhanced response processing | |
| findings = "" | |
| if isinstance(output, dict): | |
| if "error" in output: | |
| findings = f"API Error: {output['error']}" | |
| elif "generated_text" in output: | |
| findings = output["generated_text"] | |
| else: | |
| findings = str(output) | |
| elif isinstance(output, list) and len(output) > 0: | |
| first_result = output[0] | |
| if isinstance(first_result, dict) and "generated_text" in first_result: | |
| findings = first_result["generated_text"] | |
| else: | |
| findings = str(first_result) | |
| elif isinstance(output, str): | |
| findings = output | |
| else: | |
| findings = f"Unexpected response format: {type(output)}" | |
| print(f"Analysis complete. Findings length: {len(findings)} chars", file=sys.stderr) | |
| # Return formatted findings | |
| return {"findings": findings, "status": "success"} | |
| except Exception as e: | |
| error_msg = f"Error in analyze_endpoint: {str(e)}" | |
| print(error_msg, file=sys.stderr) | |
| return {"error": error_msg, "findings": "Error analyzing image", "status": "error"} | |
| def generate_pdf_report(patient_name, patient_info, findings): | |
| """Generate a PDF report for the patient with optimized layout""" | |
| try: | |
| # Create reports directory if it doesn't exist | |
| reports_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "pdf_reports") | |
| os.makedirs(reports_dir, exist_ok=True) | |
| # Create filename | |
| safe_name = "".join(c for c in patient_name if c.isalnum() or c in (' ', '-', '_')).rstrip() | |
| pdf_filename = f"{safe_name}_report.pdf" | |
| pdf_path = os.path.join(reports_dir, pdf_filename) | |
| # Create PDF document with optimized margins | |
| doc = SimpleDocTemplate(pdf_path, pagesize=A4, | |
| topMargin=0.8*inch, bottomMargin=0.6*inch, | |
| leftMargin=0.8*inch, rightMargin=0.8*inch) | |
| styles = getSampleStyleSheet() | |
| story = [] | |
| # Custom styles for better space utilization | |
| compact_style = styles['Normal'].clone('CompactStyle') | |
| compact_style.fontSize = 10 | |
| compact_style.leading = 12 | |
| compact_style.spaceAfter = 6 | |
| header_style = styles['Heading1'].clone('CompactHeader') | |
| header_style.fontSize = 16 | |
| header_style.alignment = 1 # Center | |
| header_style.spaceAfter = 8 | |
| section_style = styles['Heading2'].clone('CompactSection') | |
| section_style.fontSize = 12 | |
| section_style.spaceAfter = 4 | |
| section_style.spaceBefore = 8 | |
| # Compact header | |
| story.append(Paragraph("π¬ <b>MediVision Radiology Center</b>", header_style)) | |
| story.append(Paragraph("<i>AI-Powered Medical Imaging Analysis</i>", compact_style)) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # Patient Information in compact table format | |
| story.append(Paragraph("<b>RADIOLOGY REPORT</b>", section_style)) | |
| # Create a compact patient info section | |
| current_date = datetime.now().strftime("%m/%d/%Y") | |
| current_time = datetime.now().strftime("%H:%M") | |
| patient_info_compact = f""" | |
| <b>Patient:</b> {patient_info.get('name', 'N/A')} | |
| <b>MRN:</b> {patient_info.get('medical_record_number', 'N/A')} | |
| <b>DOB:</b> {patient_info.get('date_of_birth', 'N/A')}<br/> | |
| <b>Gender:</b> {patient_info.get('gender', 'N/A')} | |
| <b>Study Date:</b> {patient_info.get('date_of_study', current_date)} | |
| <b>Physician:</b> Dr. {patient_info.get('referring_physician', 'N/A')}<br/> | |
| <b>Report Date:</b> {current_date} {current_time} | |
| <b>Radiologist:</b> MediVision AI | |
| """ | |
| story.append(Paragraph(patient_info_compact, compact_style)) | |
| story.append(Spacer(1, 0.15*inch)) | |
| # Process findings to remove duplicates and optimize content | |
| if "answer:" in findings.lower(): | |
| parts = findings.split("answer:", 1) | |
| if len(parts) == 2: | |
| findings_text = parts[1].strip() | |
| else: | |
| findings_text = findings | |
| else: | |
| findings_text = findings | |
| # Clean up findings - remove redundant phrases | |
| findings_text = findings_text.replace("Brief Structured Report:", "") | |
| findings_text = findings_text.replace("The chest X-ray is a useful tool for diagnosing conditions like pneumonia, lung cancer, or other lung diseases.", "") | |
| # Format findings with better structure | |
| story.append(Paragraph("<b>FINDINGS & IMPRESSION:</b>", section_style)) | |
| # Split findings into structured sections if possible | |
| lines = findings_text.strip().split('\n') | |
| formatted_findings = "" | |
| for line in lines: | |
| line = line.strip() | |
| if line: | |
| # Check if it's a numbered item or bullet point | |
| if line.startswith(('1.', '2.', '3.', '4.', '5.')): | |
| formatted_findings += f"<b>{line}</b><br/>" | |
| elif line.upper().startswith(('EXAM TYPE:', 'FINDINGS:', 'IMPRESSION:')): | |
| formatted_findings += f"<b>{line}</b><br/>" | |
| else: | |
| formatted_findings += f"{line}<br/>" | |
| if not formatted_findings.strip(): | |
| formatted_findings = findings_text.replace('\n', '<br/>') | |
| story.append(Paragraph(formatted_findings, compact_style)) | |
| story.append(Spacer(1, 0.15*inch)) | |
| # Compact footer with essential info only | |
| footer_text = f""" | |
| <i>Generated by MediVision AI β’ {current_date} {current_time} β’ | |
| This AI-generated report requires radiologist review for clinical use.</i> | |
| """ | |
| footer_style = compact_style.clone('FooterStyle') | |
| footer_style.fontSize = 8 | |
| footer_style.alignment = 1 # Center | |
| footer_style.textColor = 'grey' | |
| story.append(Paragraph(footer_text, footer_style)) | |
| # Build PDF | |
| doc.build(story) | |
| return pdf_path | |
| except Exception as e: | |
| print(f"Error generating PDF: {str(e)}", file=sys.stderr) | |
| return None | |
| async def generate_mrn_endpoint(patient_name: str, date_of_birth: str): | |
| """Generate MRN for a patient""" | |
| try: | |
| mrn = generate_mrn(patient_name, date_of_birth) | |
| return { | |
| "success": True, | |
| "mrn": mrn, | |
| "patient_name": patient_name, | |
| "date_of_birth": date_of_birth | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Error generating MRN: {str(e)}" | |
| } | |
| async def get_patient_by_mrn(mrn: str): | |
| """Get patient information by MRN""" | |
| result = search_patient_by_mrn(mrn) | |
| if result["found"]: | |
| return result["patient"] | |
| else: | |
| raise HTTPException(status_code=404, detail=result.get("message", "Patient not found")) | |
| async def list_patients(): | |
| """List all patients""" | |
| result = list_all_patients() | |
| return result | |
| async def generate_report_endpoint( | |
| file: UploadFile = File(...), | |
| patient_name: str = Form(...), | |
| medical_record_number: str = Form(""), | |
| date_of_birth: str = Form(""), | |
| gender: str = Form(""), | |
| referring_physician: str = Form(""), | |
| date_of_study: str = Form("") | |
| ): | |
| """Generate report with image analysis""" | |
| try: | |
| # First, analyze the image | |
| findings = "" | |
| if file: | |
| # Validate file type | |
| if not file.content_type.startswith('image/'): | |
| return {"error": "Invalid file type", "success": False} | |
| contents = await file.read() | |
| if len(contents) == 0: | |
| return {"error": "Empty file", "success": False} | |
| # Process image for analysis | |
| image = Image.open(io.BytesIO(contents)) | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Resize if too large | |
| max_size = (1024, 1024) | |
| if image.size[0] > max_size[0] or image.size[1] > max_size[1]: | |
| image.thumbnail(max_size, Image.Resampling.LANCZOS) | |
| buffer = io.BytesIO() | |
| image.save(buffer, format="PNG", optimize=True) | |
| img_str = base64.b64encode(buffer.getvalue()).decode("utf-8") | |
| # Get findings from Hugging Face API | |
| output = query({"inputs": img_str}) | |
| # Process findings | |
| if isinstance(output, dict): | |
| if "error" in output: | |
| findings = f"API Error: {output['error']}" | |
| elif "generated_text" in output: | |
| findings = output["generated_text"] | |
| else: | |
| findings = str(output) | |
| elif isinstance(output, list) and len(output) > 0: | |
| first_result = output[0] | |
| if isinstance(first_result, dict) and "generated_text" in first_result: | |
| findings = first_result["generated_text"] | |
| else: | |
| findings = str(first_result) | |
| elif isinstance(output, str): | |
| findings = output | |
| else: | |
| findings = f"Unexpected response format: {type(output)}" | |
| # Prepare patient info | |
| patient_info = { | |
| "name": patient_name, | |
| "medical_record_number": medical_record_number, | |
| "date_of_birth": date_of_birth, | |
| "gender": gender, | |
| "referring_physician": referring_physician, | |
| "date_of_study": date_of_study | |
| } | |
| # Generate MRN if not provided | |
| mrn = medical_record_number | |
| if not mrn or len(mrn) == 0: | |
| mrn = generate_mrn(patient_name, date_of_birth) | |
| # Save patient record to database | |
| save_result = save_patient_record(patient_info, findings, mrn) | |
| if not save_result.get("success", False): | |
| return { | |
| "success": False, | |
| "error": "Failed to save patient record" | |
| } | |
| # Generate PDF report | |
| pdf_path = generate_pdf_report(patient_name, patient_info, findings) | |
| if pdf_path and os.path.exists(pdf_path): | |
| return { | |
| "success": True, | |
| "message": "Report generated successfully", | |
| "pdf_filename": os.path.basename(pdf_path), | |
| "mrn": mrn, | |
| "findings": findings | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Failed to generate PDF report" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Error generating report: {str(e)}" | |
| } | |
| async def download_pdf_report(patient_name: str): | |
| """Download PDF report for a patient""" | |
| try: | |
| reports_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "pdf_reports") | |
| safe_name = "".join(c for c in patient_name if c.isalnum() or c in (' ', '-', '_')).rstrip() | |
| pdf_filename = f"{safe_name}_report.pdf" | |
| pdf_path = os.path.join(reports_dir, pdf_filename) | |
| if os.path.exists(pdf_path): | |
| return FileResponse( | |
| path=pdf_path, | |
| filename=pdf_filename, | |
| media_type='application/pdf' | |
| ) | |
| else: | |
| raise HTTPException(status_code=404, detail=f"PDF report not found for patient: {patient_name}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error retrieving PDF: {str(e)}") | |
| async def list_reports(): | |
| """List all available PDF reports""" | |
| try: | |
| reports_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "pdf_reports") | |
| if not os.path.exists(reports_dir): | |
| return {"reports": []} | |
| pdf_files = [f for f in os.listdir(reports_dir) if f.endswith('.pdf')] | |
| reports = [] | |
| for pdf_file in pdf_files: | |
| patient_name = pdf_file.replace('_report.pdf', '').replace('_', ' ') | |
| pdf_path = os.path.join(reports_dir, pdf_file) | |
| file_stats = os.stat(pdf_path) | |
| reports.append({ | |
| "patient_name": patient_name, | |
| "filename": pdf_file, | |
| "created_date": datetime.fromtimestamp(file_stats.st_ctime).strftime('%Y-%m-%d %H:%M:%S'), | |
| "size": file_stats.st_size | |
| }) | |
| return {"reports": reports} | |
| except Exception as e: | |
| return {"error": f"Error listing reports: {str(e)}", "reports": []} | |
| def generate_mrn(patient_name, date_of_birth): | |
| """Generate a unique Medical Record Number (MRN) for a patient""" | |
| try: | |
| # Create a hash from patient name and DOB for consistency | |
| patient_data = f"{patient_name.lower().strip()}{date_of_birth}".encode('utf-8') | |
| hash_object = hashlib.md5(patient_data) | |
| hash_hex = hash_object.hexdigest() | |
| # Extract first 8 characters and convert to a medical record format | |
| # Format: MV-YYYYMMDD-XXXX (MV = MediVision, YYYY = year, MM = month, DD = day, XXXX = hash) | |
| current_date = datetime.now() | |
| date_part = current_date.strftime("%Y%m%d") | |
| hash_part = hash_hex[:4].upper() | |
| mrn = f"MV-{date_part}-{hash_part}" | |
| return mrn | |
| except Exception as e: | |
| # Fallback: Generate random MRN | |
| current_date = datetime.now().strftime("%Y%m%d") | |
| random_part = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4)) | |
| return f"MV-{current_date}-{random_part}" | |
| def save_patient_record(patient_info, findings, mrn): | |
| """Save patient record to database with MRN for future searches""" | |
| try: | |
| mongo_uri = os.getenv('MONGODB_URI', | |
| "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB | |
| ) | |
| client = MongoClient( | |
| mongo_uri, | |
| tlsCAFile=certifi.where(), | |
| serverSelectionTimeoutMS=5000 | |
| ) | |
| db = client.radiologyDB | |
| # Create patient record | |
| patient_record = { | |
| "mrn": mrn, | |
| "patient_name": patient_info.get("name", ""), | |
| "date_of_birth": patient_info.get("date_of_birth", ""), | |
| "gender": patient_info.get("gender", ""), | |
| "referring_physician": patient_info.get("referring_physician", ""), | |
| "date_of_study": patient_info.get("date_of_study", ""), | |
| "findings": findings, | |
| "report_date": datetime.now().isoformat(), | |
| "status": "completed" | |
| } | |
| # Check if patient already exists (by MRN) | |
| existing_patient = db.patient_reports.find_one({"mrn": mrn}) | |
| if existing_patient: | |
| # Update existing record | |
| result = db.patient_reports.update_one( | |
| {"mrn": mrn}, | |
| {"$set": patient_record} | |
| ) | |
| return {"action": "updated", "mrn": mrn, "success": True} | |
| else: | |
| # Insert new record | |
| result = db.patient_reports.insert_one(patient_record) | |
| return {"action": "created", "mrn": mrn, "success": True} | |
| except Exception as e: | |
| print(f"Error saving patient record: {str(e)}", file=sys.stderr) | |
| return {"action": "error", "error": str(e), "success": False} | |
| finally: | |
| if 'client' in locals(): | |
| client.close() | |
| def search_patient_by_mrn(mrn): | |
| """Search for patient records by MRN""" | |
| try: | |
| mongo_uri = os.getenv('MONGODB_URI', | |
| "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB | |
| ) | |
| client = MongoClient( | |
| mongo_uri, | |
| tlsCAFile=certifi.where(), | |
| serverSelectionTimeoutMS=5000 | |
| ) | |
| db = client.radiologyDB | |
| # Search for patient record | |
| patient_record = db.patient_reports.find_one({"mrn": mrn}) | |
| if patient_record: | |
| # Convert ObjectId to string for JSON serialization | |
| patient_record["_id"] = str(patient_record["_id"]) | |
| return {"found": True, "patient": patient_record} | |
| else: | |
| return {"found": False, "message": f"No patient found with MRN: {mrn}"} | |
| except Exception as e: | |
| print(f"Error searching patient: {str(e)}", file=sys.stderr) | |
| return {"found": False, "error": str(e)} | |
| finally: | |
| if 'client' in locals(): | |
| client.close() | |
| def list_all_patients(): | |
| """List all patients in the database""" | |
| try: | |
| mongo_uri = os.getenv('MONGODB_URI', | |
| "mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB | |
| ) | |
| client = MongoClient( | |
| mongo_uri, | |
| tlsCAFile=certifi.where(), | |
| serverSelectionTimeoutMS=5000 | |
| ) | |
| db = client.radiologyDB | |
| # Get all patient records, sorted by report date (newest first) | |
| patients = list(db.patient_reports.find().sort("report_date", -1)) | |
| # Convert ObjectId to string for JSON serialization | |
| for patient in patients: | |
| patient["_id"] = str(patient["_id"]) | |
| return {"success": True, "patients": patients, "count": len(patients)} | |
| except Exception as e: | |
| print(f"Error listing patients: {str(e)}", file=sys.stderr) | |
| return {"success": False, "error": str(e), "patients": []} | |
| finally: | |
| if 'client' in locals(): | |
| client.close() | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--get-empty', action='store_true', help='Get empty reports') | |
| parser.add_argument('--update', nargs=2, metavar=('ID', 'TEXT'), help='Update findings') | |
| parser.add_argument('--serve', action='store_true', help='Run FastAPI server') | |
| args = parser.parse_args() | |
| if args.serve: | |
| uvicorn.run("service:app", host="0.0.0.0", port=8001, reload=True) | |
| sys.exit(0) | |
| if args.get_empty: | |
| get_reports_with_empty_findings() # β DON'T capture or print the return value | |
| sys.exit(0) | |
| elif args.update: | |
| report_id, findings_text = args.update | |
| result = update_findings(report_id, findings_text) | |
| print(json.dumps(result)) | |
| sys.exit(0) | |
| else: | |
| print(json.dumps({"error": "No valid command specified"})) | |
| sys.exit(1) | |