Spaces:
Sleeping
Sleeping
File size: 37,898 Bytes
f80ff70 4c56dbf fa0cc45 2ca9845 fa0cc45 cf6de52 f80ff70 4c56dbf f80ff70 4c56dbf f80ff70 4c56dbf f80ff70 fa0cc45 f80ff70 4c56dbf f80ff70 2ca9845 f80ff70 db993c3 cf6de52 f80ff70 fa0cc45 4c56dbf 9838247 4c56dbf fa0cc45 2ca9845 fa0cc45 9838247 fa0cc45 9838247 6c9e6be 9838247 6c9e6be 9838247 993c5c3 9838247 2ca9845 9838247 221556a 9838247 221556a 9838247 993c5c3 9838247 993c5c3 9838247 993c5c3 9838247 993c5c3 9838247 6c9e6be 9838247 6ba56bf 993c5c3 9838247 f80ff70 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 | import io
import logging
import time
from datetime import datetime
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Query, Body
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse
import onnxruntime
import numpy as np
from PIL import Image
import uvicorn
import uuid
import json
import os
import tempfile
from typing import Optional, List, Dict
import subprocess
import platform
import docx
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image as RLImage, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib import colors
from reportlab.lib.units import inch
from io import BytesIO
from PIL import Image as PILImage
from reportlab.graphics.shapes import Drawing
from reportlab.graphics.charts.barcharts import VerticalBarChart
from reportlab.graphics.charts.legends import Legend
# Import for .env file support
try:
from dotenv import load_dotenv
load_dotenv() # Load environment variables from .env file
ENV_LOADED = True
except ImportError:
ENV_LOADED = False
logging.warning("python-dotenv package not installed. Using environment variables directly.")
# Import for OpenRouter integration
try:
import openai
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
logging.warning("OpenAI package not installed. AI consultation features will be disabled.")
# Import for report generation
try:
from docx import Document
DOCX_AVAILABLE = True
except ImportError:
DOCX_AVAILABLE = False
logging.warning("python-docx package not installed. Report generation will be disabled.")
# Import prompts and clinical information
from prompts import DR_CLINICAL_INFO, CONSULTATION_SYSTEM_PROMPT, DEFAULT_QUESTIONS, REPORT_SYSTEM_PROMPT
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("clarirai-api")
# Store analysis results for later consultation
analysis_cache = {}
app = FastAPI(
title="ClarirAI - Advanced Diabetic Retinopathy Analysis",
description="Next-generation API for detecting and analyzing diabetic retinopathy from retinal images with enhanced metrics",
version="2.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://diabetes-detection-zeta.vercel.app",
"https://diabetes-detection-harishvijayasarangank-gmailcoms-projects.vercel.app",
"*" # Allow all origins for development - remove in production
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Enhanced classification labels with descriptions
labels = {
0: {"name": "No DR", "description": "No signs of diabetic retinopathy detected"},
1: {"name": "Mild", "description": "Mild nonproliferative diabetic retinopathy"},
2: {"name": "Moderate", "description": "Moderate nonproliferative diabetic retinopathy"},
3: {"name": "Severe", "description": "Severe nonproliferative diabetic retinopathy"},
4: {"name": "Proliferative DR", "description": "Proliferative diabetic retinopathy"},
}
# Use the imported clinical information instead of defining it here
dr_clinical_info = DR_CLINICAL_INFO
# Model metadata
MODEL_INFO = {
"name": "ClarirAI Model",
"version": "1.2.0",
"architecture": "Densenet121",
"accuracy": "89.8%",
"last_updated": "2025-04-01",
"input_size": [224, 224],
"color_channels": 3,
}
# OpenAI/OpenRouter configuration
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
OPENROUTER_API_BASE = "https://openrouter.ai/api/v1"
OPENROUTER_REFERER = os.environ.get("OPENROUTER_REFERER", "https://clarirai.example.com")
OPENROUTER_MODEL = os.environ.get("OPENROUTER_MODEL", "mistralai/mistral-7b-instruct") # Default to a free model
if OPENAI_AVAILABLE and OPENAI_API_KEY:
openai.api_key = OPENAI_API_KEY
openai.api_base = OPENROUTER_API_BASE
logger.info(f"OpenRouter configured with model: {OPENROUTER_MODEL}")
else:
logger.warning("OpenRouter not configured. AI consultation will not be available.")
try:
logger.info("Loading ONNX model...")
start_time = time.time()
session = onnxruntime.InferenceSession('model.onnx')
load_time = time.time() - start_time
logger.info(f"Model loaded successfully in {load_time:.2f} seconds")
MODEL_INFO["load_time_seconds"] = round(load_time, 2)
except Exception as e:
logger.error(f"Error loading model: {e}")
session = None
@app.get("/")
async def root():
"""
Root endpoint that provides API information
"""
return {
"name": "ClarirAI: Advanced Diabetic Retinopathy Analysis API",
"version": "2.0.0",
"description": "AI-powered diabetic retinopathy detection and analysis with enhanced metrics and consultation",
"endpoints": [
"/predict", "/health", "/model-info", "/consult", "/generate-report/{analysis_id}"
],
"documentation": "/docs"
}
@app.get("/health")
async def health_check():
"""Check the health status of the API and model"""
if session is None:
return JSONResponse(
status_code=503,
content={"status": "unhealthy", "message": "Model failed to load", "timestamp": datetime.now().isoformat()}
)
return {
"status": "healthy",
"service": "ClarirAI API",
"timestamp": datetime.now().isoformat(),
"model_loaded": session is not None
}
@app.get("/model-info")
async def get_model_info():
"""Get information about the model being used"""
if session is None:
raise HTTPException(status_code=503, detail="Model not available")
return {
"name": "ClarirAI Diabetic Retinopathy Classifier",
"version": "2.0.0",
"framework": "ONNX Runtime",
"input_shape": [1, 3, 224, 224],
"classes": [labels[i]["name"] for i in labels],
"preprocessing": "Resize to 224x224, normalize with ImageNet mean and std"
}
def transform_image(image):
"""Preprocess image for model inference with enhanced normalization"""
image = image.resize((224, 224))
img_array = np.array(image, dtype=np.float32) / 255.0
mean = np.array([0.5353, 0.3628, 0.2486], dtype=np.float32)
std = np.array([0.2126, 0.1586, 0.1401], dtype=np.float32)
img_array = (img_array - mean) / std
img_array = np.transpose(img_array, (2, 0, 1))
return np.expand_dims(img_array, axis=0).astype(np.float32)
def calculate_severity_index(probabilities):
"""Calculate a weighted severity index (0-100) based on class probabilities"""
# Weights increase with severity
weights = [0, 25, 50, 75, 100]
# Convert numpy values to Python native float
probs = [float(p) for p in probabilities]
severity_index = sum(weights[i] * probs[i] for i in range(5))
return round(severity_index, 1)
def get_confidence_level(probability):
"""Convert probability to a confidence level description"""
if probability >= 0.90:
return "Very High"
elif probability >= 0.75:
return "High"
elif probability >= 0.50:
return "Moderate"
elif probability >= 0.25:
return "Low"
else:
return "Very Low"
@app.post("/predict")
async def predict(file: UploadFile = File(...)):
"""
Predict diabetic retinopathy from retinal image with enhanced metrics
- **file**: Upload a retinal image file
Returns detailed classification with confidence levels, severity index, and recommendations
"""
analysis_id = str(uuid.uuid4())[:8] # Generate a unique ID for this analysis
logger.info(f"Analysis #{analysis_id}: Received image: {file.filename}, content-type: {file.content_type}")
if session is None:
raise HTTPException(status_code=503, detail="Model not available")
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="File provided is not an image")
try:
# Start timing for performance metrics
process_start = time.time()
image_data = await file.read()
input_img = Image.open(io.BytesIO(image_data)).convert("RGB")
input_tensor = transform_image(input_img)
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
logger.info(f"Analysis #{analysis_id}: Running inference")
inference_start = time.time()
prediction = session.run([output_name], {input_name: input_tensor})[0][0]
inference_time = time.time() - inference_start
exp_preds = np.exp(prediction - np.max(prediction))
probabilities = exp_preds / exp_preds.sum()
# Convert numpy array to Python list of floats
prob_list = [float(p) for p in probabilities]
# Calculate severity index
severity_index = calculate_severity_index(probabilities)
# Format detailed results with confidence levels
detailed_results = []
for i in labels:
prob = float(probabilities[i]) # Convert numpy.float32 to Python float
detailed_results.append({
"class": labels[i]["name"],
"description": labels[i]["description"],
"probability": round(prob, 4),
"percentage": round(prob * 100, 1),
"confidence_level": get_confidence_level(prob)
})
# Sort by probability (highest first)
detailed_results.sort(key=lambda x: x["probability"], reverse=True)
highest_class = detailed_results[0]["class"]
highest_prob = detailed_results[0]["probability"]
# Generate recommendation based on severity
recommendation = "Regular screening recommended."
if severity_index > 75:
recommendation = "Urgent ophthalmologist consultation required."
elif severity_index > 50:
recommendation = "Prompt ophthalmologist evaluation recommended."
elif severity_index > 25:
recommendation = "Follow-up with ophthalmologist advised."
# Get clinical information for the highest probability class
clinical_info = dr_clinical_info.get(highest_class, {})
# Calculate binary classification (DR vs No DR)
dr_probability = sum(float(probabilities[i]) for i in range(1, 5)) # Sum of all DR classes, convert to Python float
binary_classification = {
"no_dr": float(probabilities[0]), # Convert to Python float
"dr_detected": float(dr_probability),
"primary_assessment": "DR Detected" if dr_probability > float(probabilities[0]) else "No DR"
}
# Get suggested questions and generate answers if OpenAI is available
suggested_questions = clinical_info.get("suggested_questions", [])
question_answers = []
if OPENAI_AVAILABLE and OPENAI_API_KEY and suggested_questions:
try:
# Create an OpenAI client
client = openai.OpenAI(
api_key=OPENAI_API_KEY,
base_url=OPENROUTER_API_BASE,
default_headers={"HTTP-Referer": OPENROUTER_REFERER}
)
# Prepare context for the AI
context = f"""Patient Analysis Summary:
- Diagnosis: {highest_class} (Confidence: {get_confidence_level(highest_prob)})
- Severity Index: {severity_index}/100
- Clinical Findings: {clinical_info.get('findings', 'Not available')}
- Associated Risks: {clinical_info.get('risks', 'Not available')}
- Standard Recommendations: {clinical_info.get('recommendations', 'Not available')}
- Standard Follow-up: {clinical_info.get('follow_up', 'Not available')}
"""
# Generate answers for each suggested question
for question in suggested_questions:
try:
response = client.chat.completions.create(
model=OPENROUTER_MODEL,
messages=[
{"role": "system", "content": CONSULTATION_SYSTEM_PROMPT},
{"role": "user", "content": f"{context}\n\nPatient Question: {question}"}
]
)
answer = response.choices[0].message.content
question_answers.append({
"question": question,
"answer": answer
})
except Exception as e:
logger.warning(f"Error generating answer for question '{question}': {e}")
question_answers.append({
"question": question,
"answer": "Unable to generate answer at this time. Please try asking this question directly."
})
except Exception as e:
logger.warning(f"Error generating answers for suggested questions: {e}")
# Still include the questions even if answers couldn't be generated
question_answers = [{"question": q, "answer": None} for q in suggested_questions]
else:
# If OpenAI is not available, just include the questions without answers
question_answers = [{"question": q, "answer": None} for q in suggested_questions]
total_time = time.time() - process_start
logger.info(f"Analysis #{analysis_id}: Prediction complete: highest probability class = {highest_class} ({highest_prob:.2f})")
# Prepare response
response = {
"analysis_id": analysis_id,
"timestamp": datetime.now().isoformat(),
"detailed_classification": detailed_results,
"binary_classification": binary_classification,
"highest_probability_class": highest_class,
"severity_index": severity_index,
"recommendation": recommendation,
"clinical_information": clinical_info,
"ai_explanation": clinical_info.get("explanation", "No explanation available for this classification."),
"suggested_questions_with_answers": question_answers,
"performance": {
"inference_time_ms": round(inference_time * 1000, 2),
"total_processing_time_ms": round(total_time * 1000, 2)
}
}
# Cache the analysis result for later consultation
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
temp_file.write(image_data)
temp_file.close()
analysis_cache[analysis_id] = {
"result": response,
"image_path": temp_file.name,
"analysis_time": datetime.now().isoformat()
}
return response
except Exception as e:
logger.error(f"Analysis #{analysis_id}: Error processing image: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error processing image: {str(e)}")
@app.get("/analysis/{analysis_id}")
async def get_analysis(analysis_id: str):
"""Retrieve a previous analysis by its ID"""
if analysis_id not in analysis_cache:
raise HTTPException(status_code=404, detail=f"Analysis with ID {analysis_id} not found")
return analysis_cache[analysis_id]["result"]
@app.post("/consult")
async def get_ai_consultation(
analysis_id: str = Body(...),
question: Optional[str] = Body(None),
background_tasks: BackgroundTasks = None
):
"""
Get AI consultation based on analysis results
- analysis_id: ID of the previous analysis
- question: Optional specific question about the diagnosis (if not provided, general consultation is given)
"""
if not OPENAI_AVAILABLE or not OPENAI_API_KEY:
raise HTTPException(status_code=501, detail="AI consultation feature is not available. OpenAI package or API key not configured.")
if analysis_id not in analysis_cache:
raise HTTPException(status_code=404, detail=f"Analysis with ID {analysis_id} not found")
analysis_data = analysis_cache[analysis_id]["result"]
dr_class = analysis_data["highest_probability_class"]
severity_index = analysis_data["severity_index"]
clinical_info = dr_clinical_info.get(dr_class, {})
# Prepare context for the AI
context = f"""Patient Analysis Summary:
- Diagnosis: {dr_class} (Confidence: {get_confidence_level(analysis_data['detailed_classification'][0]['probability'])})
- Severity Index: {severity_index}/100
- Clinical Findings: {clinical_info.get('findings', 'Not available')}
- Associated Risks: {clinical_info.get('risks', 'Not available')}
- Standard Recommendations: {clinical_info.get('recommendations', 'Not available')}
- Standard Follow-up: {clinical_info.get('follow_up', 'Not available')}
"""
# Default question if none provided
if not question:
question = DEFAULT_QUESTIONS["general"].replace("my diagnosis", f"my diagnosis of {dr_class} diabetic retinopathy with a severity index of {severity_index}")
try:
# Create an OpenAI client with the API key and base URL
client = openai.OpenAI(
api_key=OPENAI_API_KEY,
base_url=OPENROUTER_API_BASE,
default_headers={"HTTP-Referer": OPENROUTER_REFERER}
)
# Call the LLM through OpenRouter using the new API format
response = client.chat.completions.create(
model=OPENROUTER_MODEL,
messages=[
{"role": "system", "content": CONSULTATION_SYSTEM_PROMPT},
{"role": "user", "content": f"{context}\n\nPatient Question: {question}"}
]
)
# Extract the content from the response using the new API format
consultation = response.choices[0].message.content
return {
"analysis_id": analysis_id,
"dr_class": dr_class,
"severity_index": severity_index,
"question": question,
"consultation": consultation,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting AI consultation: {e}")
raise HTTPException(status_code=500, detail=f"Error generating consultation: {str(e)}")
@app.get("/generate-report/{analysis_id}")
async def generate_report(analysis_id: str, include_consultation: bool = True):
"""
Generate a downloadable medical report based on analysis results
- analysis_id: ID of the previous analysis
- include_consultation: Whether to include AI consultation in the report
"""
if not DOCX_AVAILABLE:
raise HTTPException(status_code=501, detail="Report generation is not available. python-docx package not installed.")
if analysis_id not in analysis_cache:
raise HTTPException(status_code=404, detail=f"Analysis with ID {analysis_id} not found")
analysis_data = analysis_cache[analysis_id]["result"]
image_path = analysis_cache[analysis_id].get("image_path")
# Get consultation if requested and available
consultation_text = None
if include_consultation and OPENAI_AVAILABLE and OPENAI_API_KEY:
try:
consultation_response = await get_ai_consultation(analysis_id=analysis_id)
consultation_text = consultation_response.get("consultation")
except Exception as e:
logger.warning(f"Could not get consultation for report: {e}")
try:
# First create a DOCX file (for backup purposes)
doc = Document()
# Add header
doc.add_heading('ClarirAI Medical Report', 0)
doc.add_paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
doc.add_paragraph(f"Analysis ID: {analysis_id}")
doc.add_paragraph(f"Original Analysis Date: {analysis_data['timestamp']}")
# Add the analyzed image if available
if image_path and os.path.exists(image_path):
doc.add_heading('Analyzed Retinal Image', level=1)
doc.add_paragraph('Below is the retinal image that was analyzed:')
doc.add_picture(image_path, width=docx.shared.Inches(4.0))
# Add diagnosis section
doc.add_heading('Diabetic Retinopathy Assessment', level=1)
doc.add_paragraph(f"Primary Diagnosis: {analysis_data['highest_probability_class']}")
doc.add_paragraph(f"Severity Index: {analysis_data['severity_index']}/100")
# Add a page break before the detailed classification table
doc.add_page_break()
# Detailed classification table
doc.add_heading('Detailed Classification', level=2)
table = doc.add_table(rows=1, cols=4)
table.style = 'Table Grid'
hdr_cells = table.rows[0].cells
hdr_cells[0].text = 'Classification'
hdr_cells[1].text = 'Description'
hdr_cells[2].text = 'Probability'
hdr_cells[3].text = 'Confidence Level'
for item in analysis_data['detailed_classification']:
row_cells = table.add_row().cells
row_cells[0].text = item['class']
row_cells[1].text = item['description']
row_cells[2].text = f"{item['percentage']}%"
row_cells[3].text = item['confidence_level']
# Note: We can't add a drawing directly to DOCX using python-docx
# The bar chart will only be available in the PDF version
# Add clinical information
dr_class = analysis_data['highest_probability_class']
clinical_info = dr_clinical_info.get(dr_class, {})
if clinical_info:
doc.add_heading('Clinical Information', level=1)
doc.add_heading('Findings', level=2)
doc.add_paragraph(clinical_info.get('findings', 'Not available'))
doc.add_heading('Associated Risks', level=2)
doc.add_paragraph(clinical_info.get('risks', 'Not available'))
doc.add_heading('Recommendations', level=2)
doc.add_paragraph(clinical_info.get('recommendations', 'Not available'))
doc.add_heading('Follow-up', level=2)
doc.add_paragraph(clinical_info.get('follow_up', 'Not available'))
# Add AI explanation
doc.add_heading('AI Analysis Explanation', level=1)
doc.add_paragraph(clinical_info.get('explanation', 'No explanation available for this classification.'))
# Add AI consultation if available
if consultation_text:
doc.add_heading('AI-Generated Medical Consultation', level=1)
doc.add_paragraph(consultation_text)
# Add disclaimer
doc.add_heading('Disclaimer', level=1)
doc.add_paragraph('This report is generated using artificial intelligence and should not replace professional medical advice. The analysis and recommendations provided are based on automated image processing and AI consultation. Please consult with a qualified healthcare provider for proper diagnosis, treatment, and follow-up care.')
# Save DOCX to temp file (as backup)
temp_dir = tempfile.mkdtemp()
docx_path = os.path.join(temp_dir, f"report_{analysis_id}.docx")
doc.save(docx_path)
# Now generate PDF directly using ReportLab
try:
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image as RLImage, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib import colors
from reportlab.lib.units import inch
from io import BytesIO
from PIL import Image as PILImage
# Create PDF file
pdf_path = os.path.join(temp_dir, f"report_{analysis_id}.pdf")
pdf_doc = SimpleDocTemplate(pdf_path, pagesize=letter)
styles = getSampleStyleSheet()
# Create custom styles
title_style = ParagraphStyle(
'Title',
parent=styles['Title'],
fontSize=16,
spaceAfter=12
)
heading1_style = ParagraphStyle(
'Heading1',
parent=styles['Heading1'],
fontSize=14,
spaceAfter=10,
spaceBefore=10
)
heading2_style = ParagraphStyle(
'Heading2',
parent=styles['Heading2'],
fontSize=12,
spaceAfter=8,
spaceBefore=8
)
normal_style = styles["Normal"]
# Build PDF content
elements = []
# Title
elements.append(Paragraph('ClarirAI Medical Report', title_style))
elements.append(Spacer(1, 0.25*inch))
# Metadata
elements.append(Paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", normal_style))
elements.append(Paragraph(f"Analysis ID: {analysis_id}", normal_style))
elements.append(Paragraph(f"Original Analysis Date: {analysis_data['timestamp']}", normal_style))
elements.append(Spacer(1, 0.25*inch))
# Add the analyzed image if available
if image_path and os.path.exists(image_path):
elements.append(Paragraph('Analyzed Retinal Image', heading1_style))
elements.append(Paragraph('Below is the retinal image that was analyzed:', normal_style))
# Process the image for ReportLab
img = PILImage.open(image_path)
img_width, img_height = img.size
aspect = img_height / float(img_width)
rl_img_width = 4 * inch
rl_img_height = rl_img_width * aspect
# Convert PIL Image to ReportLab Image
img_buffer = BytesIO()
img.save(img_buffer, format='JPEG')
img_buffer.seek(0)
rl_img = RLImage(img_buffer, width=rl_img_width, height=rl_img_height)
elements.append(rl_img)
elements.append(Spacer(1, 0.25*inch))
# Diagnosis section
elements.append(Paragraph('Diabetic Retinopathy Assessment', heading1_style))
elements.append(Paragraph(f"Primary Diagnosis: {analysis_data['highest_probability_class']}", normal_style))
elements.append(Paragraph(f"Severity Index: {analysis_data['severity_index']}/100", normal_style))
elements.append(Spacer(1, 0.25*inch))
# Add a page break before the detailed classification table
elements.append(PageBreak())
# Detailed classification table
elements.append(Paragraph('Detailed Classification', heading2_style))
# Create table data
table_data = [
['Classification', 'Description', 'Probability', 'Confidence Level']
]
for item in analysis_data['detailed_classification']:
table_data.append([
item['class'],
item['description'],
f"{item['percentage']}%",
item['confidence_level']
])
# Create table with adjusted column widths - make description column wider
table = Table(table_data, colWidths=[1.2*inch, 3.0*inch, 0.8*inch, 1.2*inch])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
# Align description text to the left for better readability
('ALIGN', (1, 0), (1, -1), 'LEFT'),
# Add some padding for the description column
('LEFTPADDING', (1, 0), (1, -1), 6),
('RIGHTPADDING', (1, 0), (1, -1), 6),
]))
elements.append(table)
elements.append(Spacer(1, 0.25*inch))
# Add bar chart visualization
elements.append(Paragraph('Classification Probability Chart', heading2_style))
# Create the drawing with a proper size
drawing = Drawing(500, 250)
# Create the bar chart
chart = VerticalBarChart()
chart.x = 50
chart.y = 50
chart.height = 150
chart.width = 350
# Extract data for the chart
data = []
categories = []
for item in analysis_data['detailed_classification']:
data.append(item['percentage'])
categories.append(item['class'])
# Set chart data
chart.data = [data]
chart.categoryAxis.categoryNames = categories
chart.categoryAxis.labels.boxAnchor = 'ne'
chart.categoryAxis.labels.dx = -8
chart.categoryAxis.labels.dy = -2
chart.categoryAxis.labels.angle = 30
# Set value axis properties
chart.valueAxis.valueMin = 0
chart.valueAxis.valueMax = 100
chart.valueAxis.valueStep = 10
# Set bar properties
chart.bars[0].fillColor = colors.skyblue
chart.bars[0].strokeColor = colors.black
chart.bars[0].strokeWidth = 0.5
# Add a legend
legend = Legend()
legend.alignment = 'right'
legend.x = 400
legend.y = 150
legend.colorNamePairs = [(colors.skyblue, 'Probability (%)')]
# Add chart and legend to the drawing
drawing.add(chart)
drawing.add(legend)
# Add the drawing to the PDF
elements.append(drawing)
elements.append(Spacer(1, 0.25*inch))
# Clinical information
if clinical_info:
elements.append(Paragraph('Clinical Information', heading1_style))
elements.append(Paragraph('Findings', heading2_style))
elements.append(Paragraph(clinical_info.get('findings', 'Not available'), normal_style))
elements.append(Spacer(1, 0.15*inch))
elements.append(Paragraph('Associated Risks', heading2_style))
elements.append(Paragraph(clinical_info.get('risks', 'Not available'), normal_style))
elements.append(Spacer(1, 0.15*inch))
elements.append(Paragraph('Recommendations', heading2_style))
elements.append(Paragraph(clinical_info.get('recommendations', 'Not available'), normal_style))
elements.append(Spacer(1, 0.15*inch))
elements.append(Paragraph('Follow-up', heading2_style))
elements.append(Paragraph(clinical_info.get('follow_up', 'Not available'), normal_style))
elements.append(Spacer(1, 0.25*inch))
# AI explanation
elements.append(Paragraph('AI Analysis Explanation', heading1_style))
elements.append(Paragraph(clinical_info.get('explanation', 'No explanation available for this classification.'), normal_style))
elements.append(Spacer(1, 0.25*inch))
# AI consultation
if consultation_text:
elements.append(Paragraph('AI-Generated Medical Consultation', heading1_style))
# Process the consultation text to improve readability
# Split into paragraphs and format each one
paragraphs = consultation_text.split('\n\n')
if len(paragraphs) == 1: # If no paragraph breaks, try to create logical breaks
# Look for common section indicators
for marker in ['1.', '2.', '3.', '4.', 'Recommendations:', 'Follow-up:', 'Question:']:
consultation_text = consultation_text.replace(f"{marker}", f"\n\n{marker}")
# Try to break long paragraphs
paragraphs = []
current_text = consultation_text
while len(current_text) > 300: # Break long paragraphs
# Find a good break point (end of sentence) around 250-300 chars
break_point = 250
while break_point < len(current_text) and break_point < 350:
if current_text[break_point] in ['.', '!', '?'] and (
break_point + 1 >= len(current_text) or current_text[break_point + 1] == ' '
):
break_point += 1 # Include the space after period
break
break_point += 1
if break_point >= len(current_text) or break_point >= 350:
# If no good break found, just use the whole text
paragraphs.append(current_text)
break
paragraphs.append(current_text[:break_point])
current_text = current_text[break_point:].strip()
if current_text: # Add any remaining text
paragraphs.append(current_text)
else:
# Clean up any existing paragraphs
paragraphs = [p.strip() for p in paragraphs if p.strip()]
# Add each paragraph with proper spacing
for i, para in enumerate(paragraphs):
# Check if this is a numbered point or recommendation
if any(para.startswith(marker) for marker in ['1.', '2.', '3.', '4.', 'Recommendations:', 'Follow-up:']):
# Use a slightly different style for headings within the consultation
point_style = ParagraphStyle(
'ConsultationPoint',
parent=normal_style,
fontName='Helvetica-Bold',
spaceBefore=8
)
# Split into heading and content if possible
parts = para.split(':', 1)
if len(parts) > 1 and len(parts[0]) < 30: # It's likely a heading:content format
elements.append(Paragraph(parts[0] + ':', point_style))
elements.append(Paragraph(parts[1].strip(), normal_style))
else:
elements.append(Paragraph(para, point_style))
else:
elements.append(Paragraph(para, normal_style))
# Add a small space between paragraphs, but not after the last one
if i < len(paragraphs) - 1:
elements.append(Spacer(1, 0.1*inch))
elements.append(Spacer(1, 0.25*inch))
# Disclaimer
elements.append(Paragraph('Disclaimer', heading1_style))
elements.append(Paragraph('This report is generated using artificial intelligence and should not replace professional medical advice. The analysis and recommendations provided are based on automated image processing and AI consultation. Please consult with a qualified healthcare provider for proper diagnosis, treatment, and follow-up care.', normal_style))
# Build the PDF
pdf_doc.build(elements)
# Return the PDF file
return FileResponse(
path=pdf_path,
filename=f"ClarirAI_Report_{analysis_id}.pdf",
media_type="application/pdf"
)
except Exception as e:
logger.error(f"Error generating PDF with ReportLab: {e}")
# Fall back to DOCX if PDF generation fails
return FileResponse(
path=docx_path,
filename=f"ClarirAI_Report_{analysis_id}.docx",
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
except Exception as e:
logger.error(f"Error generating report: {e}")
raise HTTPException(status_code=500, detail=f"Error generating report: {str(e)}")
# Run the server
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |