import os
import io
import sys
import json
import base64
import certifi
import requests
from PIL import Image
from bson import ObjectId
from gridfs import GridFS
from pymongo import MongoClient
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
import uvicorn
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter, A4
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.units import inch
from datetime import datetime
import hashlib
import random
import string
API_URL = "https://n0bcq3t2emgrp7ip.us-east-1.aws.endpoints.huggingface.cloud"
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {os.getenv('HF_TOKEN')}",
"Content-Type": "application/json"
}
reportsaved= []
# Disable verbose tracebacks
sys.tracebacklimit = 0
# Configure Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
def get_reports_with_empty_findings():
reportsaved = []
try:
# Use environment variable for MongoDB connection string
mongo_uri = os.getenv('MONGODB_URI',
"mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB
)
client = MongoClient(
mongo_uri,
tlsCAFile=certifi.where(),
serverSelectionTimeoutMS=5000
)
db = client.radiologyDB
reports = [str(doc["_id"]) for doc in db.reports.find({"findings": ""})]
reportsaved = reports
# ✅ First do all processing (analyze, reconstruct, etc.)
for report_id in reportsaved:
analyze(report_id) # this includes image reconstruction and update
# ✅ Then print JSON as final stdout response
print(json.dumps({
"success": True,
"count": len(reportsaved),
"reports": reportsaved
}), flush=True)
except Exception as e:
print(json.dumps({
"success": False,
"error": str(e),
"count": 0,
"reports": []
}), flush=True)
finally:
if 'client' in locals():
client.close()
def update_findings(report_id, new_findings_text):
"""Updates findings for a specific report"""
try:
mongo_uri = os.getenv('MONGODB_URI',
"mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB
)
client = MongoClient(
mongo_uri,
tlsCAFile=certifi.where()
)
db = client.radiologyDB
result = db.reports.update_one(
{"_id": ObjectId(report_id)},
{"$set": {"findings": new_findings_text.strip()}}
)
return {
"success": result.modified_count > 0,
"modified_count": result.modified_count,
"message": "Updated" if result.modified_count else "No document found"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"Error: {str(e)}"
}
finally:
if 'client' in locals():
client.close()
def reconstruct_image_if_not_exists(report_id):
try:
mongo_uri = os.getenv('MONGODB_URI',
"mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB
)
client = MongoClient(
mongo_uri,
tlsCAFile=certifi.where()
)
db = client["radiologyDB"]
fs = GridFS(db, collection="images") # ✅ Correct bucket
report = db.reports.find_one({"_id": ObjectId(report_id)})
if not report:
print(f"❌ Report not found for ID: {report_id}", file=sys.stderr)
return
if "images" not in report or len(report["images"]) == 0:
print(f"❌ No image ID found in report: {report_id}", file=sys.stderr)
return
image_id = report["images"][0] # Assuming one image per report
output_dir = os.path.join(os.path.dirname(__file__), "images")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"{image_id}.jpg")
# Check if image already exists
if os.path.exists(output_path):
print(f"✅ Image already exists: {output_path}", file=sys.stderr)
return
# Get the file from GridFS
grid_out = fs.get(ObjectId(image_id))
with open(output_path, "wb") as f:
f.write(grid_out.read())
print(f"✅ Reconstructed and saved image: {output_path}", file=sys.stderr)
except Exception as e:
print(f"❌ Error reconstructing image: {e}", file=sys.stderr)
finally:
if 'client' in locals():
client.close()
def query(payload):
response = requests.post(API_URL, headers=headers, json=payload)
print(f"Hugging Face API Response: {response.text}", file=sys.stderr)
return response.json()
def analyze_image(image_path: str):
"""
Analyze medical image using Hugging Face API.
Takes an image path and a prompt, then returns AI-generated findings.
"""
try:
# Load the image using PIL
image = Image.open(image_path)
# Convert image to base64 string
buffer = io.BytesIO()
image.save(buffer, format="PNG")
img_str = base64.b64encode(buffer.getvalue()).decode("utf-8")
# Send to Hugging Face API
output = query({
"inputs": img_str,
})
return output
except Exception as e:
# Fallback response in case of API errors
return f"Error analyzing image: {str(e)}. Using fallback analysis."
def analyze(id):
"""
Analyze medical images using Hugging Face API for findings extraction.
"""
try:
# First reconstruct the image
reconstruct_image_if_not_exists(id)
# Get the path to the reconstructed image
image_path = os.path.join(os.path.dirname(__file__), "images", f"{id}.jpg")
# Analyze the image using Hugging Face API
findings = analyze_image(image_path)
# If we got a valid response, update the findings
if isinstance(findings, str):
template_response = findings
else:
# Format the API response into a readable string (no prefix)
template_response = json.dumps(findings, indent=2)
# Update the findings in the database
update_findings(id, template_response)
return template_response
except Exception as e:
error_msg = f"Error during analysis: {str(e)}"
print(error_msg, file=sys.stderr)
return error_msg
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files directory
# Mount static files directory - removed since using remote image
# app.mount("/static", StaticFiles(directory=os.path.dirname(__file__)), name="static")
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "radiology-backend"}
@app.get("/")
async def root():
"""Root endpoint with service info"""
return {
"service": "MediVision Radiology Backend",
"version": "1.0.0",
"status": "running",
"endpoints": {
"analyze": "/analyze/",
"health": "/health"
}
}
@app.post("/analyze/")
async def analyze_endpoint(file: UploadFile = File(...)):
"""
Accepts an uploaded image file and returns AI-generated findings.
"""
try:
# Validate file type
if not file.content_type.startswith('image/'):
return {"error": "Invalid file type. Please upload an image file.", "findings": "File type error"}
contents = await file.read()
if len(contents) == 0:
return {"error": "Empty file uploaded.", "findings": "Empty file error"}
# Process image
image = Image.open(io.BytesIO(contents))
# Convert to RGB if necessary
if image.mode != 'RGB':
image = image.convert('RGB')
# Resize image if too large (optional optimization)
max_size = (1024, 1024)
if image.size[0] > max_size[0] or image.size[1] > max_size[1]:
image.thumbnail(max_size, Image.Resampling.LANCZOS)
buffer = io.BytesIO()
image.save(buffer, format="PNG", optimize=True)
img_str = base64.b64encode(buffer.getvalue()).decode("utf-8")
# Get response from Hugging Face API
print(f"Sending image to Hugging Face API (size: {len(img_str)} chars)", file=sys.stderr)
output = query({"inputs": img_str})
# Enhanced response processing
findings = ""
if isinstance(output, dict):
if "error" in output:
findings = f"API Error: {output['error']}"
elif "generated_text" in output:
findings = output["generated_text"]
else:
findings = str(output)
elif isinstance(output, list) and len(output) > 0:
first_result = output[0]
if isinstance(first_result, dict) and "generated_text" in first_result:
findings = first_result["generated_text"]
else:
findings = str(first_result)
elif isinstance(output, str):
findings = output
else:
findings = f"Unexpected response format: {type(output)}"
print(f"Analysis complete. Findings length: {len(findings)} chars", file=sys.stderr)
# Return formatted findings
return {"findings": findings, "status": "success"}
except Exception as e:
error_msg = f"Error in analyze_endpoint: {str(e)}"
print(error_msg, file=sys.stderr)
return {"error": error_msg, "findings": "Error analyzing image", "status": "error"}
def generate_pdf_report(patient_name, patient_info, findings):
"""Generate a PDF report for the patient with optimized layout"""
try:
# Create reports directory if it doesn't exist
reports_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "pdf_reports")
os.makedirs(reports_dir, exist_ok=True)
# Create filename
safe_name = "".join(c for c in patient_name if c.isalnum() or c in (' ', '-', '_')).rstrip()
pdf_filename = f"{safe_name}_report.pdf"
pdf_path = os.path.join(reports_dir, pdf_filename)
# Create PDF document with optimized margins
doc = SimpleDocTemplate(pdf_path, pagesize=A4,
topMargin=0.8*inch, bottomMargin=0.6*inch,
leftMargin=0.8*inch, rightMargin=0.8*inch)
styles = getSampleStyleSheet()
story = []
# Custom styles for better space utilization
compact_style = styles['Normal'].clone('CompactStyle')
compact_style.fontSize = 10
compact_style.leading = 12
compact_style.spaceAfter = 6
header_style = styles['Heading1'].clone('CompactHeader')
header_style.fontSize = 16
header_style.alignment = 1 # Center
header_style.spaceAfter = 8
section_style = styles['Heading2'].clone('CompactSection')
section_style.fontSize = 12
section_style.spaceAfter = 4
section_style.spaceBefore = 8
# Compact header
story.append(Paragraph("🔬 MediVision Radiology Center", header_style))
story.append(Paragraph("AI-Powered Medical Imaging Analysis", compact_style))
story.append(Spacer(1, 0.2*inch))
# Patient Information in compact table format
story.append(Paragraph("RADIOLOGY REPORT", section_style))
# Create a compact patient info section
current_date = datetime.now().strftime("%m/%d/%Y")
current_time = datetime.now().strftime("%H:%M")
patient_info_compact = f"""
Patient: {patient_info.get('name', 'N/A')}
MRN: {patient_info.get('medical_record_number', 'N/A')}
DOB: {patient_info.get('date_of_birth', 'N/A')}
Gender: {patient_info.get('gender', 'N/A')}
Study Date: {patient_info.get('date_of_study', current_date)}
Physician: Dr. {patient_info.get('referring_physician', 'N/A')}
Report Date: {current_date} {current_time}
Radiologist: MediVision AI
"""
story.append(Paragraph(patient_info_compact, compact_style))
story.append(Spacer(1, 0.15*inch))
# Process findings to remove duplicates and optimize content
if "answer:" in findings.lower():
parts = findings.split("answer:", 1)
if len(parts) == 2:
findings_text = parts[1].strip()
else:
findings_text = findings
else:
findings_text = findings
# Clean up findings - remove redundant phrases
findings_text = findings_text.replace("Brief Structured Report:", "")
findings_text = findings_text.replace("The chest X-ray is a useful tool for diagnosing conditions like pneumonia, lung cancer, or other lung diseases.", "")
# Format findings with better structure
story.append(Paragraph("FINDINGS & IMPRESSION:", section_style))
# Split findings into structured sections if possible
lines = findings_text.strip().split('\n')
formatted_findings = ""
for line in lines:
line = line.strip()
if line:
# Check if it's a numbered item or bullet point
if line.startswith(('1.', '2.', '3.', '4.', '5.')):
formatted_findings += f"{line}
"
elif line.upper().startswith(('EXAM TYPE:', 'FINDINGS:', 'IMPRESSION:')):
formatted_findings += f"{line}
"
else:
formatted_findings += f"{line}
"
if not formatted_findings.strip():
formatted_findings = findings_text.replace('\n', '
')
story.append(Paragraph(formatted_findings, compact_style))
story.append(Spacer(1, 0.15*inch))
# Compact footer with essential info only
footer_text = f"""
Generated by MediVision AI • {current_date} {current_time} •
This AI-generated report requires radiologist review for clinical use.
"""
footer_style = compact_style.clone('FooterStyle')
footer_style.fontSize = 8
footer_style.alignment = 1 # Center
footer_style.textColor = 'grey'
story.append(Paragraph(footer_text, footer_style))
# Build PDF
doc.build(story)
return pdf_path
except Exception as e:
print(f"Error generating PDF: {str(e)}", file=sys.stderr)
return None
@app.get("/generate-mrn/")
async def generate_mrn_endpoint(patient_name: str, date_of_birth: str):
"""Generate MRN for a patient"""
try:
mrn = generate_mrn(patient_name, date_of_birth)
return {
"success": True,
"mrn": mrn,
"patient_name": patient_name,
"date_of_birth": date_of_birth
}
except Exception as e:
return {
"success": False,
"error": f"Error generating MRN: {str(e)}"
}
@app.get("/patient/{mrn}")
async def get_patient_by_mrn(mrn: str):
"""Get patient information by MRN"""
result = search_patient_by_mrn(mrn)
if result["found"]:
return result["patient"]
else:
raise HTTPException(status_code=404, detail=result.get("message", "Patient not found"))
@app.get("/patients")
async def list_patients():
"""List all patients"""
result = list_all_patients()
return result
@app.post("/generate-report/")
async def generate_report_endpoint(
file: UploadFile = File(...),
patient_name: str = Form(...),
medical_record_number: str = Form(""),
date_of_birth: str = Form(""),
gender: str = Form(""),
referring_physician: str = Form(""),
date_of_study: str = Form("")
):
"""Generate report with image analysis"""
try:
# First, analyze the image
findings = ""
if file:
# Validate file type
if not file.content_type.startswith('image/'):
return {"error": "Invalid file type", "success": False}
contents = await file.read()
if len(contents) == 0:
return {"error": "Empty file", "success": False}
# Process image for analysis
image = Image.open(io.BytesIO(contents))
if image.mode != 'RGB':
image = image.convert('RGB')
# Resize if too large
max_size = (1024, 1024)
if image.size[0] > max_size[0] or image.size[1] > max_size[1]:
image.thumbnail(max_size, Image.Resampling.LANCZOS)
buffer = io.BytesIO()
image.save(buffer, format="PNG", optimize=True)
img_str = base64.b64encode(buffer.getvalue()).decode("utf-8")
# Get findings from Hugging Face API
output = query({"inputs": img_str})
# Process findings
if isinstance(output, dict):
if "error" in output:
findings = f"API Error: {output['error']}"
elif "generated_text" in output:
findings = output["generated_text"]
else:
findings = str(output)
elif isinstance(output, list) and len(output) > 0:
first_result = output[0]
if isinstance(first_result, dict) and "generated_text" in first_result:
findings = first_result["generated_text"]
else:
findings = str(first_result)
elif isinstance(output, str):
findings = output
else:
findings = f"Unexpected response format: {type(output)}"
# Prepare patient info
patient_info = {
"name": patient_name,
"medical_record_number": medical_record_number,
"date_of_birth": date_of_birth,
"gender": gender,
"referring_physician": referring_physician,
"date_of_study": date_of_study
}
# Generate MRN if not provided
mrn = medical_record_number
if not mrn or len(mrn) == 0:
mrn = generate_mrn(patient_name, date_of_birth)
# Save patient record to database
save_result = save_patient_record(patient_info, findings, mrn)
if not save_result.get("success", False):
return {
"success": False,
"error": "Failed to save patient record"
}
# Generate PDF report
pdf_path = generate_pdf_report(patient_name, patient_info, findings)
if pdf_path and os.path.exists(pdf_path):
return {
"success": True,
"message": "Report generated successfully",
"pdf_filename": os.path.basename(pdf_path),
"mrn": mrn,
"findings": findings
}
else:
return {
"success": False,
"error": "Failed to generate PDF report"
}
except Exception as e:
return {
"success": False,
"error": f"Error generating report: {str(e)}"
}
@app.get("/reports/{patient_name}/pdf")
async def download_pdf_report(patient_name: str):
"""Download PDF report for a patient"""
try:
reports_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "pdf_reports")
safe_name = "".join(c for c in patient_name if c.isalnum() or c in (' ', '-', '_')).rstrip()
pdf_filename = f"{safe_name}_report.pdf"
pdf_path = os.path.join(reports_dir, pdf_filename)
if os.path.exists(pdf_path):
return FileResponse(
path=pdf_path,
filename=pdf_filename,
media_type='application/pdf'
)
else:
raise HTTPException(status_code=404, detail=f"PDF report not found for patient: {patient_name}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error retrieving PDF: {str(e)}")
@app.get("/reports")
async def list_reports():
"""List all available PDF reports"""
try:
reports_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "pdf_reports")
if not os.path.exists(reports_dir):
return {"reports": []}
pdf_files = [f for f in os.listdir(reports_dir) if f.endswith('.pdf')]
reports = []
for pdf_file in pdf_files:
patient_name = pdf_file.replace('_report.pdf', '').replace('_', ' ')
pdf_path = os.path.join(reports_dir, pdf_file)
file_stats = os.stat(pdf_path)
reports.append({
"patient_name": patient_name,
"filename": pdf_file,
"created_date": datetime.fromtimestamp(file_stats.st_ctime).strftime('%Y-%m-%d %H:%M:%S'),
"size": file_stats.st_size
})
return {"reports": reports}
except Exception as e:
return {"error": f"Error listing reports: {str(e)}", "reports": []}
def generate_mrn(patient_name, date_of_birth):
"""Generate a unique Medical Record Number (MRN) for a patient"""
try:
# Create a hash from patient name and DOB for consistency
patient_data = f"{patient_name.lower().strip()}{date_of_birth}".encode('utf-8')
hash_object = hashlib.md5(patient_data)
hash_hex = hash_object.hexdigest()
# Extract first 8 characters and convert to a medical record format
# Format: MV-YYYYMMDD-XXXX (MV = MediVision, YYYY = year, MM = month, DD = day, XXXX = hash)
current_date = datetime.now()
date_part = current_date.strftime("%Y%m%d")
hash_part = hash_hex[:4].upper()
mrn = f"MV-{date_part}-{hash_part}"
return mrn
except Exception as e:
# Fallback: Generate random MRN
current_date = datetime.now().strftime("%Y%m%d")
random_part = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
return f"MV-{current_date}-{random_part}"
def save_patient_record(patient_info, findings, mrn):
"""Save patient record to database with MRN for future searches"""
try:
mongo_uri = os.getenv('MONGODB_URI',
"mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB
)
client = MongoClient(
mongo_uri,
tlsCAFile=certifi.where(),
serverSelectionTimeoutMS=5000
)
db = client.radiologyDB
# Create patient record
patient_record = {
"mrn": mrn,
"patient_name": patient_info.get("name", ""),
"date_of_birth": patient_info.get("date_of_birth", ""),
"gender": patient_info.get("gender", ""),
"referring_physician": patient_info.get("referring_physician", ""),
"date_of_study": patient_info.get("date_of_study", ""),
"findings": findings,
"report_date": datetime.now().isoformat(),
"status": "completed"
}
# Check if patient already exists (by MRN)
existing_patient = db.patient_reports.find_one({"mrn": mrn})
if existing_patient:
# Update existing record
result = db.patient_reports.update_one(
{"mrn": mrn},
{"$set": patient_record}
)
return {"action": "updated", "mrn": mrn, "success": True}
else:
# Insert new record
result = db.patient_reports.insert_one(patient_record)
return {"action": "created", "mrn": mrn, "success": True}
except Exception as e:
print(f"Error saving patient record: {str(e)}", file=sys.stderr)
return {"action": "error", "error": str(e), "success": False}
finally:
if 'client' in locals():
client.close()
def search_patient_by_mrn(mrn):
"""Search for patient records by MRN"""
try:
mongo_uri = os.getenv('MONGODB_URI',
"mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB
)
client = MongoClient(
mongo_uri,
tlsCAFile=certifi.where(),
serverSelectionTimeoutMS=5000
)
db = client.radiologyDB
# Search for patient record
patient_record = db.patient_reports.find_one({"mrn": mrn})
if patient_record:
# Convert ObjectId to string for JSON serialization
patient_record["_id"] = str(patient_record["_id"])
return {"found": True, "patient": patient_record}
else:
return {"found": False, "message": f"No patient found with MRN: {mrn}"}
except Exception as e:
print(f"Error searching patient: {str(e)}", file=sys.stderr)
return {"found": False, "error": str(e)}
finally:
if 'client' in locals():
client.close()
def list_all_patients():
"""List all patients in the database"""
try:
mongo_uri = os.getenv('MONGODB_URI',
"mongodb://localhost:27017/radiologyDB" # Fallback to local MongoDB
)
client = MongoClient(
mongo_uri,
tlsCAFile=certifi.where(),
serverSelectionTimeoutMS=5000
)
db = client.radiologyDB
# Get all patient records, sorted by report date (newest first)
patients = list(db.patient_reports.find().sort("report_date", -1))
# Convert ObjectId to string for JSON serialization
for patient in patients:
patient["_id"] = str(patient["_id"])
return {"success": True, "patients": patients, "count": len(patients)}
except Exception as e:
print(f"Error listing patients: {str(e)}", file=sys.stderr)
return {"success": False, "error": str(e), "patients": []}
finally:
if 'client' in locals():
client.close()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--get-empty', action='store_true', help='Get empty reports')
parser.add_argument('--update', nargs=2, metavar=('ID', 'TEXT'), help='Update findings')
parser.add_argument('--serve', action='store_true', help='Run FastAPI server')
args = parser.parse_args()
if args.serve:
uvicorn.run("service:app", host="0.0.0.0", port=8001, reload=True)
sys.exit(0)
if args.get_empty:
get_reports_with_empty_findings() # ✅ DON'T capture or print the return value
sys.exit(0)
elif args.update:
report_id, findings_text = args.update
result = update_findings(report_id, findings_text)
print(json.dumps(result))
sys.exit(0)
else:
print(json.dumps({"error": "No valid command specified"}))
sys.exit(1)