davidjurgens's picture
Deploy: Potato — Codebook Annotation
aceb1b2 verified
Raw
History Blame Contribute Delete
10 kB
"""
Signal-based triage scoring for the annotation queue.
Captures a per-item quality signal — an agent error, a production thumbs-down, a
low automated score, or any custom field — and turns it into a numeric
*triage priority* so the assignment queue can surface the worst / most-suspect
traces first instead of annotating in arrival (FIFO) order.
The same scorer runs over statically loaded data and over traces ingested at
runtime (webhook / Langfuse), because both funnel through
``ItemStateManager.add_item``. The priority is stored on the item's metadata
(``triage_priority`` / ``triage_reason`` / ``triage_rule``); the ``priority``
assignment strategy reads it; the inline badge and the admin queue page surface it.
Config (all optional):
triage:
enabled: true
order: desc # high priority first (default); 'asc' = low first
default_priority: 0 # items matching no rule
show_badge: true # show a "why prioritized" banner during annotation
signal_field: null # read a numeric priority straight from this field
invert_signal: false # if true, a LOWER field value => HIGHER priority
rules: # evaluated in order; highest matching priority wins
- name: "Agent errored"
when: {field: "status", equals: "error"}
priority: 100
badge: "Agent errored"
- name: "Negative feedback"
when: {field: "feedback", in: ["thumbs_down", "negative"]}
priority: 80
- name: "Low score"
when: {field: "score", lt: 0.5}
priority: 60
When ``enabled`` with no ``rules`` and no ``signal_field``, a turnkey set of
built-in defaults is used (error status, negative feedback, low score) so
ingested traces are triaged out of the box.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# Turnkey defaults applied when triage is enabled but no rules/signal_field are
# configured. These match the signals ingested traces most commonly carry.
DEFAULT_RULES = [
{"name": "Agent errored", "badge": "Agent errored", "priority": 100,
"when": {"field": "status", "in": ["error", "failed", "failure"]}},
{"name": "Negative feedback", "badge": "Negative feedback", "priority": 80,
"when": {"field": "feedback", "in": ["thumbs_down", "negative", "down", "👎"]}},
{"name": "Low score", "badge": "Low score", "priority": 60,
"when": {"field": "score", "lt": 0.5}},
]
@dataclass
class TriageScore:
"""The triage outcome for one item."""
priority: float
reason: str | None = None # human-readable badge text (None when unflagged)
rule: str | None = None # the rule name that matched (None for default/field)
def to_metadata(self) -> dict:
return {
"triage_priority": self.priority,
"triage_reason": self.reason,
"triage_rule": self.rule,
}
def _lookup(data: dict, field: str):
"""Resolve a possibly dotted field path against an item dict.
Supports nested dicts (``metadata.score``). Returns None if any segment is
missing or a non-dict is traversed.
"""
cur = data
for part in str(field).split("."):
if not isinstance(cur, dict) or part not in cur:
return None
cur = cur[part]
return cur
def _as_number(value):
"""Coerce a value to float, or None if it isn't numeric."""
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
try:
return float(value.strip())
except (ValueError, AttributeError):
return None
return None
def _matches(condition: dict, data: dict) -> bool:
"""Evaluate a single rule's ``when`` condition against item data.
Supported operators: equals, in, lt, lte, gt, gte, exists, contains.
String comparisons for equals/in are case-insensitive. Numeric comparisons
coerce both sides. ``contains`` tests membership in a list/string field.
"""
field = condition.get("field")
if field is None:
return False
value = _lookup(data, field)
if "exists" in condition:
present = value is not None
return present == bool(condition["exists"])
# Absent fields never match value-based operators.
if value is None:
return False
if "equals" in condition:
target = condition["equals"]
if isinstance(value, str) and isinstance(target, str):
return value.strip().lower() == target.strip().lower()
return value == target
if "in" in condition:
options = condition["in"] or []
norm = [o.lower() if isinstance(o, str) else o for o in options]
v = value.lower() if isinstance(value, str) else value
return v in norm
if "contains" in condition:
target = condition["contains"]
if isinstance(value, (list, tuple, set)):
tnorm = target.lower() if isinstance(target, str) else target
return any(
(item.lower() if isinstance(item, str) else item) == tnorm
for item in value
)
if isinstance(value, str) and isinstance(target, str):
return target.lower() in value.lower()
return False
for op, py in (("lt", "<"), ("lte", "<="), ("gt", ">"), ("gte", ">=")):
if op in condition:
lhs, rhs = _as_number(value), _as_number(condition[op])
if lhs is None or rhs is None:
return False
if op == "lt":
return lhs < rhs
if op == "lte":
return lhs <= rhs
if op == "gt":
return lhs > rhs
return lhs >= rhs
return False
class TriageScorer:
"""Scores items into a triage priority from the ``triage`` config block."""
def __init__(self, triage_config: dict):
cfg = triage_config or {}
self.enabled = bool(cfg.get("enabled", False))
self.order = str(cfg.get("order", "desc")).lower()
self.default_priority = float(cfg.get("default_priority", 0) or 0)
self.show_badge = bool(cfg.get("show_badge", True))
self.signal_field = cfg.get("signal_field")
self.invert_signal = bool(cfg.get("invert_signal", False))
rules = cfg.get("rules")
if not rules and not self.signal_field:
rules = DEFAULT_RULES
self.rules = rules or []
def score(self, item_data: dict) -> TriageScore:
"""Return the TriageScore for one item (highest matching rule wins)."""
if not self.enabled:
return TriageScore(priority=self.default_priority)
best: TriageScore | None = None
for rule in self.rules:
cond = rule.get("when") or {}
try:
if _matches(cond, item_data):
pr = float(rule.get("priority", 0) or 0)
if best is None or pr > best.priority:
badge = rule.get("badge") or rule.get("name")
best = TriageScore(priority=pr, reason=badge, rule=rule.get("name"))
except Exception as e: # a malformed rule must never break loading
logger.warning(f"Triage rule {rule.get('name')!r} failed: {e}")
if best is not None:
return best
# No rule matched: optionally read a direct numeric signal.
if self.signal_field is not None:
raw = _as_number(_lookup(item_data, self.signal_field))
if raw is not None:
pr = -raw if self.invert_signal else raw
return TriageScore(priority=pr, reason=None, rule=None)
return TriageScore(priority=self.default_priority)
def build_scorer(config: dict) -> TriageScorer | None:
"""Build a TriageScorer from a server config, or None when triage is off."""
triage_cfg = (config or {}).get("triage") or {}
if not triage_cfg.get("enabled"):
return None
return TriageScorer(triage_cfg)
def compute_triage_queue(config: dict) -> dict:
"""Build the admin triage-queue report from the live ItemStateManager.
Returns the remaining (incomplete) items ranked by triage priority, with the
reason/rule that flagged them, current annotation count, and whether they are
already assigned. Used by the ``/admin/triage-queue`` page.
"""
from potato.item_state_management import get_item_state_manager
scorer = build_scorer(config)
order = (scorer.order if scorer else "desc")
reverse = order != "asc"
ism = get_item_state_manager()
rows = []
# Preserve the configured/global ordering as the deterministic tie-break.
ordering = {iid: i for i, iid in enumerate(ism.instance_id_ordering)}
for iid in ism.instance_id_ordering:
item = ism.get_item(iid)
if item is None:
continue
# Skip items that have reached their annotation cap.
try:
if ism._item_is_saturated(iid):
continue
except Exception:
pass
priority = item.get_metadata("triage_priority")
if priority is None:
priority = scorer.default_priority if scorer else 0
n_ann = len(ism.instance_annotators.get(iid, set()))
rows.append({
"id": iid,
"priority": priority,
"reason": item.get_metadata("triage_reason"),
"rule": item.get_metadata("triage_rule"),
"annotations": n_ann,
"assigned": n_ann > 0,
"_order": ordering.get(iid, 0),
})
rows.sort(key=lambda r: (r["priority"], -r["_order"]), reverse=reverse)
for r in rows:
r.pop("_order", None)
return {
"enabled": bool(scorer),
"order": order,
"n_items": len(rows),
"n_flagged": sum(1 for r in rows if r["reason"]),
"items": rows,
}