Spaces:
Running on Zero
Running on Zero
| """ | |
| Queen verification cascade. | |
| Two strategies, in priority order: | |
| 1. DEDICATED BINARY CLASSIFIER (preferred) | |
| If a trained EfficientNet-B0 queen-vs-worker classifier is shipped | |
| in weights/queen_classifier.pt, every YOLO bee detection is passed | |
| through it. The crop with the highest queen probability (above a | |
| threshold) is the queen. This is the right architecture: focused, | |
| fast, trained specifically for this task. | |
| 2. VLM GRID FALLBACK | |
| If no classifier weights are available, fall back to the older | |
| approach of cropping candidates into a numbered grid and asking | |
| Qwen-VL to pick the queen. Less reliable but works zero-shot. | |
| """ | |
| from __future__ import annotations | |
| import math | |
| import re | |
| from typing import Callable | |
| from PIL import Image, ImageDraw, ImageFont | |
| import queen_clf | |
| import queen_locate | |
| GRID_SIDE_PX = 240 # each crop tile this size in the composite grid | |
| GRID_COLS = 5 # max cols in the grid | |
| MAX_CANDIDATES = 20 # never send more than this many crops to the VLM | |
| PADDING_PX = 32 # bbox padding before cropping (gives the VLM more context) | |
| def _font(size: int = 22): | |
| for path in ( | |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", | |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", | |
| "arial.ttf", | |
| ): | |
| try: | |
| return ImageFont.truetype(path, size) | |
| except Exception: | |
| continue | |
| return ImageFont.load_default() | |
| def _box_area(bbox: list[float]) -> float: | |
| return max(0.0, bbox[2] - bbox[0]) * max(0.0, bbox[3] - bbox[1]) | |
| def _crop_with_padding( | |
| image: Image.Image, bbox: list[float], pad: int = PADDING_PX | |
| ) -> Image.Image: | |
| x1, y1, x2, y2 = bbox | |
| x1 = max(0, int(x1) - pad) | |
| y1 = max(0, int(y1) - pad) | |
| x2 = min(image.width, int(x2) + pad) | |
| y2 = min(image.height, int(y2) + pad) | |
| return image.crop((x1, y1, x2, y2)) | |
| def _make_grid(crops: list[Image.Image]) -> Image.Image: | |
| n = len(crops) | |
| cols = min(GRID_COLS, n) | |
| rows = math.ceil(n / cols) | |
| tile = GRID_SIDE_PX | |
| grid_w = cols * tile | |
| grid_h = rows * tile | |
| grid = Image.new("RGB", (grid_w, grid_h), (20, 16, 8)) | |
| draw = ImageDraw.Draw(grid) | |
| font = _font(28) | |
| for idx, c in enumerate(crops): | |
| col = idx % cols | |
| row = idx // cols | |
| # Letterbox-resize the crop into a tile | |
| c_resized = _fit_into_square(c, tile - 4) | |
| x = col * tile + (tile - c_resized.width) // 2 | |
| y = row * tile + (tile - c_resized.height) // 2 | |
| grid.paste(c_resized, (x, y)) | |
| # Draw the index number in the top-left corner of the tile | |
| label = str(idx + 1) | |
| tx, ty = col * tile + 6, row * tile + 4 | |
| tw = draw.textlength(label, font=font) | |
| draw.rectangle( | |
| [tx - 2, ty - 2, tx + tw + 6, ty + font.size + 4], | |
| fill=(244, 163, 0), | |
| ) | |
| draw.text((tx + 2, ty), label, fill=(20, 16, 8), font=font) | |
| return grid | |
| def _fit_into_square(img: Image.Image, side: int) -> Image.Image: | |
| img = img.convert("RGB") | |
| w, h = img.size | |
| ratio = side / max(w, h) | |
| return img.resize((max(1, int(w * ratio)), max(1, int(h * ratio)))) | |
| _GRID_PROMPT = ( | |
| "Look at this numbered grid of bee crops from a single hive frame. " | |
| "AT MOST ONE bee in the grid is the queen. The rest are workers." | |
| "\n\n" | |
| "How to identify the queen visually:" | |
| "\n 1. SIZE - she is the longest bee. Her body is noticeably longer " | |
| "than every worker bee in the grid." | |
| "\n 2. ABDOMEN - her abdomen is elongated and extends WELL PAST the " | |
| "wing tips. Workers' wings cover most of their abdomen." | |
| "\n 3. COLOR - her abdomen is often DARKER (deep amber, brown, or " | |
| "almost black) compared to the more uniformly striped workers. " | |
| "Sometimes she has a colored paint dot on her thorax." | |
| "\n 4. POSTURE - she often appears in the centre of a small cluster " | |
| "of attending workers." | |
| "\n\n" | |
| "Be conservative. Only pick a cell if that bee CLEARLY stands out " | |
| "from the others on size + abdomen length. If every bee looks " | |
| "roughly the same, there is no queen visible." | |
| "\n\n" | |
| "Respond with EXACTLY ONE of these formats and nothing else:" | |
| "\n - a single cell number, e.g. '3'" | |
| "\n - the single word 'none'" | |
| "\nNo explanation. No list of numbers. Just one number or 'none'." | |
| ) | |
| _SINGLE_NUM_RE = re.compile(r"^\s*(\d{1,2})\s*$") | |
| _FIRST_NUM_RE = re.compile(r"\b(\d{1,2})\b") | |
| def _parse_queen_indices(response: str) -> set[int]: | |
| """Parse the VLM's queen pick. ULTRA strict: only accept a clean | |
| one-token answer. Anything else -> no queen (better to miss than lie).""" | |
| if not response: | |
| return set() | |
| text = response.strip().lower() | |
| if not text or text.startswith("none") or "no queen" in text: | |
| return set() | |
| # ONLY accept a clean single-number response - nothing else. | |
| m = _SINGLE_NUM_RE.match(text) | |
| if m: | |
| return {int(m.group(1))} | |
| return set() | |
| def verify_queens( | |
| image: Image.Image, | |
| detections: list[dict], | |
| qwen_caller: Callable[[Image.Image, str], str] | None = None, | |
| ) -> tuple[list[dict], dict]: | |
| """ | |
| Verify which YOLO bee detection (if any) is the queen. | |
| Prefers the dedicated binary classifier (queen_clf). Falls back to | |
| a VLM-grid cascade only if the classifier weights aren't present. | |
| Returns (new_detections, info). | |
| """ | |
| candidates = [d for d in detections if d["class"] in ("bee", "queen")] | |
| if len(candidates) < 1: | |
| return detections, { | |
| "method": "skip", "n_candidates": 0, | |
| "queen_prob": 0.0, "raw_response": "", | |
| } | |
| # ---- Path A: binary classifier on every YOLO bee crop ---- | |
| if queen_clf.is_available(): | |
| crops = [_crop_with_padding(image, d["bbox"]) for d in candidates] | |
| probs = queen_clf.classify_crops(crops) | |
| # SIZE FILTER: real queens are almost always among the largest bees | |
| # in the frame. Compute the size-rank percentile of each detection, | |
| # then disqualify queen promotions for bees in the smallest 70%. | |
| areas = [_box_area(d["bbox"]) for d in candidates] | |
| sorted_areas = sorted(areas) | |
| size_threshold = sorted_areas[int(len(sorted_areas) * 0.70)] if sorted_areas else 0 | |
| size_eligible = {i for i, a in enumerate(areas) if a >= size_threshold} | |
| # Rank all candidates by queen probability, descending. | |
| ranked = sorted( | |
| range(len(candidates)), | |
| key=lambda i: probs[i]["queen_prob"], | |
| reverse=True, | |
| ) | |
| # Top queen pick must ALSO be size-eligible (top 30% by area). | |
| size_eligible_ranked = [i for i in ranked if i in size_eligible] | |
| top_idx = size_eligible_ranked[0] if size_eligible_ranked else None | |
| top_prob = probs[top_idx]["queen_prob"] if top_idx is not None else 0.0 | |
| # Promotion rule: ONLY the single highest-scoring, size-eligible crop | |
| # above QUEEN_PROB_THRESHOLD becomes "queen". Everything else stays a | |
| # plain "bee". No cyan candidate boxes - one clean green queen or none. | |
| promoted_queen_idx = top_idx if top_prob >= queen_clf.QUEEN_PROB_THRESHOLD else None | |
| new_detections = [] | |
| others = [d for d in detections if d not in candidates] | |
| for i, d in enumerate(candidates): | |
| new_d = dict(d) | |
| new_d["queen_prob"] = probs[i]["queen_prob"] | |
| new_d["class"] = "queen" if i == promoted_queen_idx else "bee" | |
| new_detections.append(new_d) | |
| new_detections.extend(others) | |
| top_probs = [probs[i]["queen_prob"] for i in ranked[:3]] | |
| return new_detections, { | |
| "method": "classifier", | |
| "n_candidates": len(candidates), | |
| "queen_prob": top_prob, | |
| "queen_found": promoted_queen_idx is not None, | |
| "top_3_probs": top_probs, | |
| "raw_response": "", | |
| } | |
| # ---- Path B: geometric outlier locator (no weights, no VLM) ---- | |
| # | |
| # This replaces the old VLM-grid pick, which was unreliable because it | |
| # asked the model to judge each crop in isolation. Here we keep every | |
| # bee's class as "bee" and instead TAG the single most queen-like bee | |
| # as a *candidate* to confirm by eye - judged relative to the other | |
| # bees on this same frame. If none stands out, nothing is tagged. | |
| new_detections = [dict(d) for d in detections] | |
| info, chosen = queen_locate.locate(new_detections) | |
| if chosen is not None: | |
| chosen["queen_candidate"] = True | |
| chosen["queen_standout"] = info["score"] | |
| return new_detections, { | |
| "method": "geometric", | |
| "n_candidates": info["n_pool"], | |
| "queen_candidate": info["candidate"], | |
| "standout": info["score"], | |
| "margin": info["margin"], | |
| "length_ratio": info["length_ratio"], | |
| "raw_response": "", | |
| } | |