ClarirAI / main.py
Surajkumaar's picture
Upload main.py
6ba56bf verified
import io
import logging
import time
from datetime import datetime
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Query, Body
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse
import onnxruntime
import numpy as np
from PIL import Image
import uvicorn
import uuid
import json
import os
import tempfile
from typing import Optional, List, Dict
import subprocess
import platform
import docx
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image as RLImage, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib import colors
from reportlab.lib.units import inch
from io import BytesIO
from PIL import Image as PILImage
from reportlab.graphics.shapes import Drawing
from reportlab.graphics.charts.barcharts import VerticalBarChart
from reportlab.graphics.charts.legends import Legend
# Import for .env file support
try:
from dotenv import load_dotenv
load_dotenv() # Load environment variables from .env file
ENV_LOADED = True
except ImportError:
ENV_LOADED = False
logging.warning("python-dotenv package not installed. Using environment variables directly.")
# Import for OpenRouter integration
try:
import openai
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
logging.warning("OpenAI package not installed. AI consultation features will be disabled.")
# Import for report generation
try:
from docx import Document
DOCX_AVAILABLE = True
except ImportError:
DOCX_AVAILABLE = False
logging.warning("python-docx package not installed. Report generation will be disabled.")
# Import prompts and clinical information
from prompts import DR_CLINICAL_INFO, CONSULTATION_SYSTEM_PROMPT, DEFAULT_QUESTIONS, REPORT_SYSTEM_PROMPT
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("clarirai-api")
# Store analysis results for later consultation
analysis_cache = {}
app = FastAPI(
title="ClarirAI - Advanced Diabetic Retinopathy Analysis",
description="Next-generation API for detecting and analyzing diabetic retinopathy from retinal images with enhanced metrics",
version="2.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://diabetes-detection-zeta.vercel.app",
"https://diabetes-detection-harishvijayasarangank-gmailcoms-projects.vercel.app",
"*" # Allow all origins for development - remove in production
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Enhanced classification labels with descriptions
labels = {
0: {"name": "No DR", "description": "No signs of diabetic retinopathy detected"},
1: {"name": "Mild", "description": "Mild nonproliferative diabetic retinopathy"},
2: {"name": "Moderate", "description": "Moderate nonproliferative diabetic retinopathy"},
3: {"name": "Severe", "description": "Severe nonproliferative diabetic retinopathy"},
4: {"name": "Proliferative DR", "description": "Proliferative diabetic retinopathy"},
}
# Use the imported clinical information instead of defining it here
dr_clinical_info = DR_CLINICAL_INFO
# Model metadata
MODEL_INFO = {
"name": "ClarirAI Model",
"version": "1.2.0",
"architecture": "Densenet121",
"accuracy": "89.8%",
"last_updated": "2025-04-01",
"input_size": [224, 224],
"color_channels": 3,
}
# OpenAI/OpenRouter configuration
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
OPENROUTER_API_BASE = "https://openrouter.ai/api/v1"
OPENROUTER_REFERER = os.environ.get("OPENROUTER_REFERER", "https://clarirai.example.com")
OPENROUTER_MODEL = os.environ.get("OPENROUTER_MODEL", "mistralai/mistral-7b-instruct") # Default to a free model
if OPENAI_AVAILABLE and OPENAI_API_KEY:
openai.api_key = OPENAI_API_KEY
openai.api_base = OPENROUTER_API_BASE
logger.info(f"OpenRouter configured with model: {OPENROUTER_MODEL}")
else:
logger.warning("OpenRouter not configured. AI consultation will not be available.")
try:
logger.info("Loading ONNX model...")
start_time = time.time()
session = onnxruntime.InferenceSession('model.onnx')
load_time = time.time() - start_time
logger.info(f"Model loaded successfully in {load_time:.2f} seconds")
MODEL_INFO["load_time_seconds"] = round(load_time, 2)
except Exception as e:
logger.error(f"Error loading model: {e}")
session = None
@app.get("/")
async def root():
"""
Root endpoint that provides API information
"""
return {
"name": "ClarirAI: Advanced Diabetic Retinopathy Analysis API",
"version": "2.0.0",
"description": "AI-powered diabetic retinopathy detection and analysis with enhanced metrics and consultation",
"endpoints": [
"/predict", "/health", "/model-info", "/consult", "/generate-report/{analysis_id}"
],
"documentation": "/docs"
}
@app.get("/health")
async def health_check():
"""Check the health status of the API and model"""
if session is None:
return JSONResponse(
status_code=503,
content={"status": "unhealthy", "message": "Model failed to load", "timestamp": datetime.now().isoformat()}
)
return {
"status": "healthy",
"service": "ClarirAI API",
"timestamp": datetime.now().isoformat(),
"model_loaded": session is not None
}
@app.get("/model-info")
async def get_model_info():
"""Get information about the model being used"""
if session is None:
raise HTTPException(status_code=503, detail="Model not available")
return {
"name": "ClarirAI Diabetic Retinopathy Classifier",
"version": "2.0.0",
"framework": "ONNX Runtime",
"input_shape": [1, 3, 224, 224],
"classes": [labels[i]["name"] for i in labels],
"preprocessing": "Resize to 224x224, normalize with ImageNet mean and std"
}
def transform_image(image):
"""Preprocess image for model inference with enhanced normalization"""
image = image.resize((224, 224))
img_array = np.array(image, dtype=np.float32) / 255.0
mean = np.array([0.5353, 0.3628, 0.2486], dtype=np.float32)
std = np.array([0.2126, 0.1586, 0.1401], dtype=np.float32)
img_array = (img_array - mean) / std
img_array = np.transpose(img_array, (2, 0, 1))
return np.expand_dims(img_array, axis=0).astype(np.float32)
def calculate_severity_index(probabilities):
"""Calculate a weighted severity index (0-100) based on class probabilities"""
# Weights increase with severity
weights = [0, 25, 50, 75, 100]
# Convert numpy values to Python native float
probs = [float(p) for p in probabilities]
severity_index = sum(weights[i] * probs[i] for i in range(5))
return round(severity_index, 1)
def get_confidence_level(probability):
"""Convert probability to a confidence level description"""
if probability >= 0.90:
return "Very High"
elif probability >= 0.75:
return "High"
elif probability >= 0.50:
return "Moderate"
elif probability >= 0.25:
return "Low"
else:
return "Very Low"
@app.post("/predict")
async def predict(file: UploadFile = File(...)):
"""
Predict diabetic retinopathy from retinal image with enhanced metrics
- **file**: Upload a retinal image file
Returns detailed classification with confidence levels, severity index, and recommendations
"""
analysis_id = str(uuid.uuid4())[:8] # Generate a unique ID for this analysis
logger.info(f"Analysis #{analysis_id}: Received image: {file.filename}, content-type: {file.content_type}")
if session is None:
raise HTTPException(status_code=503, detail="Model not available")
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="File provided is not an image")
try:
# Start timing for performance metrics
process_start = time.time()
image_data = await file.read()
input_img = Image.open(io.BytesIO(image_data)).convert("RGB")
input_tensor = transform_image(input_img)
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
logger.info(f"Analysis #{analysis_id}: Running inference")
inference_start = time.time()
prediction = session.run([output_name], {input_name: input_tensor})[0][0]
inference_time = time.time() - inference_start
exp_preds = np.exp(prediction - np.max(prediction))
probabilities = exp_preds / exp_preds.sum()
# Convert numpy array to Python list of floats
prob_list = [float(p) for p in probabilities]
# Calculate severity index
severity_index = calculate_severity_index(probabilities)
# Format detailed results with confidence levels
detailed_results = []
for i in labels:
prob = float(probabilities[i]) # Convert numpy.float32 to Python float
detailed_results.append({
"class": labels[i]["name"],
"description": labels[i]["description"],
"probability": round(prob, 4),
"percentage": round(prob * 100, 1),
"confidence_level": get_confidence_level(prob)
})
# Sort by probability (highest first)
detailed_results.sort(key=lambda x: x["probability"], reverse=True)
highest_class = detailed_results[0]["class"]
highest_prob = detailed_results[0]["probability"]
# Generate recommendation based on severity
recommendation = "Regular screening recommended."
if severity_index > 75:
recommendation = "Urgent ophthalmologist consultation required."
elif severity_index > 50:
recommendation = "Prompt ophthalmologist evaluation recommended."
elif severity_index > 25:
recommendation = "Follow-up with ophthalmologist advised."
# Get clinical information for the highest probability class
clinical_info = dr_clinical_info.get(highest_class, {})
# Calculate binary classification (DR vs No DR)
dr_probability = sum(float(probabilities[i]) for i in range(1, 5)) # Sum of all DR classes, convert to Python float
binary_classification = {
"no_dr": float(probabilities[0]), # Convert to Python float
"dr_detected": float(dr_probability),
"primary_assessment": "DR Detected" if dr_probability > float(probabilities[0]) else "No DR"
}
# Get suggested questions and generate answers if OpenAI is available
suggested_questions = clinical_info.get("suggested_questions", [])
question_answers = []
if OPENAI_AVAILABLE and OPENAI_API_KEY and suggested_questions:
try:
# Create an OpenAI client
client = openai.OpenAI(
api_key=OPENAI_API_KEY,
base_url=OPENROUTER_API_BASE,
default_headers={"HTTP-Referer": OPENROUTER_REFERER}
)
# Prepare context for the AI
context = f"""Patient Analysis Summary:
- Diagnosis: {highest_class} (Confidence: {get_confidence_level(highest_prob)})
- Severity Index: {severity_index}/100
- Clinical Findings: {clinical_info.get('findings', 'Not available')}
- Associated Risks: {clinical_info.get('risks', 'Not available')}
- Standard Recommendations: {clinical_info.get('recommendations', 'Not available')}
- Standard Follow-up: {clinical_info.get('follow_up', 'Not available')}
"""
# Generate answers for each suggested question
for question in suggested_questions:
try:
response = client.chat.completions.create(
model=OPENROUTER_MODEL,
messages=[
{"role": "system", "content": CONSULTATION_SYSTEM_PROMPT},
{"role": "user", "content": f"{context}\n\nPatient Question: {question}"}
]
)
answer = response.choices[0].message.content
question_answers.append({
"question": question,
"answer": answer
})
except Exception as e:
logger.warning(f"Error generating answer for question '{question}': {e}")
question_answers.append({
"question": question,
"answer": "Unable to generate answer at this time. Please try asking this question directly."
})
except Exception as e:
logger.warning(f"Error generating answers for suggested questions: {e}")
# Still include the questions even if answers couldn't be generated
question_answers = [{"question": q, "answer": None} for q in suggested_questions]
else:
# If OpenAI is not available, just include the questions without answers
question_answers = [{"question": q, "answer": None} for q in suggested_questions]
total_time = time.time() - process_start
logger.info(f"Analysis #{analysis_id}: Prediction complete: highest probability class = {highest_class} ({highest_prob:.2f})")
# Prepare response
response = {
"analysis_id": analysis_id,
"timestamp": datetime.now().isoformat(),
"detailed_classification": detailed_results,
"binary_classification": binary_classification,
"highest_probability_class": highest_class,
"severity_index": severity_index,
"recommendation": recommendation,
"clinical_information": clinical_info,
"ai_explanation": clinical_info.get("explanation", "No explanation available for this classification."),
"suggested_questions_with_answers": question_answers,
"performance": {
"inference_time_ms": round(inference_time * 1000, 2),
"total_processing_time_ms": round(total_time * 1000, 2)
}
}
# Cache the analysis result for later consultation
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
temp_file.write(image_data)
temp_file.close()
analysis_cache[analysis_id] = {
"result": response,
"image_path": temp_file.name,
"analysis_time": datetime.now().isoformat()
}
return response
except Exception as e:
logger.error(f"Analysis #{analysis_id}: Error processing image: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error processing image: {str(e)}")
@app.get("/analysis/{analysis_id}")
async def get_analysis(analysis_id: str):
"""Retrieve a previous analysis by its ID"""
if analysis_id not in analysis_cache:
raise HTTPException(status_code=404, detail=f"Analysis with ID {analysis_id} not found")
return analysis_cache[analysis_id]["result"]
@app.post("/consult")
async def get_ai_consultation(
analysis_id: str = Body(...),
question: Optional[str] = Body(None),
background_tasks: BackgroundTasks = None
):
"""
Get AI consultation based on analysis results
- analysis_id: ID of the previous analysis
- question: Optional specific question about the diagnosis (if not provided, general consultation is given)
"""
if not OPENAI_AVAILABLE or not OPENAI_API_KEY:
raise HTTPException(status_code=501, detail="AI consultation feature is not available. OpenAI package or API key not configured.")
if analysis_id not in analysis_cache:
raise HTTPException(status_code=404, detail=f"Analysis with ID {analysis_id} not found")
analysis_data = analysis_cache[analysis_id]["result"]
dr_class = analysis_data["highest_probability_class"]
severity_index = analysis_data["severity_index"]
clinical_info = dr_clinical_info.get(dr_class, {})
# Prepare context for the AI
context = f"""Patient Analysis Summary:
- Diagnosis: {dr_class} (Confidence: {get_confidence_level(analysis_data['detailed_classification'][0]['probability'])})
- Severity Index: {severity_index}/100
- Clinical Findings: {clinical_info.get('findings', 'Not available')}
- Associated Risks: {clinical_info.get('risks', 'Not available')}
- Standard Recommendations: {clinical_info.get('recommendations', 'Not available')}
- Standard Follow-up: {clinical_info.get('follow_up', 'Not available')}
"""
# Default question if none provided
if not question:
question = DEFAULT_QUESTIONS["general"].replace("my diagnosis", f"my diagnosis of {dr_class} diabetic retinopathy with a severity index of {severity_index}")
try:
# Create an OpenAI client with the API key and base URL
client = openai.OpenAI(
api_key=OPENAI_API_KEY,
base_url=OPENROUTER_API_BASE,
default_headers={"HTTP-Referer": OPENROUTER_REFERER}
)
# Call the LLM through OpenRouter using the new API format
response = client.chat.completions.create(
model=OPENROUTER_MODEL,
messages=[
{"role": "system", "content": CONSULTATION_SYSTEM_PROMPT},
{"role": "user", "content": f"{context}\n\nPatient Question: {question}"}
]
)
# Extract the content from the response using the new API format
consultation = response.choices[0].message.content
return {
"analysis_id": analysis_id,
"dr_class": dr_class,
"severity_index": severity_index,
"question": question,
"consultation": consultation,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting AI consultation: {e}")
raise HTTPException(status_code=500, detail=f"Error generating consultation: {str(e)}")
@app.get("/generate-report/{analysis_id}")
async def generate_report(analysis_id: str, include_consultation: bool = True):
"""
Generate a downloadable medical report based on analysis results
- analysis_id: ID of the previous analysis
- include_consultation: Whether to include AI consultation in the report
"""
if not DOCX_AVAILABLE:
raise HTTPException(status_code=501, detail="Report generation is not available. python-docx package not installed.")
if analysis_id not in analysis_cache:
raise HTTPException(status_code=404, detail=f"Analysis with ID {analysis_id} not found")
analysis_data = analysis_cache[analysis_id]["result"]
image_path = analysis_cache[analysis_id].get("image_path")
# Get consultation if requested and available
consultation_text = None
if include_consultation and OPENAI_AVAILABLE and OPENAI_API_KEY:
try:
consultation_response = await get_ai_consultation(analysis_id=analysis_id)
consultation_text = consultation_response.get("consultation")
except Exception as e:
logger.warning(f"Could not get consultation for report: {e}")
try:
# First create a DOCX file (for backup purposes)
doc = Document()
# Add header
doc.add_heading('ClarirAI Medical Report', 0)
doc.add_paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
doc.add_paragraph(f"Analysis ID: {analysis_id}")
doc.add_paragraph(f"Original Analysis Date: {analysis_data['timestamp']}")
# Add the analyzed image if available
if image_path and os.path.exists(image_path):
doc.add_heading('Analyzed Retinal Image', level=1)
doc.add_paragraph('Below is the retinal image that was analyzed:')
doc.add_picture(image_path, width=docx.shared.Inches(4.0))
# Add diagnosis section
doc.add_heading('Diabetic Retinopathy Assessment', level=1)
doc.add_paragraph(f"Primary Diagnosis: {analysis_data['highest_probability_class']}")
doc.add_paragraph(f"Severity Index: {analysis_data['severity_index']}/100")
# Add a page break before the detailed classification table
doc.add_page_break()
# Detailed classification table
doc.add_heading('Detailed Classification', level=2)
table = doc.add_table(rows=1, cols=4)
table.style = 'Table Grid'
hdr_cells = table.rows[0].cells
hdr_cells[0].text = 'Classification'
hdr_cells[1].text = 'Description'
hdr_cells[2].text = 'Probability'
hdr_cells[3].text = 'Confidence Level'
for item in analysis_data['detailed_classification']:
row_cells = table.add_row().cells
row_cells[0].text = item['class']
row_cells[1].text = item['description']
row_cells[2].text = f"{item['percentage']}%"
row_cells[3].text = item['confidence_level']
# Note: We can't add a drawing directly to DOCX using python-docx
# The bar chart will only be available in the PDF version
# Add clinical information
dr_class = analysis_data['highest_probability_class']
clinical_info = dr_clinical_info.get(dr_class, {})
if clinical_info:
doc.add_heading('Clinical Information', level=1)
doc.add_heading('Findings', level=2)
doc.add_paragraph(clinical_info.get('findings', 'Not available'))
doc.add_heading('Associated Risks', level=2)
doc.add_paragraph(clinical_info.get('risks', 'Not available'))
doc.add_heading('Recommendations', level=2)
doc.add_paragraph(clinical_info.get('recommendations', 'Not available'))
doc.add_heading('Follow-up', level=2)
doc.add_paragraph(clinical_info.get('follow_up', 'Not available'))
# Add AI explanation
doc.add_heading('AI Analysis Explanation', level=1)
doc.add_paragraph(clinical_info.get('explanation', 'No explanation available for this classification.'))
# Add AI consultation if available
if consultation_text:
doc.add_heading('AI-Generated Medical Consultation', level=1)
doc.add_paragraph(consultation_text)
# Add disclaimer
doc.add_heading('Disclaimer', level=1)
doc.add_paragraph('This report is generated using artificial intelligence and should not replace professional medical advice. The analysis and recommendations provided are based on automated image processing and AI consultation. Please consult with a qualified healthcare provider for proper diagnosis, treatment, and follow-up care.')
# Save DOCX to temp file (as backup)
temp_dir = tempfile.mkdtemp()
docx_path = os.path.join(temp_dir, f"report_{analysis_id}.docx")
doc.save(docx_path)
# Now generate PDF directly using ReportLab
try:
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image as RLImage, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib import colors
from reportlab.lib.units import inch
from io import BytesIO
from PIL import Image as PILImage
# Create PDF file
pdf_path = os.path.join(temp_dir, f"report_{analysis_id}.pdf")
pdf_doc = SimpleDocTemplate(pdf_path, pagesize=letter)
styles = getSampleStyleSheet()
# Create custom styles
title_style = ParagraphStyle(
'Title',
parent=styles['Title'],
fontSize=16,
spaceAfter=12
)
heading1_style = ParagraphStyle(
'Heading1',
parent=styles['Heading1'],
fontSize=14,
spaceAfter=10,
spaceBefore=10
)
heading2_style = ParagraphStyle(
'Heading2',
parent=styles['Heading2'],
fontSize=12,
spaceAfter=8,
spaceBefore=8
)
normal_style = styles["Normal"]
# Build PDF content
elements = []
# Title
elements.append(Paragraph('ClarirAI Medical Report', title_style))
elements.append(Spacer(1, 0.25*inch))
# Metadata
elements.append(Paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", normal_style))
elements.append(Paragraph(f"Analysis ID: {analysis_id}", normal_style))
elements.append(Paragraph(f"Original Analysis Date: {analysis_data['timestamp']}", normal_style))
elements.append(Spacer(1, 0.25*inch))
# Add the analyzed image if available
if image_path and os.path.exists(image_path):
elements.append(Paragraph('Analyzed Retinal Image', heading1_style))
elements.append(Paragraph('Below is the retinal image that was analyzed:', normal_style))
# Process the image for ReportLab
img = PILImage.open(image_path)
img_width, img_height = img.size
aspect = img_height / float(img_width)
rl_img_width = 4 * inch
rl_img_height = rl_img_width * aspect
# Convert PIL Image to ReportLab Image
img_buffer = BytesIO()
img.save(img_buffer, format='JPEG')
img_buffer.seek(0)
rl_img = RLImage(img_buffer, width=rl_img_width, height=rl_img_height)
elements.append(rl_img)
elements.append(Spacer(1, 0.25*inch))
# Diagnosis section
elements.append(Paragraph('Diabetic Retinopathy Assessment', heading1_style))
elements.append(Paragraph(f"Primary Diagnosis: {analysis_data['highest_probability_class']}", normal_style))
elements.append(Paragraph(f"Severity Index: {analysis_data['severity_index']}/100", normal_style))
elements.append(Spacer(1, 0.25*inch))
# Add a page break before the detailed classification table
elements.append(PageBreak())
# Detailed classification table
elements.append(Paragraph('Detailed Classification', heading2_style))
# Create table data
table_data = [
['Classification', 'Description', 'Probability', 'Confidence Level']
]
for item in analysis_data['detailed_classification']:
table_data.append([
item['class'],
item['description'],
f"{item['percentage']}%",
item['confidence_level']
])
# Create table with adjusted column widths - make description column wider
table = Table(table_data, colWidths=[1.2*inch, 3.0*inch, 0.8*inch, 1.2*inch])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
# Align description text to the left for better readability
('ALIGN', (1, 0), (1, -1), 'LEFT'),
# Add some padding for the description column
('LEFTPADDING', (1, 0), (1, -1), 6),
('RIGHTPADDING', (1, 0), (1, -1), 6),
]))
elements.append(table)
elements.append(Spacer(1, 0.25*inch))
# Add bar chart visualization
elements.append(Paragraph('Classification Probability Chart', heading2_style))
# Create the drawing with a proper size
drawing = Drawing(500, 250)
# Create the bar chart
chart = VerticalBarChart()
chart.x = 50
chart.y = 50
chart.height = 150
chart.width = 350
# Extract data for the chart
data = []
categories = []
for item in analysis_data['detailed_classification']:
data.append(item['percentage'])
categories.append(item['class'])
# Set chart data
chart.data = [data]
chart.categoryAxis.categoryNames = categories
chart.categoryAxis.labels.boxAnchor = 'ne'
chart.categoryAxis.labels.dx = -8
chart.categoryAxis.labels.dy = -2
chart.categoryAxis.labels.angle = 30
# Set value axis properties
chart.valueAxis.valueMin = 0
chart.valueAxis.valueMax = 100
chart.valueAxis.valueStep = 10
# Set bar properties
chart.bars[0].fillColor = colors.skyblue
chart.bars[0].strokeColor = colors.black
chart.bars[0].strokeWidth = 0.5
# Add a legend
legend = Legend()
legend.alignment = 'right'
legend.x = 400
legend.y = 150
legend.colorNamePairs = [(colors.skyblue, 'Probability (%)')]
# Add chart and legend to the drawing
drawing.add(chart)
drawing.add(legend)
# Add the drawing to the PDF
elements.append(drawing)
elements.append(Spacer(1, 0.25*inch))
# Clinical information
if clinical_info:
elements.append(Paragraph('Clinical Information', heading1_style))
elements.append(Paragraph('Findings', heading2_style))
elements.append(Paragraph(clinical_info.get('findings', 'Not available'), normal_style))
elements.append(Spacer(1, 0.15*inch))
elements.append(Paragraph('Associated Risks', heading2_style))
elements.append(Paragraph(clinical_info.get('risks', 'Not available'), normal_style))
elements.append(Spacer(1, 0.15*inch))
elements.append(Paragraph('Recommendations', heading2_style))
elements.append(Paragraph(clinical_info.get('recommendations', 'Not available'), normal_style))
elements.append(Spacer(1, 0.15*inch))
elements.append(Paragraph('Follow-up', heading2_style))
elements.append(Paragraph(clinical_info.get('follow_up', 'Not available'), normal_style))
elements.append(Spacer(1, 0.25*inch))
# AI explanation
elements.append(Paragraph('AI Analysis Explanation', heading1_style))
elements.append(Paragraph(clinical_info.get('explanation', 'No explanation available for this classification.'), normal_style))
elements.append(Spacer(1, 0.25*inch))
# AI consultation
if consultation_text:
elements.append(Paragraph('AI-Generated Medical Consultation', heading1_style))
# Process the consultation text to improve readability
# Split into paragraphs and format each one
paragraphs = consultation_text.split('\n\n')
if len(paragraphs) == 1: # If no paragraph breaks, try to create logical breaks
# Look for common section indicators
for marker in ['1.', '2.', '3.', '4.', 'Recommendations:', 'Follow-up:', 'Question:']:
consultation_text = consultation_text.replace(f"{marker}", f"\n\n{marker}")
# Try to break long paragraphs
paragraphs = []
current_text = consultation_text
while len(current_text) > 300: # Break long paragraphs
# Find a good break point (end of sentence) around 250-300 chars
break_point = 250
while break_point < len(current_text) and break_point < 350:
if current_text[break_point] in ['.', '!', '?'] and (
break_point + 1 >= len(current_text) or current_text[break_point + 1] == ' '
):
break_point += 1 # Include the space after period
break
break_point += 1
if break_point >= len(current_text) or break_point >= 350:
# If no good break found, just use the whole text
paragraphs.append(current_text)
break
paragraphs.append(current_text[:break_point])
current_text = current_text[break_point:].strip()
if current_text: # Add any remaining text
paragraphs.append(current_text)
else:
# Clean up any existing paragraphs
paragraphs = [p.strip() for p in paragraphs if p.strip()]
# Add each paragraph with proper spacing
for i, para in enumerate(paragraphs):
# Check if this is a numbered point or recommendation
if any(para.startswith(marker) for marker in ['1.', '2.', '3.', '4.', 'Recommendations:', 'Follow-up:']):
# Use a slightly different style for headings within the consultation
point_style = ParagraphStyle(
'ConsultationPoint',
parent=normal_style,
fontName='Helvetica-Bold',
spaceBefore=8
)
# Split into heading and content if possible
parts = para.split(':', 1)
if len(parts) > 1 and len(parts[0]) < 30: # It's likely a heading:content format
elements.append(Paragraph(parts[0] + ':', point_style))
elements.append(Paragraph(parts[1].strip(), normal_style))
else:
elements.append(Paragraph(para, point_style))
else:
elements.append(Paragraph(para, normal_style))
# Add a small space between paragraphs, but not after the last one
if i < len(paragraphs) - 1:
elements.append(Spacer(1, 0.1*inch))
elements.append(Spacer(1, 0.25*inch))
# Disclaimer
elements.append(Paragraph('Disclaimer', heading1_style))
elements.append(Paragraph('This report is generated using artificial intelligence and should not replace professional medical advice. The analysis and recommendations provided are based on automated image processing and AI consultation. Please consult with a qualified healthcare provider for proper diagnosis, treatment, and follow-up care.', normal_style))
# Build the PDF
pdf_doc.build(elements)
# Return the PDF file
return FileResponse(
path=pdf_path,
filename=f"ClarirAI_Report_{analysis_id}.pdf",
media_type="application/pdf"
)
except Exception as e:
logger.error(f"Error generating PDF with ReportLab: {e}")
# Fall back to DOCX if PDF generation fails
return FileResponse(
path=docx_path,
filename=f"ClarirAI_Report_{analysis_id}.docx",
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
except Exception as e:
logger.error(f"Error generating report: {e}")
raise HTTPException(status_code=500, detail=f"Error generating report: {str(e)}")
# Run the server
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)