Apiarist / cascade.py
Apiarist Dev
remove cyan queen candidate boxes - only confident green QUEEN box now
8ec418b
Raw
History Blame Contribute Delete
8.94 kB
"""
Queen verification cascade.
Two strategies, in priority order:
1. DEDICATED BINARY CLASSIFIER (preferred)
If a trained EfficientNet-B0 queen-vs-worker classifier is shipped
in weights/queen_classifier.pt, every YOLO bee detection is passed
through it. The crop with the highest queen probability (above a
threshold) is the queen. This is the right architecture: focused,
fast, trained specifically for this task.
2. VLM GRID FALLBACK
If no classifier weights are available, fall back to the older
approach of cropping candidates into a numbered grid and asking
Qwen-VL to pick the queen. Less reliable but works zero-shot.
"""
from __future__ import annotations
import math
import re
from typing import Callable
from PIL import Image, ImageDraw, ImageFont
import queen_clf
import queen_locate
GRID_SIDE_PX = 240 # each crop tile this size in the composite grid
GRID_COLS = 5 # max cols in the grid
MAX_CANDIDATES = 20 # never send more than this many crops to the VLM
PADDING_PX = 32 # bbox padding before cropping (gives the VLM more context)
def _font(size: int = 22):
for path in (
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"arial.ttf",
):
try:
return ImageFont.truetype(path, size)
except Exception:
continue
return ImageFont.load_default()
def _box_area(bbox: list[float]) -> float:
return max(0.0, bbox[2] - bbox[0]) * max(0.0, bbox[3] - bbox[1])
def _crop_with_padding(
image: Image.Image, bbox: list[float], pad: int = PADDING_PX
) -> Image.Image:
x1, y1, x2, y2 = bbox
x1 = max(0, int(x1) - pad)
y1 = max(0, int(y1) - pad)
x2 = min(image.width, int(x2) + pad)
y2 = min(image.height, int(y2) + pad)
return image.crop((x1, y1, x2, y2))
def _make_grid(crops: list[Image.Image]) -> Image.Image:
n = len(crops)
cols = min(GRID_COLS, n)
rows = math.ceil(n / cols)
tile = GRID_SIDE_PX
grid_w = cols * tile
grid_h = rows * tile
grid = Image.new("RGB", (grid_w, grid_h), (20, 16, 8))
draw = ImageDraw.Draw(grid)
font = _font(28)
for idx, c in enumerate(crops):
col = idx % cols
row = idx // cols
# Letterbox-resize the crop into a tile
c_resized = _fit_into_square(c, tile - 4)
x = col * tile + (tile - c_resized.width) // 2
y = row * tile + (tile - c_resized.height) // 2
grid.paste(c_resized, (x, y))
# Draw the index number in the top-left corner of the tile
label = str(idx + 1)
tx, ty = col * tile + 6, row * tile + 4
tw = draw.textlength(label, font=font)
draw.rectangle(
[tx - 2, ty - 2, tx + tw + 6, ty + font.size + 4],
fill=(244, 163, 0),
)
draw.text((tx + 2, ty), label, fill=(20, 16, 8), font=font)
return grid
def _fit_into_square(img: Image.Image, side: int) -> Image.Image:
img = img.convert("RGB")
w, h = img.size
ratio = side / max(w, h)
return img.resize((max(1, int(w * ratio)), max(1, int(h * ratio))))
_GRID_PROMPT = (
"Look at this numbered grid of bee crops from a single hive frame. "
"AT MOST ONE bee in the grid is the queen. The rest are workers."
"\n\n"
"How to identify the queen visually:"
"\n 1. SIZE - she is the longest bee. Her body is noticeably longer "
"than every worker bee in the grid."
"\n 2. ABDOMEN - her abdomen is elongated and extends WELL PAST the "
"wing tips. Workers' wings cover most of their abdomen."
"\n 3. COLOR - her abdomen is often DARKER (deep amber, brown, or "
"almost black) compared to the more uniformly striped workers. "
"Sometimes she has a colored paint dot on her thorax."
"\n 4. POSTURE - she often appears in the centre of a small cluster "
"of attending workers."
"\n\n"
"Be conservative. Only pick a cell if that bee CLEARLY stands out "
"from the others on size + abdomen length. If every bee looks "
"roughly the same, there is no queen visible."
"\n\n"
"Respond with EXACTLY ONE of these formats and nothing else:"
"\n - a single cell number, e.g. '3'"
"\n - the single word 'none'"
"\nNo explanation. No list of numbers. Just one number or 'none'."
)
_SINGLE_NUM_RE = re.compile(r"^\s*(\d{1,2})\s*$")
_FIRST_NUM_RE = re.compile(r"\b(\d{1,2})\b")
def _parse_queen_indices(response: str) -> set[int]:
"""Parse the VLM's queen pick. ULTRA strict: only accept a clean
one-token answer. Anything else -> no queen (better to miss than lie)."""
if not response:
return set()
text = response.strip().lower()
if not text or text.startswith("none") or "no queen" in text:
return set()
# ONLY accept a clean single-number response - nothing else.
m = _SINGLE_NUM_RE.match(text)
if m:
return {int(m.group(1))}
return set()
def verify_queens(
image: Image.Image,
detections: list[dict],
qwen_caller: Callable[[Image.Image, str], str] | None = None,
) -> tuple[list[dict], dict]:
"""
Verify which YOLO bee detection (if any) is the queen.
Prefers the dedicated binary classifier (queen_clf). Falls back to
a VLM-grid cascade only if the classifier weights aren't present.
Returns (new_detections, info).
"""
candidates = [d for d in detections if d["class"] in ("bee", "queen")]
if len(candidates) < 1:
return detections, {
"method": "skip", "n_candidates": 0,
"queen_prob": 0.0, "raw_response": "",
}
# ---- Path A: binary classifier on every YOLO bee crop ----
if queen_clf.is_available():
crops = [_crop_with_padding(image, d["bbox"]) for d in candidates]
probs = queen_clf.classify_crops(crops)
# SIZE FILTER: real queens are almost always among the largest bees
# in the frame. Compute the size-rank percentile of each detection,
# then disqualify queen promotions for bees in the smallest 70%.
areas = [_box_area(d["bbox"]) for d in candidates]
sorted_areas = sorted(areas)
size_threshold = sorted_areas[int(len(sorted_areas) * 0.70)] if sorted_areas else 0
size_eligible = {i for i, a in enumerate(areas) if a >= size_threshold}
# Rank all candidates by queen probability, descending.
ranked = sorted(
range(len(candidates)),
key=lambda i: probs[i]["queen_prob"],
reverse=True,
)
# Top queen pick must ALSO be size-eligible (top 30% by area).
size_eligible_ranked = [i for i in ranked if i in size_eligible]
top_idx = size_eligible_ranked[0] if size_eligible_ranked else None
top_prob = probs[top_idx]["queen_prob"] if top_idx is not None else 0.0
# Promotion rule: ONLY the single highest-scoring, size-eligible crop
# above QUEEN_PROB_THRESHOLD becomes "queen". Everything else stays a
# plain "bee". No cyan candidate boxes - one clean green queen or none.
promoted_queen_idx = top_idx if top_prob >= queen_clf.QUEEN_PROB_THRESHOLD else None
new_detections = []
others = [d for d in detections if d not in candidates]
for i, d in enumerate(candidates):
new_d = dict(d)
new_d["queen_prob"] = probs[i]["queen_prob"]
new_d["class"] = "queen" if i == promoted_queen_idx else "bee"
new_detections.append(new_d)
new_detections.extend(others)
top_probs = [probs[i]["queen_prob"] for i in ranked[:3]]
return new_detections, {
"method": "classifier",
"n_candidates": len(candidates),
"queen_prob": top_prob,
"queen_found": promoted_queen_idx is not None,
"top_3_probs": top_probs,
"raw_response": "",
}
# ---- Path B: geometric outlier locator (no weights, no VLM) ----
#
# This replaces the old VLM-grid pick, which was unreliable because it
# asked the model to judge each crop in isolation. Here we keep every
# bee's class as "bee" and instead TAG the single most queen-like bee
# as a *candidate* to confirm by eye - judged relative to the other
# bees on this same frame. If none stands out, nothing is tagged.
new_detections = [dict(d) for d in detections]
info, chosen = queen_locate.locate(new_detections)
if chosen is not None:
chosen["queen_candidate"] = True
chosen["queen_standout"] = info["score"]
return new_detections, {
"method": "geometric",
"n_candidates": info["n_pool"],
"queen_candidate": info["candidate"],
"standout": info["score"],
"margin": info["margin"],
"length_ratio": info["length_ratio"],
"raw_response": "",
}