Spaces:
Sleeping
Sleeping
| import io | |
| import logging | |
| import time | |
| from datetime import datetime | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Query, Body | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, FileResponse | |
| import onnxruntime | |
| import numpy as np | |
| from PIL import Image | |
| import uvicorn | |
| import uuid | |
| import json | |
| import os | |
| import tempfile | |
| from typing import Optional, List, Dict | |
| import subprocess | |
| import platform | |
| import docx | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image as RLImage, PageBreak | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib import colors | |
| from reportlab.lib.units import inch | |
| from io import BytesIO | |
| from PIL import Image as PILImage | |
| from reportlab.graphics.shapes import Drawing | |
| from reportlab.graphics.charts.barcharts import VerticalBarChart | |
| from reportlab.graphics.charts.legends import Legend | |
| # Import for .env file support | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() # Load environment variables from .env file | |
| ENV_LOADED = True | |
| except ImportError: | |
| ENV_LOADED = False | |
| logging.warning("python-dotenv package not installed. Using environment variables directly.") | |
| # Import for OpenRouter integration | |
| try: | |
| import openai | |
| OPENAI_AVAILABLE = True | |
| except ImportError: | |
| OPENAI_AVAILABLE = False | |
| logging.warning("OpenAI package not installed. AI consultation features will be disabled.") | |
| # Import for report generation | |
| try: | |
| from docx import Document | |
| DOCX_AVAILABLE = True | |
| except ImportError: | |
| DOCX_AVAILABLE = False | |
| logging.warning("python-docx package not installed. Report generation will be disabled.") | |
| # Import prompts and clinical information | |
| from prompts import DR_CLINICAL_INFO, CONSULTATION_SYSTEM_PROMPT, DEFAULT_QUESTIONS, REPORT_SYSTEM_PROMPT | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| ) | |
| logger = logging.getLogger("clarirai-api") | |
| # Store analysis results for later consultation | |
| analysis_cache = {} | |
| app = FastAPI( | |
| title="ClarirAI - Advanced Diabetic Retinopathy Analysis", | |
| description="Next-generation API for detecting and analyzing diabetic retinopathy from retinal images with enhanced metrics", | |
| version="2.0.0" | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[ | |
| "https://diabetes-detection-zeta.vercel.app", | |
| "https://diabetes-detection-harishvijayasarangank-gmailcoms-projects.vercel.app", | |
| "*" # Allow all origins for development - remove in production | |
| ], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Enhanced classification labels with descriptions | |
| labels = { | |
| 0: {"name": "No DR", "description": "No signs of diabetic retinopathy detected"}, | |
| 1: {"name": "Mild", "description": "Mild nonproliferative diabetic retinopathy"}, | |
| 2: {"name": "Moderate", "description": "Moderate nonproliferative diabetic retinopathy"}, | |
| 3: {"name": "Severe", "description": "Severe nonproliferative diabetic retinopathy"}, | |
| 4: {"name": "Proliferative DR", "description": "Proliferative diabetic retinopathy"}, | |
| } | |
| # Use the imported clinical information instead of defining it here | |
| dr_clinical_info = DR_CLINICAL_INFO | |
| # Model metadata | |
| MODEL_INFO = { | |
| "name": "ClarirAI Model", | |
| "version": "1.2.0", | |
| "architecture": "Densenet121", | |
| "accuracy": "89.8%", | |
| "last_updated": "2025-04-01", | |
| "input_size": [224, 224], | |
| "color_channels": 3, | |
| } | |
| # OpenAI/OpenRouter configuration | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") | |
| OPENROUTER_API_BASE = "https://openrouter.ai/api/v1" | |
| OPENROUTER_REFERER = os.environ.get("OPENROUTER_REFERER", "https://clarirai.example.com") | |
| OPENROUTER_MODEL = os.environ.get("OPENROUTER_MODEL", "mistralai/mistral-7b-instruct") # Default to a free model | |
| if OPENAI_AVAILABLE and OPENAI_API_KEY: | |
| openai.api_key = OPENAI_API_KEY | |
| openai.api_base = OPENROUTER_API_BASE | |
| logger.info(f"OpenRouter configured with model: {OPENROUTER_MODEL}") | |
| else: | |
| logger.warning("OpenRouter not configured. AI consultation will not be available.") | |
| try: | |
| logger.info("Loading ONNX model...") | |
| start_time = time.time() | |
| session = onnxruntime.InferenceSession('model.onnx') | |
| load_time = time.time() - start_time | |
| logger.info(f"Model loaded successfully in {load_time:.2f} seconds") | |
| MODEL_INFO["load_time_seconds"] = round(load_time, 2) | |
| except Exception as e: | |
| logger.error(f"Error loading model: {e}") | |
| session = None | |
| async def root(): | |
| """ | |
| Root endpoint that provides API information | |
| """ | |
| return { | |
| "name": "ClarirAI: Advanced Diabetic Retinopathy Analysis API", | |
| "version": "2.0.0", | |
| "description": "AI-powered diabetic retinopathy detection and analysis with enhanced metrics and consultation", | |
| "endpoints": [ | |
| "/predict", "/health", "/model-info", "/consult", "/generate-report/{analysis_id}" | |
| ], | |
| "documentation": "/docs" | |
| } | |
| async def health_check(): | |
| """Check the health status of the API and model""" | |
| if session is None: | |
| return JSONResponse( | |
| status_code=503, | |
| content={"status": "unhealthy", "message": "Model failed to load", "timestamp": datetime.now().isoformat()} | |
| ) | |
| return { | |
| "status": "healthy", | |
| "service": "ClarirAI API", | |
| "timestamp": datetime.now().isoformat(), | |
| "model_loaded": session is not None | |
| } | |
| async def get_model_info(): | |
| """Get information about the model being used""" | |
| if session is None: | |
| raise HTTPException(status_code=503, detail="Model not available") | |
| return { | |
| "name": "ClarirAI Diabetic Retinopathy Classifier", | |
| "version": "2.0.0", | |
| "framework": "ONNX Runtime", | |
| "input_shape": [1, 3, 224, 224], | |
| "classes": [labels[i]["name"] for i in labels], | |
| "preprocessing": "Resize to 224x224, normalize with ImageNet mean and std" | |
| } | |
| def transform_image(image): | |
| """Preprocess image for model inference with enhanced normalization""" | |
| image = image.resize((224, 224)) | |
| img_array = np.array(image, dtype=np.float32) / 255.0 | |
| mean = np.array([0.5353, 0.3628, 0.2486], dtype=np.float32) | |
| std = np.array([0.2126, 0.1586, 0.1401], dtype=np.float32) | |
| img_array = (img_array - mean) / std | |
| img_array = np.transpose(img_array, (2, 0, 1)) | |
| return np.expand_dims(img_array, axis=0).astype(np.float32) | |
| def calculate_severity_index(probabilities): | |
| """Calculate a weighted severity index (0-100) based on class probabilities""" | |
| # Weights increase with severity | |
| weights = [0, 25, 50, 75, 100] | |
| # Convert numpy values to Python native float | |
| probs = [float(p) for p in probabilities] | |
| severity_index = sum(weights[i] * probs[i] for i in range(5)) | |
| return round(severity_index, 1) | |
| def get_confidence_level(probability): | |
| """Convert probability to a confidence level description""" | |
| if probability >= 0.90: | |
| return "Very High" | |
| elif probability >= 0.75: | |
| return "High" | |
| elif probability >= 0.50: | |
| return "Moderate" | |
| elif probability >= 0.25: | |
| return "Low" | |
| else: | |
| return "Very Low" | |
| async def predict(file: UploadFile = File(...)): | |
| """ | |
| Predict diabetic retinopathy from retinal image with enhanced metrics | |
| - **file**: Upload a retinal image file | |
| Returns detailed classification with confidence levels, severity index, and recommendations | |
| """ | |
| analysis_id = str(uuid.uuid4())[:8] # Generate a unique ID for this analysis | |
| logger.info(f"Analysis #{analysis_id}: Received image: {file.filename}, content-type: {file.content_type}") | |
| if session is None: | |
| raise HTTPException(status_code=503, detail="Model not available") | |
| if not file.content_type.startswith("image/"): | |
| raise HTTPException(status_code=400, detail="File provided is not an image") | |
| try: | |
| # Start timing for performance metrics | |
| process_start = time.time() | |
| image_data = await file.read() | |
| input_img = Image.open(io.BytesIO(image_data)).convert("RGB") | |
| input_tensor = transform_image(input_img) | |
| input_name = session.get_inputs()[0].name | |
| output_name = session.get_outputs()[0].name | |
| logger.info(f"Analysis #{analysis_id}: Running inference") | |
| inference_start = time.time() | |
| prediction = session.run([output_name], {input_name: input_tensor})[0][0] | |
| inference_time = time.time() - inference_start | |
| exp_preds = np.exp(prediction - np.max(prediction)) | |
| probabilities = exp_preds / exp_preds.sum() | |
| # Convert numpy array to Python list of floats | |
| prob_list = [float(p) for p in probabilities] | |
| # Calculate severity index | |
| severity_index = calculate_severity_index(probabilities) | |
| # Format detailed results with confidence levels | |
| detailed_results = [] | |
| for i in labels: | |
| prob = float(probabilities[i]) # Convert numpy.float32 to Python float | |
| detailed_results.append({ | |
| "class": labels[i]["name"], | |
| "description": labels[i]["description"], | |
| "probability": round(prob, 4), | |
| "percentage": round(prob * 100, 1), | |
| "confidence_level": get_confidence_level(prob) | |
| }) | |
| # Sort by probability (highest first) | |
| detailed_results.sort(key=lambda x: x["probability"], reverse=True) | |
| highest_class = detailed_results[0]["class"] | |
| highest_prob = detailed_results[0]["probability"] | |
| # Generate recommendation based on severity | |
| recommendation = "Regular screening recommended." | |
| if severity_index > 75: | |
| recommendation = "Urgent ophthalmologist consultation required." | |
| elif severity_index > 50: | |
| recommendation = "Prompt ophthalmologist evaluation recommended." | |
| elif severity_index > 25: | |
| recommendation = "Follow-up with ophthalmologist advised." | |
| # Get clinical information for the highest probability class | |
| clinical_info = dr_clinical_info.get(highest_class, {}) | |
| # Calculate binary classification (DR vs No DR) | |
| dr_probability = sum(float(probabilities[i]) for i in range(1, 5)) # Sum of all DR classes, convert to Python float | |
| binary_classification = { | |
| "no_dr": float(probabilities[0]), # Convert to Python float | |
| "dr_detected": float(dr_probability), | |
| "primary_assessment": "DR Detected" if dr_probability > float(probabilities[0]) else "No DR" | |
| } | |
| # Get suggested questions and generate answers if OpenAI is available | |
| suggested_questions = clinical_info.get("suggested_questions", []) | |
| question_answers = [] | |
| if OPENAI_AVAILABLE and OPENAI_API_KEY and suggested_questions: | |
| try: | |
| # Create an OpenAI client | |
| client = openai.OpenAI( | |
| api_key=OPENAI_API_KEY, | |
| base_url=OPENROUTER_API_BASE, | |
| default_headers={"HTTP-Referer": OPENROUTER_REFERER} | |
| ) | |
| # Prepare context for the AI | |
| context = f"""Patient Analysis Summary: | |
| - Diagnosis: {highest_class} (Confidence: {get_confidence_level(highest_prob)}) | |
| - Severity Index: {severity_index}/100 | |
| - Clinical Findings: {clinical_info.get('findings', 'Not available')} | |
| - Associated Risks: {clinical_info.get('risks', 'Not available')} | |
| - Standard Recommendations: {clinical_info.get('recommendations', 'Not available')} | |
| - Standard Follow-up: {clinical_info.get('follow_up', 'Not available')} | |
| """ | |
| # Generate answers for each suggested question | |
| for question in suggested_questions: | |
| try: | |
| response = client.chat.completions.create( | |
| model=OPENROUTER_MODEL, | |
| messages=[ | |
| {"role": "system", "content": CONSULTATION_SYSTEM_PROMPT}, | |
| {"role": "user", "content": f"{context}\n\nPatient Question: {question}"} | |
| ] | |
| ) | |
| answer = response.choices[0].message.content | |
| question_answers.append({ | |
| "question": question, | |
| "answer": answer | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Error generating answer for question '{question}': {e}") | |
| question_answers.append({ | |
| "question": question, | |
| "answer": "Unable to generate answer at this time. Please try asking this question directly." | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Error generating answers for suggested questions: {e}") | |
| # Still include the questions even if answers couldn't be generated | |
| question_answers = [{"question": q, "answer": None} for q in suggested_questions] | |
| else: | |
| # If OpenAI is not available, just include the questions without answers | |
| question_answers = [{"question": q, "answer": None} for q in suggested_questions] | |
| total_time = time.time() - process_start | |
| logger.info(f"Analysis #{analysis_id}: Prediction complete: highest probability class = {highest_class} ({highest_prob:.2f})") | |
| # Prepare response | |
| response = { | |
| "analysis_id": analysis_id, | |
| "timestamp": datetime.now().isoformat(), | |
| "detailed_classification": detailed_results, | |
| "binary_classification": binary_classification, | |
| "highest_probability_class": highest_class, | |
| "severity_index": severity_index, | |
| "recommendation": recommendation, | |
| "clinical_information": clinical_info, | |
| "ai_explanation": clinical_info.get("explanation", "No explanation available for this classification."), | |
| "suggested_questions_with_answers": question_answers, | |
| "performance": { | |
| "inference_time_ms": round(inference_time * 1000, 2), | |
| "total_processing_time_ms": round(total_time * 1000, 2) | |
| } | |
| } | |
| # Cache the analysis result for later consultation | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") | |
| temp_file.write(image_data) | |
| temp_file.close() | |
| analysis_cache[analysis_id] = { | |
| "result": response, | |
| "image_path": temp_file.name, | |
| "analysis_time": datetime.now().isoformat() | |
| } | |
| return response | |
| except Exception as e: | |
| logger.error(f"Analysis #{analysis_id}: Error processing image: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"Error processing image: {str(e)}") | |
| async def get_analysis(analysis_id: str): | |
| """Retrieve a previous analysis by its ID""" | |
| if analysis_id not in analysis_cache: | |
| raise HTTPException(status_code=404, detail=f"Analysis with ID {analysis_id} not found") | |
| return analysis_cache[analysis_id]["result"] | |
| async def get_ai_consultation( | |
| analysis_id: str = Body(...), | |
| question: Optional[str] = Body(None), | |
| background_tasks: BackgroundTasks = None | |
| ): | |
| """ | |
| Get AI consultation based on analysis results | |
| - analysis_id: ID of the previous analysis | |
| - question: Optional specific question about the diagnosis (if not provided, general consultation is given) | |
| """ | |
| if not OPENAI_AVAILABLE or not OPENAI_API_KEY: | |
| raise HTTPException(status_code=501, detail="AI consultation feature is not available. OpenAI package or API key not configured.") | |
| if analysis_id not in analysis_cache: | |
| raise HTTPException(status_code=404, detail=f"Analysis with ID {analysis_id} not found") | |
| analysis_data = analysis_cache[analysis_id]["result"] | |
| dr_class = analysis_data["highest_probability_class"] | |
| severity_index = analysis_data["severity_index"] | |
| clinical_info = dr_clinical_info.get(dr_class, {}) | |
| # Prepare context for the AI | |
| context = f"""Patient Analysis Summary: | |
| - Diagnosis: {dr_class} (Confidence: {get_confidence_level(analysis_data['detailed_classification'][0]['probability'])}) | |
| - Severity Index: {severity_index}/100 | |
| - Clinical Findings: {clinical_info.get('findings', 'Not available')} | |
| - Associated Risks: {clinical_info.get('risks', 'Not available')} | |
| - Standard Recommendations: {clinical_info.get('recommendations', 'Not available')} | |
| - Standard Follow-up: {clinical_info.get('follow_up', 'Not available')} | |
| """ | |
| # Default question if none provided | |
| if not question: | |
| question = DEFAULT_QUESTIONS["general"].replace("my diagnosis", f"my diagnosis of {dr_class} diabetic retinopathy with a severity index of {severity_index}") | |
| try: | |
| # Create an OpenAI client with the API key and base URL | |
| client = openai.OpenAI( | |
| api_key=OPENAI_API_KEY, | |
| base_url=OPENROUTER_API_BASE, | |
| default_headers={"HTTP-Referer": OPENROUTER_REFERER} | |
| ) | |
| # Call the LLM through OpenRouter using the new API format | |
| response = client.chat.completions.create( | |
| model=OPENROUTER_MODEL, | |
| messages=[ | |
| {"role": "system", "content": CONSULTATION_SYSTEM_PROMPT}, | |
| {"role": "user", "content": f"{context}\n\nPatient Question: {question}"} | |
| ] | |
| ) | |
| # Extract the content from the response using the new API format | |
| consultation = response.choices[0].message.content | |
| return { | |
| "analysis_id": analysis_id, | |
| "dr_class": dr_class, | |
| "severity_index": severity_index, | |
| "question": question, | |
| "consultation": consultation, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting AI consultation: {e}") | |
| raise HTTPException(status_code=500, detail=f"Error generating consultation: {str(e)}") | |
| async def generate_report(analysis_id: str, include_consultation: bool = True): | |
| """ | |
| Generate a downloadable medical report based on analysis results | |
| - analysis_id: ID of the previous analysis | |
| - include_consultation: Whether to include AI consultation in the report | |
| """ | |
| if not DOCX_AVAILABLE: | |
| raise HTTPException(status_code=501, detail="Report generation is not available. python-docx package not installed.") | |
| if analysis_id not in analysis_cache: | |
| raise HTTPException(status_code=404, detail=f"Analysis with ID {analysis_id} not found") | |
| analysis_data = analysis_cache[analysis_id]["result"] | |
| image_path = analysis_cache[analysis_id].get("image_path") | |
| # Get consultation if requested and available | |
| consultation_text = None | |
| if include_consultation and OPENAI_AVAILABLE and OPENAI_API_KEY: | |
| try: | |
| consultation_response = await get_ai_consultation(analysis_id=analysis_id) | |
| consultation_text = consultation_response.get("consultation") | |
| except Exception as e: | |
| logger.warning(f"Could not get consultation for report: {e}") | |
| try: | |
| # First create a DOCX file (for backup purposes) | |
| doc = Document() | |
| # Add header | |
| doc.add_heading('ClarirAI Medical Report', 0) | |
| doc.add_paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| doc.add_paragraph(f"Analysis ID: {analysis_id}") | |
| doc.add_paragraph(f"Original Analysis Date: {analysis_data['timestamp']}") | |
| # Add the analyzed image if available | |
| if image_path and os.path.exists(image_path): | |
| doc.add_heading('Analyzed Retinal Image', level=1) | |
| doc.add_paragraph('Below is the retinal image that was analyzed:') | |
| doc.add_picture(image_path, width=docx.shared.Inches(4.0)) | |
| # Add diagnosis section | |
| doc.add_heading('Diabetic Retinopathy Assessment', level=1) | |
| doc.add_paragraph(f"Primary Diagnosis: {analysis_data['highest_probability_class']}") | |
| doc.add_paragraph(f"Severity Index: {analysis_data['severity_index']}/100") | |
| # Add a page break before the detailed classification table | |
| doc.add_page_break() | |
| # Detailed classification table | |
| doc.add_heading('Detailed Classification', level=2) | |
| table = doc.add_table(rows=1, cols=4) | |
| table.style = 'Table Grid' | |
| hdr_cells = table.rows[0].cells | |
| hdr_cells[0].text = 'Classification' | |
| hdr_cells[1].text = 'Description' | |
| hdr_cells[2].text = 'Probability' | |
| hdr_cells[3].text = 'Confidence Level' | |
| for item in analysis_data['detailed_classification']: | |
| row_cells = table.add_row().cells | |
| row_cells[0].text = item['class'] | |
| row_cells[1].text = item['description'] | |
| row_cells[2].text = f"{item['percentage']}%" | |
| row_cells[3].text = item['confidence_level'] | |
| # Note: We can't add a drawing directly to DOCX using python-docx | |
| # The bar chart will only be available in the PDF version | |
| # Add clinical information | |
| dr_class = analysis_data['highest_probability_class'] | |
| clinical_info = dr_clinical_info.get(dr_class, {}) | |
| if clinical_info: | |
| doc.add_heading('Clinical Information', level=1) | |
| doc.add_heading('Findings', level=2) | |
| doc.add_paragraph(clinical_info.get('findings', 'Not available')) | |
| doc.add_heading('Associated Risks', level=2) | |
| doc.add_paragraph(clinical_info.get('risks', 'Not available')) | |
| doc.add_heading('Recommendations', level=2) | |
| doc.add_paragraph(clinical_info.get('recommendations', 'Not available')) | |
| doc.add_heading('Follow-up', level=2) | |
| doc.add_paragraph(clinical_info.get('follow_up', 'Not available')) | |
| # Add AI explanation | |
| doc.add_heading('AI Analysis Explanation', level=1) | |
| doc.add_paragraph(clinical_info.get('explanation', 'No explanation available for this classification.')) | |
| # Add AI consultation if available | |
| if consultation_text: | |
| doc.add_heading('AI-Generated Medical Consultation', level=1) | |
| doc.add_paragraph(consultation_text) | |
| # Add disclaimer | |
| doc.add_heading('Disclaimer', level=1) | |
| doc.add_paragraph('This report is generated using artificial intelligence and should not replace professional medical advice. The analysis and recommendations provided are based on automated image processing and AI consultation. Please consult with a qualified healthcare provider for proper diagnosis, treatment, and follow-up care.') | |
| # Save DOCX to temp file (as backup) | |
| temp_dir = tempfile.mkdtemp() | |
| docx_path = os.path.join(temp_dir, f"report_{analysis_id}.docx") | |
| doc.save(docx_path) | |
| # Now generate PDF directly using ReportLab | |
| try: | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image as RLImage, PageBreak | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib import colors | |
| from reportlab.lib.units import inch | |
| from io import BytesIO | |
| from PIL import Image as PILImage | |
| # Create PDF file | |
| pdf_path = os.path.join(temp_dir, f"report_{analysis_id}.pdf") | |
| pdf_doc = SimpleDocTemplate(pdf_path, pagesize=letter) | |
| styles = getSampleStyleSheet() | |
| # Create custom styles | |
| title_style = ParagraphStyle( | |
| 'Title', | |
| parent=styles['Title'], | |
| fontSize=16, | |
| spaceAfter=12 | |
| ) | |
| heading1_style = ParagraphStyle( | |
| 'Heading1', | |
| parent=styles['Heading1'], | |
| fontSize=14, | |
| spaceAfter=10, | |
| spaceBefore=10 | |
| ) | |
| heading2_style = ParagraphStyle( | |
| 'Heading2', | |
| parent=styles['Heading2'], | |
| fontSize=12, | |
| spaceAfter=8, | |
| spaceBefore=8 | |
| ) | |
| normal_style = styles["Normal"] | |
| # Build PDF content | |
| elements = [] | |
| # Title | |
| elements.append(Paragraph('ClarirAI Medical Report', title_style)) | |
| elements.append(Spacer(1, 0.25*inch)) | |
| # Metadata | |
| elements.append(Paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", normal_style)) | |
| elements.append(Paragraph(f"Analysis ID: {analysis_id}", normal_style)) | |
| elements.append(Paragraph(f"Original Analysis Date: {analysis_data['timestamp']}", normal_style)) | |
| elements.append(Spacer(1, 0.25*inch)) | |
| # Add the analyzed image if available | |
| if image_path and os.path.exists(image_path): | |
| elements.append(Paragraph('Analyzed Retinal Image', heading1_style)) | |
| elements.append(Paragraph('Below is the retinal image that was analyzed:', normal_style)) | |
| # Process the image for ReportLab | |
| img = PILImage.open(image_path) | |
| img_width, img_height = img.size | |
| aspect = img_height / float(img_width) | |
| rl_img_width = 4 * inch | |
| rl_img_height = rl_img_width * aspect | |
| # Convert PIL Image to ReportLab Image | |
| img_buffer = BytesIO() | |
| img.save(img_buffer, format='JPEG') | |
| img_buffer.seek(0) | |
| rl_img = RLImage(img_buffer, width=rl_img_width, height=rl_img_height) | |
| elements.append(rl_img) | |
| elements.append(Spacer(1, 0.25*inch)) | |
| # Diagnosis section | |
| elements.append(Paragraph('Diabetic Retinopathy Assessment', heading1_style)) | |
| elements.append(Paragraph(f"Primary Diagnosis: {analysis_data['highest_probability_class']}", normal_style)) | |
| elements.append(Paragraph(f"Severity Index: {analysis_data['severity_index']}/100", normal_style)) | |
| elements.append(Spacer(1, 0.25*inch)) | |
| # Add a page break before the detailed classification table | |
| elements.append(PageBreak()) | |
| # Detailed classification table | |
| elements.append(Paragraph('Detailed Classification', heading2_style)) | |
| # Create table data | |
| table_data = [ | |
| ['Classification', 'Description', 'Probability', 'Confidence Level'] | |
| ] | |
| for item in analysis_data['detailed_classification']: | |
| table_data.append([ | |
| item['class'], | |
| item['description'], | |
| f"{item['percentage']}%", | |
| item['confidence_level'] | |
| ]) | |
| # Create table with adjusted column widths - make description column wider | |
| table = Table(table_data, colWidths=[1.2*inch, 3.0*inch, 0.8*inch, 1.2*inch]) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), | |
| ('ALIGN', (0, 0), (-1, -1), 'CENTER'), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('BOTTOMPADDING', (0, 0), (-1, 0), 12), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black), | |
| ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), | |
| # Align description text to the left for better readability | |
| ('ALIGN', (1, 0), (1, -1), 'LEFT'), | |
| # Add some padding for the description column | |
| ('LEFTPADDING', (1, 0), (1, -1), 6), | |
| ('RIGHTPADDING', (1, 0), (1, -1), 6), | |
| ])) | |
| elements.append(table) | |
| elements.append(Spacer(1, 0.25*inch)) | |
| # Add bar chart visualization | |
| elements.append(Paragraph('Classification Probability Chart', heading2_style)) | |
| # Create the drawing with a proper size | |
| drawing = Drawing(500, 250) | |
| # Create the bar chart | |
| chart = VerticalBarChart() | |
| chart.x = 50 | |
| chart.y = 50 | |
| chart.height = 150 | |
| chart.width = 350 | |
| # Extract data for the chart | |
| data = [] | |
| categories = [] | |
| for item in analysis_data['detailed_classification']: | |
| data.append(item['percentage']) | |
| categories.append(item['class']) | |
| # Set chart data | |
| chart.data = [data] | |
| chart.categoryAxis.categoryNames = categories | |
| chart.categoryAxis.labels.boxAnchor = 'ne' | |
| chart.categoryAxis.labels.dx = -8 | |
| chart.categoryAxis.labels.dy = -2 | |
| chart.categoryAxis.labels.angle = 30 | |
| # Set value axis properties | |
| chart.valueAxis.valueMin = 0 | |
| chart.valueAxis.valueMax = 100 | |
| chart.valueAxis.valueStep = 10 | |
| # Set bar properties | |
| chart.bars[0].fillColor = colors.skyblue | |
| chart.bars[0].strokeColor = colors.black | |
| chart.bars[0].strokeWidth = 0.5 | |
| # Add a legend | |
| legend = Legend() | |
| legend.alignment = 'right' | |
| legend.x = 400 | |
| legend.y = 150 | |
| legend.colorNamePairs = [(colors.skyblue, 'Probability (%)')] | |
| # Add chart and legend to the drawing | |
| drawing.add(chart) | |
| drawing.add(legend) | |
| # Add the drawing to the PDF | |
| elements.append(drawing) | |
| elements.append(Spacer(1, 0.25*inch)) | |
| # Clinical information | |
| if clinical_info: | |
| elements.append(Paragraph('Clinical Information', heading1_style)) | |
| elements.append(Paragraph('Findings', heading2_style)) | |
| elements.append(Paragraph(clinical_info.get('findings', 'Not available'), normal_style)) | |
| elements.append(Spacer(1, 0.15*inch)) | |
| elements.append(Paragraph('Associated Risks', heading2_style)) | |
| elements.append(Paragraph(clinical_info.get('risks', 'Not available'), normal_style)) | |
| elements.append(Spacer(1, 0.15*inch)) | |
| elements.append(Paragraph('Recommendations', heading2_style)) | |
| elements.append(Paragraph(clinical_info.get('recommendations', 'Not available'), normal_style)) | |
| elements.append(Spacer(1, 0.15*inch)) | |
| elements.append(Paragraph('Follow-up', heading2_style)) | |
| elements.append(Paragraph(clinical_info.get('follow_up', 'Not available'), normal_style)) | |
| elements.append(Spacer(1, 0.25*inch)) | |
| # AI explanation | |
| elements.append(Paragraph('AI Analysis Explanation', heading1_style)) | |
| elements.append(Paragraph(clinical_info.get('explanation', 'No explanation available for this classification.'), normal_style)) | |
| elements.append(Spacer(1, 0.25*inch)) | |
| # AI consultation | |
| if consultation_text: | |
| elements.append(Paragraph('AI-Generated Medical Consultation', heading1_style)) | |
| # Process the consultation text to improve readability | |
| # Split into paragraphs and format each one | |
| paragraphs = consultation_text.split('\n\n') | |
| if len(paragraphs) == 1: # If no paragraph breaks, try to create logical breaks | |
| # Look for common section indicators | |
| for marker in ['1.', '2.', '3.', '4.', 'Recommendations:', 'Follow-up:', 'Question:']: | |
| consultation_text = consultation_text.replace(f"{marker}", f"\n\n{marker}") | |
| # Try to break long paragraphs | |
| paragraphs = [] | |
| current_text = consultation_text | |
| while len(current_text) > 300: # Break long paragraphs | |
| # Find a good break point (end of sentence) around 250-300 chars | |
| break_point = 250 | |
| while break_point < len(current_text) and break_point < 350: | |
| if current_text[break_point] in ['.', '!', '?'] and ( | |
| break_point + 1 >= len(current_text) or current_text[break_point + 1] == ' ' | |
| ): | |
| break_point += 1 # Include the space after period | |
| break | |
| break_point += 1 | |
| if break_point >= len(current_text) or break_point >= 350: | |
| # If no good break found, just use the whole text | |
| paragraphs.append(current_text) | |
| break | |
| paragraphs.append(current_text[:break_point]) | |
| current_text = current_text[break_point:].strip() | |
| if current_text: # Add any remaining text | |
| paragraphs.append(current_text) | |
| else: | |
| # Clean up any existing paragraphs | |
| paragraphs = [p.strip() for p in paragraphs if p.strip()] | |
| # Add each paragraph with proper spacing | |
| for i, para in enumerate(paragraphs): | |
| # Check if this is a numbered point or recommendation | |
| if any(para.startswith(marker) for marker in ['1.', '2.', '3.', '4.', 'Recommendations:', 'Follow-up:']): | |
| # Use a slightly different style for headings within the consultation | |
| point_style = ParagraphStyle( | |
| 'ConsultationPoint', | |
| parent=normal_style, | |
| fontName='Helvetica-Bold', | |
| spaceBefore=8 | |
| ) | |
| # Split into heading and content if possible | |
| parts = para.split(':', 1) | |
| if len(parts) > 1 and len(parts[0]) < 30: # It's likely a heading:content format | |
| elements.append(Paragraph(parts[0] + ':', point_style)) | |
| elements.append(Paragraph(parts[1].strip(), normal_style)) | |
| else: | |
| elements.append(Paragraph(para, point_style)) | |
| else: | |
| elements.append(Paragraph(para, normal_style)) | |
| # Add a small space between paragraphs, but not after the last one | |
| if i < len(paragraphs) - 1: | |
| elements.append(Spacer(1, 0.1*inch)) | |
| elements.append(Spacer(1, 0.25*inch)) | |
| # Disclaimer | |
| elements.append(Paragraph('Disclaimer', heading1_style)) | |
| elements.append(Paragraph('This report is generated using artificial intelligence and should not replace professional medical advice. The analysis and recommendations provided are based on automated image processing and AI consultation. Please consult with a qualified healthcare provider for proper diagnosis, treatment, and follow-up care.', normal_style)) | |
| # Build the PDF | |
| pdf_doc.build(elements) | |
| # Return the PDF file | |
| return FileResponse( | |
| path=pdf_path, | |
| filename=f"ClarirAI_Report_{analysis_id}.pdf", | |
| media_type="application/pdf" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating PDF with ReportLab: {e}") | |
| # Fall back to DOCX if PDF generation fails | |
| return FileResponse( | |
| path=docx_path, | |
| filename=f"ClarirAI_Report_{analysis_id}.docx", | |
| media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating report: {e}") | |
| raise HTTPException(status_code=500, detail=f"Error generating report: {str(e)}") | |
| # Run the server | |
| if __name__ == "__main__": | |
| uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |