""" Queen verification cascade. Two strategies, in priority order: 1. DEDICATED BINARY CLASSIFIER (preferred) If a trained EfficientNet-B0 queen-vs-worker classifier is shipped in weights/queen_classifier.pt, every YOLO bee detection is passed through it. The crop with the highest queen probability (above a threshold) is the queen. This is the right architecture: focused, fast, trained specifically for this task. 2. VLM GRID FALLBACK If no classifier weights are available, fall back to the older approach of cropping candidates into a numbered grid and asking Qwen-VL to pick the queen. Less reliable but works zero-shot. """ from __future__ import annotations import math import re from typing import Callable from PIL import Image, ImageDraw, ImageFont import queen_clf import queen_locate GRID_SIDE_PX = 240 # each crop tile this size in the composite grid GRID_COLS = 5 # max cols in the grid MAX_CANDIDATES = 20 # never send more than this many crops to the VLM PADDING_PX = 32 # bbox padding before cropping (gives the VLM more context) def _font(size: int = 22): for path in ( "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "arial.ttf", ): try: return ImageFont.truetype(path, size) except Exception: continue return ImageFont.load_default() def _box_area(bbox: list[float]) -> float: return max(0.0, bbox[2] - bbox[0]) * max(0.0, bbox[3] - bbox[1]) def _crop_with_padding( image: Image.Image, bbox: list[float], pad: int = PADDING_PX ) -> Image.Image: x1, y1, x2, y2 = bbox x1 = max(0, int(x1) - pad) y1 = max(0, int(y1) - pad) x2 = min(image.width, int(x2) + pad) y2 = min(image.height, int(y2) + pad) return image.crop((x1, y1, x2, y2)) def _make_grid(crops: list[Image.Image]) -> Image.Image: n = len(crops) cols = min(GRID_COLS, n) rows = math.ceil(n / cols) tile = GRID_SIDE_PX grid_w = cols * tile grid_h = rows * tile grid = Image.new("RGB", (grid_w, grid_h), (20, 16, 8)) draw = ImageDraw.Draw(grid) font = _font(28) for idx, c in enumerate(crops): col = idx % cols row = idx // cols # Letterbox-resize the crop into a tile c_resized = _fit_into_square(c, tile - 4) x = col * tile + (tile - c_resized.width) // 2 y = row * tile + (tile - c_resized.height) // 2 grid.paste(c_resized, (x, y)) # Draw the index number in the top-left corner of the tile label = str(idx + 1) tx, ty = col * tile + 6, row * tile + 4 tw = draw.textlength(label, font=font) draw.rectangle( [tx - 2, ty - 2, tx + tw + 6, ty + font.size + 4], fill=(244, 163, 0), ) draw.text((tx + 2, ty), label, fill=(20, 16, 8), font=font) return grid def _fit_into_square(img: Image.Image, side: int) -> Image.Image: img = img.convert("RGB") w, h = img.size ratio = side / max(w, h) return img.resize((max(1, int(w * ratio)), max(1, int(h * ratio)))) _GRID_PROMPT = ( "Look at this numbered grid of bee crops from a single hive frame. " "AT MOST ONE bee in the grid is the queen. The rest are workers." "\n\n" "How to identify the queen visually:" "\n 1. SIZE - she is the longest bee. Her body is noticeably longer " "than every worker bee in the grid." "\n 2. ABDOMEN - her abdomen is elongated and extends WELL PAST the " "wing tips. Workers' wings cover most of their abdomen." "\n 3. COLOR - her abdomen is often DARKER (deep amber, brown, or " "almost black) compared to the more uniformly striped workers. " "Sometimes she has a colored paint dot on her thorax." "\n 4. POSTURE - she often appears in the centre of a small cluster " "of attending workers." "\n\n" "Be conservative. Only pick a cell if that bee CLEARLY stands out " "from the others on size + abdomen length. If every bee looks " "roughly the same, there is no queen visible." "\n\n" "Respond with EXACTLY ONE of these formats and nothing else:" "\n - a single cell number, e.g. '3'" "\n - the single word 'none'" "\nNo explanation. No list of numbers. Just one number or 'none'." ) _SINGLE_NUM_RE = re.compile(r"^\s*(\d{1,2})\s*$") _FIRST_NUM_RE = re.compile(r"\b(\d{1,2})\b") def _parse_queen_indices(response: str) -> set[int]: """Parse the VLM's queen pick. ULTRA strict: only accept a clean one-token answer. Anything else -> no queen (better to miss than lie).""" if not response: return set() text = response.strip().lower() if not text or text.startswith("none") or "no queen" in text: return set() # ONLY accept a clean single-number response - nothing else. m = _SINGLE_NUM_RE.match(text) if m: return {int(m.group(1))} return set() def verify_queens( image: Image.Image, detections: list[dict], qwen_caller: Callable[[Image.Image, str], str] | None = None, ) -> tuple[list[dict], dict]: """ Verify which YOLO bee detection (if any) is the queen. Prefers the dedicated binary classifier (queen_clf). Falls back to a VLM-grid cascade only if the classifier weights aren't present. Returns (new_detections, info). """ candidates = [d for d in detections if d["class"] in ("bee", "queen")] if len(candidates) < 1: return detections, { "method": "skip", "n_candidates": 0, "queen_prob": 0.0, "raw_response": "", } # ---- Path A: binary classifier on every YOLO bee crop ---- if queen_clf.is_available(): crops = [_crop_with_padding(image, d["bbox"]) for d in candidates] probs = queen_clf.classify_crops(crops) # SIZE FILTER: real queens are almost always among the largest bees # in the frame. Compute the size-rank percentile of each detection, # then disqualify queen promotions for bees in the smallest 70%. areas = [_box_area(d["bbox"]) for d in candidates] sorted_areas = sorted(areas) size_threshold = sorted_areas[int(len(sorted_areas) * 0.70)] if sorted_areas else 0 size_eligible = {i for i, a in enumerate(areas) if a >= size_threshold} # Rank all candidates by queen probability, descending. ranked = sorted( range(len(candidates)), key=lambda i: probs[i]["queen_prob"], reverse=True, ) # Top queen pick must ALSO be size-eligible (top 30% by area). size_eligible_ranked = [i for i in ranked if i in size_eligible] top_idx = size_eligible_ranked[0] if size_eligible_ranked else None top_prob = probs[top_idx]["queen_prob"] if top_idx is not None else 0.0 # Promotion rule: ONLY the single highest-scoring, size-eligible crop # above QUEEN_PROB_THRESHOLD becomes "queen". Everything else stays a # plain "bee". No cyan candidate boxes - one clean green queen or none. promoted_queen_idx = top_idx if top_prob >= queen_clf.QUEEN_PROB_THRESHOLD else None new_detections = [] others = [d for d in detections if d not in candidates] for i, d in enumerate(candidates): new_d = dict(d) new_d["queen_prob"] = probs[i]["queen_prob"] new_d["class"] = "queen" if i == promoted_queen_idx else "bee" new_detections.append(new_d) new_detections.extend(others) top_probs = [probs[i]["queen_prob"] for i in ranked[:3]] return new_detections, { "method": "classifier", "n_candidates": len(candidates), "queen_prob": top_prob, "queen_found": promoted_queen_idx is not None, "top_3_probs": top_probs, "raw_response": "", } # ---- Path B: geometric outlier locator (no weights, no VLM) ---- # # This replaces the old VLM-grid pick, which was unreliable because it # asked the model to judge each crop in isolation. Here we keep every # bee's class as "bee" and instead TAG the single most queen-like bee # as a *candidate* to confirm by eye - judged relative to the other # bees on this same frame. If none stands out, nothing is tagged. new_detections = [dict(d) for d in detections] info, chosen = queen_locate.locate(new_detections) if chosen is not None: chosen["queen_candidate"] = True chosen["queen_standout"] = info["score"] return new_detections, { "method": "geometric", "n_candidates": info["n_pool"], "queen_candidate": info["candidate"], "standout": info["score"], "margin": info["margin"], "length_ratio": info["length_ratio"], "raw_response": "", }