iBrood-app / app.py
Rozu1726's picture
Update app.py
68ecae0 verified
from fastapi import FastAPI, UploadFile, File, Request
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from ultralytics import YOLO
import os
import logging
from PIL import Image
import io
import base64
import cv2
import numpy as np
# ==================== INITIALIZE APP ====================
app = FastAPI(title="iBrood Detection API", version="1.0.0")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ==================== CORS ====================
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ==================== LOAD MODELS ====================
# Set environment variables for headless operation
os.environ['DISPLAY'] = ':0'
os.environ['QT_QPA_PLATFORM'] = 'offscreen'
os.environ['OPENCV_IO_ENABLE_OPENEXR'] = '0'
# Queen Cell Model (Segmentation)
queen_model = None
# Brood Model (Object Detection)
brood_model = None
try:
import cv2
logger.info("OpenCV imported successfully")
# Load Queen Cell Model (best-seg.pt)
if os.path.exists('best-seg.pt'):
file_size = os.path.getsize('best-seg.pt')
logger.info(f"Queen model file size: {file_size} bytes")
if file_size > 1000:
queen_model = YOLO('best-seg.pt')
logger.info("Queen Cell model (best-seg.pt) loaded successfully")
else:
logger.error("Queen model file too small, likely corrupted")
else:
logger.error("Queen model file 'best-seg.pt' not found")
# Load Brood Model (best-od.pt)
if os.path.exists('best-od.pt'):
file_size = os.path.getsize('best-od.pt')
logger.info(f"Brood model file size: {file_size} bytes")
if file_size > 1000:
brood_model = YOLO('best-od.pt')
logger.info("Brood model (best-od.pt) loaded successfully")
else:
logger.error("Brood model file too small, likely corrupted")
else:
logger.error("Brood model file 'best-od.pt' not found")
except ImportError as e:
logger.error(f"OpenCV import error: {e}")
except Exception as e:
logger.error(f"Error loading models: {e}")
# ==================== CLASS CONFIGURATIONS ====================
# Queen Cell Classes
QUEEN_CLASS_NAMES = {
0: 'Capped Cell',
1: 'Failed Cell',
2: 'Matured Cell',
3: 'Open Cell',
4: 'Semi-Matured Cell'
}
QUEEN_COLORS = {
0: (0, 93, 253), # Orange for Capped
1: (0, 0, 255), # Red for Failed
2: (255, 0, 119), # Purple for Matured
3: (255, 0, 25), # Blue for Open
4: (236, 229, 10) # Cyan for Semi-Mature
}
# Brood Classes - ONLY 3 CLASSES
BROOD_CLASS_NAMES = {
0: 'egg',
1: 'larva',
2: 'pupa'
}
# Vibrant/Neon colors matching queen cell style (BGR format)
BROOD_COLORS = {
0: (0, 165, 255), # Orange for Egg (BGR)
1: (255, 255, 0), # Cyan for Larva (BGR)
2: (255, 0, 119) # Purple/Magenta for Pupa (BGR)
}
# Text colors for labels (matching box colors)
BROOD_TEXT_COLORS = {
0: (0, 165, 255), # Orange for Egg
1: (255, 255, 0), # Cyan for Larva
2: (255, 0, 119) # Purple for Pupa
}
BROOD_CLASS_ATTRIBUTES = {
"egg": {
"display_name": "Egg",
"description": "Early development stage - tiny white elongated shape",
"age": "1-3 days old",
"health": "HEALTHY"
},
"larva": {
"display_name": "Larva",
"description": "Active growth stage - C-shaped white grub",
"age": "3-8 days old",
"health": "HEALTHY"
},
"pupa": {
"display_name": "Pupa",
"description": "Pre-emergence stage - capped cell with developing bee",
"age": "8-21 days old",
"health": "HEALTHY"
}
}
# ==================== ROUTES ====================
@app.get("/", response_class=HTMLResponse)
async def home():
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>iBrood Detection API</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
.container { text-align: center; }
.upload-section { margin: 20px 0; padding: 20px; border: 2px dashed #ccc; }
button { background: #007bff; color: white; padding: 10px 20px; border: none; cursor: pointer; }
button:hover { background: #0056b3; }
.result { margin: 20px 0; padding: 10px; background: #f8f9fa; }
</style>
</head>
<body>
<div class="container">
<h1> iBrood Detection API</h1>
<p>Upload an image to detect queens or brood in beehive frames</p>
<div class="upload-section">
<h3>Queen Detection</h3>
<input type="file" id="queenFile" accept="image/*">
<button onclick="detectQueen()">Detect Queen</button>
<div id="queenResult" class="result"></div>
</div>
<div class="upload-section">
<h3>Brood Detection</h3>
<input type="file" id="broodFile" accept="image/*">
<button onclick="detectBrood()">Detect Brood</button>
<div id="broodResult" class="result"></div>
</div>
</div>
<script>
async function detectQueen() {
const fileInput = document.getElementById('queenFile');
const resultDiv = document.getElementById('queenResult');
if (!fileInput.files[0]) {
resultDiv.innerHTML = 'Please select an image first';
return;
}
const formData = new FormData();
formData.append('file', fileInput.files[0]);
resultDiv.innerHTML = 'Processing...';
try {
const response = await fetch('/queen_detect', {
method: 'POST',
body: formData
});
const result = await response.json();
resultDiv.innerHTML = `<strong>Result:</strong> ${result.count} queen(s) detected<br><pre>${JSON.stringify(result, null, 2)}</pre>`;
} catch (error) {
resultDiv.innerHTML = `Error: ${error.message}`;
}
}
async function detectBrood() {
const fileInput = document.getElementById('broodFile');
const resultDiv = document.getElementById('broodResult');
if (!fileInput.files[0]) {
resultDiv.innerHTML = 'Please select an image first';
return;
}
const formData = new FormData();
formData.append('file', fileInput.files[0]);
resultDiv.innerHTML = 'Processing...';
try {
const response = await fetch('/brood_detect', {
method: 'POST',
body: formData
});
const result = await response.json();
resultDiv.innerHTML = `<strong>Result:</strong> ${result.count} brood(s) detected<br><pre>${JSON.stringify(result, null, 2)}</pre>`;
} catch (error) {
resultDiv.innerHTML = `Error: ${error.message}`;
}
}
</script>
</body>
</html>
"""
return HTMLResponse(content=html_content)
@app.get("/health")
async def health_check():
# List files in current directory for debugging
current_dir_files = os.listdir('.') if os.path.exists('.') else []
return {
"status": "healthy",
"message": "iBrood Detection API is running",
"queen_model_loaded": queen_model is not None,
"brood_model_loaded": brood_model is not None,
"queen_model_file_exists": os.path.exists('best-seg.pt'),
"brood_model_file_exists": os.path.exists('best-od.pt'),
"files_in_directory": current_dir_files
}
# ==================== HELPER FUNCTIONS ====================
def optimize_image_for_inference(image, max_size=1280):
"""Resize image if too large to speed up inference"""
width, height = image.size
if max(width, height) > max_size:
ratio = max_size / max(width, height)
new_size = (int(width * ratio), int(height * ratio))
return image.resize(new_size, Image.LANCZOS), ratio
return image, 1.0
# ==================== DETECTION FUNCTIONS ====================
def process_queen_detection(results, original_image):
"""Process YOLO results for Queen Cell detection with segmentation masks"""
detections = []
img_array = np.array(original_image)
if len(img_array.shape) == 3:
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
for result in results:
if result.boxes is not None:
boxes_data = result.boxes
masks_data = result.masks if hasattr(result, 'masks') and result.masks is not None else None
for idx, box in enumerate(boxes_data):
cls = int(box.cls[0])
conf = float(box.conf[0])
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
detection = {
"confidence": conf,
"class": cls,
"bbox": [x1, y1, x2, y2]
}
# Extract segmentation mask if available
if masks_data is not None and idx < len(masks_data.data):
try:
mask = masks_data.data[idx].cpu().numpy()
mask_height, mask_width = mask.shape
# Convert mask to polygon points for frontend rendering
binary_mask = (mask * 255).astype(np.uint8)
contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
# Get the largest contour (main mask area)
largest_contour = max(contours, key=cv2.contourArea)
# Simplify the contour to reduce points
epsilon = 0.005 * cv2.arcLength(largest_contour, True)
approx = cv2.approxPolyDP(largest_contour, epsilon, True)
# Scale contour points to original image size
# PIL Image.size returns (width, height)
img_width, img_height = original_image.size
scale_x = img_width / mask_width
scale_y = img_height / mask_height
# Convert to list of [x, y] points
polygon_points = []
for point in approx:
px, py = point[0]
polygon_points.append([float(px * scale_x), float(py * scale_y)])
detection["mask"] = {
"type": "polygon",
"points": polygon_points,
"imageShape": [img_height, img_width]
}
except Exception as e:
logger.warning(f"Mask extraction failed: {e}")
detections.append(detection)
color = QUEEN_COLORS.get(cls, (255, 255, 255))
cv2.rectangle(img_array, (x1, y1), (x2, y2), color, 2)
label = f"{QUEEN_CLASS_NAMES.get(cls, 'Unknown')} {conf:.0%}"
cv2.putText(img_array, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
if len(img_array.shape) == 3:
img_array = cv2.cvtColor(img_array, cv2.COLOR_BGR2RGB)
annotated_image = Image.fromarray(img_array)
buffered = io.BytesIO()
annotated_image.save(buffered, format="JPEG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
return {
"detections": detections,
"count": len(detections),
"annotated_image": f"data:image/jpeg;base64,{img_base64}"
}
def process_brood_detection(results, original_image, show_labels=True):
"""Process YOLO results for Brood detection with health assessment"""
detections = []
counts = {"egg": 0, "larva": 0, "pupa": 0, "empty_comb": 0}
img_array = np.array(original_image)
if len(img_array.shape) == 3:
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
# Styling - THIN lines for clean modern look like the screenshot
thickness = 1 # Thin line
font_scale = 0.35 # Small font
font_thickness = 1
for result in results:
if result.boxes is not None:
boxes_data = result.boxes
for idx, box in enumerate(boxes_data):
cls = int(box.cls[0])
conf = float(box.conf[0])
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
class_name = BROOD_CLASS_NAMES.get(cls, 'unknown')
detection = {
"confidence": conf,
"class": cls,
"class_name": class_name,
"bbox": [x1, y1, x2, y2],
"attributes": BROOD_CLASS_ATTRIBUTES.get(class_name, {})
}
detections.append(detection)
# Update counts
if class_name in counts:
counts[class_name] += 1
elif class_name == "empty_comb":
counts["empty_comb"] += 1
# Draw THIN bounding box - clean modern style
color = BROOD_COLORS.get(cls, (255, 255, 255))
cv2.rectangle(img_array, (x1, y1), (x2, y2), color, thickness)
# Only draw labels if show_labels is True - JUST PERCENTAGE
if show_labels:
label = f"{int(conf * 100)}%" # Just percentage, no class name
text_color = BROOD_TEXT_COLORS.get(cls, (255, 255, 255))
# Position inside box at top-left corner
cv2.putText(img_array, label, (x1 + 2, y1 + 12), cv2.FONT_HERSHEY_SIMPLEX, font_scale, text_color, font_thickness)
if len(img_array.shape) == 3:
img_array = cv2.cvtColor(img_array, cv2.COLOR_BGR2RGB)
annotated_image = Image.fromarray(img_array)
buffered = io.BytesIO()
annotated_image.save(buffered, format="PNG") # PNG for better quality
img_base64 = base64.b64encode(buffered.getvalue()).decode()
# Calculate health assessment
total_brood = counts["egg"] + counts["larva"] + counts["pupa"]
# Try to get empty cells if available, otherwise fallback to only brood
total_cells = total_brood
if "empty_comb" in counts:
total_cells += counts["empty_comb"]
health_status = "UNKNOWN"
health_score = 0
recommendations = []
if total_cells > 0:
# Calculate brood coverage as a percentage
brood_coverage = (total_brood / total_cells) * 100
# Calculate health based on brood distribution (actual computation)
egg_ratio = counts["egg"] / total_brood if total_brood > 0 else 0
larva_ratio = counts["larva"] / total_brood if total_brood > 0 else 0
pupa_ratio = counts["pupa"] / total_brood if total_brood > 0 else 0
# Score for balanced distribution (max 60)
ideal = 1/3
balance_penalty = (
abs(egg_ratio - ideal) + abs(larva_ratio - ideal) + abs(pupa_ratio - ideal)
)
balance_score = max(0, 60 - int(balance_penalty * 90))
# Score for brood coverage (max 40 for 100% coverage)
brood_score = min(40, int(brood_coverage * 0.4))
# Penalty for missing any stage
missing_penalty = 0
if counts["egg"] == 0 or counts["larva"] == 0 or counts["pupa"] == 0:
missing_penalty = 15
health_score = max(0, balance_score + brood_score - missing_penalty)
# Assign status
if health_score >= 85:
health_status = "EXCELLENT"
recommendations.append("Colony is thriving with excellent brood pattern")
elif health_score >= 70:
health_status = "GOOD"
recommendations.append("Healthy brood pattern - continue regular monitoring")
elif health_score >= 50:
health_status = "FAIR"
recommendations.append("Moderate brood presence - check queen activity")
else:
health_status = "POOR"
recommendations.append("Low brood count - inspect for queen issues")
# Additional recommendations based on counts
if counts["egg"] == 0 and total_brood > 0:
recommendations.append("No eggs detected - verify queen is laying")
if counts["larva"] > counts["pupa"] * 3:
recommendations.append("High larva count - ensure adequate food supply")
if counts["pupa"] == 0 and counts["larva"] > 0:
recommendations.append("No pupae detected - monitor for development issues")
return {
"detections": detections,
"count": len(detections),
"counts": counts,
"health": {
"status": health_status,
"score": health_score,
"total_brood": total_brood,
"total_cells": total_cells
},
"recommendations": recommendations if recommendations else ["Continue regular monitoring"],
"annotated_image": f"data:image/png;base64,{img_base64}"
}
# ==================== QUEEN DETECTION ====================
@app.post("/queen_detect")
async def detect_queen(file: UploadFile = File(...)):
try:
if queen_model is None:
return JSONResponse({
"error": "Queen model not loaded",
"message": "Model file 'best-seg.pt' may be missing or corrupted."
}, status_code=500)
logger.info("Starting Queen Cell Detection...")
file_content = await file.read()
image = Image.open(io.BytesIO(file_content))
# Optimize for faster inference
optimized_image, _ = optimize_image_for_inference(image, max_size=1280)
results = queen_model(optimized_image, verbose=False)
response = process_queen_detection(results, optimized_image)
logger.info(f"Queen detection completed: {response['count']} detections")
return response
except Exception as e:
logger.error(f"Error in queen detection: {str(e)}")
return JSONResponse({"error": str(e)}, status_code=500)
def process_brood_detection_optimized(results, original_image, optimized_image, scale_ratio):
"""Optimized: Process YOLO results and generate both annotated versions in one pass"""
detections = []
counts = {"egg": 0, "larva": 0, "pupa": 0}
# Work on original image for output quality
img_array = np.array(original_image)
if len(img_array.shape) == 3:
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
img_with_labels = img_array.copy()
img_no_labels = img_array.copy()
thickness = 1
font_scale = 0.35
font_thickness = 1
# Estimate total detectable cells using grid approach
img_height, img_width = img_array.shape[:2]
# Assume average cell size ~40x40 pixels (adjust based on your images)
avg_cell_size = 40
grid_cols = img_width // avg_cell_size
grid_rows = img_height // avg_cell_size
estimated_total_cells = grid_cols * grid_rows
for result in results:
if result.boxes is not None:
for idx, box in enumerate(result.boxes):
cls = int(box.cls[0])
conf = float(box.conf[0])
# Scale coordinates back to original image size
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
if scale_ratio != 1.0:
x1 = int(x1 / scale_ratio)
y1 = int(y1 / scale_ratio)
x2 = int(x2 / scale_ratio)
y2 = int(y2 / scale_ratio)
class_name = BROOD_CLASS_NAMES.get(cls, 'unknown')
detection = {
"confidence": conf,
"class": cls,
"class_name": class_name,
"bbox": [x1, y1, x2, y2],
"attributes": BROOD_CLASS_ATTRIBUTES.get(class_name, {})
}
detections.append(detection)
if class_name in counts:
counts[class_name] += 1
color = BROOD_COLORS.get(cls, (255, 255, 255))
text_color = BROOD_TEXT_COLORS.get(cls, (255, 255, 255))
# Draw on both images
cv2.rectangle(img_no_labels, (x1, y1), (x2, y2), color, thickness)
cv2.rectangle(img_with_labels, (x1, y1), (x2, y2), color, thickness)
# Labels only on one version
label = f"{int(conf * 100)}%"
cv2.putText(img_with_labels, label, (x1 + 2, y1 + 12), cv2.FONT_HERSHEY_SIMPLEX, font_scale, text_color, font_thickness)
# Convert back to RGB
img_no_labels = cv2.cvtColor(img_no_labels, cv2.COLOR_BGR2RGB)
img_with_labels = cv2.cvtColor(img_with_labels, cv2.COLOR_BGR2RGB)
# Encode images
buf1 = io.BytesIO()
Image.fromarray(img_no_labels).save(buf1, format="PNG", optimize=True)
img_no_labels_b64 = base64.b64encode(buf1.getvalue()).decode()
buf2 = io.BytesIO()
Image.fromarray(img_with_labels).save(buf2, format="PNG", optimize=True)
img_with_labels_b64 = base64.b64encode(buf2.getvalue()).decode()
# Health assessment with DATA-DRIVEN brood coverage
total_brood = sum(counts.values())
health_status, health_score, brood_coverage, recommendations = "UNKNOWN", 0, 0, []
if total_brood > 0:
# DATA-DRIVEN: Brood Coverage = (Detected Brood / Estimated Total Cells) × 100
brood_coverage = min(100, round((total_brood / estimated_total_cells) * 100, 1))
egg_r = counts["egg"] / total_brood
larva_r = counts["larva"] / total_brood
pupa_r = counts["pupa"] / total_brood
# Base score for having brood present (30 points)
base_score = 30
# Score for total brood count (max 25 points)
count_score = min(25, int(total_brood * 0.8))
# Score for brood coverage (max 20 points)
coverage_score = min(20, int(brood_coverage * 2))
# Score for balanced distribution (max 25 points)
ideal = 1/3
balance_penalty = abs(egg_r - ideal) + abs(larva_r - ideal) + abs(pupa_r - ideal)
balance_score = max(0, 25 - int(balance_penalty * 40))
# Penalty for missing stages (only if NO eggs at all - critical issue)
missing_penalty = 15 if counts["egg"] == 0 else 0
health_score = max(0, min(100, base_score + count_score + coverage_score + balance_score - missing_penalty))
# Assign status based on score
if health_score >= 85:
health_status = "EXCELLENT"
recommendations.append("Colony is thriving with excellent brood pattern")
elif health_score >= 70:
health_status = "GOOD"
recommendations.append("Healthy brood pattern - continue regular monitoring")
elif health_score >= 50:
health_status = "FAIR"
recommendations.append("Moderate brood presence - check queen activity")
else:
health_status = "POOR"
recommendations.append("Try capturing the whole frame to detect more cells or ensure image is clear")
# Additional recommendations
if counts["egg"] == 0 and total_brood > 0:
recommendations.append("No eggs detected - verify queen is laying")
if counts["larva"] > counts["pupa"] * 3 and counts["pupa"] > 0:
recommendations.append("High larva count - ensure adequate food supply")
if counts["pupa"] == 0 and counts["larva"] > 0:
recommendations.append("No pupae detected - monitor for development issues")
return {
"detections": detections,
"count": len(detections),
"counts": counts,
"health": {"status": health_status, "score": health_score, "total_brood": total_brood, "total_cells": estimated_total_cells},
"broodCoverage": brood_coverage,
"recommendations": recommendations or ["Continue regular monitoring"],
"annotated_image": f"data:image/png;base64,{img_no_labels_b64}",
"annotated_image_with_labels": f"data:image/png;base64,{img_with_labels_b64}"
}
# ==================== BROOD DETECTION ====================
@app.post("/brood_detect")
async def detect_brood(file: UploadFile = File(...), show_labels: bool = False):
try:
if brood_model is None:
return JSONResponse({
"error": "Brood model not loaded",
"message": "Model file 'best-od.pt' may be missing or corrupted."
}, status_code=500)
logger.info("Starting Brood Detection...")
file_content = await file.read()
image = Image.open(io.BytesIO(file_content))
# Optimize image size for faster inference
optimized_image, scale_ratio = optimize_image_for_inference(image, max_size=1280)
# Run inference ONCE
results = brood_model(optimized_image, verbose=False)
# Process results and generate BOTH annotated versions in one pass
response = process_brood_detection_optimized(results, image, optimized_image, scale_ratio)
logger.info(f"Brood detection completed: {response['count']} detections")
return response
except Exception as e:
logger.error(f"Error in brood detection: {str(e)}")
return JSONResponse({"error": str(e)}, status_code=500)
# ==================== ANALYZE ENDPOINT (Frontend Compatibility) ====================
@app.post("/analyze")
async def analyze_image(request: Request):
"""
Compatibility endpoint for frontend - expects JSON with base64 image
Returns queen cell analysis with segmentation masks
"""
try:
logger.info("STARTING ANALYSIS FROM FRONTEND...")
data = await request.json()
image_data = data.get('image', '')
if not image_data or 'data:image' not in image_data:
return JSONResponse(
content={"error": "Invalid image format"},
status_code=400
)
# Extract base64 data
image_base64 = image_data.split(',')[1]
image_bytes = base64.b64decode(image_base64)
# Convert to PIL Image
image = Image.open(io.BytesIO(image_bytes))
img_width, img_height = image.size
if queen_model is None:
return JSONResponse(
content={"error": "Queen model not loaded"},
status_code=500
)
# Optimize image for faster inference
optimized_image, scale_ratio = optimize_image_for_inference(image, max_size=1280)
# Run YOLO inference with verbose=False for speed
results = queen_model(optimized_image, verbose=False)
cells = []
maturity_distribution = {
"open": 0,
"capped": 0,
"mature": 0,
"semiMature": 0,
"failed": 0
}
# Class info for recommendations
class_info = {
"Open Cell": {"days": 5, "desc": "Newly formed queen cell, larva visible", "maturity": 20},
"Capped Cell": {"days": 4, "desc": "Sealed cell, pupa developing inside", "maturity": 50},
"Semi-Matured Cell": {"days": 2, "desc": "Development progressing, darkening tip", "maturity": 75},
"Matured Cell": {"days": 1, "desc": "Ready to emerge, dark conical tip", "maturity": 95},
"Failed Cell": {"days": 0, "desc": "Development stopped, cell failed", "maturity": 0}
}
for result in results:
if result.boxes is not None:
boxes_data = result.boxes
masks_data = result.masks if hasattr(result, 'masks') and result.masks is not None else None
for idx, box in enumerate(boxes_data):
cls = int(box.cls[0])
conf = float(box.conf[0])
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
# Scale coordinates back to original image size
if scale_ratio != 1.0:
x1 = int(x1 / scale_ratio)
y1 = int(y1 / scale_ratio)
x2 = int(x2 / scale_ratio)
y2 = int(y2 / scale_ratio)
class_name = QUEEN_CLASS_NAMES.get(cls, 'Unknown')
info = class_info.get(class_name, {"days": 3, "desc": "Unknown cell type", "maturity": 50})
# Update distribution
if class_name == "Open Cell":
maturity_distribution["open"] += 1
elif class_name == "Capped Cell":
maturity_distribution["capped"] += 1
elif class_name == "Semi-Matured Cell":
maturity_distribution["semiMature"] += 1
elif class_name == "Matured Cell":
maturity_distribution["mature"] += 1
elif class_name == "Failed Cell":
maturity_distribution["failed"] += 1
cell = {
"id": idx + 1,
"type": class_name,
"confidence": round(conf * 100),
"bbox": [x1, y1, x2 - x1, y2 - y1], # Convert to [x, y, width, height]
"maturityPercentage": info["maturity"],
"estimatedHatchingDays": info["days"],
"description": info["desc"]
}
# Extract segmentation mask if available
if masks_data is not None and idx < len(masks_data.data):
try:
mask = masks_data.data[idx].cpu().numpy()
mask_height, mask_width = mask.shape
# Convert mask to polygon points
binary_mask = (mask * 255).astype(np.uint8)
contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
largest_contour = max(contours, key=cv2.contourArea)
# Less aggressive simplification for smoother masks
epsilon = 0.002 * cv2.arcLength(largest_contour, True)
approx = cv2.approxPolyDP(largest_contour, epsilon, True)
# Scale to original image size (accounting for optimization resize)
opt_w, opt_h = optimized_image.size
scale_x = (img_width / mask_width)
scale_y = (img_height / mask_height)
polygon_points = []
for point in approx:
px, py = point[0]
polygon_points.append([float(px * scale_x), float(py * scale_y)])
cell["mask"] = {
"type": "polygon",
"points": polygon_points,
"imageShape": [img_height, img_width]
}
except Exception as e:
logger.warning(f"Mask extraction failed for cell {idx + 1}: {e}")
cells.append(cell)
# Generate recommendations
recommendations = []
if maturity_distribution["mature"] > 0:
recommendations.append(f"{maturity_distribution['mature']} mature cell(s) ready to emerge - monitor closely!")
if maturity_distribution["semiMature"] > 0:
recommendations.append(f"{maturity_distribution['semiMature']} semi-mature cell(s) - emergence in 1-2 days")
if maturity_distribution["failed"] > 0:
recommendations.append(f"Remove {maturity_distribution['failed']} failed cell(s) to prevent disease")
if len(cells) > 5:
recommendations.append("High queen cell count detected - consider swarm prevention")
if not recommendations:
recommendations.append("Continue regular monitoring of queen cell development")
response = {
"totalQueenCells": len(cells),
"cells": cells,
"maturityDistribution": maturity_distribution,
"recommendations": recommendations,
"imagePreview": image_data
}
logger.info(f"Analysis complete: {len(cells)} cells detected")
return JSONResponse(content=response)
except Exception as e:
logger.error(f"Error in analyze endpoint: {str(e)}")
import traceback
traceback.print_exc()
return JSONResponse(
content={"error": str(e)},
status_code=500
)